In [1]:
import findspark
findspark.init()

In [2]:
from pyspark import SparkContext, SparkConf
sc = SparkContext()

In [3]:
import re
import json
import dateutil.parser

# for this part, I'll skip the gory part of cleaning the log
# and focus on the logic
P1 = re.compile(r'"" ?:')
P2 = re.compile(r'created_at|craetedAt')
P3 = re.compile(r'session_id')
P4 = re.compile(r'item_id|itemId')
P5 = re.compile(r'payload')

def clean_logs(log):
    log = P1.sub('\":', log)
    log = P2.sub('createdAt', log)
    log = P3.sub('sessionID', log)
    log = P4.sub('itemID', log)
    logj = json.loads(log)
    try:
        logj.pop('userAgent')
        logj.pop('user_agent')
    except KeyError:
        pass
    return logj
    
logs_raw = sc.textFile('/Users/apple/Downloads/datasets/web.log')
logs_clean = logs_raw.filter(lambda log: P5.search(log)).map(clean_logs).filter(lambda log: len(log['payload'])!=0)
logs_clean.cache()
logs_clean.take(1)

[{u'auth': u'15a63c4:e66189ba',
  u'createdAt': u'2013-05-12T00:00:01-08:00',
  u'payload': {u'itemID': u'15607', u'marker': 240},
  u'refId': u'47c7e2f6',
  u'sessionID': u'82ada851-0b3c-4e9d-b8cf-0f0a2ebed278',
  u'type': u'Play',
  u'user': 22700996}]

In [5]:
sc.appName

u'pyspark-shell'

In [4]:
types = logs_clean.map(lambda log: (log['type'], 1))
types_count = types.reduceByKey(lambda x,y: x+y)
types_count.takeOrdered(30, key=lambda x: -x[1])

[(u'Play', 543129),
 (u'Hover', 19617),
 (u'Stop', 7178),
 (u'Home', 5425),
 (u'AddToQueue', 5091),
 (u'Pause', 4424),
 (u'Advance', 3062),
 (u'Resume', 1774),
 (u'Recommendations', 1344),
 (u'Search', 1328),
 (u'Rate', 652),
 (u'ItemPage', 274),
 (u'WriteReview', 274),
 (u'Account', 177),
 (u'Position', 164)]

In [5]:
payloads = logs_clean.map(lambda log: set(log.get('payload', None))).reduce(lambda x,y: x.union(y))
payloads

{u'itemID',
 u'length',
 u'marker',
 u'new',
 u'old',
 u'popular',
 u'rating',
 u'recent',
 u'recommended',
 u'recs',
 u'results',
 u'subAction'}

In [6]:
user_num = logs_clean.map(lambda log: log['user']).distinct().count()
user_num

2195

In [7]:
def map_types(log):
    if log['payload'].get('itemID'):
        action =  log['type']
        actions = ['Play', 'Hover', 'Stop', 'AddToQueue', 'Pause', 'Advance', 'Resume', 'ItemPage', 'Position']
        if action in actions:
            log['payload'][action] = log['payload']['itemID']
            del log['payload']['itemID']
        if action == 'Rate':
            log['payload'] = {'Rate': (log['payload']['itemID'], log['payload']['rating'])}
        if action == 'WriteReview':
            v1, v2, v3 = log['payload'].get('itemID'),log['payload'].get('rating'), log['payload'].get('length') 
            log['payload'] = {'WriteReview': (v1, v2, v3)}
    return log

sessions_pre = logs_clean.map(map_types)
sessions_pre.cache()
sessions_pre.take(1)

[{u'auth': u'15a63c4:e66189ba',
  u'createdAt': u'2013-05-12T00:00:01-08:00',
  u'payload': {u'Play': u'15607', u'marker': 240},
  u'refId': u'47c7e2f6',
  u'sessionID': u'82ada851-0b3c-4e9d-b8cf-0f0a2ebed278',
  u'type': u'Play',
  u'user': 22700996}]

In [8]:
def red_session(x, y):
    z = {}
    for k in set(x) | set(y):
        xval = x.get(k, [])
        yval = y.get(k, [])
        if not isinstance(xval, list):
            xval = [xval]
        if not isinstance(yval, list):
            yval = [yval]
        z[k] = xval + yval
    return z

