In [2]:
eventsPath = os.environ["YAHOO_DATA"]
rawEventsRdd = sc.textFile(eventsPath + "installs.txt")
rawEventsRdd.take(3)

[u'001e6d8e-cbe7-4374-8c38-f37962a457e9\tair.com.buffalo_studios.bingorush2\t1420974077\t47.237461\t-122.530907\tTacoma\t6\t3\tApp_Installed\tinstall',
 u'009ef415-eb39-4f3e-8714-2b22a6490fbb\tcom.pandora.android\t1421228526\t37.353397\t-121.944977\tSanta Clara\t2\t1\tApp_Installed\tinstall',
 u'01fb3e32-5e7f-498d-aa51-aa8eabdb12ac\tcom.fractal360.go.launcherex.theme.gfl\t1421543961\t33.842270\t-84.211357\tTucker\t5\t20\tApp_Installed\tinstall']

In [3]:
from collections import namedtuple

EventDataRow = namedtuple("EventDataRow", ["userId", "itemId", "ts", "latitude", 
                                           "longitude", "city", "day_of_week", "time_of_day" , "event_type"])

def parseRawData(line):
    lineSplit = line.split("\t")
    return EventDataRow(userId=lineSplit[0],
                      itemId=lineSplit[1],
                      ts=int(lineSplit[2]),
                      latitude=float(lineSplit[3]),
                      longitude=float(lineSplit[4]),
                      city=lineSplit[5],
                      day_of_week=int(lineSplit[6]),
                      time_of_day=int(lineSplit[7]),
                      event_type=lineSplit[-1],
    )
    

eventsRdd = rawEventsRdd.map(parseRawData).cache()
eventsRdd.take(3)

[EventDataRow(userId=u'001e6d8e-cbe7-4374-8c38-f37962a457e9', itemId=u'air.com.buffalo_studios.bingorush2', ts=1420974077, latitude=47.237461, longitude=-122.530907, city=u'Tacoma', day_of_week=6, time_of_day=3, event_type=u'install'),
 EventDataRow(userId=u'009ef415-eb39-4f3e-8714-2b22a6490fbb', itemId=u'com.pandora.android', ts=1421228526, latitude=37.353397, longitude=-121.944977, city=u'Santa Clara', day_of_week=2, time_of_day=1, event_type=u'install'),
 EventDataRow(userId=u'01fb3e32-5e7f-498d-aa51-aa8eabdb12ac', itemId=u'com.fractal360.go.launcherex.theme.gfl', ts=1421543961, latitude=33.84227, longitude=-84.211357, city=u'Tucker', day_of_week=5, time_of_day=20, event_type=u'install')]

In [4]:
userIdConversionDictionary = eventsRdd.map(lambda x: x.userId).distinct().zipWithIndex().collectAsMap()
userIdConversionDictionaryBroadcast = sc.broadcast(userIdConversionDictionary)
itemIdConversionDictionary = eventsRdd.map(lambda x: x.itemId).distinct().zipWithIndex().collectAsMap()
itemIdConversionDictionaryBroadcast = sc.broadcast(itemIdConversionDictionary)
cityConversionDictionary = eventsRdd.map(lambda x: x.city).distinct().zipWithIndex().collectAsMap()
cityConversionDictionaryBroadcast = sc.broadcast(cityConversionDictionary)

In [9]:
eventsConvertedRdd = eventsRdd.map(lambda x: EventDataRow(
    userId=userIdConversionDictionaryBroadcast.value[x.userId],
    itemId=itemIdConversionDictionaryBroadcast.value[x.itemId],
    ts=x.ts,
    latitude=x.latitude,
    longitude=x.longitude,
    city=cityConversionDictionaryBroadcast.value[x.city],
    day_of_week=x.day_of_week,
    time_of_day=x.time_of_day,
    event_type= 1 if x.event_type == "install" else 0
    ))

eventsConvertedRdd.take(3)

[EventDataRow(userId=205, itemId=32750, ts=1420974077, latitude=47.237461, longitude=-122.530907, city=2560, day_of_week=6, time_of_day=3, event_type=1),
 EventDataRow(userId=117275, itemId=32815, ts=1421228526, latitude=37.353397, longitude=-121.944977, city=5433, day_of_week=2, time_of_day=1, event_type=1),
 EventDataRow(userId=107155, itemId=53490, ts=1421543961, latitude=33.84227, longitude=-84.211357, city=7925, day_of_week=5, time_of_day=20, event_type=1)]

In [10]:
eventsConvertedRdd.map(lambda eventRaw: (
    eventRaw.userId,eventRaw.itemId,eventRaw.ts,eventRaw.city,eventRaw.day_of_week,eventRaw.time_of_day,
    eventRaw.latitude,eventRaw.longitude, eventRaw.event_type)
        ).saveAsTextFile(eventsPath + "events_parsed_installed")

