# Projet Final Apache Spark

**Nom Etudiant :**  *SOW*

**Prenom Etudiant:**  *Mouhamadou Mansour*

**Classe :**  *Master 1 Big Data Analytics*


## Description
Ce projet consiste à utiliser Apache Spark pour faire l'analyse et le traitement des données de **[San Francisco Fire Department Calls ](https://data.sfgov.org/Public-Safety/Fire-Department-Calls-for-Service/nuek-vuh3)** afin de fournir quelques KPI (*Key Performance Indicator*). Le **SF Fire Dataset** comprend les réponses aux appels de toutes les unités d'incendie. Chaque enregistrement comprend le numéro d'appel, le numéro d'incident, l'adresse, l'identifiant de l'unité, le type d'appel et la disposition. Tous les intervalles de temps pertinents sont également inclus. Étant donné que ce Dataset est basé sur les réponses et que la plupart des appels impliquent plusieurs unités, ainsi il existe plusieurs enregistrements pour chaque numéro d'appel. Les adresses sont associées à un numéro de bloc, à une intersection ou à une boîte d'appel, et non à une adresse spécifique.

**Plus de details sur la description des données cliquer sur ce [lien](https://data.sfgov.org/Public-Safety/Fire-Department-Calls-for-Service/nuek-vuh3)**

## Travail à faire.
L'objectif de ce projet est de comprendre le **SF Fire Dataset** afin de bien répondre aux questions en utilisant les codes Spark/Scala adéquates.

- Créer un repos git (public ou privé) et partager le repos avec mon mail (limahin10@gmail.com)
- Ecrire un code lisible et bien indenté 
- N'oublier pas de mettre en commentaire la justification de vos réponses sur les cellules Markdown. 


## Note:
- Le projet est personnel, c'est-à-dire chaque notebook ne concerne qu'un seul étudiant. 
- Deadline : **Dimanche 10 janvier 2021** (Aucune de dérogation ne sera acceptée)

### Chargement des données

Importation des packages Spark

In [36]:
import org.apache.spark.sql.types._ 
import org.apache.spark.sql.functions._ 
import spark.implicits._

import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._
import spark.implicits._


Nous allons jeter un coup d'oeil sur la structure des données avant de définir un schéma

In [2]:
!head -1 "/home/mansour/prog_fonctionnelle/sf-fire-calls.csv"

CallNumber,UnitID,IncidentNumber,CallType,CallDate,WatchDate,CallFinalDisposition,AvailableDtTm,Address,City,Zipcode,Battalion,StationArea,Box,OriginalPriority,Priority,FinalPriority,ALSUnit,CallTypeGroup,NumAlarms,UnitType,UnitSequenceInCallDispatch,FirePreventionDistrict,SupervisorDistrict,Neighborhood,Location,RowID,Delay



Vu que la taille de ces données est énormes, inferer le schema pour un très grand volume de données s'avère un peu couteux. Nous allons ainsi définir un schema pour le Dataset.

In [3]:
val fireSchema = StructType(Array(StructField("CallNumber", IntegerType, true),
  StructField("UnitID", StringType, true),
  StructField("IncidentNumber", IntegerType, true),
  StructField("CallType", StringType, true),                  
  StructField("CallDate", StringType, true),      
  StructField("WatchDate", StringType, true),
  StructField("CallFinalDisposition", StringType, true),
  StructField("AvailableDtTm", StringType, true),
  StructField("Address", StringType, true),       
  StructField("City", StringType, true),       
  StructField("Zipcode", IntegerType, true),       
  StructField("Battalion", StringType, true),                 
  StructField("StationArea", StringType, true),       
  StructField("Box", StringType, true),       
  StructField("OriginalPriority", StringType, true),       
  StructField("Priority", StringType, true),       
  StructField("FinalPriority", IntegerType, true),       
  StructField("ALSUnit", BooleanType, true),       
  StructField("CallTypeGroup", StringType, true),
  StructField("NumAlarms", IntegerType, true),
  StructField("UnitType", StringType, true),
  StructField("UnitSequenceInCallDispatch", IntegerType, true),
  StructField("FirePreventionDistrict", StringType, true),
  StructField("SupervisorDistrict", StringType, true),
  StructField("Neighborhood", StringType, true),
  StructField("Location", StringType, true),
  StructField("RowID", StringType, true),
  StructField("Delay", FloatType, true)))

fireSchema: org.apache.spark.sql.types.StructType = StructType(StructField(CallNumber,IntegerType,true), StructField(UnitID,StringType,true), StructField(IncidentNumber,IntegerType,true), StructField(CallType,StringType,true), StructField(CallDate,StringType,true), StructField(WatchDate,StringType,true), StructField(CallFinalDisposition,StringType,true), StructField(AvailableDtTm,StringType,true), StructField(Address,StringType,true), StructField(City,StringType,true), StructField(Zipcode,IntegerType,true), StructField(Battalion,StringType,true), StructField(StationArea,StringType,true), StructField(Box,StringType,true), StructField(OriginalPriority,StringType,true), StructField(Priority,StringType,true), StructField(FinalPriority,IntegerType,true), StructField(ALSUnit,BooleanType,true)...


In [4]:
val sfFireFile = "/home/mansour/prog_fonctionnelle/sf-fire-calls.csv"
val fireDF = spark
  .read
  .schema(fireSchema)
  .option("header", "true")
  .csv(sfFireFile)

sfFireFile: String = /home/mansour/prog_fonctionnelle/sf-fire-calls.csv
fireDF: org.apache.spark.sql.DataFrame = [CallNumber: int, UnitID: string ... 26 more fields]


Nous allons mettre en cache le Dataframe

In [5]:
fireDF.cache()

res0: fireDF.type = [CallNumber: int, UnitID: string ... 26 more fields]


In [6]:
fireDF.count()

res1: Long = 175296


In [7]:
fireDF.printSchema()

root
 |-- CallNumber: integer (nullable = true)
 |-- UnitID: string (nullable = true)
 |-- IncidentNumber: integer (nullable = true)
 |-- CallType: string (nullable = true)
 |-- CallDate: string (nullable = true)
 |-- WatchDate: string (nullable = true)
 |-- CallFinalDisposition: string (nullable = true)
 |-- AvailableDtTm: string (nullable = true)
 |-- Address: string (nullable = true)
 |-- City: string (nullable = true)
 |-- Zipcode: integer (nullable = true)
 |-- Battalion: string (nullable = true)
 |-- StationArea: string (nullable = true)
 |-- Box: string (nullable = true)
 |-- OriginalPriority: string (nullable = true)
 |-- Priority: string (nullable = true)
 |-- FinalPriority: integer (nullable = true)
 |-- ALSUnit: boolean (nullable = true)
 |-- CallTypeGroup: string (nullable = true)
 |-- NumAlarms: integer (nullable = true)
 |-- UnitType: string (nullable = true)
 |-- UnitSequenceInCallDispatch: integer (nullable = true)
 |-- FirePreventionDistrict: string (nullable = true)
 

In [8]:
fireDF.show(5)

+----------+------+--------------+----------------+----------+----------+--------------------+--------------------+--------------------+----+-------+---------+-----------+----+----------------+--------+-------------+-------+-------------+---------+--------+--------------------------+----------------------+------------------+--------------------+--------------------+-------------+---------+
|CallNumber|UnitID|IncidentNumber|        CallType|  CallDate| WatchDate|CallFinalDisposition|       AvailableDtTm|             Address|City|Zipcode|Battalion|StationArea| Box|OriginalPriority|Priority|FinalPriority|ALSUnit|CallTypeGroup|NumAlarms|UnitType|UnitSequenceInCallDispatch|FirePreventionDistrict|SupervisorDistrict|        Neighborhood|            Location|        RowID|    Delay|
+----------+------+--------------+----------------+----------+----------+--------------------+--------------------+--------------------+----+-------+---------+-----------+----+----------------+--------+------------

Filtrage des d'appels de type "Medical Incident"

In [9]:
val fewFireDF = fireDF
  .select("IncidentNumber", "AvailableDtTm", "CallType") 
  .where($"CallType" =!= "Medical Incident")

fewFireDF.show(5, false)

+--------------+----------------------+--------------+
|IncidentNumber|AvailableDtTm         |CallType      |
+--------------+----------------------+--------------+
|2003235       |01/11/2002 01:51:44 AM|Structure Fire|
|2003250       |01/11/2002 04:16:46 AM|Vehicle Fire  |
|2003259       |01/11/2002 06:01:58 AM|Alarms        |
|2003279       |01/11/2002 08:03:26 AM|Structure Fire|
|2003301       |01/11/2002 09:46:44 AM|Alarms        |
+--------------+----------------------+--------------+
only showing top 5 rows



fewFireDF: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [IncidentNumber: int, AvailableDtTm: string ... 1 more field]


### Question 1
**Combien de types d'appels distincts ont été passés ?**  
Pour être sûr, il ne faut pas compter les valeurs «nulles» dans la colonne.

###### Reponse 1
*Pour afficher ce résultat, j'ai utilisé deux opérations des RDDs, à savoir `select` et `distinct` comme opération de transformation et `show` comme action* 

In [283]:
val callTypeDistinct = fireDF
    .select("CallType")
    .distinct()

callTypeDistinct.show()

+--------------------+
|            CallType|
+--------------------+
|Elevator / Escala...|
|         Marine Fire|
|  Aircraft Emergency|
|Confined Space / ...|
|      Administrative|
|              Alarms|
|Odor (Strange / U...|
|Citizen Assist / ...|
|              HazMat|
|Watercraft in Dis...|
|           Explosion|
|           Oil Spill|
|        Vehicle Fire|
|  Suspicious Package|
|Extrication / Ent...|
|               Other|
|        Outside Fire|
|   Traffic Collision|
|       Assist Police|
|Gas Leak (Natural...|
+--------------------+
only showing top 20 rows



callTypeDistinct: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [CallType: string]


### Question 2

**Quels types d'appels différents ont été passés au service d'incendie?**

*Ici nous avons effectuer un filtrage de tout les appels qui sont passés au service d'incendie représenté par la variable **Department Fire** du Dataframe*

In [206]:
// Reponse 2
val callTypeFPD = fireDF
    .select("CallType") 
    .where($"CallType" !== "Department Fire")
    .distinct()

callTypeFPD.show()

+--------------------+
|            CallType|
+--------------------+
|Elevator / Escala...|
|         Marine Fire|
|  Aircraft Emergency|
|Confined Space / ...|
|      Administrative|
|              Alarms|
|Odor (Strange / U...|
|Citizen Assist / ...|
|              HazMat|
|Watercraft in Dis...|
|           Explosion|
|           Oil Spill|
|        Vehicle Fire|
|  Suspicious Package|
|Extrication / Ent...|
|               Other|
|        Outside Fire|
|   Traffic Collision|
|       Assist Police|
|Gas Leak (Natural...|
+--------------------+
only showing top 20 rows



callTypeFPD: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [CallType: string]


### Question 3

**Trouver toutes les réponses ou les délais sont supérieurs à 5 minutes?**

*Indication
1. Renommer la colonne Delay -> ReponseDelayedinMins
2. Retourner un nouveau DataFrame
3. Afficher tous les appels où le temps de réponse à un site d'incendie a eu lieu après un retard de plus de 5 minutes

*Pour cette question, on a utiliser la commande `select()` avec l'option `where` pour spécifier que le résultat à afficher concerne uniquement les delais dont le temps est supérieur à 5 minutes.* 

In [12]:
val newFireDF = fireDF.withColumnRenamed("Delay", "ResponseDelayedinMins")

val callTypeUpperFM = newFireDF
    .select("calltype", "ResponseDelayedinMins")
    .where($"ResponseDelayedinMins" > 5)

callTypeUpperFM.show()

+--------------------+---------------------+
|            calltype|ResponseDelayedinMins|
+--------------------+---------------------+
|    Medical Incident|                 5.35|
|    Medical Incident|                 6.25|
|    Medical Incident|                  5.2|
|Citizen Assist / ...|                  5.6|
|    Medical Incident|                 7.25|
|    Medical Incident|            11.916667|
|    Medical Incident|             5.116667|
|    Medical Incident|             8.633333|
|    Medical Incident|             95.28333|
|    Medical Incident|                 5.45|
|    Medical Incident|                  7.6|
|    Medical Incident|             6.133333|
|      Structure Fire|            5.1833334|
|    Medical Incident|            6.9166665|
|    Medical Incident|                  5.2|
|              Alarms|                 6.35|
|               Other|             7.983333|
|    Medical Incident|                13.55|
|    Medical Incident|                 5.15|
|      Str

newFireDF: org.apache.spark.sql.DataFrame = [CallNumber: int, UnitID: string ... 26 more fields]
callTypeUpperFM: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [calltype: string, ResponseDelayedinMins: float]


##### Transformations des dates
Maintenant nous allons d'abord:
1. Transformer les dates de type String en Spark Timestamp afin que nous puissions effectuer des requêtes basées sur la date plus tard
2. Retourner le Dataframe transformée
3. Mettre en cache le nouveau DataFrame

In [30]:
val fireTSDF = newFireDF
  .withColumn("IncidentDate", to_timestamp(col("CallDate"), "MM/dd/yyyy")).drop("CallDate") 
  .withColumn("OnWatchDate", to_timestamp(col("WatchDate"), "MM/dd/yyyy")).drop("WatchDate") 
  .withColumn("AvailableDtTS", to_timestamp(col("AvailableDtTm"), "MM/dd/yyyy hh:mm:ss a")).drop("AvailableDtTm")

fireTSDF.cache()

fireTSDF: org.apache.spark.sql.DataFrame = [CallNumber: int, UnitID: string ... 26 more fields]
res10: fireTSDF.type = [CallNumber: int, UnitID: string ... 26 more fields]


### Question 4
**Quels sont les types d'appels les plus courants?**

*Les appels les plus courants, constituent les appels de type **Medical Incident** puisqu'ils possèdent le plus grand nombre d'appels.*

In [275]:
//Reponse 4
/*
Ecrire ici votre code
*/
val callTypeMC = fireTSDF
    .groupBy("callType")
    .count()
    .orderBy(
        desc("count"))
    .show


+--------------------+------+
|            callType| count|
+--------------------+------+
|    Medical Incident|113794|
|      Structure Fire| 23319|
|              Alarms| 19406|
|   Traffic Collision|  7013|
|Citizen Assist / ...|  2524|
|               Other|  2166|
|        Outside Fire|  2094|
|        Vehicle Fire|   854|
|Gas Leak (Natural...|   764|
|        Water Rescue|   755|
|Odor (Strange / U...|   490|
|   Electrical Hazard|   482|
|Elevator / Escala...|   453|
|Smoke Investigati...|   391|
|          Fuel Spill|   193|
|              HazMat|   124|
|Industrial Accidents|    94|
|           Explosion|    89|
|Train / Rail Inci...|    57|
|  Aircraft Emergency|    36|
+--------------------+------+
only showing top 20 rows



callTypeMC: Unit = ()


### Question 5-a
**Quels sont boites postaux rencontrés dans les appels les plus courants?**

*Pour répondre à cette question, il suffit simplement d'utiliser l'option `groupBy()` en ordonnant suivant selon le plus grand nombre d'appels courant.*

In [204]:
val ZipcodeMC = fireTSDF
    .groupBy("callType", "Zipcode")
    .count()
    .orderBy(
        desc("count"))
    .show

+----------------+-------+-----+
|        callType|Zipcode|count|
+----------------+-------+-----+
|Medical Incident|  94102|16130|
|Medical Incident|  94103|14775|
|Medical Incident|  94110| 9995|
|Medical Incident|  94109| 9479|
|Medical Incident|  94124| 5885|
|Medical Incident|  94112| 5630|
|Medical Incident|  94115| 4785|
|Medical Incident|  94122| 4323|
|Medical Incident|  94107| 4284|
|Medical Incident|  94133| 3977|
|Medical Incident|  94117| 3522|
|Medical Incident|  94134| 3437|
|Medical Incident|  94114| 3225|
|Medical Incident|  94118| 3104|
|Medical Incident|  94121| 2953|
|Medical Incident|  94116| 2738|
|Medical Incident|  94132| 2594|
|  Structure Fire|  94110| 2267|
|Medical Incident|  94105| 2258|
|  Structure Fire|  94102| 2229|
+----------------+-------+-----+
only showing top 20 rows



ZipcodeMC: Unit = ()


### Question 5-a
**Quels sont les quartiers de San Francisco dont les codes postaux sont 94102 et 94103?**

*Nous avons utiliser le transformateur `select()` avec l'option de selectionner que les **Zipcode** (l'équivalent des codes postaux au Etats Unis) inclus dans la condition `where` de `select()`.* 

In [189]:
val NeighborhoodZC = fireTSDF
    .select("Neighborhood", "Zipcode")
    .where((col("Zipcode") === "94102") || (col("Zipcode") === "94103"))
    .distinct()

NeighborhoodZC.show()    

+--------------------+-------+
|        Neighborhood|Zipcode|
+--------------------+-------+
|        Potrero Hill|  94103|
|    Western Addition|  94102|
|          Tenderloin|  94102|
|            Nob Hill|  94102|
| Castro/Upper Market|  94103|
|     South of Market|  94102|
|     South of Market|  94103|
|        Hayes Valley|  94103|
|Financial Distric...|  94102|
|         Mission Bay|  94103|
|          Tenderloin|  94103|
|Financial Distric...|  94103|
|        Hayes Valley|  94102|
|             Mission|  94103|
+--------------------+-------+



NeighborhoodZC: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [Neighborhood: string, Zipcode: int]


### Question 6
**Determiner le nombre total d'appels, ainsi que la moyenne, le minimum et le maximum du temps de réponse des appels?**

*Pour cette question, nous avons simplement utiliser la commande `describe` qui permet d'afficher les statistiques de base d'un de la colonne **ResponseDelayedinMins** de notre Dataframe*

In [232]:
val statsDelayresponse = newFireDF
    .describe("ResponseDelayedinMins")
    .show

+-------+---------------------+
|summary|ResponseDelayedinMins|
+-------+---------------------+
|  count|               175296|
|   mean|    3.892364154521585|
| stddev|    9.378286226254197|
|    min|          0.016666668|
|    max|              1844.55|
+-------+---------------------+



statsDelayresponse: Unit = ()


### Question 7-a
**Combien d'années distinctes trouve t-on dans ce Dataset?**  
Dans ce dataset nous avons des données comprises entre 2000-2018. Vous pouvez utilisez la fonction Spark `year()` pour les dates en Timestamp

*Dans cette question, on a utlisé la commande `select` comme en sql afin de selectionner que les années avec l'option `year`. Puis nous avons rangé l'ordre d'apparition des années par ordre décroissant.*

In [140]:
val yearDistinct = fireTSDF
    .select(year(fireTSDF("IncidentDate")))
    .distinct()
    .orderBy(desc("year(IncidentDate)"))
    .show

+------------------+
|year(IncidentDate)|
+------------------+
|              2018|
|              2017|
|              2016|
|              2015|
|              2014|
|              2013|
|              2012|
|              2011|
|              2010|
|              2009|
|              2008|
|              2007|
|              2006|
|              2005|
|              2004|
|              2003|
|              2002|
|              2001|
|              2000|
+------------------+



yearDistinct: Unit = ()


### Question 7-b
**Quelle semaine de l'année 2018 a eu le plus d'appels d'incendie?**

*Pour cette question, nous avons d'utiliser filter afin de filtrer l'année considérée. Ensuite, on a groupé l'année en semaines avec l'option `groupBy()` puis on a compté le nombre d'appels d'incendies pour chaque semaine et d'ordonner les semaines par ordre du plus grand nombres d'appels pour chaque semaine avec la commande `orderBy()`*

In [245]:
val fireCallsByWeek2018 = fireTSDF
    .filter(year(fireTSDF("IncidentDate"))=== "2018")
    .groupBy(weekofyear(fireTSDF("IncidentDate")))
    .count()
    .orderBy(desc("count"))

fireCallsByWeek2018.show()

+------------------------+-----+
|weekofyear(IncidentDate)|count|
+------------------------+-----+
|                      22|  259|
|                      40|  255|
|                      43|  250|
|                      25|  249|
|                       1|  246|
|                      44|  244|
|                      32|  243|
|                      13|  243|
|                      11|  240|
|                       5|  236|
|                      18|  236|
|                      23|  235|
|                      31|  234|
|                       2|  234|
|                      42|  234|
|                      19|  233|
|                       8|  232|
|                      34|  232|
|                      10|  232|
|                      21|  231|
+------------------------+-----+
only showing top 20 rows



fireCallsByWeek2018: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [weekofyear(IncidentDate): int, count: bigint]


### Question 8
**Quels sont les quartiers de San Francisco qui ont connu le pire temps de réponse en 2018?**

In [271]:
//Reponse 8
/*
Ecrire ici votre code
*/
val DelayResponseByNeighborhood = fireTSDF
    .filter(year(fireTSDF("IncidentDate"))=== "2018")
    .filter(fireTSDF("Neighborhood")==="Neighborhood")
    .groupBy($"ResponseDelayedinMins")
    .count()
    .orderBy(desc("ResponseDelayedinMins"))

DelayResponseByNeighborhood.show()

+---------------------+-----+
|ResponseDelayedinMins|count|
+---------------------+-----+
+---------------------+-----+



DelayResponseByNeighborhood: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [ResponseDelayedinMins: float, count: bigint]


### Question 9

**Comment stocker les données du Dataframe sous format de fichiers Parquet?**

*Pour stocker les données du Dataframe sous format parquet, on utilise la commande suivante:
  `coalesce(nombredefichier).write.mode().option().format("formatdufichier").save("chemindurepertoiredesauvegarde)`*

In [272]:
fireTSDF.
    coalesce(1).
    write.
    mode("overwrite").
    option("compression", "none").
    format("parquet").
    save("/home/mansour/prog_fonctionnelle/sf-fire-calls")

### Question 10
**Comment relire les données stockée en format Parquet?**

*Pour lire un fichier sous format parquet, on peut utiliserla commande `spark.read.parquet(chemin du répértoire du fichier)`. nous pouvons utiliser l'option `show()` pour voir le contenu du fichier.*

In [274]:
spark.read.parquet("/home/mansour/prog_fonctionnelle/sf-fire-calls").show

+----------+------+--------------+----------------+--------------------+--------------------+----+-------+---------+-----------+----+----------------+--------+-------------+-------+-------------+---------+--------------+--------------------------+----------------------+------------------+--------------------+--------------------+-------------+---------------------+-------------------+-------------------+-------------------+
|CallNumber|UnitID|IncidentNumber|        CallType|CallFinalDisposition|             Address|City|Zipcode|Battalion|StationArea| Box|OriginalPriority|Priority|FinalPriority|ALSUnit|CallTypeGroup|NumAlarms|      UnitType|UnitSequenceInCallDispatch|FirePreventionDistrict|SupervisorDistrict|        Neighborhood|            Location|        RowID|ResponseDelayedinMins|       IncidentDate|        OnWatchDate|      AvailableDtTS|
+----------+------+--------------+----------------+--------------------+--------------------+----+-------+---------+-----------+----+-----------

## FIN