def map_value(x):
    actions = ['Play', 'Hover', 'Stop', 'AddToQueue', 'Pause', 'Advance', 'Resume', 'ItemPage', 'Position']
    for action in actions:
        if x.get(action):
            x[action] = set(x[action])
    return x
   
def map_marker(x):
    if x[1].get('marker'):
        low = min(x[1]['marker'])
        high = max(x[1]['marker'])
        x[1]['marker'] = [low, high]
    return x

sessions = sessions_pre.map(lambda log: ((log['sessionID'],log['user']),log['payload']))\
                       .filter(lambda (k,v): isinstance(v, dict))\
                       .reduceByKey(red_session)\
                       .mapValues(map_value)\
                       .map(map_marker)
sessions.cache()
sessions.take(1)

[((u'3c6472fd-7ddd-415f-b42a-626d92e6214e', 26513925),
  {u'Pause': {u'28379'},
   u'Play': {u'28379'},
   u'Position': {u'28379'},
   u'Resume': {u'28379'},
   u'Stop': {u'28379'},
   u'marker': [0, 4945],
   u'popular': [u'33172', u'7097', u'38420', u'32212', u'28379'],
   u'recent': [u'2485e9'],
   u'recommended': [u'6282', u'22571', u'8276', u'32065', u'2528']})]

In [9]:
sessions_labeled = sessions.filter(lambda s: s[1].get('new', None))
sessions_labeled.cache()
sessions_labeled.take(1)

[((u'83c28331-2499-43c8-bbe8-f6a7271434dc', 93515290),
  {u'AddToQueue': {u'12458', u'18872', u'30973'},
   u'Hover': {u'12458', u'18872', u'30973', u'4111'},
   u'Pause': {u'35399'},
   u'Play': {u'35399'},
   u'Stop': {u'35399'},
   u'marker': [0, 6454],
   u'new': [u'kid'],
   u'old': [u'adult'],
   u'popular': [u'37830',
    u'10759',
    u'14687',
    u'14696',
    u'4111',
    u'19550',
    u'2815',
    u'7130',
    u'22487',
    u'32353'],
   u'recent': [u'30243'],
   u'recommended': [u'1226',
    u'9740',
    u'18872',
    u'9008e111',
    u'12458',
    u'15630',
    u'13069',
    u'21710',
    u'4004',
    u'30973'],
   u'results': [u'36633e111',
    u'31390',
    u'12319',
    u'4970',
    u'29473e23',
    u'9379',
    u'8235',
    u'14360',
    u'20059e1',
    u'32513',
    u'15315',
    u'14140',
    u'38278',
    u'22134e9',
    u'33254',
    u'10901',
    u'35399',
    u'32568',
    u'16659',
    u'32199'],
   u'subAction': [u'parentalControls']})]

In [10]:
sessions_unlabeled = sessions.filter(lambda s: not s[1].get('new', None))
sessions_unlabeled.cache()
sessions_unlabeled.take(1)

[((u'3c6472fd-7ddd-415f-b42a-626d92e6214e', 26513925),
  {u'Pause': {u'28379'},
   u'Play': {u'28379'},
   u'Position': {u'28379'},
   u'Resume': {u'28379'},
   u'Stop': {u'28379'},
   u'marker': [0, 4945],
   u'popular': [u'33172', u'7097', u'38420', u'32212', u'28379'],
   u'recent': [u'2485e9'],
   u'recommended': [u'6282', u'22571', u'8276', u'32065', u'2528']})]

In [11]:
items = sessions.map(lambda s: set(s[1].get('recommended', []))).reduce(lambda x,y: x.union(y))
len(items)

1447

In [12]:
users = sessions.map(lambda s: set([s[0][1]])).reduce(lambda x,y: x.union(y))
len(users)

2195

In [13]:
subActs = sessions.filter(lambda s: s[1].get('subAction', None))\
                  .map(lambda s: (s[1]['subAction'][0], 1))\
                  .reduceByKey(lambda x,y: x+y)
subActs.takeOrdered(10, key=lambda x: -x[1])

[(u'parentalControls', 134),
 (u'updatePaymentInfo', 26),
 (u'updatePassword', 17)]

In [14]:
s = sessions.take(1)
s1id, s1usr = s[0][0]
s1mk = s[0][1]['marker']
s1 = logs_clean.filter(lambda log: log['sessionID']==s1id and log['user']==s1usr)

t1 = s1.map(lambda s: s['createdAt'])\
       .map(lambda t: dateutil.parser.parse(t))
