In [1]:
file_path = 'input_files/access_log'

In [2]:
input_rdd=sc.textFile(file_path)

In [3]:
def parser(line):
    fields = line.split(' ',1)
    ip = fields[0]
    log={}
    fields = fields[1].split('"')
    address = fields[1].split(' ')
    if len(address) != 3:
        log['method']=address[0]
        log['endpoint']=address[0]
        log['protocol']=address[0]
    else:
        log['method']=address[0]
        log['endpoint']=address[1]
        log['protocol']=address[2]
    address = fields[2].split(' ')
    log['response_code'] = address[1]
    log['content_size'] = address[2]
    
    return (ip,log)

In [4]:
def safe(func,x):
    try:
        return func(x)
    except Exception as ex:
        return(x,ex.args)
    
def safe_parse(line):
    return safe(parser,line)

In [5]:
log_parsed = input_rdd.map(safe_parse)
log_parsed.persist()

PythonRDD[2] at RDD at PythonRDD.scala:43

In [6]:
log_parsed.map(lambda x: isinstance(x[1],dict)).countByValue()

defaultdict(int, {True: 1546})

In [7]:
log_parsed.take(1)

[(u'64.242.88.10',
  {'content_size': u'12846',
   'endpoint': u'/twiki/bin/edit/Main/Double_bounce_sender?topicparent=Main.ConfigurationVariables',
   'method': u'GET',
   'protocol': u'HTTP/1.1',
   'response_code': u'401'})]

In [8]:
log_size = log_parsed.filter(lambda x: x[1]['content_size']!='-').map(lambda x: int(x[1]['content_size']))

def seqOp(acc,val):
    return(acc[0]+val,min(acc[1],val),max(acc[2],val),acc[3]+1)

def combOp(acc1,acc2):
    return(acc1[0]+acc2[0],min(acc1[1],acc2[1]),max(acc1[2],acc2[2]),acc1[3]+acc2[3])

count_log = log_size.aggregate((0,10**10,0,0),seqOp,combOp)


print 'average content size: ',float(count_log[0])/count_log[-1]
print 'min of content size: ',count_log[1]
print 'max of content size: ',count_log[2]


average content size:  7777.84221748
min of content size:  0
max of content size:  138789


In [9]:
log_parsed.map(lambda x: x[1]['response_code']).countByValue()

defaultdict(int,
            {u'200': 1274,
             u'302': 6,
             u'304': 137,
             u'401': 123,
             u'404': 5,
             u'408': 1})

In [10]:
ip_count = log_parsed.map(lambda x: x[0]).countByValue()
dict((key,value) for key, value in ip_count.iteritems() if value > 25)

{u'10.0.0.153': 270,
 u'64.242.88.10': 452,
 u'cr020r01-3.sac.overture.com': 44,
 u'h24-70-69-74.ca.shawcable.net': 32,
 u'h24-71-236-129.ca.shawcable.net': 51,
 u'market-mail.panduit.com': 29,
 u'ts04-ip92.hevanet.com': 28}

In [11]:
import operator
endpoint_count = log_parsed.map(lambda x: x[1]['endpoint']).countByValue()
sorted(endpoint_count.items(), key=operator.itemgetter(1),reverse=True)

