Nous souhaitons étudier le comportement des trajets des taxis new yorkais. Pour cela nous
allons calculer les indicateurs ci-dessous :  
● la vitesse moyenne de chaque trajet,  
● le nombre de trajets effectués en fonction du jour de la semaine,  
● le nombre de trajets effectués en fonction de l’horaire de la journée par tranche de 4h,  
● le nombre de km parcourus par jour de la semaine.  
  
Les données et leurs descriptions sont sur le lien ci-dessous :
https://www.kaggle.com/c/nyc-taxi-trip-duration/data

In [1]:
# Install dependencies
!pip install pyspark



In [3]:
# Get Spark imports
from pyspark.sql import SparkSession
from pyspark.sql.context import SQLContext     # add
from pyspark.sql import functions as f
from pyspark.sql.types import StructType,StructField, StringType, IntegerType , BooleanType
import pyspark.sql.functions as F             # to delete

# Other imports
import datetime
from math import sqrt, cos

In [4]:
# Get Spark Context
spark = SparkSession.builder.appName('pyspark - NYC Taxi Trip Duration').getOrCreate()
sc = spark.sparkContext

In [5]:
df = spark.read.csv("data/trainExtract.csv", header=True, inferSchema=True)
df.printSchema()

root
 |-- id: string (nullable = true)
 |-- vendor_id: integer (nullable = true)
 |-- pickup_datetime: string (nullable = true)
 |-- dropoff_datetime: string (nullable = true)
 |-- passenger_count: integer (nullable = true)
 |-- pickup_longitude: double (nullable = true)
 |-- pickup_latitude: double (nullable = true)
 |-- dropoff_longitude: double (nullable = true)
 |-- dropoff_latitude: double (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- trip_duration: integer (nullable = true)



In [6]:
sc

In [7]:
df.show(1,False,vertical=True)

-RECORD 0---------------------------------
 id                 | id2875421           
 vendor_id          | 2                   
 pickup_datetime    | 2016-03-14 17:24:55 
 dropoff_datetime   | 2016-03-14 17:32:30 
 passenger_count    | 1                   
 pickup_longitude   | -73.9821548461914   
 pickup_latitude    | 40.76793670654297   
 dropoff_longitude  | -73.96463012695312  
 dropoff_latitude   | 40.765602111816406  
 store_and_fwd_flag | N                   
 trip_duration      | 455                 
-RECORD 1---------------------------------
 id                 | id2377394           
 vendor_id          | 1                   
 pickup_datetime    | 2016-06-12 00:43:35 
 dropoff_datetime   | 2016-06-12 00:54:38 
 passenger_count    | 1                   
 pickup_longitude   | -73.98041534423828  
 pickup_latitude    | 40.738563537597656  
 dropoff_longitude  | -73.99948120117188  
 dropoff_latitude   | 40.73115158081055   
 store_and_fwd_flag | N                   
 trip_durat

In [8]:
df.select("trip_duration").show(2)

+-------------+
|trip_duration|
+-------------+
|          455|
|          663|
+-------------+
only showing top 2 rows



In [10]:
#from pyspark.sql.functions import lit

In [11]:
#df_with_distance = df.withColumn("distance", 
#             lit(0))
# 1.852 * 60 * sqrt(pow((df.select("dropoff_longitude")-df.select("pickup_longitude")*cos((df.select("pickup_latitude")+df.select("dropoff_latitude"))/2)), 2)+pow(df.select("dropoff_latitude")-df.select("pickup_latitude"), 2) )
#df_with_distance.show(2,False,vertical=True)

In [12]:
def getKmDistance(pickup_longitude, pickup_latitude, dropoff_longitude, dropoff_latitude):
    x = (dropoff_longitude - pickup_longitude) * cos((pickup_latitude + dropoff_latitude) / 2 )
    y = dropoff_latitude - pickup_latitude
    z = sqrt(pow(x, 2) + pow(y, 2))
    return 1.852 * 60 * z

In [13]:
#res = df.select("pickup_longitude","pickup_latitude","dropoff_longitude","dropoff_latitude").limit(1)
#res.show(3)
distance0 = getKmDistance(-73.982154846191406, 40.767936706542969, -73.964630126953125,40.765602111816406 )
distance0

1.959277081692363

In [15]:
'''
#convert to a UDF Function by passing in the function and return type of function
udfGetDistance = F.udf(getKmDistance, StringType())
df_with_distance = df.withColumn("distance", udfGetDistance("pickup_longitude", "pickup_latitude", "dropoff_longitude", "dropoff_latitude"))
df_with_distance.show(1,False,vertical=True)
''' # Python worker failed to connect back. accept timeout !

'\n#convert to a UDF Function by passing in the function and return type of function\nudfGetDistance = F.udf(getKmDistance, StringType())\ndf_with_distance = df.withColumn("distance", udfGetDistance("pickup_longitude", "pickup_latitude", "dropoff_longitude", "dropoff_latitude"))\ndf_with_distance.show(1,False,vertical=True)\n'

In [18]:
sqlContext = SQLContext(sc)
#sqlContext

In [19]:
df.registerTempTable('df_table')
df_with_distance = sqlContext.sql('select *, \
    1.852*60*sqrt(pow((dropoff_longitude-pickup_longitude)*cos((pickup_latitude+dropoff_latitude)/2),2) + pow(dropoff_latitude-pickup_latitude,2)) \
    as distance from df_table')
df_with_distance.show(2)

+---------+---------+-------------------+-------------------+---------------+------------------+------------------+------------------+------------------+------------------+-------------+-----------------+
|       id|vendor_id|    pickup_datetime|   dropoff_datetime|passenger_count|  pickup_longitude|   pickup_latitude| dropoff_longitude|  dropoff_latitude|store_and_fwd_flag|trip_duration|         distance|
+---------+---------+-------------------+-------------------+---------------+------------------+------------------+------------------+------------------+------------------+-------------+-----------------+
|id2875421|        2|2016-03-14 17:24:55|2016-03-14 17:32:30|              1| -73.9821548461914| 40.76793670654297|-73.96463012695312|40.765602111816406|                 N|          455|1.959277081692363|
|id2377394|        1|2016-06-12 00:43:35|2016-06-12 00:54:38|              1|-73.98041534423828|40.738563537597656|-73.99948120117188| 40.73115158081055|                 N|        

In [38]:
# Get Average distance
#df_with_distance.filter(df_with_distance['distance'] > 0.0).agg({"distance": "avg"})
#print(df_with_distance.filter(df_with_distance['distance'] > 0.0).groupBy().avg('distance'))
df_with_distance.select(f.avg("distance").alias("Average distance")).show()

+------------------+
|  Average distance|
+------------------+
|3.9253457473550606|
+------------------+



In [20]:
date_time_str = df_with_distance.groupby("pickup_datetime").mean().collect()[0]["pickup_datetime"]
date_time_str

'2016-01-01 03:49:48'

In [74]:
def getDayOfWeek(date_time_str):
    date_time_obj = datetime.datetime.strptime(date_time_str, '%Y-%m-%d %H:%M:%S')
    return date_time_obj.strftime('%A').lower()

In [24]:
test_date = getDayOfWeek('2016-01-01 03:49:48')
test_date

'friday'

In [62]:
#convert to a UDF Function by passing in the function and return type of function
#udfsomefunc = F.udf(getDayOfWeek, StringType())
#df_with_day_of_week = df_with_distance.withColumn("day_of_week", udfsomefunc("pickup_datetime"))
#df_with_day_of_week = df_with_distance.withColumn("day_of_week", f.col("pickup_datetime")[0:9])
#df_with_day_of_week.show(1)

In [63]:
#my_udf = f.udf(lambda x: getDayOfWeek(x), StringType())

#df_with_day_of_week = df_with_distance.withColumn('day_of_week', my_udf(df_with_distance.pickup_datetime) ) 
# df_with_day_of_week.show(2,False,vertical=True) # accept time out !

In [67]:
df_with_day_of_week = df_with_distance.withColumn("pickup_datetime",
    f.to_timestamp(f.col("pickup_datetime"))).withColumn("day_of_week", f.date_format(f.col("pickup_datetime"), "EEEE"))
df_with_day_of_week.show(2)

+---------+---------+-------------------+-------------------+---------------+------------------+------------------+------------------+------------------+------------------+-------------+-----------------+-----------+
|       id|vendor_id|    pickup_datetime|   dropoff_datetime|passenger_count|  pickup_longitude|   pickup_latitude| dropoff_longitude|  dropoff_latitude|store_and_fwd_flag|trip_duration|         distance|day_of_week|
+---------+---------+-------------------+-------------------+---------------+------------------+------------------+------------------+------------------+------------------+-------------+-----------------+-----------+
|id2875421|        2|2016-03-14 17:24:55|2016-03-14 17:32:30|              1| -73.9821548461914| 40.76793670654297|-73.96463012695312|40.765602111816406|                 N|          455|1.959277081692363|     Monday|
|id2377394|        1|2016-06-12 00:43:35|2016-06-12 00:54:38|              1|-73.98041534423828|40.738563537597656|-73.9994812011718

In [41]:
#df_with_distance.registerTempTable('df_table_with_day_of_week')
#df_with_day_of_week = sqlContext.sql('select *, \
#    datetime.datetime.strptime(pickup_datetime, "%Y-%m-%d %H:%M:%S").strftime("%A").lower() \
#    as day_of_week from df_table_with_day_of_week')
#df_with_day_of_week.show(2) # mismatched input '"%A"' expecting {'ADD', 'AFTER',...

In [47]:
df_with_day_of_week.select(df_with_day_of_week["passenger_count"]).distinct().show()

+---------------+
|passenger_count|
+---------------+
|              1|
|              6|
|              3|
|              5|
|              4|
|              2|
+---------------+



In [78]:
# le nombre de trajets effectués en fonction du jour de la semaine
df_with_day_of_week.groupBy('day_of_week').count().show()

+-----------+-----+
|day_of_week|count|
+-----------+-----+
|  Wednesday|  140|
|    Tuesday|  141|
|     Friday|  152|
|   Thursday|  142|
|   Saturday|  150|
|     Monday|  136|
|     Sunday|  138|
+-----------+-----+



In [75]:
#df("trip_duration").show(5)
#averageSpeed = distance / duration