start = t1.min()
end = t1.max()

print s1mk
print start, end
# turns out the marker's time and session interval are of approximately the same length

[0, 4945]
2013-05-09 02:02:29+00:00 2013-05-09 03:26:09+00:00


In [15]:
def map_features(x):
    res = {
        'Play':0, 'Hover':0, 'Stop':0, 'AddToQueue':0, 'Pause':0, 'Advance':0, 'Resume':0, 'Itempage':0, 'Position':0,\
        'marker':0, 'Rate':0, 'results':0, 'recommended':0, 'WriteReview':0, 'subAction':0,\
        'play_recommended':0, 'play_search':0, 'play_popular':0
    }
    for key in x[1].keys():
        if key == 'marker':
            res[key] = x[1][key][1]-x[1][key][0]
        elif key == 'subAction' and x[1][key] == 'parentalControls':
            res[key] = 1
        elif key == 'Play':
            played = x[1][key]
            res[key] = len(played)
            for p in played:
                if x[1].get('recommended') and p in x[1]['recommended']:
                    res['play_recommended'] += 1
                if x[1].get('results') and p in x[1]['results']:
                    res['play_search'] += 1
                if x[1].get('popular') and p in x[1]['popular']:
                    res['play_popular'] += 1
        elif key in res.keys():
            res[key] = len(x[1][key])
    return (x[0], res)
    
sessions4cluster = sessions.map(map_features)
sessions4cluster.cache()
sessions4cluster.take(1)

[((u'3c6472fd-7ddd-415f-b42a-626d92e6214e', 26513925),
  {'AddToQueue': 0,
   'Advance': 0,
   'Hover': 0,
   'Itempage': 0,
   'Pause': 1,
   'Play': 1,
   'Position': 1,
   'Rate': 0,
   'Resume': 1,
   'Stop': 1,
   'WriteReview': 0,
   'marker': 4945,
   'play_popular': 1,
   'play_recommended': 0,
   'play_search': 0,
   'recommended': 5,
   'results': 0,
   'subAction': 0})]

In [17]:
from math import sqrt
import numpy as np

vec4cluster = sessions4cluster.map(lambda s: sorted(s[1].iteritems()))\
                              .map(lambda s: [v for (k,v) in s])

L2NormSquares = vec4cluster.map(lambda vec: [x**2 for x in vec]).reduce(lambda vec1,vec2: np.add(vec1, vec2))

def normalize(x):
    for i in range(len(x)):
        if L2NormSquares[i] != 0:
            x[i] /= float(sqrt(L2NormSquares[i]))
    return x

vecs = vec4cluster.map(normalize)
vecs.take(1)

[[0.0,
  0.0,
  0.0,
  0,
  0.014282799726001058,
  0.0038016218897750985,
  0.07559289460184544,
  0.0,
  0.022721405353294154,
  0.005969963069325685,
  0.0,
  0.014390258637276391,
  0.0255488065153798,
  0.0,
  0.0,
  0.013170441819759387,
  0.0,
  0.0]]

In [18]:
n = vec4cluster.count()
means = vec4cluster.map(lambda vec: [x/float(n) for x in vec]).reduce(lambda v1,v2: np.add(v1, v2))
variances = vec4cluster.map(lambda vec: np.subtract(vec, means))\
                       .map(lambda vec: [x**2/float(n) for x in vec])\
                       .reduce(lambda v1,v2: np.add(v1, v2))

In [19]:
def standardize(vec, means, variances):
    res = np.zeros(len(vec))
    for i in range(len(vec)):
        if stdevs[i] != 0:
            res[i] = (vec[i]-means[i])/sqrt(variances[i])
    return res
        

vecs3 = vec4cluster.map(lambda vec: standardize(vec, means, stdevs))
clusters3 = KMeans.train(vecs3, 5, maxIterations=10, runs=10)
cost3 = clusters3.computeCost(vecs3)
print cost3

NameError: name 'KMeans' is not defined

In [23]:
sessions_rated = sessions4cluster.filter(lambda s: s[1]['Rate'] != 0)
sessions_rated.count()

629

In [24]:
training_explicit = sessions.filter(lambda s: s[1].get('Rate') is not None)\
                            .map(lambda s: (str(s[0][1]), s[1]['Rate'][0][0], s[1]['Rate'][0][1]))\
                            .map(lambda s: (re.sub('e', '0', s[0]), re.sub('e', '0', s[1]), s[2]))
