In [1]:
import pyspark


In [2]:
sc

# Spark in a nutshell

In [3]:
# do something to prove it works
rdd = sc.parallelize(range(1000))
rdd.takeSample(False, 5)

[353, 807, 740, 972, 421]

### Exploring the dataset

### GPS Log File

Load RDD 

In [45]:
from datetime import datetime, timedelta
from math import radians, sin, cos, sqrt, atan2

def distance(origin, destination):
    lat1, lon1 = origin["lat"], origin["lon"]
    lat2, lon2 = destination["lat"], destination["lon"]
    radius = 6371 # km

    dlat = radians(lat2-lat1)
    dlon = radians(lon2-lon1)
    a = sin(dlat/2) * sin(dlat/2) + cos(radians(lat1)) \
        * cos(radians(lat2)) * sin(dlon/2) * sin(dlon/2)
    c = 2 * atan2(sqrt(a), sqrt(1-a))
    d = radius * c

    return d


def parseLogLine(line):
    line = line.strip().split(",")    
    date = line[5] + " " + line[6]
    return {
        "lat": float( line[0] ), 
        "lon": float( line[1] ),
        "ts" : datetime.strptime(date, "%Y-%m-%d %H:%M:%S").timestamp()
    }

def isNotHeader(line):
    a = line.split(",")
    return True if len(a) == 7 else False


def locTS(location):
    return location['ts']
    

def t_duration(traceRDD):
    timestampsRDD = traceRDD.map( locTS )
    duration = timestampsRDD.max() - timestampsRDD.min()
    return duration


def t_origin(traceRDD):
    ts_origin = traceRDD.map( locTS ).min()
    originRDD = traceRDD.filter( lambda loc: loc["ts"] == ts_origin )
    origin = originRDD.collect()[0]
    return origin


def t_destination(traceRDD):
    ts_destin = traceRDD.map( locTS ).max()
    destinRDD = traceRDD.filter( lambda loc: loc["ts"] == ts_destin )
    destin = destinRDD.collect()[0]
    return destin


def t_origin_destination(traceRDD):
    origin = t_origin(traceRDD)
    destin = t_destination(traceRDD)
    return (origin, destin)


def durationAsString(duration):
    return str(timedelta(seconds=duration) )
    
file = 'data/010/Trajectory/20080618003409.plt'

rdd  = sc.textFile( file )
data = rdd.filter( isNotHeader ).map( parseLogLine ).sortBy( locTS )

### Checar si hay puntos repetidos
print(data.count())
print(data.map(lambda loc: (loc["lat"], loc["lon"], loc["ts"]) ).distinct().count())
print(data.groupBy(lambda loc: loc["ts"]).count())


time = t_duration(data)
orig = t_origin( data )
dest = t_destination( data )
dist = distance(orig, dest)

time, dist



7882
7882
7882


(16869.0, 122.48572579667099)

In [46]:

# Hacer cleansing

data = sc.textFile( 'data/010/Trajectory/*' ).filter( isNotHeader ).map( parseLogLine ).sortBy( locTS )

# decir de que tamanio es la coleccion antes d ehacer el heatmap. 
# Explicar que hay que reducir la coleecion
print(data.count())
print(data.map(lambda loc: (loc["lat"], loc["lon"], loc["ts"]) ).distinct().count())
print(data.groupBy(lambda loc: loc["ts"]).count())


609637
609455
607439


In [145]:
# cruzar puntos con etiquetas
# analizis de etiquetas

from collections import namedtuple

def parseLabelsLine(line):
    tokens = line.split("\t")    
    start_time = datetime.strptime( tokens[0], "%Y/%m/%d %H:%M:%S").timestamp()
    end_time   = datetime.strptime( tokens[1], "%Y/%m/%d %H:%M:%S").timestamp()
    label = tokens[2]
    return (start_time, end_time, label)


def notLabelsHeader(line):
    return line.startswith( 'Start Time' ) == False


def locInTrajectory( label_loc ):
    label = label_loc[0]
    loc   = label_loc[1]
    start_time = label[0]
    end_time   = label[1]
    return True if start_time <= loc['ts'] and loc['ts'] <= end_time else False


