In [1]:
webraw = sc.textFile("data")

In [2]:
# parsing fields with regex
import re

pattern = "([^\"]\S*|\".+?\")\s*"
web = webraw.map(lambda x: re.findall(pattern, x))

web_valid = web.filter(lambda x: len(x) == 15)

In [3]:
print "Successully parsed logs %d out of %d" % (web_valid.count(), web.count())

Successully parsed logs 1158473 out of 1158500


In [3]:
web_valid.first()

[u'2015-07-22T09:00:28.019143Z',
 u'marketpalce-shop',
 u'123.242.248.130:54635',
 u'10.0.6.158:80',
 u'0.000022',
 u'0.026109',
 u'0.00002',
 u'200',
 u'200',
 u'0',
 u'699',
 u'"GET https://paytm.com:443/shop/authresponse?code=f2405b05-e2ee-4b0d-8f6a-9fed0fcfe2e0&state=null HTTP/1.1"',
 u'"Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/43.0.2357.130 Safari/537.36"',
 u'ECDHE-RSA-AES128-GCM-SHA256',
 u'TLSv1.2']

In [9]:
from datetime import datetime

# requires time_str1 > time_str2
def time_difference_mins(time_str1, time_str2):
    return (datetime.strptime(time_str1, '%Y-%m-%dT%H:%M:%S.%fZ') -
            datetime.strptime(time_str2, '%Y-%m-%dT%H:%M:%S.%fZ')).seconds /60.0

# sesseionization by iterating through logs sorted by time
# sessionized logs have an int field appended, denoting the session number
def sessionize(logs, session_mins=30):
    logs.sort(key=lambda x: x[0])
    last_session_time = logs[0][0]
    current_session = 0
    for log in logs:
        if time_difference_mins(log[0], last_session_time) > session_mins:
            current_session += 1
        last_session_time = log[0]
        log.append(current_session)
    return logs


In [10]:
# sessionized logs per IP address
logs_per_ip = web_valid.groupBy(lambda x:x[2].split(':')[0]).map(lambda x: (x[0], sessionize(list(x[1]))))

In [11]:
# calculate the session time for each session, then return the average
def average_session_time(logs):
    # get all session numbers
    sessions = set([i[-1] for i in logs])
    session_times = []
    for session in sessions:
        this_session = [i for i in logs if i[-1] == session]
        session_times.append(time_difference_mins(this_session[-1][0], this_session[0][0]))
    return sum(session_times)/len(session_times)

In [12]:
session_time_per_ip = logs_per_ip.map(lambda x: (x[0], average_session_time(x[1])))

In [53]:
session_time_per_ip.take(10)

[(u'14.195.29.144', 0.9333333333333333),
 (u'49.204.102.63', 0.5166666666666667),
 (u'14.96.97.101', 0.7333333333333333),
 (u'106.221.131.75', 0.0),
 (u'101.61.118.87', 0.0),
 (u'112.110.12.156', 0.06666666666666667),
 (u'122.176.115.129', 7.716666666666667),
 (u'117.198.176.102', 0.1),
 (u'49.238.53.122', 0.06666666666666667),
 (u'223.227.45.25', 0.03333333333333333)]

In [54]:
# top 10 most engaged users
session_time_per_ip.sortBy(lambda x: -x[1]).take(10)

[(u'103.29.159.138', 34.416666666666664),
 (u'125.16.218.194', 34.4),
 (u'14.99.226.79', 34.36666666666667),
 (u'117.217.94.18', 34.35),
 (u'122.169.141.4', 34.333333333333336),
 (u'117.218.61.172', 34.333333333333336),
 (u'180.151.32.147', 34.333333333333336),
 (u'59.97.160.225', 34.31666666666667),
 (u'223.182.246.141', 34.31666666666667),
 (u'59.91.252.41', 34.31666666666667)]

In [13]:
def get_unique_urls(logs):
    return set([(log[-1], log[11].split(' ')[1]) for log in logs])

In [15]:
# unique URL's visited per session, per IP address
unique_urls = logs_per_ip.map(lambda x: (x[0], get_unique_urls(x[1])))

In [18]:
# taking the user 125.16.218.194 as an example
unique_urls.filter(lambda x : x[0] == '14.195.29.144').first()

(u'14.195.29.144',
 {(0, u'https://paytm.com:443/shop/cart'),
  (0, u'https://paytm.com:443/shop/h/baby-kids-toys/baby-care-maternity'),
  (0, u'https://paytm.com:443/shop/log')})