training_explicit.take(5)

[('16323703', u'20556', 4),
 ('38639248', u'17128', 2),
 ('34053991', u'26716', 4),
 ('53470359', u'16875', 4),
 ('12379735', u'13069', 5)]

In [25]:
def get_implicit(session):
    actions = ['AddToQueue', 'Play', 'WriteReview', 'Rate', 'Itempage']
    uid = session[0][1]
    res = []
    for key in session[1].keys():
        if key in actions:
            for item in session[1][key]:
                res.append((uid, item))
    return res

training_implicit = sessions.flatMap(get_implicit)
training_implicit.take(10)

[(26513925, u'28379'),
 (15133328, u'26212'),
 (15133328, u'11822e111'),
 (15133328, u'23956'),
 (40492473, u'20731'),
 (40492473, u'37830'),
 (53763287, u'19474'),
 (49436657, u'2296e4'),
 (49436657, u'2296e5'),
 (49436657, u'2296e6')]

In [26]:
training_explicit_avgs = training_explicit.map(lambda x: x[2]).reduce(lambda x,y: x+y) / training_explicit.count()
training_explicit_RMSE = training_explicit.map(lambda x: pow(x[2]-training_explicit_avgs, 2)).reduce(lambda x,y: x+y)
training_explicit_RMSE

1182

In [22]:
sessions.take(2)

[((u'3c6472fd-7ddd-415f-b42a-626d92e6214e', 26513925),
  {u'Pause': {u'28379'},
   u'Play': {u'28379'},
   u'Position': {u'28379'},
   u'Resume': {u'28379'},
   u'Stop': {u'28379'},
   u'marker': [0, 4945],
   u'popular': [u'33172', u'7097', u'38420', u'32212', u'28379'],
   u'recent': [u'2485e9'],
   u'recommended': [u'6282', u'22571', u'8276', u'32065', u'2528']}),
 ((u'21b70caf-a85d-4b9d-87c7-a8a32fa5f7f4', 15133328),
  {u'AddToQueue': {u'11822e111', u'23956'},
   u'Hover': {u'11822e111', u'23956', u'29358'},
   u'Pause': {u'26212'},
   u'Play': {u'26212'},
   u'marker': [0, 6555],
   u'popular': [u'14696', u'28448', u'7347', u'18484', u'9116'],
   u'recent': [u'25092'],
   u'recommended': [u'37254', u'10694', u'26212', u'34094', u'24821'],
   u'results': [u'8565',
    u'34980',
    u'38947',
    u'17411',
    u'16819',
    u'37938',
    u'29358',
    u'4026',
    u'23956',
    u'15774',
    u'26237',
    u'10917',
    u'11822e111',
    u'26212',
    u'12117',
    u'36810e23',
    u

In [43]:
from pyspark.mllib.feature import HashingTF
tf = HashingTF(10000)
sk = sessions.map(lambda x: x[1].keys())
cnt = sk.map(lambda x: tf.transform(x))
sk.take(2)

[[u'Play',
  u'Pause',
  u'Resume',
  u'Stop',
  u'popular',
  u'recommended',
  u'marker',
  u'Position',
  u'recent'],
 [u'Play',
  u'Hover',
  u'AddToQueue',
  u'results',
  u'marker',
  u'recommended',
  u'Pause',
  u'popular',
  u'recent']]

In [44]:
cnt.take(2)

[SparseVector(10000, {119: 1.0, 153: 1.0, 1731: 1.0, 4130: 1.0, 5042: 1.0, 5260: 1.0, 6904: 1.0, 7400: 1.0, 9951: 1.0}),
 SparseVector(10000, {1731: 1.0, 2537: 1.0, 3213: 1.0, 4130: 1.0, 5042: 1.0, 6904: 1.0, 6979: 1.0, 7400: 1.0, 9951: 1.0})]

In [45]:
# integrate into spark to scale up
from pyspark.mllib.regression import LabeledPoint
lp = cnt.map(lambda x: LabeledPoint(5, x))
lp.take(2)

[LabeledPoint(5.0, (10000,[119,153,1731,4130,5042,5260,6904,7400,9951],[1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0])),
 LabeledPoint(5.0, (10000,[1731,2537,3213,4130,5042,6904,6979,7400,9951],[1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0]))]