labelsRDD = sc.textFile( 'data/010/labels.txt' )
labelsRDD = labelsRDD.filter( notLabelsHeader ).map( parseLabelsLine )

logRDD = sc.textFile( 'data/010/Trajectory/20080618003409.plt' )
logRDD = logRDD.filter( isNotHeader ).map( parseLogLine ).sortBy( locTS )


Trajectory = namedtuple('Trajectory', ['start', 'end', 'mode', 'locs'])

trajectories = labelsRDD.cartesian( logRDD ).filter( locInTrajectory ).groupByKey()
trajectories = trajectories.map( lambda t: Trajectory(
    start= t[0][0],
    end  = t[0][1],
    mode = t[0][2],
    locs = t[1]
))

trajectories

PythonRDD[1071] at RDD at PythonRDD.scala:48

In [157]:
trajectories.count()

data = trajectories.filter( lambda tj: tj.mode == 'train' ).map( lambda tj: tj.locs ).collect()[0]
type(data)

pyspark.resultiterable.ResultIterable

In [159]:
x = trajectories.map( lambda tj: (tj.mode, len(tj.locs)) ).collect()
print(x)

l = []
for e in x:
    l.append(e[1])

sum(l), logRDD.count()



[('taxi', 532), ('subway', 355), ('walk', 179), ('train', 5804), ('walk', 234), ('bus', 712), ('bus', 66)]


(7882, 7882)

In [154]:
fig = gmaps.figure()

locs = []
for loc in data:
    locs.append( (loc['lat'], loc['lon']) )

layer = gmaps.heatmap_layer(locs)
fig.add_layer( layer )


#heatmap_layer = gmaps.heatmap_layer(locations)
#fig.add_layer(heatmap_layer)
#markers = gmaps.marker_layer(locations=locations, info_box_content=["origine", "destination"])
#fig.add_layer(markers)

fig

In [36]:
import gmaps
gmaps.configure(api_key="AIzaSyBVGIwiga847RsDpucrpR5M5KAs7Zj1_nE")

fig = gmaps.figure()

locations = [ (orig['lat'], orig['lon']), (dest['lat'], dest['lon']) ]

#heatmap_layer = gmaps.heatmap_layer(locations)
#fig.add_layer(heatmap_layer)

markers = gmaps.marker_layer(locations=locations, info_box_content=["origine", "destination"])
fig.add_layer(markers)

fig

In [50]:
import gmaps
gmaps.configure(api_key="AIzaSyBVGIwiga847RsDpucrpR5M5KAs7Zj1_nE")

fig = gmaps.figure()

locations = [ (loc["lat"], loc["lon"]) for loc in data.collect() ]

heatmap_layer = gmaps.heatmap_layer(locations)
fig.add_layer(heatmap_layer)

#markers = gmaps.marker_layer(locations)
#fig.add_layer(markers)

fig

#### Exploring Single File

In spark, you can load a file using the ```sc.textFile()``` operation. Note how this operation returns an RDD (Resiliable Distributed Dataset).

In [None]:
lines = sc.textFile("data/010/Trajectory/20070804033032.plt")
type(lines)

A distributed dataset is equivalent to a list. Lets see the first 10 elements in the RDD:

In [None]:
lines.take(10)

As you can see, the first 6 lines of the file are metadata. Lets filter the metadata by keeping only the lines that contains **locations**. A location in the file is a line composed of 7 attributes, each separated by comma. 


In [None]:
def isNotHeader(line):
    a = line.split(",")
    if len(a) == 7:
        return True
    else:
        return False

filtered_lines = lines.filter( isNotHeader )
filtered_lines.take(4)

Right now locations are strings. We need to parse them so that we can operate over them. 

In [None]:
from datetime import datetime

def parseLocation(line):
    loc = line.strip().split(",")
    lat = float( loc[0] )
    lon = float( loc[1] )
    date = loc[5]+ " " + loc[6]
    date = datetime.strptime(date, "%Y-%m-%d %H:%M:%S")
    return ( lat, lon, date )

locations = filtered_lines.map( parseLocation )
locations.take(4)

With locations in this format, we can do some interesting stuffs. For instance, lets compute the duration of the trayectory:

