In [3]:
%%python
import pandas as pd 
datas_py = pd.read_csv("../src/data/train.csv")

In [4]:
%%python
datas_py.head()

Unnamed: 0,id,vendor_id,pickup_datetime,dropoff_datetime,passenger_count,pickup_longitude,pickup_latitude,dropoff_longitude,dropoff_latitude,store_and_fwd_flag,trip_duration
0,id2875421,2,2016-03-14 17:24:55,2016-03-14 17:32:30,1,-73.982155,40.767937,-73.96463,40.765602,N,455
1,id2377394,1,2016-06-12 00:43:35,2016-06-12 00:54:38,1,-73.980415,40.738564,-73.999481,40.731152,N,663
2,id3858529,2,2016-01-19 11:35:24,2016-01-19 12:10:48,1,-73.979027,40.763939,-74.005333,40.710087,N,2124
3,id3504673,2,2016-04-06 19:32:31,2016-04-06 19:39:40,1,-74.01004,40.719971,-74.012268,40.706718,N,429
4,id2181028,2,2016-03-26 13:30:55,2016-03-26 13:38:10,1,-73.973053,40.793209,-73.972923,40.78252,N,435


In [5]:
val df = spark
      .read
      .format("csv")
      .option("header", "true")
      .load("../src/data/train.csv")

df: org.apache.spark.sql.DataFrame = [id: string, vendor_id: string ... 9 more fields]


# Vitesse de chaque trajet 

J'utilise ici la formule de Haversine pour cacluler la distance en km entre un point A et un point B.
$$a = sin^2(\frac{lat_B-lat_A}{2})+ cos(lat_A)*cos(lat_B)*sin^2(\frac{long_B-long_A}{2})$$
$$d=2*r*arcsin(\sqrt{a})$$

avec r le rayon de la sphère sur laquelle sont placés les points. On considérera ici la terre comme sphérique, ce qui donne de bon résultat en pratique.
D'après la formule de wikipédia : https://en.wikipedia.org/wiki/Haversine_formula


On sait ensuite que :
$$vitesse = \frac{distance}{temps}$$

On obtiendra donc ici la vitesse moyenne du trajet en km/seconde, ce qui est une unité peu parlante, mais qui permettra de comparer les trajets entre eux. On pourra par exemple voir quels sont les taxis ayant une vitesse moyenne plus rapide que les autres. Si nécessaire on pourra ensuite adapter l'unité.
On sait bien qu'un taxi ne se déplace pas à allure constante dans les rues, cette notion de vitesse permet selon moi uniquement de comparer les trajets entre eux. 

In [6]:
val df_with_speed = df
    .withColumn("a", pow(sin(toRadians($"dropoff_latitude" - $"pickup_latitude") / 2), 2) + cos(toRadians($"pickup_latitude")) * cos(toRadians($"dropoff_latitude")) * pow(sin(toRadians($"dropoff_longitude" - $"pickup_longitude") / 2), 2))
    .withColumn("distance", asin(sqrt(col("a"))) * 2 * 6371)
    .withColumn("speed",$"distance" / $"trip_duration")

df_with_speed: org.apache.spark.sql.DataFrame = [id: string, vendor_id: string ... 12 more fields]


In [7]:
df_with_speed.take(5)

