In [1]:
import findspark
findspark.init()
import pyspark
import os
import pandas as pd
import geohash

sc = pyspark.SparkContext()
spark = pyspark.SQLContext(sc)

working_dir = os.getcwd()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/02/23 11:39:29 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable




First, we read the data processed in the notebook *"data-cleansing.ipynb"*

In [44]:
#processedDf = pd.read_csv('processed_dataset.csv')
# Remove first column which contains a no needed index
#processedDf = processedDf.drop(processedDf.columns[0], axis=1)
# Remove eventTimeEnd because we won't use it
#processedDf = processedDf.drop('eventTimeEnd', axis=1)
#processedDf.head(10)
# Load parquet file into dataframe
processedDf = spark.read.csv("file://" + working_dir + "/processed-dataset.csv", header=True)
processedDf = processedDf.select(['instante_tickado', 'lat_org','long_org', 'lat_dest', 'long_dest','tipo_usuario','linea_org','ruta_org','viaje_org','coche_org','nodo_org','nodo_dest'])
processedDf.show(10, False)

+-------------------+------------------+-------------------+------------------+-------------------+------------+---------+--------+---------+---------+--------+---------+
|instante_tickado   |lat_org           |long_org           |lat_dest          |long_dest          |tipo_usuario|linea_org|ruta_org|viaje_org|coche_org|nodo_org|nodo_dest|
+-------------------+------------------+-------------------+------------------+-------------------+------------+---------+--------+---------+---------+--------+---------+
|2022-11-07 05:18:11|43.45857735990531 |-3.830086800994463 |43.46607675451544 |-3.7955349611972777|2           |41       |1       |2        |2        |509     |76       |
|2022-11-07 05:18:16|43.45857735990531 |-3.830086800994463 |43.46353676983129 |-3.8088896413509863|2           |41       |1       |2        |2        |509     |41       |
|2022-11-07 05:46:41|43.46543910424869 |-3.7869680290511183|43.467026928269824|-3.8664002658256176|16          |41       |2       |3        |3   

# Data preparation specification

Given, a certain granularity in location (geohash length g), granularity in time (bins per day b) and a chosen wideness (w) of the neighbourhood we want to look at, the aggregated data in the end should have the following columns:

**"geohash"**

Geohash with length g (categorical feature). This column will not actually be used in the prediction. It is just an id and can be used when calculating the distances between the geohashes.

**"time_cat"**

Time of the day as a categorical feature. If $b = 24$ (one bin for every hour), then "time_cat" for a pickup at 14:20:00 should be the string "14:00". If $b = 96$ (one bin for every quarter of an hour), then "time_cat" for a pickup at 14:20:00 should be the string '14:15'.

**"time_num"**

Time of the day as a (binned!) floating point number between 0 and 1, where the center of the bin is converted to a floating point number between 0 and 1. So if $b = 24$, then "time_num" for a pickup at 14:20:00 should be $14.5\,/\,24 =  0.6042$. If $b = 96$, it should translate to $14.375\,/\,24 = 0.5990$.

**"time_cos"**

The binned "time_num" variable converted to a cosine version so that time nicely 'loops' rather than going saw-like when it traverses midnight. See the figure below. This transformation doesn't have any magic powers, but it can make it easier for a model to find the right patterns. "time_cos" = $\cos(\textrm{time_num} \cdot 2\pi)$. So for 24 bins, 14:20:00 would translate to $\cos(0.6042 \cdot 2\pi) = -0.7932$.

[cyclic-transf]: ./images/cyclic-numeric-feature-transformation.png
![alt text][cyclic-transf]

**"time_sin"**

Same thing as 4) but then with sine. So, "time_sin" = $\sin(\textrm{time_num} \cdot 2 \pi)$. For 24 bins per day, 14:20:00 would translate to $\sin(0.6042 \cdot 2 \pi) = -0.6089$.

**"day_cat"**

Day of the week as a categorical feature: "Monday", "Tuesday", etc.

**"day_num"**

Day of the week as a numerical feature going from 0 (Monday morning, start of the week) to 1 (Sunday night), European style. With 24 bins, Tuesday afternoon 14:20:00 would translate to $(1 + \frac{14.5}{24})\,/\,7 = 0.2292$.

**"day_cos"**

Binned "day_num" variable converted to a cosine version. "day_cos" = $\cos(\textrm{day_num} \cdot 2\pi)$