In [None]:
timestamps = locations.map( 
    lambda loc: loc[2].timestamp() 
)

start_time = timestamps.min()
end_time   = timestamps.max()

duration = int( end_time - start_time )

'Duration: ' + str( duration ) + ' seg'

Knowing the first and last timestamps, we can identify their respective locations and thus, compute the distance.
https://www.movable-type.co.uk/scripts/latlong.html

In [None]:
from math import radians, sin, cos, sqrt, atan2

def distance(origin, destination):
    lat1, lon1 = origin[0], origin[1]
    lat2, lon2 = destination[0], destination[1]
    radius = 6371 # km

    dlat = radians(lat2-lat1)
    dlon = radians(lon2-lon1)
    a = sin(dlat/2) * sin(dlat/2) + cos(math.radians(lat1)) \
        * cos(radians(lat2)) * sin(dlon/2) * sin(dlon/2)
    c = 2 * atan2(sqrt(a), sqrt(1-a))
    d = radius * c

    return d

loc1 = (39.921712, 116.472343)
loc2 = (39.902885, 116.4213)

dist = distance(loc1, loc2)
str(dist) + ' km'

Now, lets complete our RDD with distance and duration:

In [None]:

locations.map( lambda loc: (
    loc[0], loc[1], loc[2]
)).take(1)



In [None]:

initial_location = locations.filter( lambda loc: loc[2].timestamp() == start_time )
final_location   = locations.filter( lambda loc: loc[2].timestamp() == end_time )

loc_1 = initial_location.collect()[0]
loc_n = final_location.collect()[0]

distance(loc_1, loc_n)


### Exploring all user trayectories

GPS logs are organized per users. Lets load all the logs of a specific user

In [None]:

def parseLocation(line):
    loc = line.strip().split(",")
    lat = float( loc[0] )
    lon = float( loc[1] )
    date = loc[5]+ " " + loc[6]
    date = datetime.strptime(date, "%Y-%m-%d %H:%M:%S").timestamp()
    return ( lat, lon, date )


def t_origin_destination(trajectory):
    timestamps = trajectory.map( lambda loc: loc[2] )
    ts_origin = timestamps.min()
    ts_destin = timestamps.max()
    origin = trajectory.filter( lambda loc: loc[2] == ts_origin )
    destin = trajectory.filter( lambda loc: loc[2] == ts_destin )
    return (origin.collect()[0], destin.collect()[0])

   
def t_distance(trajectory):
    (origine, destination) = t_origin_destination(trajectory)
    return distance (origine, destination)


In [None]:
#from os import listdir, stat, remove
import os

USER = '010'
PATH = 'data/{0}/Trajectory'.format( USER )

files = []

for file in os.listdir(PATH):
    files.append( file )
    

    
def load_trajectory(file):
    lines = sc.textFile(file)
    filtered_lines = lines.filter( isNotHeader )
    locations = filtered_lines.map( parseLocation )
    return locations



tmp = [ (load_trajectory( 'data/{0}/Trajectory/{1}'.format( USER, f)), f) for f in files ]

user_trajectories = sc.parallelize(tmp)

'''
user_trajectories = sc.parallelize([])
for t in tmp:
    user_trajectories = user_trajectories.union(t)


user_trajectories.map( lambda loc: (
    loc[0],
    loc[1]
)).take(2)
'''
#user_trajectories[0].count(), user_trajectories[1].count(), user_trajectories[0].union( user_trajectories[1] ).count()

user_trajectories.map( lambda traj: t_distance(traj) )

user_trajectories.take(1)


In [None]:

# Load trajectory 
linesRDD = sc.textFile("data/010/Trajectory/20070804033032.plt")

# Remove file header
def isNotHeader(line):
    a = line.split(",")
    if len(a) == 7:
        return True
    else:
        return False

rdd = linesRDD.filter( isNotHeader )

''' Compact version
rdd = linesRDD.filter(
    lambda line: len(line.split(",")) == 7
)
'''

print( rdd.take(5) )


In [None]:
import calendar
import time

# we are not using altitude in this example