res0: Array[org.apache.spark.sql.Row] = Array([id2875421,2,2016-03-14 17:24:55,2016-03-14 17:32:30,1,-73.982154846191406,40.767936706542969,-73.964630126953125,40.765602111816406,N,455,1.3830896636179652E-8,1.498520779647477,0.003293452262961488], [id2377394,1,2016-06-12 00:43:35,2016-06-12 00:54:38,1,-73.980415344238281,40.738563537597656,-73.999481201171875,40.731151580810547,N,663,2.0078128522717423E-8,1.805507168795824,0.002723238565302902], [id3858529,2,2016-01-19 11:35:24,2016-01-19 12:10:48,1,-73.979026794433594,40.763938903808594,-74.005332946777344,40.710086822509766,N,2124,2.511076618143245E-7,6.385098495252941,0.0030061668998366013], [id3504673,2,2016-04-06 19:32:31,2016-04-06 19:39:40,1,-74.010040283203125,40.719970703125,-74.01226806640625,40.706718444824219,N,429,1.3591556...

# Nombre de trajets effectués en fonction du jour de la semaine

In [8]:
val df_with_week_day = df.withColumn("pickup_datetime",
    to_timestamp(col("pickup_datetime")))
    .withColumn("week_day_number", date_format(col("pickup_datetime"), "u"))
    .withColumn("week_day", date_format(col("pickup_datetime"), "E"))

df_with_week_day: org.apache.spark.sql.DataFrame = [id: string, vendor_id: string ... 11 more fields]


In [9]:
val df_with_trip_per_day = df_with_week_day.rdd
        .map(x => (x(12),1))
        .reduceByKey(_+_)

df_with_trip_per_day: org.apache.spark.rdd.RDD[(Any, Int)] = ShuffledRDD[20] at reduceByKey at <console>:28


In [10]:
df_with_trip_per_day.collect()

res1: Array[(Any, Int)] = Array((Mon,187418), (Tue,202749), (Sun,195366), (Fri,223533), (Thu,218574), (Sat,220868), (Wed,210136))


On utilise ensuite la méthode du MapReduce pour calculer le nombre de trajet en fonction du jour de la semaine.
Pourquoi utiliser le MapReduce avec le `reduceByKey` et non pas le `groupByKey`.
Lorsque le montant de données est élévé, `groupByKey` est moins performant que `reduceByKey`. Voyons pourquoi.


Lorsqu'on utilise les RDD, chaque morceau de RDD est appelé partition. Chaque partition appartient à un executeur.
`groupByKey` va chercher à regrouper les clés identiques dans la même partitition pour ensuite faire le traitement.
Il y a deux conséquences à cela :

- Cela produit un shuffle assez important des données car il doit le faire pour chaque donnée de chaque partition du RDD. Donc, si les partitions ne sont pas sur une même machine, cela provoque un gros trafic réseau.
- Si dans notre exemple les trajets avaient lieu à 99% le samedi, groupByKey rassemblera toutes les données ayant comme clé "samedi" dans une seule machine, ce qui peut causer des problèmes de mémoire.

Pour `reduceByKey`cela se passe autrement. Il y a d'abord un prétraitement dans chaque partition. Ensuite les nouvelles clé/valeur sont déplacées dans les partitions selon leur clé pour avoir leur traitement final.
La quantité de données est donc moins élevée, réduisant ainsi le temps de traitement.

# Nombre de trajets effectués en fonction de la tranche horaire


on divisera les tranche horaire en fonction de l'heure de départ du taxi : 
- 00:00 - 04:00 -> tranche horaire 1
- 04:00 - 08:00 -> tranche horaire 2 
- 08:00 - 12:00 -> tranche horaire 3
- 12:00 - 16:00 -> tranche horaire 4
- 16:00 - 20:00 -> tranche horaire 5
- 20:00 - 23:59 -> tranche horaire 6

In [11]:
val df_with_time_slice = df
    .withColumn("pickup_datetime",to_timestamp(col("pickup_datetime"))) 
    .withColumn("hour_pickup", date_format(col("pickup_datetime"), "hh:mm"))
    .withColumn("time_slice",
        when((col("hour_pickup")).geq("00:00") && (col("hour_pickup").lt("04:00")), "slice_1")
        .when((col("hour_pickup")).geq("04:00") && (col("hour_pickup").lt("08:00")), "slice_2")
        .when((col("hour_pickup")).geq("08:00") && (col("hour_pickup").lt("12:00")), "slice_3")
        .when((col("hour_pickup")).geq("12:00") && (col("hour_pickup").lt("16:00")), "slice_4")
        .when((col("hour_pickup")).geq("16:00") && (col("hour_pickup").lt("20:00")), "slice_5")
        .when((col("hour_pickup")).geq("20:00") && (col("hour_pickup").leq("23:59")), "slice_6")
        .otherwise("slice_unknown")
)

df_with_time_slice: org.apache.spark.sql.DataFrame = [id: string, vendor_id: string ... 11 more fields]


In [13]:
val df_with_trip_per_time_slice = df_with_time_slice.rdd
    .map(x => (x(12),1))
    .reduceByKey(_+_)

df_with_trip_per_time_slice: org.apache.spark.rdd.RDD[(Any, Int)] = ShuffledRDD[29] at reduceByKey at <console>:28


In [15]:
df_with_trip_per_time_slice.collect()

res3: Array[(Any, Int)] = Array((slice_1,305014), (slice_2,441346), (slice_3,587163), (slice_4,125121))


# Nombre de km par jour de la semaine 

In [16]:
import org.apache.spark.sql.types._

import org.apache.spark.sql.types._


In [17]:
val df_with_distance_per_day_of_week = df 
    .withColumn("a", pow(sin(toRadians($"dropoff_latitude" - $"pickup_latitude") / 2), 2) + cos(toRadians($"pickup_latitude")) * cos(toRadians($"dropoff_latitude")) * pow(sin(toRadians($"dropoff_longitude" - $"pickup_longitude") / 2), 2))
    .withColumn("distance", asin(sqrt(col("a"))) * 2 * 6371)
    .withColumn("pickup_datetime",to_timestamp(col("pickup_datetime"))) 
    .withColumn("week_day_number", date_format(col("pickup_datetime"), "u")) 
    .withColumn("week_day", date_format(col("pickup_datetime"), "E"))

df_with_distance_per_day_of_week: org.apache.spark.sql.DataFrame = [id: string, vendor_id: string ... 13 more fields]


In [18]:
df_with_distance_per_day_of_week.printSchema

root
 |-- id: string (nullable = true)
 |-- vendor_id: string (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- dropoff_datetime: string (nullable = true)
 |-- passenger_count: string (nullable = true)
 |-- pickup_longitude: string (nullable = true)
 |-- pickup_latitude: string (nullable = true)
 |-- dropoff_longitude: string (nullable = true)
 |-- dropoff_latitude: string (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- trip_duration: string (nullable = true)
 |-- a: double (nullable = true)
 |-- distance: double (nullable = true)
 |-- week_day_number: string (nullable = true)
 |-- week_day: string (nullable = true)



In [19]:
val week_day_abb_index = df_with_distance_per_day_of_week.columns.indexOf("week_day")
val distance_index = df_with_distance_per_day_of_week.columns.indexOf("distance")

week_day_abb_index: Int = 14
distance_index: Int = 12


Pour ce dernier calcul je ne comprends malheuresement pas où se trouve mon erreur. J'ai réussi à l'implémenter en pyspark, néanmoins je ne trouve pas comment gérer se problème de type, ni comment cast la variable dans le bon type.

In [20]:
val df_with_km_par_time_slice = df_with_distance_per_day_of_week.rdd
    .map(x => (x(week_day_abb_index),x(distance_index)))
    .reduceByKey((x,y) => x + y)

<console>: 34: error: type mismatch;

In [21]:
spark.stop()