**"day_sin"**

Binned "day_num"variable converted to a sine version. "day_sin" = $\sin(\textrm{day_num} \cdot 2\pi)$

**"weekend"**

0 if weekday, 1 if weekend (Saturday/Sunday)

**Location features**

Latitude and longitude of the center of the geohash the record was bucketed in.

#### Helper functions for cleaning and feature extraction/generation

In [45]:
# Needed libraries
import time
from datetime import date
import math

def date_extractor(date_str,b,minutes_per_bin):
    # Takes a datetime object as a parameter
    # and extracts and returns a tuple of the form: (as per the data specification)
    # (time_cat, time_num, time_cos, time_sin, day_cat, day_num, day_cos, day_sin, weekend)
    # Split date string into list of date, time
    
    d = date_str.split()
    
    #safety check
    if len(d) != 2:
        return tuple([None,])
    
    # TIME (eg. for 16:56:20 and 15 mins per bin)
    #list of hour,min,sec (e.g. [16,56,20])
    time_list = [int(t) for t in d[1].split(':')]
    
    #safety check
    if len(time_list) != 3:
        return tuple([None,])
    
    # calculate number of minute into the day (eg. 1016)
    num_minutes = time_list[0] * 60 + time_list[1]
    
    # Time of the start of the bin
    time_bin = num_minutes / minutes_per_bin     # eg. 1005
    hour_bin = num_minutes / 60                  # eg. 16
    min_bin = (time_bin * minutes_per_bin) % 60  # eg. 45
    
    #get time_cat
    hour_str = str(hour_bin) if hour_bin / 10 > 0 else "0" + str(hour_bin)  # eg. "16"
    min_str = str(min_bin) if min_bin / 10 > 0 else "0" + str(min_bin)      # eg. "45"
    time_cat = hour_str + ":" + min_str                                     # eg. "16:45"
    
    # Get a floating point representation of the center of the time bin
    time_num = (hour_bin*60 + min_bin + minutes_per_bin / 2.0)/(60*24)      # eg. 0.7065972222222222
    
    time_cos = math.cos(time_num * 2 * math.pi)
    time_sin = math.sin(time_num * 2 * math.pi)
    
    # DATE
    # Parse year, month, day
    date_list = d[0].split('-')
    d_obj = date(int(date_list[0]),int(date_list[1]),int(date_list[2]))
    day_to_str = {0: "Monday",
                  1: "Tuesday",
                  2: "Wednesday",
                  3: "Thursday",
                  4: "Friday",
                  5: "Saturday",
                  6: "Sunday"}
    day_of_week = d_obj.weekday()
    day_cat = day_to_str[day_of_week]
    day_num = (day_of_week + time_num)/7.0
    day_cos = math.cos(day_num * 2 * math.pi)
    day_sin = math.sin(day_num * 2 * math.pi)
    
    year = d_obj.year
    month = d_obj.month
    day = d_obj.day
    
    weekend = 0
    #check if it is the weekend
    if day_of_week in [5,6]:
        weekend = 1
       
    return (year, month, day, time_cat, time_num, time_cos, time_sin, day_cat, day_num, day_cos, day_sin, weekend)

In [46]:
def data_cleaner(zipped_row):
    # takes a tuple (row,g,b,minutes_per_bin) as a parameter and returns a tuple of the form:
    # (time_cat, time_num, time_cos, time_sin, day_cat, day_num, day_cos, day_sin, weekend,geohash)
    row = zipped_row[0]
    g = zipped_row[1]
    b = zipped_row[2]
    minutes_per_bin = zipped_row[3]
    # The indices of trip-start datetime, latitude start, longitude start, latitude end and longitude end respectively
    indices = (0, 1, 2, 3, 4,5,6,7,8,9,10,11)
    
    #safety check: make sure row has enough features
    if len(row) < 5:
        return None
    
    #extract day of the week and hour
    date_str = row[indices[0]]
    clean_date = date_extractor(date_str,b,minutes_per_bin)
    #get geo hash

    lat_start = float(row[indices[1]])
    lon_start = float(row[indices[2]])
    lat_end = float(row[indices[3]])
    lon_end = float(row[indices[4]])
    tipo_usuario = int(row[indices[5]])
    linea = int(row[indices[6]])
    ruta = int(row[indices[7]])
    viaje = int(row[indices[8]])
    coche = int(row[indices[9]])
    nodo_org = int(row[indices[10]])
    nodo_dest = int(row[indices[11]])
    location_start = None
    location_end = None
    location_start = geohash.encode(lat_start,lon_start, g)    
    location_end = geohash.encode(lat_end,lon_end, g)
    x_start = math.cos(lat_start) * math.cos(lon_start)
    y_start = math.cos(lat_start) * math.sin(lon_start) 
    z_start = math.sin(lat_start) 
    return tuple(list(clean_date)+[x_start]+[y_start]+[z_start]+[location_start]+[location_end] + [tipo_usuario] + [linea]+ [ruta]+ [viaje]+ [coche]+ [nodo_org]+ [nodo_dest])