In [11]:
import json
with open(eventsPath + 'userIdConversionDictionaryInstalls.txt', 'w') as outfile:
    json.dump(userIdConversionDictionary, outfile)
with open(eventsPath + 'itemIdConversionDictionaryInstalls.txt', 'w') as outfile:
    json.dump(itemIdConversionDictionary, outfile)
with open(eventsPath + 'cityConversionDictionaryInstalls.txt', 'w') as outfile:
    json.dump(cityConversionDictionary, outfile)

# Format data into context information

In [7]:
execfile("../script/utils.py")
eventRDD = loadInstalledDataset(eventsPath + "events_parsed_installed").groupBy(lambda x: x.userId).map(lambda (x,y): (x, sorted(list(y),key=lambda a: a.ts)))
#eventRDD = eventRDD.map(lambda x: (x[0], 
#                        map(lambda y : TrainRow(itemId=y.itemId, 
#                                               context = ContextRow(ts=y.ts,city=y.city,
#                                                                   lat=y.lat, lon=y.lon, moving = 1)), x[1])))
eventRDD.take(3)

[(0,
  [event(userId=0, itemId=69367, ts=1421203932, city=3326, lat=33.201946, lon=-96.883171, event_type=1),
   event(userId=0, itemId=38479, ts=1421254896, city=3326, lat=33.201874, lon=-96.883179, event_type=1)]),
 (3,
  [event(userId=3, itemId=48349, ts=1421556490, city=4407, lat=35.41518, lon=-97.571648, event_type=1),
   event(userId=3, itemId=10575, ts=1421557037, city=4407, lat=35.415195, lon=-97.571678, event_type=1),
   event(userId=3, itemId=10575, ts=1421557428, city=4407, lat=35.415195, lon=-97.571655, event_type=0),
   event(userId=3, itemId=48349, ts=1421557450, city=4407, lat=35.415195, lon=-97.571655, event_type=0)]),
 (6,
  [event(userId=6, itemId=44644, ts=1421519654, city=3201, lat=27.73037, lon=-97.381813, event_type=1),
   event(userId=6, itemId=32063, ts=1421521824, city=3201, lat=27.748331, lon=-97.392975, event_type=1)])]

In [8]:
import datetime
def detectMovement(line):
    #location clustering
    listGroup = map(lambda x: list(x), line[1])
    workGroup = [x for x in listGroup if datetime.datetime.fromtimestamp(int(x[2])).hour >= 6 and  
            datetime.datetime.fromtimestamp(int(x[2])).hour <= 18]
    numNearLocation = []
    i = 0
    for x in workGroup:
        numNearLocation.append(0);
        for y in workGroup:
            if haversine(x[5], x[4], y[5], y[4]) < 0.1:
                numNearLocation[i] = numNearLocation[i] + 1
        i = i + 1
    if len(numNearLocation) > 0:
        index_work = numNearLocation.index(max(numNearLocation))
    else:
        index_work = -1
    
    homeGroup = [x for x in listGroup if datetime.datetime.fromtimestamp(int(x[2])).hour < 6 or
            datetime.datetime.fromtimestamp(int(x[2])).hour > 18]
    
    numNearLocation = []
    i = 0
    for x in homeGroup:
        numNearLocation.append(0);
        for y in homeGroup:
            if haversine(x[5], x[4], y[5], y[4]) < 0.1:
                numNearLocation[i] = numNearLocation[i] + 1
        i = i + 1
    if len(numNearLocation) > 0:
        index_home = numNearLocation.index(max(numNearLocation))
    else:
        index_home = -1

    if index_home != -1 and index_work != -1:
        listGroup = [(x[0],x[2],1) if haversine(x[5], x[4], workGroup[index_work][3], workGroup[index_work][2]) < 0.01
                 else( 
                    (x[0],x[2],2) if haversine(x[5], x[4], homeGroup[index_home][3], homeGroup[index_home][2]) < 0.01
                    else (x[0],x[2],3) 
                    )
                 for x in listGroup]
    else:
        listGroup = [(x[0],x[2],3)
                 for x in listGroup]
    
    
    listGroup = [(x[0],1) if datetime.datetime.fromtimestamp(int(x[1])).hour >= 6 and
                datetime.datetime.fromtimestamp(int(x[1])).hour <= 13
                    else(
                      (x[0],2) if datetime.datetime.fromtimestamp(int(x[1])).hour >= 13 and
                        datetime.datetime.fromtimestamp(int(x[1])).hour <= 18
                      else (x[0],3)
                    )
                for x in listGroup]
    #movement
    data = line[1]
    newData = [(data[0][1], data[0][2], data[0][3], data[0][4],data[0][5], 1, listGroup[0][1],convertTime(data[0].ts),data[0][6])]
    for i in xrange(1,len(data)):
        event = data[i]
        distance = haversine(event[5],event[4], data[i-1][5], data[i-1][4]) * 1000 #in meters
        time_difference = event.ts - newData[i-1][1] #in seconds
        moving = 1 #not available 
        if time_difference <= 300: #if 2 consecutive events are more than 300 seconds away, the movement is not available
            velocity =  distance/time_difference if time_difference > 0 else -1
            if velocity < 0:
                moving = 1; #not available
            elif velocity >= 0 and velocity <= 1:
                moving = 2  #standing still
            elif velocity <=2.4:
                moving = 3 #walking spead
            else:
                moving = 4 #faster
        newData.append((event[1],event[2],event[3],event[4],event[5], moving, listGroup[i][1], convertTime(event.ts),event[6]))
    #return (line[0], map(lambda el : TrainRow(el[0], ContextRow._make(el[1:])),newData))
    return (line[0], newData)
