# Projet Final Apache Spark

**Nom Etudiant : Ba**  

**Prenom Etudiant:Aminata Abdoulaye**  

**Classe :M1 RESI**  


## Description
Ce projet consiste à utiliser Apache Spark pour faire l'analyse et le traitement des données de **[San Francisco Fire Department Calls ](https://data.sfgov.org/Public-Safety/Fire-Department-Calls-for-Service/nuek-vuh3)** afin de fournir quelques KPI (*Key Performance Indicator*). Le **SF Fire Dataset** comprend les réponses aux appels de toutes les unités d'incendie. Chaque enregistrement comprend le numéro d'appel, le numéro d'incident, l'adresse, l'identifiant de l'unité, le type d'appel et la disposition. Tous les intervalles de temps pertinents sont également inclus. Étant donné que ce Dataset est basé sur les réponses et que la plupart des appels impliquent plusieurs unités, ainsi il existe plusieurs enregistrements pour chaque numéro d'appel. Les adresses sont associées à un numéro de bloc, à une intersection ou à une boîte d'appel, et non à une adresse spécifique.

**Plus de details sur la description des données cliquer sur ce [lien](https://data.sfgov.org/Public-Safety/Fire-Department-Calls-for-Service/nuek-vuh3)**

## Travail à faire.
L'objectif de ce projet est de comprendre le **SF Fire Dataset** afin de bien répondre aux questions en utilisant les codes Spark/Scala adéquates.

- Créer un repos git (public) et partager le repos avec mon mail (limahin10@gmail.com)
- Ecrire un code lisible et bien indenté 
- N'oublier pas de mettre en commentaire la justification de vos réponses sur les cellules Markdown. 


## Note:
- Le projet est personnel, c'est-à-dire chaque notebook ne concerne qu'un seul étudiant. 
- Deadline : **Lundi 31 Janvier 2022  à 23h 59** (Aucune de dérogation ne sera acceptée)

### Chargement des données

Importation des packages Spark

In [1]:
import org.apache.spark.sql.types._ 
import org.apache.spark.sql.functions._ 
import spark.implicits._

Intitializing Scala interpreter ...

Spark Web UI available at http://192.168.43.210:4041
SparkContext available as 'sc' (version = 3.0.1, master = local[*], app id = local-1643597059876)
SparkSession available as 'spark'


import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._
import spark.implicits._


Nous allons jeter un coup d'oeil sur la structure des données avant de définir un schéma

In [2]:
!head -1 "datasets/sf-fire/sf-fire-calls.csv"

CallNumber,UnitID,IncidentNumber,CallType,CallDate,WatchDate,CallFinalDisposition,AvailableDtTm,Address,City,Zipcode,Battalion,StationArea,Box,OriginalPriority,Priority,FinalPriority,ALSUnit,CallTypeGroup,NumAlarms,UnitType,UnitSequenceInCallDispatch,FirePreventionDistrict,SupervisorDistrict,Neighborhood,Location,RowID,Delay



Vu que la taille de ces données est énormes, inferer le schema pour un très grande volumes de données s'avère un peu couteux. Nous allons ainsi définir un schema pour le Dataset.

In [3]:
val fireSchema = StructType(Array(StructField("CallNumber", IntegerType, true),
  StructField("UnitID", StringType, true),
  StructField("IncidentNumber", IntegerType, true),
  StructField("CallType", StringType, true),                  
  StructField("CallDate", StringType, true),      
  StructField("WatchDate", StringType, true),
  StructField("CallFinalDisposition", StringType, true),
  StructField("AvailableDtTm", StringType, true),
  StructField("Address", StringType, true),       
  StructField("City", StringType, true),       
  StructField("Zipcode", IntegerType, true),       
  StructField("Battalion", StringType, true),                 
  StructField("StationArea", StringType, true),       
  StructField("Box", StringType, true),       
  StructField("OriginalPriority", StringType, true),       
  StructField("Priority", StringType, true),       
  StructField("FinalPriority", IntegerType, true),       
  StructField("ALSUnit", BooleanType, true),       
  StructField("CallTypeGroup", StringType, true),
  StructField("NumAlarms", IntegerType, true),
  StructField("UnitType", StringType, true),
  StructField("UnitSequenceInCallDispatch", IntegerType, true),
  StructField("FirePreventionDistrict", StringType, true),
  StructField("SupervisorDistrict", StringType, true),
  StructField("Neighborhood", StringType, true),
  StructField("Location", StringType, true),
  StructField("RowID", StringType, true),
  StructField("Delay", FloatType, true)))

fireSchema: org.apache.spark.sql.types.StructType = StructType(StructField(CallNumber,IntegerType,true), StructField(UnitID,StringType,true), StructField(IncidentNumber,IntegerType,true), StructField(CallType,StringType,true), StructField(CallDate,StringType,true), StructField(WatchDate,StringType,true), StructField(CallFinalDisposition,StringType,true), StructField(AvailableDtTm,StringType,true), StructField(Address,StringType,true), StructField(City,StringType,true), StructField(Zipcode,IntegerType,true), StructField(Battalion,StringType,true), StructField(StationArea,StringType,true), StructField(Box,StringType,true), StructField(OriginalPriority,StringType,true), StructField(Priority,StringType,true), StructField(FinalPriority,IntegerType,true), StructField(ALSUnit,BooleanType,true)...


In [4]:
val sfFireFile = "datasets/sf-fire/sf-fire-calls.csv"
val fireDF = spark
  .read
  .schema(fireSchema)
  .option("header", "true")
  .csv(sfFireFile)

sfFireFile: String = datasets/sf-fire/sf-fire-calls.csv
fireDF: org.apache.spark.sql.DataFrame = [CallNumber: int, UnitID: string ... 26 more fields]


Nous allons mettre en cache le Dataframe

In [5]:
fireDF.cache()

res0: fireDF.type = [CallNumber: int, UnitID: string ... 26 more fields]


In [6]:
fireDF.count()

res1: Long = 175296


In [7]:
fireDF.printSchema()

root
 |-- CallNumber: integer (nullable = true)
 |-- UnitID: string (nullable = true)
 |-- IncidentNumber: integer (nullable = true)
 |-- CallType: string (nullable = true)
 |-- CallDate: string (nullable = true)
 |-- WatchDate: string (nullable = true)
 |-- CallFinalDisposition: string (nullable = true)
 |-- AvailableDtTm: string (nullable = true)
 |-- Address: string (nullable = true)
 |-- City: string (nullable = true)
 |-- Zipcode: integer (nullable = true)
 |-- Battalion: string (nullable = true)
 |-- StationArea: string (nullable = true)
 |-- Box: string (nullable = true)
 |-- OriginalPriority: string (nullable = true)
 |-- Priority: string (nullable = true)
 |-- FinalPriority: integer (nullable = true)
 |-- ALSUnit: boolean (nullable = true)
 |-- CallTypeGroup: string (nullable = true)
 |-- NumAlarms: integer (nullable = true)
 |-- UnitType: string (nullable = true)
 |-- UnitSequenceInCallDispatch: integer (nullable = true)
 |-- FirePreventionDistrict: string (nullable = true)
 

In [8]:
fireDF.show(5)

+----------+------+--------------+----------------+----------+----------+--------------------+--------------------+--------------------+----+-------+---------+-----------+----+----------------+--------+-------------+-------+-------------+---------+--------+--------------------------+----------------------+------------------+--------------------+--------------------+-------------+---------+
|CallNumber|UnitID|IncidentNumber|        CallType|  CallDate| WatchDate|CallFinalDisposition|       AvailableDtTm|             Address|City|Zipcode|Battalion|StationArea| Box|OriginalPriority|Priority|FinalPriority|ALSUnit|CallTypeGroup|NumAlarms|UnitType|UnitSequenceInCallDispatch|FirePreventionDistrict|SupervisorDistrict|        Neighborhood|            Location|        RowID|    Delay|
+----------+------+--------------+----------------+----------+----------+--------------------+--------------------+--------------------+----+-------+---------+-----------+----+----------------+--------+------------

Filtrage des d'appels de type "Medical Incident"

In [8]:
val fewFireDF = fireDF
  .select("IncidentNumber", "AvailableDtTm", "CallType") 
  .where($"CallType" =!= "Medical Incident")

fewFireDF.show(5, false)

+--------------+----------------------+--------------+
|IncidentNumber|AvailableDtTm         |CallType      |
+--------------+----------------------+--------------+
|2003235       |01/11/2002 01:51:44 AM|Structure Fire|
|2003250       |01/11/2002 04:16:46 AM|Vehicle Fire  |
|2003259       |01/11/2002 06:01:58 AM|Alarms        |
|2003279       |01/11/2002 08:03:26 AM|Structure Fire|
|2003301       |01/11/2002 09:46:44 AM|Alarms        |
+--------------+----------------------+--------------+
only showing top 5 rows



fewFireDF: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [IncidentNumber: int, AvailableDtTm: string ... 1 more field]


### Question 1
**Combien de types d'appels distincts ont été passés ?**  
Pour être sûr, il ne faut pas compter les valeurs «nulles» dans la colonne.

In [9]:
// Reponse 1
val distinctCallType=fireDF
   .select("CallType")
   .filter($"CallType".isNotNull)
   .distinct()
distinctCallType.count()

distinctCallType: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [CallType: string]
res4: Long = 30


j'ai utilisé filter() pour filtrer les valeurs non nulles et distinct() pour eliminer les doublons. Le nombre de types d'appels distincts qui ont été passés est 30

### Question 2

**Quels types d'appels différents ont été passés au service d'incendie?**

In [10]:
// Reponse 2
fireDF
   .select("CallType")
   .where($"CallType".isNotNull)
   .distinct()
   .show(30,false)

+--------------------------------------------+
|CallType                                    |
+--------------------------------------------+
|Elevator / Escalator Rescue                 |
|Marine Fire                                 |
|Aircraft Emergency                          |
|Confined Space / Structure Collapse         |
|Administrative                              |
|Alarms                                      |
|Odor (Strange / Unknown)                    |
|Citizen Assist / Service Call               |
|HazMat                                      |
|Watercraft in Distress                      |
|Explosion                                   |
|Oil Spill                                   |
|Vehicle Fire                                |
|Suspicious Package                          |
|Extrication / Entrapped (Machinery, Vehicle)|
|Other                                       |
|Outside Fire                                |
|Traffic Collision                           |
|Assist Polic

ce code permet d'afficher Les types distincts de la colonne "CallType" j'ai specifié 30 dans la methode show(30, false) pour pouvoir afficher tous les 30 types d'appels sans restriction

### Question 3

**Trouver toutes les réponses ou les délais sont supérieurs à 5 minutes?**

*Indication
1. Renommer la colonne Delay -> ReponseDelayedinMins
2. Retourner un nouveau DataFrame
3. Afficher tous les appels où le temps de réponse à un site d'incendie a eu lieu après un retard de plus de 5 minutes

In [11]:
// Reponse 3
val newFireDF = fireDF.withColumnRenamed("Delay", "ResponseDelayedinMins")
newFireDF
.select("ResponseDelayedinMins")
.where($"ResponseDelayedinMins" > 5)
.show(10, false)

+---------------------+
|ResponseDelayedinMins|
+---------------------+
|5.35                 |
|6.25                 |
|5.2                  |
|5.6                  |
|7.25                 |
|11.916667            |
|5.116667             |
|8.633333             |
|95.28333             |
|5.45                 |
+---------------------+
only showing top 10 rows



newFireDF: org.apache.spark.sql.DataFrame = [CallNumber: int, UnitID: string ... 26 more fields]


### Transformations des dates  
Maintenant nous allons d'abord:  
1. Transformer les dates de type String en Spark Timestamp afin que nous puissions effectuer des requêtes basées sur la date plus tard    
2. Retourner le Dataframe transformée  
3. Mettre en cache le nouveau DataFrame  

In [12]:
val fireTSDF = newFireDF
  .withColumn("IncidentDate", to_timestamp(col("CallDate"), "MM/dd/yyyy")).drop("CallDate") 
  .withColumn("OnWatchDate", to_timestamp(col("WatchDate"), "MM/dd/yyyy")).drop("WatchDate") 
  .withColumn("AvailableDtTS", to_timestamp(col("AvailableDtTm"), "MM/dd/yyyy hh:mm:ss a")).drop("AvailableDtTm")

fireTSDF.cache()

fireTSDF: org.apache.spark.sql.DataFrame = [CallNumber: int, UnitID: string ... 26 more fields]
res7: fireTSDF.type = [CallNumber: int, UnitID: string ... 26 more fields]


### Question 4
**Quels sont les types d'appels les plus courants?**

1. On recupere le nombre de type d'appel, pour chaque type d'appel;
2. On fait la statistique de ces nombres d'appels  avec la methode describe;
3. On recupre les types d'appels superieure a la moyenne.

In [13]:
//Reponse 4
//je recupere le nombre d'appel, pour chaque type d'appel
val CountCalls =  fireTSDF
.groupBy("CallType")
.count()
CountCalls.orderBy("count").show(30, false)

+--------------------------------------------+------+
|CallType                                    |count |
+--------------------------------------------+------+
|Administrative                              |3     |
|Mutual Aid / Assist Outside Agency          |9     |
|Confined Space / Structure Collapse         |13    |
|Marine Fire                                 |14    |
|Suspicious Package                          |15    |
|Oil Spill                                   |21    |
|Watercraft in Distress                      |28    |
|Extrication / Entrapped (Machinery, Vehicle)|28    |
|High Angle Rescue                           |32    |
|Assist Police                               |35    |
|Aircraft Emergency                          |36    |
|Train / Rail Incident                       |57    |
|Explosion                                   |89    |
|Industrial Accidents                        |94    |
|HazMat                                      |124   |
|Fuel Spill                 

CountCalls: org.apache.spark.sql.DataFrame = [CallType: string, count: bigint]


In [14]:
//ensuite je fais la statistique de ces nombres d'appels  en utilisant la methode describe
CountCalls
.describe()
.show()

+-------+--------------------+------------------+
|summary|            CallType|             count|
+-------+--------------------+------------------+
|  count|                  30|                30|
|   mean|                null|            5843.2|
| stddev|                null|21101.213397697593|
|    min|      Administrative|                 3|
|    max|Watercraft in Dis...|            113794|
+-------+--------------------+------------------+



In [15]:
// Enfin,On recupre les types d'appels superieure a la moyenne 
CountCalls
  .withColumn("CommonCallType", col("count").gt(5843))
  .show(30,false)

+--------------------------------------------+------+--------------+
|CallType                                    |count |CommonCallType|
+--------------------------------------------+------+--------------+
|Elevator / Escalator Rescue                 |453   |false         |
|Marine Fire                                 |14    |false         |
|Aircraft Emergency                          |36    |false         |
|Confined Space / Structure Collapse         |13    |false         |
|Administrative                              |3     |false         |
|Alarms                                      |19406 |true          |
|Odor (Strange / Unknown)                    |490   |false         |
|Citizen Assist / Service Call               |2524  |false         |
|HazMat                                      |124   |false         |
|Watercraft in Distress                      |28    |false         |
|Explosion                                   |89    |false         |
|Oil Spill                        

En conclusion les CallType, les plus courants sont ceux superieure a la moyenne donc on a :
- Traffic Collision, 
- Alarms, 
- Structure Fire, 
- Medical Incident

### Question 5-a
**Quels sont boites postaux rencontrés dans les appels les plus courants?**

 On  recupere les boites postaux les plus courants  en applicant un filtre sur les types d'appels les plus courants.

In [16]:
//Reponse 5-a
fireTSDF
    .filter($"CallType" === "Medical Incident" || $"CallType" === "Structure Fire" || $"CallType" ===  "Alarms" || $"CallType" === "Traffic Collision")
    .select("Zipcode")
    .distinct()
    .show()

+-------+
|Zipcode|
+-------+
|  94109|
|  94115|
|  94112|
|  94127|
|  94108|
|  94121|
|  94105|
|   null|
|  94131|
|  94116|
|  94134|
|  94124|
|  94102|
|  94114|
|  94107|
|  94111|
|  94103|
|  94117|
|  94122|
|  94110|
+-------+
only showing top 20 rows



### Question 5-b
**Quels sont les quartiers de San Francisco dont les codes postaux sont 94102 et 94103?**

on recupere les quartiers en faisant un filtre avec ces deux Zipcode

In [18]:
//Reponse 5-b
fireTSDF 
   .select("Neighborhood")
   .where($"Zipcode" === 94102 || $"Zipcode" === 94103)
   .distinct()
   .show()

+--------------------+
|        Neighborhood|
+--------------------+
|    Western Addition|
|         Mission Bay|
|        Hayes Valley|
|Financial Distric...|
|            Nob Hill|
|             Mission|
|          Tenderloin|
|        Potrero Hill|
| Castro/Upper Market|
|     South of Market|
+--------------------+



### Question 6
**Determiner le nombre total d'appels, ainsi que la moyenne, le minimum et le maximum du temps de réponse des appels?**

- on fais la statistique des temps de réponse des appels en utilisant la methode describe
- Comme que chaque appels a un temps de reponse donc le nombre total des appels c'est ègale aunombre total des temps de reponse

In [19]:
fireTSDF
.select("ResponseDelayedinMins")
.describe()
.show()

+-------+---------------------+
|summary|ResponseDelayedinMins|
+-------+---------------------+
|  count|               175296|
|   mean|    3.892364154521585|
| stddev|    9.378286226254257|
|    min|          0.016666668|
|    max|              1844.55|
+-------+---------------------+



- nombre total d'appels = 175296 appels
- la moyenne des temps de reponses = 3.892
- Le minimum des temps de reponses = 0.016
- Le maximum des temps de reponses = 1844.55

### Question 7-a
**Combien d'années distinctes trouve t-on dans ce Dataset?**  
Dans ce dataset nous avons des données comprises entre 2000-2018. Vous pouvez utilisez la fonction Spark `year()` pour les dates en Timestamp

In [21]:
//Reponse 7-a
fireTSDF
  .select(year(col("IncidentDate")))
  .distinct()
  .count()

res16: Long = 19


j'ai appliqué la fonction year() a la colonne IncidentDate
Dans ce dataset nous avons "19 années distinctes"`

### Question 7-b
**Quelle semaine de l'année 2018 a eu le plus d'appels d'incendie?**

On utilise la methode weekofyear() qui permet d'extraire le numero de semaine d'une année donnée  

In [39]:
//Reponse 7-b
fireTSDF
.filter(year(col("IncidentDate")) === 2018)
.groupBy(weekofyear($"IncidentDate"))
.count()
.orderBy(desc("count"))
.show()

+------------------------+-----+
|weekofyear(IncidentDate)|count|
+------------------------+-----+
|                      22|  259|
|                      40|  255|
|                      43|  250|
|                      25|  249|
|                       1|  246|
|                      44|  244|
|                      32|  243|
|                      13|  243|
|                      11|  240|
|                       5|  236|
|                      18|  236|
|                      23|  235|
|                       2|  234|
|                      31|  234|
|                      42|  234|
|                      19|  233|
|                      10|  232|
|                       8|  232|
|                      34|  232|
|                      21|  231|
+------------------------+-----+
only showing top 20 rows



- La 22 ème semaine  de l'anne 2018  a connu plus d'appel d'incendie : 259

### Question 8
**Quels sont les quartiers de San Francisco qui ont connu le pire temps de réponse en 2018?**

1. on cherche d'abord les quartier qui un connu des incendies au cours de l'annee 2018
2. Onfait un filtre sur des temps de reponse superieur à la moyenne pour pouvoir determiner  les quartiers qui ont connu les pires temps de reponse

In [101]:
//Reponse 8
val worstDelay=fireTSDF
.select("Neighborhood", "ResponseDelayedinMins", "IncidentDate")
.filter((year($"IncidentDate") === 2018))
worstDelay.show()


+------------------------------+---------------------+-------------------+
|Neighborhood                  |ResponseDelayedinMins|IncidentDate       |
+------------------------------+---------------------+-------------------+
|Presidio Heights              |2.8833334            |2018-01-19 00:00:00|
|Mission Bay                   |6.3333335            |2018-01-19 00:00:00|
|Chinatown                     |2.65                 |2018-01-19 00:00:00|
|Financial District/South Beach|3.5333333            |2018-01-19 00:00:00|
|Tenderloin                    |1.1                  |2018-01-19 00:00:00|
|Bayview Hunters Point         |4.05                 |2018-01-19 00:00:00|
|Inner Richmond                |2.5666666            |2018-01-19 00:00:00|
|Inner Sunset                  |1.4                  |2018-01-19 00:00:00|
|Sunset/Parkside               |2.6666667            |2018-01-19 00:00:00|
|South of Market               |1.7666667            |2018-01-19 00:00:00|
|Golden Gate Park        

worstDelay: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [Neighborhood: string, ResponseDelayedinMins: float ... 1 more field]


In [102]:
worstDelay.select("Neighborhood")
.where($"ResponseDelayedinMins"> 3.892)
.distinct()
.show(20, false)

+------------------------------+
|Neighborhood                  |
+------------------------------+
|Inner Sunset                  |
|Haight Ashbury                |
|Lincoln Park                  |
|Japantown                     |
|None                          |
|North Beach                   |
|Lone Mountain/USF             |
|Western Addition              |
|Bernal Heights                |
|Mission Bay                   |
|Hayes Valley                  |
|Financial District/South Beach|
|Lakeshore                     |
|Bayview Hunters Point         |
|Visitacion Valley             |
|Inner Richmond                |
|Nob Hill                      |
|Oceanview/Merced/Ingleside    |
|Outer Richmond                |
|Treasure Island               |
+------------------------------+
only showing top 20 rows



### Question 9

**Comment stocker les données du Dataframe sous format de fichiers Parquet?**

In [105]:
//Reponse 9
fireTSDF.write.parquet("MonProjet.parquet")

### Question 10
**Comment relire les données stockée en format Parquet?**

In [106]:
//Reponse 10
val parquetFileDF = spark.read.parquet("MonProjet.parquet")

parquetFileDF: org.apache.spark.sql.DataFrame = [CallNumber: int, UnitID: string ... 26 more fields]


## FIN