#### Specify Parameters

In [47]:
g = 8 #geohash length, a 5x5m square area
b = 12 # number of time bins per day
# Note: b must evenly divide 60
minutes_per_bin = int((12 / float(b)) * 60)

#### Clean data create and create features as specified above

In [48]:
processedRDD = processedDf.rdd
processedRDD = processedRDD \
                .map(lambda row: (row, g, b, minutes_per_bin)) \
                .map(data_cleaner) \
                .filter(lambda row: row != None)

In [49]:
processedRDD

PythonRDD[89] at RDD at PythonRDD.scala:53

In [50]:
featuredDf = spark.createDataFrame(processedRDD, ['year', 'month', 'day', 'time_cat', 'time_num', 'time_cos', \
                                                  'time_sin', 'day_cat', 'day_num', 'day_cos', 'day_sin', 'weekend', \
                                                  'x_start', 'y_start', 'z_start','location_start', 'location_end','linea_org','tipo_usuario','ruta_org','viaje_org','coche_org','nodo_org','nodo_dest'])

In [51]:
featuredPandas = featuredDf.toPandas()
featuredPandas.head(10)

                                                                                

Unnamed: 0,year,month,day,time_cat,time_num,time_cos,time_sin,day_cat,day_num,day_cos,...,z_start,location_start,location_end,linea_org,tipo_usuario,ruta_org,viaje_org,coche_org,nodo_org,nodo_dest
0,2022,11,7,5.3:18.0,0.254167,-0.026177,0.999657,Monday,0.03631,0.974089,...,-0.500105,eztr0z7h,eztr38u0,2,41,1,2,2,509,76
1,2022,11,7,5.3:18.0,0.254167,-0.026177,0.999657,Monday,0.03631,0.974089,...,-0.500105,eztr0z7h,eztr3263,2,41,1,2,2,509,41
2,2022,11,7,5.766666666666667:46.0,0.293056,-0.267238,0.96363,Monday,0.041865,0.965602,...,-0.494151,eztr3bdk,eztr20bt,16,41,2,3,3,36,403
3,2022,11,7,5.9:54.0,0.304167,-0.333807,0.942641,Monday,0.043452,0.962961,...,-0.495661,eztr322d,eztr3053,2,41,2,3,3,42,11
4,2022,11,7,5.983333333333333:59.0,0.311111,-0.374607,0.927184,Monday,0.044444,0.961262,...,-0.500003,eztr0z6v,eztr39e9,2,41,2,3,3,512,137
5,2022,11,7,6.366666666666666:22.0,0.301389,-0.317305,0.948324,Monday,0.043056,0.96363,...,-0.495882,eztr32kb,eztr0ret,17,3,2,2,2,40,2
6,2022,11,7,6.366666666666666:22.0,0.301389,-0.317305,0.948324,Monday,0.043056,0.96363,...,-0.495882,eztr32kb,eztr0xeg,16,3,2,2,2,40,7
7,2022,11,7,6.433333333333334:26.0,0.306944,-0.350207,0.936672,Monday,0.043849,0.962286,...,-0.496893,eztr3056,eztr0zq0,8,3,2,2,2,43,109
8,2022,11,7,6.683333333333334:41.0,0.327778,-0.469472,0.882948,Monday,0.046825,0.957031,...,-0.498129,eztr0ry9,eztr32kb,2,2,1,2,2,3,40
9,2022,11,7,6.716666666666667:43.0,0.330556,-0.48481,0.87462,Monday,0.047222,0.956305,...,-0.497513,eztr0xuj,eztr32kb,16,2,1,2,2,6,40


In [52]:
featuredPandas.to_csv('featured-dataset.csv')