def parse(line):
    a = line.split(",")
    lat = float( a[0] )
    lon = float( a[1] )
    date = a[5]+ " " + a[6]
    ts   = time.strptime(date, "%Y-%m-%d %H:%M:%S") 
    
    return {
        "lat": lat, 
        "lon": lon,
        "ts": calendar.timegm( ts )
    }


'''
GPS_logs = GPS_logs.map(
    lambda line: {
        "latitude":  float(line.split(",")[0]),
        "longitude": float(line.split(",")[1]),
        "altitude":  float(line.split(",")[3]),
        "timestamp": datetime.strptime(line.split(",")[5] + " " + line.split(",")[6], "%Y-%m-%d %H:%M:%S")
    }
)
'''

pointsRDD = rdd.map( parse )

print (pointsRDD.take(5))
    



In [None]:
# sort values

pointsRDD.sortBy(
    lambda point: point['ts']
)



tssRDD = pointsRDD.map(
    lambda point : point['ts']
)

x = tssRDD
#print( x.take(10) )

s = tssRDD.stats()
print('Min ts: ' + str(s.min()))
print('Max ts: ' + str( s.max()) )
print('Duration: ' + str( s.max()-s.min()) )






# xxxxxx

In [None]:
from datetime import datetime

path = 'data/{}/Trajectory/{}.plt'.format('010', '20070804155303') 

def parseLogLine(line):
    # Split line after removing '\n'
    tokens = line.strip().split(",")
    
    # Parse latitude and longitude 
    lat = float( tokens[0] )
    lon = float( tokens[1] )        
    # Parse date-time
    
    date = '{} {}'.format(tokens[5], tokens[6])    
    ts   = datetime.strptime(date, '%Y-%m-%d %H:%M:%S').timestamp()    
    
    return [ lat, lon, ts ]


def parseLogFile(log):
    log = log.split('\n')[7:1]
    return [ parseLogLine(line) for line in log ]
        



with open(path, 'r') as f:
    content = f.read()
    parseLogFile( content )
    
f.closed



In [None]:
import folium
from folium.plugins import HeatMap

_data = data.collect()

center = _data[0]
m = folium.Map(location=[ center['lat'], center['lon']])

_data = [ _data[i] for i in range(1,500000) ]


locations = [ [loc["lat"], loc["lon"]] for loc in _data ]
HeatMap( locations ).add_to(m)

m

In [None]:
import re


def parseLogFile(log):
    log = log.split('\n')[7:]
    return [ parseLogLine(line) for line in log[:1] ]
        

exp = '.*/(\d.*)/Trajectory/(\d*).plt'

        
rdd = sc.wholeTextFiles('data/010/Trajectory/*')

rdd = rdd.map( lambda x: (
    ( re.match(exp, x[0]).group(1), re.match(exp, x[0]).group(2) ), 
    parseLogFile(x[1])
))


rdd = rdd.flatMapValues( lambda x: x )

rdd = rdd.map( lambda x: (
    x[0][0],
    x[0][1],
    x[1][0],
    x[1][1],
    x[1][2]
) )

print( rdd.take(3) )




In [None]:
_str = 'file:/home/jovyan/data/010/Trajectory/20081223230455.plt'

x = re.match('.*/data/(\d.*)/Trajectory/(\d*).plt', _str)

print( x.group(1, 2) ) 



In [None]:
# Load all USER trayectories 
from datetime import datetime

def parseLogLine(line):
    # Split line after removing '\n'
    _line = line.strip().split(",")
    
    # Parse latitude and longitude 
    lat = float( _line[0] )
    lon = float( _line[1] )        

    # Parse date-time
    date = '{} {}'.format(_line[5], _line[6])
    ts   = datetime.strptime(date, '%Y-%m-%d %H:%M:%S').timestamp()    
    
    return ( lat, lon, ts )
    
    

def load_user_trajectory(user, trajectory):
    
    # Load file
    path = 'data/{}/Trajectory/{}.plt'.format(user, trajectory) 
    rdd = sc.textFile( path )
    
    # Remove header
    rdd = rdd.filter(
        lambda line: len(line.split(",")) == 7
    )

    # Parse file lines
    rdd = rdd.map( parseLogLine )
    
    # Produce tuples (usr, trj, lat, lon, ts)
    rdd.map(
        lambda loc : (user, trajectory, loc[0], loc[1], loc[2])
    )
    
    return rdd

    