eventRDD_context = eventRDD.map(detectMovement)
eventRDD_context.take(2)
#train(itemId=60075, context=context(ts=1421371713, city=12940, lat=43.503536, lon=-88.558907, moving=1, location=3))

[(0,
  [(69367, 1421203932, 3326, 33.201946, -96.883171, 1, 3, 4, 1),
   (38479, 1421254896, 3326, 33.201874, -96.883179, 1, 2, 3, 1)]),
 (3,
  [(48349, 1421556490, 4407, 35.41518, -97.571648, 1, 3, 1, 1),
   (10575, 1421557037, 4407, 35.415195, -97.571678, 1, 3, 1, 1),
   (10575, 1421557428, 4407, 35.415195, -97.571655, 1, 1, 1, 0),
   (48349, 1421557450, 4407, 35.415195, -97.571655, 2, 1, 1, 0)])]

In [12]:
def splitInstall(line):
    context_data = line[1]
    install = []
    uninstall = []
    for data in context_data:
        if data[8] == 1: #if data.event_type == "install"
            install.append(data)
        else:
            uninstall.append(data)
    return (line[0], install,uninstall)
eventRDD_context_installsplit = eventRDD_context.map(splitInstall)
eventRDD_context_installsplit.take(2)

[(0,
  [(69367, 1421203932, 3326, 33.201946, -96.883171, 1, 3, 4, 1),
   (38479, 1421254896, 3326, 33.201874, -96.883179, 1, 2, 3, 1)],
  []),
 (3,
  [(48349, 1421556490, 4407, 35.41518, -97.571648, 1, 3, 1, 1),
   (10575, 1421557037, 4407, 35.415195, -97.571678, 1, 3, 1, 1)],
  [(10575, 1421557428, 4407, 35.415195, -97.571655, 1, 1, 1, 0),
   (48349, 1421557450, 4407, 35.415195, -97.571655, 2, 1, 1, 0)])]

In [14]:
splitedData = splitRddV2install(eventRDD_context_installsplit,0.8)

splitedData.saveAsTextFile(eventsPath + "splitedDataInstalled")
splitedData.take(2)

[(0,
  [(69367, 1421203932, 3326, 33.201946, -96.883171, 1, 3, 4, 1)],
  [(38479, 1421254896, 3326, 33.201874, -96.883179, 1, 2, 3, 1)],
  []),
 (3,
  [(48349, 1421556490, 4407, 35.41518, -97.571648, 1, 3, 1, 1)],
  [(10575, 1421557037, 4407, 35.415195, -97.571678, 1, 3, 1, 1)],
  [(10575, 1421557428, 4407, 35.415195, -97.571655, 1, 1, 1, 0),
   (48349, 1421557450, 4407, 35.415195, -97.571655, 2, 1, 1, 0)])]

In [19]:
splitedRdd = sc.textFile(eventsPath + "splitedDataInstalled")
splitedRdd = splitedRdd.map(parseContextData2install)
splitedRdd.take(2)
#(uid,[[train],[test], [uninstalls]])

[(0,
  [[train(itemId=69367, context=context(ts=1421203932, city=3326, lat=33.201946, lon=-96.883171, moving=1, location=3, time_of_day=4), event_type=1)],
   [train(itemId=38479, context=context(ts=1421254896, city=3326, lat=33.201874, lon=-96.883179, moving=1, location=2, time_of_day=3), event_type=1)],
   []]),
 (3,
  [[train(itemId=48349, context=context(ts=1421556490, city=4407, lat=35.41518, lon=-97.571648, moving=1, location=3, time_of_day=1), event_type=1)],
   [train(itemId=10575, context=context(ts=1421557037, city=4407, lat=35.415195, lon=-97.571678, moving=1, location=3, time_of_day=1), event_type=1)],
   [train(itemId=10575, context=context(ts=1421557428, city=4407, lat=35.415195, lon=-97.571655, moving=1, location=1, time_of_day=1), event_type=0),
    train(itemId=48349, context=context(ts=1421557450, city=4407, lat=35.415195, lon=-97.571655, moving=2, location=1, time_of_day=1), event_type=0)]])]