[(u'/twiki/pub/TWiki/TWikiLogos/twikiRobot46x50.gif', 64),
 (u'/', 47),
 (u'/twiki/bin/view/Main/WebHome', 41),
 (u'/icons/mailman.jpg', 37),
 (u'/icons/PythonPowered.png', 37),
 (u'/icons/gnu-head-tiny.jpg', 37),
 (u'/favicon.ico', 28),
 (u'/robots.txt', 27),
 (u'/razor.html', 26),
 (u'/twiki/bin/view/Main/SpamAssassinTaggingOnly', 18),
 (u'/twiki/bin/view/Main/SpamAssassinAndPostFix', 17),
 (u'/cgi-bin/mailgraph.cgi/mailgraph_2_err.png', 16),
 (u'/cgi-bin/mailgraph.cgi/mailgraph_3_err.png', 16),
 (u'/cgi-bin/mailgraph.cgi/mailgraph_1_err.png', 16),
 (u'/cgi-bin/mailgraph.cgi/mailgraph_3.png', 16),
 (u'/cgi-bin/mailgraph.cgi/mailgraph_1.png', 16),
 (u'/cgi-bin/mailgraph.cgi/mailgraph_0.png', 16),
 (u'/cgi-bin/mailgraph2.cgi', 16),
 (u'/cgi-bin/mailgraph.cgi/mailgraph_0_err.png', 16),
 (u'/cgi-bin/mailgraph.cgi/mailgraph_2.png', 16),
 (u'/twiki/bin/view/Main/SpamAssassinDeleting', 15),
 (u'/dccstats/stats-spam.1day.png', 15),
 (u'/dccstats/stats-hashes.1year.png', 15),
 (u'/dccstats/st

In [12]:
file_path_end = 'input_files/end_song_sample.csv'
file_path_user = 'input_files/user_data_sample.csv'

In [13]:
from dateutil.parser import parse
import datetime

In [14]:
end_rdd = sc.textFile(file_path_end)
user_rdd = sc.textFile(file_path_user)

In [15]:
def parser_end(line):
    fields = line.split(',')
    id = fields[-1]
    dic = {}
    dic['ms']=int(fields[0])
    dic['context']=fields[1]
    dic['track']=fields[2]
    dic['product']=fields[3]
    #In seconds? to the nearest hundredth
    dic['end']=float(fields[4])
    return (id,dic)

def parser_user(line):
    fields = line.split(',')
    id = fields[-1]
    dic = {}
    dic['gender']=fields[0]
    dic['age_range']=fields[1]
    dic['country']=fields[2]
    dic['acct_age']=fields[3]
    return(id,dic)

In [16]:
def safe_parse_end(line):
    return safe(parser_end,line)

def safe_parse_user(line):
    return safe(parser_user,line)

In [17]:
#Ignore the first line
end_parsed = end_rdd.filter(lambda x: 'ms_played' not in x).map(safe_parse_end)
end_parsed.persist()

user_parsed = user_rdd.map(safe_parse_user)
user_parsed.persist()


PythonRDD[14] at RDD at PythonRDD.scala:43

In [18]:
print end_parsed.map(lambda x: isinstance(x[1],dict)).countByValue()
print user_parsed.map(lambda x: isinstance(x[1],dict)).countByValue()

defaultdict(<type 'int'>, {True: 1342891})
defaultdict(<type 'int'>, {True: 9566})


In [19]:
joined = end_parsed.join(user_parsed)
joined.persist()
del user_parsed,end_parsed

In [20]:
joined.take(1)

[(u'aae4289e475c448c883443cd6ee74fd7',
  ({'context': u'playlist',
    'end': 1444326481.28,
    'ms': 250400,
    'product': u'open',
    'track': u'a356a1c56ace4a5f99cb4079be9743a6'},
   {'acct_age': u'15',
    'age_range': u'45 - 54',
    'country': u'PE',
    'gender': u'female'}))]

In [21]:
def createCombiner(value):
    return (value,1)

def mergeValue(acc,value):
    return (value+acc[0],acc[1]+1)

def mergeCombiner(acc1,acc2):
    return (acc1[0]+acc2[0],acc1[1]+acc2[1])

sum_count = joined.map(lambda x: (x[1][1]['gender'],x[1][0]['ms'])).combineByKey(createCombiner,mergeValue,mergeCombiner).take(2)

print 'total number of songs for males: ',sum_count[1][1][1]
print 'total average total listening time for males: ',float(sum_count[1][1][0])/sum_count[1][1][1]
print 'total number of songs for females: ',sum_count[0][1][1]
print 'total average total listening time for females: ',float(sum_count[0][1][0])/sum_count[0][1][1]

total number of songs for males:  691479
total average total listening time for males:  128950.805728
total number of songs for females:  649178
total average total listening time for females:  128347.948082


In [22]:
import numpy as np

In [23]:
def createCombiner(value):
    return [value]

def mergeValue(acc,value):
    acc.append(value)
    return acc

def mergeCombiner(acc1,acc2):
    return acc1+acc2

sum_count = joined.map(lambda x: ((x[1][1]['gender'],x[1][1]['age_range']),x[1][0]['ms'])).combineByKey(createCombiner,mergeValue,mergeCombiner).mapValues(lambda x: (np.median(x),np.mean(x)))

In [24]:
#Format: <(gender,age_range),(median,average)>
sum_count.collect()

[((u'unknown', u'35 - 44'), (228120.0, 222282.52506596307)),
 ((u'male', u'25 - 29'), (150279.0, 129650.59934334279)),
 ((u'unknown', u''), (66199.5, 112717.89358974359)),
 ((u'male', u'55+'), (145946.0, 135935.70400395166)),
 ((u'female', u'0 - 17'), (95704.5, 114569.02350113122)),
 ((u'female', u'35 - 44'), (191133.0, 153974.20628912715)),
 ((u'female', u'25 - 29'), (138533.5, 124441.19689354212)),
 ((u'female', u'18 - 24'), (120790.0, 120488.54605723236)),
 ((u'female', u'30 - 34'), (176610.0, 141273.56059230183)),
 ((u'male', u''), (17009.5, 87962.633333333331)),
 ((u'unknown', u'45 - 54'), (30232.0, 141282.96875)),
 ((u'female', u'45 - 54'), (190250.0, 155221.79916861522)),
 ((u'male', u'30 - 34'), (180613.0, 151631.11020256457)),
 ((u'female', u'55+'), (181653.0, 154511.96058238635)),
 ((u'male', u'35 - 44'), (187249.0, 158281.96195030669)),
 ((u'male', u'0 - 17'), (43762.5, 104776.17423361684)),
 ((u'male', u'18 - 24'), (71620.0, 113719.48990781416)),
 ((u'unknown', u'25 - 29'),

In [25]:
import operator
country_count = joined.map(lambda x: x[1][1]['country']).countByValue()
max(country_count.iteritems(), key=operator.itemgetter(1))

(u'US', 514371)

In [26]:
def createCombiner(value):
    return (value,1)

def mergeValue(acc,value):
    return (acc[0]+value,acc[1]+1)

def mergeCombiner(acc1,acc2):
    return (acc1[0]+acc2[0],acc1[1]+acc2[1])

country_count2 = joined.map(lambda x: (x[1][1]['country'],x[1][0]['ms'])).combineByKey(createCombiner,mergeValue,mergeCombiner).mapValues(lambda x: float(x[0])/x[1])
country_count2.top(1,key=lambda x: x[1])

[(u'NI', 212247.59668109668)]

In [27]:
def createCombiner(value):
    return (value,1)

def mergeValue(acc,value):
    return (acc[0]+value,acc[1]+1)

def mergeCombiner(acc1,acc2):
    return (acc1[0]+acc2[0],acc1[1]+acc2[1])

country_count_male = joined.filter(lambda x: x[1][1]['gender']=='male').map(lambda x: (x[1][1]['country'],x[1][0]['ms'])).combineByKey(createCombiner,mergeValue,mergeCombiner).mapValues(lambda x: float(x[0])/x[1])
print 'male: ',country_count_male.top(1,key=lambda x: x[1])
country_count_female = joined.filter(lambda x: x[1][1]['gender']=='female').map(lambda x: (x[1][1]['country'],x[1][0]['ms'])).combineByKey(createCombiner,mergeValue,mergeCombiner).mapValues(lambda x: float(x[0])/x[1])
print 'female: ',country_count_female.top(1,key=lambda x: x[1])


male:  [(u'A1', 218347.42510121458)]
female:  [(u'MT', 207817.92753623187)]


In [28]:
country_count_male = joined.filter(lambda x: x[1][1]['gender']=='male').map(lambda x: (x[1][1]['country'],(x[1][0]['ms']))).combineByKey(createCombiner,mergeValue,mergeCombiner)
country_count_female = joined.filter(lambda x: x[1][1]['gender']=='female').map(lambda x: (x[1][1]['country'],x[1][0]['ms'])).combineByKey(createCombiner,mergeValue,mergeCombiner)

joined_country = country_count_male.join(country_count_female)

print 'country with max difference for total listening time: ', joined_country.top(1,key=lambda x: abs(x[1][1][0]-x[1][0][0]))
print 'country with max difference for total number of songs: ', joined_country.top(1,key=lambda x: abs(x[1][1][1]-x[1][0][1]))

country with max difference for total listening time:  [(u'DE', ((6246786330, 51028), (3883585735, 30640)))]
country with max difference for total number of songs:  [(u'DE', ((6246786330, 51028), (3883585735, 30640)))]


In [29]:
#Checking that the two are in fact different

print '10 countries with max difference for total listening time: ', joined_country.top(10,key=lambda x: abs(x[1][1][0]-x[1][0][0]))
print '10 countries with max difference for total number of songs: ', joined_country.top(10,key=lambda x: abs(x[1][1][1]-x[1][0][1]))

10 countries with max difference for total listening time:  [(u'DE', ((6246786330, 51028), (3883585735, 30640))), (u'BR', ((4372532169, 33567), (2293929390, 18463))), (u'IT', ((2958320012, 21324), (1370377921, 9938))), (u'CA', ((1252255088, 9000), (2749479826, 18330))), (u'NL', ((1772226148, 14961), (3054178356, 19952))), (u'ES', ((5190275176, 33056), (4061521115, 31424))), (u'US', ((33179691110, 255472), (32120119010, 257770))), (u'BE', ((405546932, 3502), (1209198261, 6783))), (u'CL', ((798017465, 5372), (1445067843, 9778))), (u'NZ', ((800461078, 5417), (257414302, 3011)))]
10 countries with max difference for total number of songs:  [(u'DE', ((6246786330, 51028), (3883585735, 30640))), (u'BR', ((4372532169, 33567), (2293929390, 18463))), (u'IT', ((2958320012, 21324), (1370377921, 9938))), (u'CA', ((1252255088, 9000), (2749479826, 18330))), (u'NO', ((575488630, 3958), (867066227, 9150))), (u'NL', ((1772226148, 14961), (3054178356, 19952))), (u'GB', ((6167828313, 50498), (5853357936, 

In [30]:
count_55 = joined.filter(lambda x: x[1][1]['age_range']=='55+').map(lambda x: (x[1][1]['country'],(x[1][0]['ms']))).combineByKey(createCombiner,mergeValue,mergeCombiner).mapValues(lambda x: float(x[0])/x[1])

print 'country with maximum average time for users over 55: ', count_55.top(1,key=lambda x:x[1])

country with maximum average time for users over 55:  [(u'NI', 214206.41537324464)]


In [57]:
def create_sessions(iterable):
    i=0
    #Declare them here since it looks like we have an empty session or two
    userid = 0 
    current_session = 0
    first = 0
    second = 0
    all_sessions = []
    session_id = 0
    #Start time (one to save, one to update)
    
    for item in iterable:
        if i == 0:
            userid = item[0][0] 
            current_session = item[0][1]
            first = item[0][1]
            i=1
        #Create new session
        if item[0][0]!=userid:
            #If the beginning of the session is the last thing recorded
            if current_session!=first:
                all_sessions.append(((userid,session_id),first-current_session))
            #Reset all parameters for new user
            session_id=0
            userid=item[0][0]
            first = item[0][1]
            current_session = item[0][1]
        second = item[0][1]
        #If difference from last dropoff is more than 4 hours
        if (second - first > (3*3600)):
            #Append current session to list of sessions
            all_sessions.append(((userid,session_id),first-current_session))
            #Increment session id
            session_id += 1
            #Start new session
            current_session = second
        #Update end of previous song
        first = second   
    #If, by the end of the for loop, our session has not ended, append the session to the list
    if current_session!=second:
        all_sessions.append(((userid,session_id),first-current_session))
    
    return all_sessions

In [58]:
def groupByKeyAndSortByValues(rdd,numPartitions):
    rdd_with_time=rdd.map(lambda (ip,dics):((ip,dics[0]['end']),dics[0]['end']))
    rdd_sorted = rdd_with_time.repartitionAndSortWithinPartitions(numPartitions,lambda x:hash(x[0])%numPartitions,ascending=True)
    
    return rdd_sorted.mapPartitions(create_sessions)

In [59]:
sessions = groupByKeyAndSortByValues(joined,40)
sessions.persist()

PythonRDD[127] at RDD at PythonRDD.scala:43

In [60]:
sessions.take(100)

[((u'0179cf1fa7b941fdbbc214e61f01e283', 0), 1428.5099999904633),
 ((u'0179cf1fa7b941fdbbc214e61f01e283', 1), 1619.3100001811981),
 ((u'0179cf1fa7b941fdbbc214e61f01e283', 2), 1639.75),
 ((u'01a642d83b3249ffbc0fdd2f1188ffc7', 0), 1042.1600000858307),
 ((u'01ea4bb0fc3641a3aae36f9898aab2f2', 0), 8892.580000162125),
 ((u'01ea4bb0fc3641a3aae36f9898aab2f2', 1), 24676.96999979019),
 ((u'01ea4bb0fc3641a3aae36f9898aab2f2', 2), 0.0),
 ((u'01ea4bb0fc3641a3aae36f9898aab2f2', 3), 20388.44000005722),
 ((u'01ea4bb0fc3641a3aae36f9898aab2f2', 4), 801.9900000095367),
 ((u'01ea4bb0fc3641a3aae36f9898aab2f2', 5), 323.26000022888184),
 ((u'01ea4bb0fc3641a3aae36f9898aab2f2', 6), 8297.490000009537),
 ((u'01ea4bb0fc3641a3aae36f9898aab2f2', 7), 8346.430000066757),
 ((u'01ea4bb0fc3641a3aae36f9898aab2f2', 8), 12131.66000008583),
 ((u'01ea4bb0fc3641a3aae36f9898aab2f2', 9), 2977.5),
 ((u'01ea4bb0fc3641a3aae36f9898aab2f2', 10), 4175.509999990463),
 ((u'01ea4bb0fc3641a3aae36f9898aab2f2', 11), 3632.7000000476837),
 ((u

In [63]:
sessions_male = groupByKeyAndSortByValues(joined.filter(lambda x: x[1][1]['gender']=='male'),40)
sessions_male.persist()
print 'max male session duration: ', sessions_male.top(1,key=lambda x:x[1])
print 'min male session duration: ', sessions_male.top(1,key=lambda x:-x[1])
print 'average male session duration: ', sessions_male.map(lambda x: x[1]).mean()

max male session duration:  [((u'3b4478b896634468984355f29a977a29', 0), 426718.71000003815)]
min male session duration:  [((u'01ea4bb0fc3641a3aae36f9898aab2f2', 2), 0.0)]
average male session duration:  5200.34454784


In [64]:
sessions_female = groupByKeyAndSortByValues(joined.filter(lambda x: x[1][1]['gender']=='female'),40)
sessions_female.persist()
print 'max female session duration: ', sessions_female.top(1,key=lambda x:x[1])
print 'min female session duration: ', sessions_female.top(1,key=lambda x:-x[1])
print 'average female session duration: ', sessions_female.map(lambda x: x[1]).mean()

max female session duration:  [((u'01d7789847e9489aaca7230b8cf7f145', 0), 165160.24000000954)]
min female session duration:  [((u'02ae4d79e89a4186bf43084638bfb325', 1), 0.0)]
average female session duration:  4967.13046781


In [75]:
def create_sessions_country(iterable):
    i=0
    #Declare them here since it looks like we have an empty session or two
    userid = 0 
    current_session = 0
    first = 0
    second = 0
    all_sessions = []
    session_id = 0
    country = 0
    #Start time (one to save, one to update)
    
    for item in iterable:
        if i == 0:
            userid = item[0][0] 
            country = item[0][1]
            current_session = item[0][2]
            first = item[0][2]
            i=1
        #Create new session
        if item[0][0]!=userid:
            #If the beginning of the session is the last thing recorded
            if current_session!=first:
                all_sessions.append(((userid,country,session_id),first-current_session))
            #Reset all parameters for new user
            session_id=0
            userid=item[0][0]
            country = item[0][1]
            first = item[0][2]
            current_session = item[0][2]
        second = item[0][2]
        #If difference from last dropoff is more than 4 hours
        if (second - first > (3*3600)):
            #Append current session to list of sessions
            all_sessions.append(((userid,country,session_id),first-current_session))
            #Increment session id
            session_id += 1
            #Start new session
            current_session = second
        #Update end of previous song
        first = second   
    #If, by the end of the for loop, our session has not ended, append the session to the list
    if current_session!=second:
        all_sessions.append(((userid,country,session_id),first-current_session))
    
    return all_sessions

def groupByKeyAndSortByValuesCountry(rdd,numPartitions):
    rdd_with_time=rdd.map(lambda (ip,dics):((ip,dics[1]['country'],dics[0]['end']),dics[0]['end']))
    rdd_sorted = rdd_with_time.repartitionAndSortWithinPartitions(numPartitions,lambda x:hash(x[0])%numPartitions,ascending=True)
    
    return rdd_sorted.mapPartitions(create_sessions_country)

sessions_male_country = groupByKeyAndSortByValuesCountry(joined.filter(lambda x: x[1][1]['gender']=='male'),40)
sessions_female_country = groupByKeyAndSortByValuesCountry(joined.filter(lambda x: x[1][1]['gender']=='female'),40)



In [80]:
print sessions_male_country.take(1)
print sessions_female_country.take(1)

[((u'01a642d83b3249ffbc0fdd2f1188ffc7', u'ES', 0), 1042.1600000858307)]
[((u'0179cf1fa7b941fdbbc214e61f01e283', u'US', 0), 1428.5099999904633)]


In [84]:
def createCombiner(value):
    return (value,1)

def mergeValue(acc,value):
    return (value+acc[0],acc[1]+1)

def mergeCombiner(acc1,acc2):
    return (acc1[0]+acc2[0],acc1[1]+acc2[1])

#Average male session duration by country
sessions_male_avg = sessions_male_country.map(lambda x: (x[0][1],x[1])).combineByKey(createCombiner,mergeValue,mergeCombiner).map(lambda x: (x[0],float(x[1][0])/x[1][1]))
sessions_female_avg = sessions_female_country.map(lambda x: (x[0][1],x[1])).combineByKey(createCombiner,mergeValue,mergeCombiner).map(lambda x: (x[0],float(x[1][0])/x[1][1]))
sessions_male_avg.join(sessions_female_avg).top(1,key=lambda x:abs(x[1][1]-x[1][0]))

[(u'NI', (20489.341499996186, 1509.2766666412354))]