def load_user_trajectories(user):
    
    path = 'data/{}/Trajectory'.format(user)
    
    rdd = None
    for trajectory in listdir(path):
        tmp = load_user_trajectory(user, trajectory)
        if rdd not None: 
            rdd.union( tmp )
        
        
    
    

'''
    rdd = linesRDD.filter(
    lambda line: len(line.split(",")) == 7
)    
    
from os import listdir, stat, remove
from os.path import isfile, join, exists

PATH  = "data/010/Trajectory"

for t_file in listdir(PATH):
    FULL_PATH = join(PATH, t_file)
    rdd = sc.textFile(FULL_PATH)
    print( rdd.count() )
    if userDir.isdigit() and not isfile(userDir):
        user       = userDir
        inputPath  = join(INPUT,  userDir)
        outputPath = join(OUTPUT, userDir)

        if exists(join(INPUT, "labels.txt")):
            extractUserTrayectories(user, inputPath, outputPath)
            cleanUserDir(outputPath)
'''


xxx = loadTrajectoryRDD('010', '20070804033032')
xxx.take(2)

zzz = loadTrajectoryRDD('010', '20070804155303')
zzz.take(2)

    



# Stop !!!!!

In [None]:
def extractUserTrayectories(userID, basePath):

    labelsFilePath = basePath + "/labels.txt"
    logsPath = basePath + "/Trajectory"

    # ----------------------------------------------------
    # --  Step 1: Load Labels (Trajectories META-DATA)
    # ----------------------------------------------------

    labels = sc.textFile(labelsFilePath)

    # Remove file header
    labels = labels.filter(
        lambda line: not "Start" in line
    )

    labels = labels.map(
        lambda line: {
            "start_time": datetime.strptime(line.split("\t")[0], "%Y/%m/%d %H:%M:%S"),
            "end_time":   datetime.strptime(line.split("\t")[1], "%Y/%m/%d %H:%M:%S"),
            "transportation_mode": line.split("\t")[2]
        }
    )

    # ----------------------------------------------------
    # --    Step 2: Load GPS logs
    # ----------------------------------------------------

    GPS_logs = sc.textFile(logsPath)

    # Remove file header
    GPS_logs = GPS_logs.filter(
        lambda line: len(line.split(",")) == 7
    )

    GPS_logs = GPS_logs.map(
        lambda line: {
            "latitude":  float(line.split(",")[0]),
            "longitude": float(line.split(",")[1]),
            "altitude":  float(line.split(",")[3]),
            "timestamp": datetime.strptime(line.split(",")[5] + " " + line.split(",")[6], "%Y-%m-%d %H:%M:%S")
        }
    )

    # ----------------------------------------------------
    # -- Step 3: Find trajectories
    # ----------------------------------------------------

    CR = labels.cartesian(GPS_logs)

    CR = CR.filter(
        lambda t: t[0]["start_time"] <= t[1]["timestamp"] and t[0]["end_time"] >= t[1]["timestamp"]
    )

    GR = CR.groupBy(
        lambda t: (t[0]["start_time"], t[0]["end_time"], t[0]["transportation_mode"])
    )

    Trayectories = GR.map(
        lambda t: {
            "transportationMode": t[0][2],
            "startTime": t[0][0],
            "endTime":   t[0][1],
            "coordinates": [z[1] for z in t[1]]
        }
    ) 
    


In [None]:
def labels():
    
    labels = sc.textFile("data/105/labels.txt")

    # Remove 1st line in file
    labels = labels.filter(
        lambda line: not "Start Time" in line
    )
    
    print( labels.take(10) )
    

In [None]:
from datetime import datetime
from os import listdir, stat, remove
from os.path import isfile, join, exists

labels()

#extractUserTrayectories(104, "data/104")
'''
INPUT  = "data/104"
for userDir in listdir(INPUT):
    user = userDir
    inputPath = join(INPUT, userDir)
    print(inputPath)
   '''

        


In [None]:
print( sc.textFile("data/104/labels.txt").take(10) )