# Projet Final Apache Spark

**Nom Etudiant :ABDOULAYE

**Prenom Etudiant:BA

**Classe :Master1 bigdata uvs


## Description
Ce projet consiste à utiliser Apache Spark pour faire l'analyse et le traitement des données de **[San Francisco Fire Department Calls ](https://data.sfgov.org/Public-Safety/Fire-Department-Calls-for-Service/nuek-vuh3)** afin de fournir quelques KPI (*Key Performance Indicator*). Le **SF Fire Dataset** comprend les réponses aux appels de toutes les unités d'incendie. Chaque enregistrement comprend le numéro d'appel, le numéro d'incident, l'adresse, l'identifiant de l'unité, le type d'appel et la disposition. Tous les intervalles de temps pertinents sont également inclus. Étant donné que ce Dataset est basé sur les réponses et que la plupart des appels impliquent plusieurs unités, ainsi il existe plusieurs enregistrements pour chaque numéro d'appel. Les adresses sont associées à un numéro de bloc, à une intersection ou à une boîte d'appel, et non à une adresse spécifique.

**Plus de details sur la description des données cliquer sur ce [lien](https://data.sfgov.org/Public-Safety/Fire-Department-Calls-for-Service/nuek-vuh3)**

## Travail à faire.
L'objectif de ce projet est de comprendre le **SF Fire Dataset** afin de bien répondre aux questions en utilisant les codes Spark/Scala adéquates.

- Créer un repos git (public ou privé) et partager le repos avec mon mail (limahin10@gmail.com)
- Ecrire un code lisible et bien indenté 
- N'oublier pas de mettre en commentaire la justification de vos réponses sur les cellules Markdown. 


## Note:
- Le projet est personnel, c'est-à-dire chaque notebook ne concerne qu'un seul étudiant. 
- Deadline : **Jeudi 10 janvier 2021** (Aucune de dérogation ne sera acceptée)

### Chargement des données

Importation des packages Spark

In [2]:
import org.apache.spark.sql.types._ 
import org.apache.spark.sql.functions._ 
import spark.implicits._

import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._
import spark.implicits._


In [3]:
import org.apache.spark.sql.SparkSession

val spark = SparkSession
  .builder()
  .appName("Spark projet finalcalls.csv ba_Abdoulaye")
  .config("spark.some.config.option", "some-value")
  .getOrCreate()


import org.apache.spark.sql.SparkSession
spark: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@4f0c055e


In [6]:
val fireDF = spark.read.format("csv").option("header", "true").load("/home/papalayeba/Bureau/spark/sf-fire-calls.csv")


fireDF: org.apache.spark.sql.DataFrame = [CallNumber: string, UnitID: string ... 26 more fields]


Nous allons jeter un coup d'oeil sur la structure des données avant de définir un schéma

In [7]:
!head -1 "/home/papalayeba/Bureau/spark/sf-fire-calls.csv"

CallNumber,UnitID,IncidentNumber,CallType,CallDate,WatchDate,CallFinalDisposition,AvailableDtTm,Address,City,Zipcode,Battalion,StationArea,Box,OriginalPriority,Priority,FinalPriority,ALSUnit,CallTypeGroup,NumAlarms,UnitType,UnitSequenceInCallDispatch,FirePreventionDistrict,SupervisorDistrict,Neighborhood,Location,RowID,Delay



Vu que la taille de ces données est énormes, inferer le schema pour un très grande volumes de données s'avère un peu couteux. Nous allons ainsi définir un schema pour le Dataset.

In [10]:
val fireSchema = StructType(Array(StructField("CallNumber", IntegerType, true),
  StructField("UnitID", StringType, true),
  StructField("IncidentNumber", IntegerType, true),
  StructField("CallType", StringType, true),                  
  StructField("CallDate", StringType, true),      
  StructField("WatchDate", StringType, true),
  StructField("CallFinalDisposition", StringType, true),
  StructField("AvailableDtTm", StringType, true),
  StructField("Address", StringType, true),       
  StructField("City", StringType, true),       
  StructField("Zipcode", IntegerType, true),       
  StructField("Battalion", StringType, true),                 
  StructField("StationArea", StringType, true),       
  StructField("Box", StringType, true),       
  StructField("OriginalPriority", StringType, true),       
  StructField("Priority", StringType, true),       
  StructField("FinalPriority", IntegerType, true),       
  StructField("ALSUnit", BooleanType, true),       
  StructField("CallTypeGroup", StringType, true),
  StructField("NumAlarms", IntegerType, true),
  StructField("UnitType", StringType, true),
  StructField("UnitSequenceInCallDispatch", IntegerType, true),
  StructField("FirePreventionDistrict", StringType, true),
  StructField("SupervisorDistrict", StringType, true),
  StructField("Neighborhood", StringType, true),
  StructField("Location", StringType, true),
  StructField("RowID", StringType, true),
  StructField("Delay", FloatType, true)))

fireSchema: org.apache.spark.sql.types.StructType = StructType(StructField(CallNumber,IntegerType,true), StructField(UnitID,StringType,true), StructField(IncidentNumber,IntegerType,true), StructField(CallType,StringType,true), StructField(CallDate,StringType,true), StructField(WatchDate,StringType,true), StructField(CallFinalDisposition,StringType,true), StructField(AvailableDtTm,StringType,true), StructField(Address,StringType,true), StructField(City,StringType,true), StructField(Zipcode,IntegerType,true), StructField(Battalion,StringType,true), StructField(StationArea,StringType,true), StructField(Box,StringType,true), StructField(OriginalPriority,StringType,true), StructField(Priority,StringType,true), StructField(FinalPriority,IntegerType,true), StructField(ALSUnit,BooleanType,true)...


In [11]:
val sfFireFile = "/home/papalayeba/Bureau/spark/sf-fire-calls.csv"
val fireDF = spark
  .read
  .schema(fireSchema)
  .option("header", "true")
  .csv(sfFireFile)

sfFireFile: String = /home/papalayeba/Bureau/spark/sf-fire-calls.csv
fireDF: org.apache.spark.sql.DataFrame = [CallNumber: int, UnitID: string ... 26 more fields]


Nous allons mettre en cache le Dataframe

In [12]:
fireDF.cache()

res1: fireDF.type = [CallNumber: int, UnitID: string ... 26 more fields]


In [13]:
fireDF.count()

res2: Long = 175296


In [15]:
fireDF.printSchema()

root
 |-- CallNumber: integer (nullable = true)
 |-- UnitID: string (nullable = true)
 |-- IncidentNumber: integer (nullable = true)
 |-- CallType: string (nullable = true)
 |-- CallDate: string (nullable = true)
 |-- WatchDate: string (nullable = true)
 |-- CallFinalDisposition: string (nullable = true)
 |-- AvailableDtTm: string (nullable = true)
 |-- Address: string (nullable = true)
 |-- City: string (nullable = true)
 |-- Zipcode: integer (nullable = true)
 |-- Battalion: string (nullable = true)
 |-- StationArea: string (nullable = true)
 |-- Box: string (nullable = true)
 |-- OriginalPriority: string (nullable = true)
 |-- Priority: string (nullable = true)
 |-- FinalPriority: integer (nullable = true)
 |-- ALSUnit: boolean (nullable = true)
 |-- CallTypeGroup: string (nullable = true)
 |-- NumAlarms: integer (nullable = true)
 |-- UnitType: string (nullable = true)
 |-- UnitSequenceInCallDispatch: integer (nullable = true)
 |-- FirePreventionDistrict: string (nullable = true)
 

In [14]:
fireDF.show(10)

+----------+------+--------------+----------------+----------+----------+--------------------+--------------------+--------------------+----+-------+---------+-----------+----+----------------+--------+-------------+-------+-------------+---------+--------+--------------------------+----------------------+------------------+--------------------+--------------------+-------------+---------+
|CallNumber|UnitID|IncidentNumber|        CallType|  CallDate| WatchDate|CallFinalDisposition|       AvailableDtTm|             Address|City|Zipcode|Battalion|StationArea| Box|OriginalPriority|Priority|FinalPriority|ALSUnit|CallTypeGroup|NumAlarms|UnitType|UnitSequenceInCallDispatch|FirePreventionDistrict|SupervisorDistrict|        Neighborhood|            Location|        RowID|    Delay|
+----------+------+--------------+----------------+----------+----------+--------------------+--------------------+--------------------+----+-------+---------+-----------+----+----------------+--------+------------

Filtrage des d'appels de type "Medical Incident"

In [15]:
val fewFireDF = fireDF
  .select("IncidentNumber", "AvailableDtTm", "CallType") 
  .where($"CallType" =!= "Medical Incident")

fewFireDF.show(5, false)

+--------------+----------------------+--------------+
|IncidentNumber|AvailableDtTm         |CallType      |
+--------------+----------------------+--------------+
|2003235       |01/11/2002 01:51:44 AM|Structure Fire|
|2003250       |01/11/2002 04:16:46 AM|Vehicle Fire  |
|2003259       |01/11/2002 06:01:58 AM|Alarms        |
|2003279       |01/11/2002 08:03:26 AM|Structure Fire|
|2003301       |01/11/2002 09:46:44 AM|Alarms        |
+--------------+----------------------+--------------+
only showing top 5 rows



fewFireDF: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [IncidentNumber: int, AvailableDtTm: string ... 1 more field]


### Question 1
**Combien de types d'appels distincts ont été passés ?**  
Pour être sûr, il ne faut pas compter les valeurs «nulles» dans la colonne.

In [16]:
fireDF.groupBy("CallType", "CallTypeGroup","UnitID","CallNumber", "Address").count().show(false)

+-----------------+----------------------------+------+----------+----------------------------+-----+
|CallType         |CallTypeGroup               |UnitID|CallNumber|Address                     |count|
+-----------------+----------------------------+------+----------+----------------------------+-----+
|Medical Incident |Potentially Life-Threatening|78    |120010229 |400 Block of POWELL ST      |1    |
|Medical Incident |Potentially Life-Threatening|99    |120110214 |3700 Block of MISSION ST    |1    |
|Alarms           |Alarm                       |T16   |120140338 |2100 Block of JACKSON ST    |1    |
|Traffic Collision|Potentially Life-Threatening|T14   |120170065 |200 Block of POINT LOBOS AVE|1    |
|Medical Incident |Non Life-threatening        |68    |120170232 |1500 Block of FRANCISCO ST  |1    |
|Medical Incident |Non Life-threatening        |AM04  |120390342 |400 Block of JEFFERSON ST   |1    |
|Medical Incident |Potentially Life-Threatening|66    |120460230 |CROSSOVER DR/TRA

In [20]:
fireDF.select("CallType")

res9: org.apache.spark.sql.DataFrame = [CallType: string]


In [22]:
res9.count()

res11: Long = 175296


### Question 2

**Quels types d'appels différents ont été passés au service d'incendie?**

In [23]:
val FireDF = fireDF
  .select("IncidentNumber","Address", "Neighborhood" ,"CallType") 
  .where($"CallType" =!= "Structure Fire")

FireDF.show(10, false)

+--------------+------------------------+--------------------------+----------------+
|IncidentNumber|Address                 |Neighborhood              |CallType        |
+--------------+------------------------+--------------------------+----------------+
|2003241       |0 Block of SILVERVIEW DR|Bayview Hunters Point     |Medical Incident|
|2003242       |MARKET ST/MCALLISTER ST |Tenderloin                |Medical Incident|
|2003250       |APPLETON AV/MISSION ST  |Bernal Heights            |Vehicle Fire    |
|2003259       |1400 Block of SUTTER ST |Western Addition          |Alarms          |
|2003301       |0 Block of FARALLONES ST|Oceanview/Merced/Ingleside|Alarms          |
|2003304       |600 Block of POLK ST    |Tenderloin                |Alarms          |
|2003343       |1500 Block of WEBSTER ST|Japantown                 |Medical Incident|
|2003348       |DIAMOND ST/MARKET ST    |Castro/Upper Market       |Medical Incident|
|2003381       |2700 Block of MISSION ST|Mission      

FireDF: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [IncidentNumber: int, Address: string ... 2 more fields]


### Question 3

**Trouver toutes les réponses ou les délais sont supérieurs à 5 minutes?**

*Indication
1. Renommer la colonne Delay -> ReponseDelayedinMins
2. Retourner un nouveau DataFrame
3. Afficher tous les appels où le temps de réponse à un site d'incendie a eu lieu après un retard de plus de 5 minutes

In [24]:
val newFireDF = fireDF.withColumnRenamed("Delay", "ResponseDelayedinMins")


newFireDF: org.apache.spark.sql.DataFrame = [CallNumber: int, UnitID: string ... 26 more fields]


In [29]:
val newFireDF = spark.read.format("csv").option("header", "true").load("/home/papalayeba/Bureau/spark/sf-fire-calls.csv")


newFireDF: org.apache.spark.sql.DataFrame = [CallNumber: string, UnitID: string ... 26 more fields]


In [None]:
2. Retourner un nouveau DataFrame


In [33]:
val newFireSchema = StructType(Array(StructField("CallNumber", IntegerType, true),
  StructField("UnitID", StringType, true),
  StructField("IncidentNumber", IntegerType, true),
  StructField("CallType", StringType, true),                  
  StructField("CallDate", StringType, true),      
  StructField("WatchDate", StringType, true),
  StructField("CallFinalDisposition", StringType, true),
  StructField("AvailableDtTm", StringType, true),
  StructField("Address", StringType, true),       
  StructField("City", StringType, true),       
  StructField("Zipcode", IntegerType, true),       
  StructField("Battalion", StringType, true),                 
  StructField("StationArea", StringType, true),       
  StructField("Box", StringType, true),       
  StructField("OriginalPriority", StringType, true),       
  StructField("Priority", StringType, true),       
  StructField("FinalPriority", IntegerType, true),       
  StructField("ALSUnit", BooleanType, true),       
  StructField("CallTypeGroup", StringType, true),
  StructField("NumAlarms", IntegerType, true),
  StructField("UnitType", StringType, true),
  StructField("UnitSequenceInCallDispatch", IntegerType, true),
  StructField("FirePreventionDistrict", StringType, true),
  StructField("SupervisorDistrict", StringType, true),
  StructField("Neighborhood", StringType, true),
  StructField("Location", StringType, true),
  StructField("RowID", StringType, true),
  StructField("ResponseDelayedinMins", FloatType, true)))

newFireSchema: org.apache.spark.sql.types.StructType = StructType(StructField(CallNumber,IntegerType,true), StructField(UnitID,StringType,true), StructField(IncidentNumber,IntegerType,true), StructField(CallType,StringType,true), StructField(CallDate,StringType,true), StructField(WatchDate,StringType,true), StructField(CallFinalDisposition,StringType,true), StructField(AvailableDtTm,StringType,true), StructField(Address,StringType,true), StructField(City,StringType,true), StructField(Zipcode,IntegerType,true), StructField(Battalion,StringType,true), StructField(StationArea,StringType,true), StructField(Box,StringType,true), StructField(OriginalPriority,StringType,true), StructField(Priority,StringType,true), StructField(FinalPriority,IntegerType,true), StructField(ALSUnit,BooleanType,tr...


In [34]:
val sfFireFile = "/home/papalayeba/Bureau/spark/sf-fire-calls.csv"
val newFireDF = spark
  .read
  .schema(newFireSchema)
  .option("header", "true")
  .csv(sfFireFile)

sfFireFile: String = /home/papalayeba/Bureau/spark/sf-fire-calls.csv
newFireDF: org.apache.spark.sql.DataFrame = [CallNumber: int, UnitID: string ... 26 more fields]


In [35]:
newFireDF.printSchema()

root
 |-- CallNumber: integer (nullable = true)
 |-- UnitID: string (nullable = true)
 |-- IncidentNumber: integer (nullable = true)
 |-- CallType: string (nullable = true)
 |-- CallDate: string (nullable = true)
 |-- WatchDate: string (nullable = true)
 |-- CallFinalDisposition: string (nullable = true)
 |-- AvailableDtTm: string (nullable = true)
 |-- Address: string (nullable = true)
 |-- City: string (nullable = true)
 |-- Zipcode: integer (nullable = true)
 |-- Battalion: string (nullable = true)
 |-- StationArea: string (nullable = true)
 |-- Box: string (nullable = true)
 |-- OriginalPriority: string (nullable = true)
 |-- Priority: string (nullable = true)
 |-- FinalPriority: integer (nullable = true)
 |-- ALSUnit: boolean (nullable = true)
 |-- CallTypeGroup: string (nullable = true)
 |-- NumAlarms: integer (nullable = true)
 |-- UnitType: string (nullable = true)
 |-- UnitSequenceInCallDispatch: integer (nullable = true)
 |-- FirePreventionDistrict: string (nullable = true)
 

In [None]:
3. Afficher tous les appels où le temps de réponse à un site d'incendie a eu lieu après un retard de plus de 5 minutes


In [36]:
val FireDF = newFireDF
  .select( "Neighborhood","Location" ,"CallType", "ResponseDelayedinMins") 
  .where($"CallType" === "Structure Fire")
  .where($"ResponseDelayedinMins" > "5")



FireDF.show(10, false)

+------------------------------+-------------------------------------+--------------+---------------------+
|Neighborhood                  |Location                             |CallType      |ResponseDelayedinMins|
+------------------------------+-------------------------------------+--------------+---------------------+
|Potrero Hill                  |(37.7605763978135, -122.407559721688)|Structure Fire|5.1833334            |
|Financial District/South Beach|(37.787352867216, -122.399071684966) |Structure Fire|13.583333            |
|Potrero Hill                  |(37.754135226231, -122.399481585017) |Structure Fire|5.25                 |
|Bayview Hunters Point         |(37.750758868663, -122.390429362473) |Structure Fire|11.4                 |
|Japantown                     |(37.78676442656, -122.427490271072)  |Structure Fire|15.466666            |
|Inner Sunset                  |(37.7645085780924, -122.477112517357)|Structure Fire|5.6666665            |
|Bayview Hunters Point      

FireDF: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [Neighborhood: string, Location: string ... 2 more fields]


### Transformations des dates
Maintenant nous allons d'abord:
1. Transformer les dates de type String en Spark Timestamp afin que nous puissions effectuer des requêtes basées sur la date plus tard
2. Retourner le Dataframe transformée
3. Mettre en cache le nouveau DataFrame

In [40]:
val fireTSDF = newFireDF
  .withColumn("IncidentDate", to_timestamp(col("CallDate"), "MM/dd/yyyy")).drop("CallDate") 
  .withColumn("OnWatchDate", to_timestamp(col("WatchDate"), "MM/dd/yyyy")).drop("WatchDate") 
  .withColumn("AvailableDtTS", to_timestamp(col("AvailableDtTm"), "MM/dd/yyyy hh:mm:ss a")).drop("AvailableDtTm")

fireTSDF.cache()

fireTSDF: org.apache.spark.sql.DataFrame = [CallNumber: int, UnitID: string ... 26 more fields]
res18: fireTSDF.type = [CallNumber: int, UnitID: string ... 26 more fields]


In [41]:
res18.show(5)

+----------+------+--------------+----------------+--------------------+--------------------+----+-------+---------+-----------+----+----------------+--------+-------------+-------+-------------+---------+--------+--------------------------+----------------------+------------------+--------------------+--------------------+-------------+---------------------+-------------------+-------------------+-------------------+
|CallNumber|UnitID|IncidentNumber|        CallType|CallFinalDisposition|             Address|City|Zipcode|Battalion|StationArea| Box|OriginalPriority|Priority|FinalPriority|ALSUnit|CallTypeGroup|NumAlarms|UnitType|UnitSequenceInCallDispatch|FirePreventionDistrict|SupervisorDistrict|        Neighborhood|            Location|        RowID|ResponseDelayedinMins|       IncidentDate|        OnWatchDate|      AvailableDtTS|
+----------+------+--------------+----------------+--------------------+--------------------+----+-------+---------+-----------+----+----------------+------

In [61]:
val FireDTFSchema = StructType(Array(StructField("CallNumber", IntegerType, true),
  StructField("UnitID", StringType, true),
  StructField("IncidentNumber", IntegerType, true),
  StructField("CallType", StringType, true),                  
  StructField("IncidentDate", StringType, true),      
  StructField("OnWatchDate", StringType, true),
  StructField("CallFinalDisposition", StringType, true),
  StructField("AvailableDtTS", StringType, true),
  StructField("Address", StringType, true),       
  StructField("City", StringType, true),       
  StructField("Zipcode", IntegerType, true),       
  StructField("Battalion", StringType, true),                 
  StructField("StationArea", StringType, true),       
  StructField("Box", StringType, true),       
  StructField("OriginalPriority", StringType, true),       
  StructField("Priority", StringType, true),       
  StructField("FinalPriority", IntegerType, true),       
  StructField("ALSUnit", BooleanType, true),       
  StructField("CallTypeGroup", StringType, true),
  StructField("NumAlarms", IntegerType, true),
  StructField("UnitType", StringType, true),
  StructField("UnitSequenceInCallDispatch", IntegerType, true),
  StructField("FirePreventionDistrict", StringType, true),
  StructField("SupervisorDistrict", StringType, true),
  StructField("Neighborhood", StringType, true),
  StructField("Location", StringType, true),
  StructField("RowID", StringType, true),
  StructField("ResponseDelayedinMins", FloatType, true)))

FireDTFSchema: org.apache.spark.sql.types.StructType = StructType(StructField(CallNumber,IntegerType,true), StructField(UnitID,StringType,true), StructField(IncidentNumber,IntegerType,true), StructField(CallType,StringType,true), StructField(IncidentDate,StringType,true), StructField(OnWatchDate,StringType,true), StructField(CallFinalDisposition,StringType,true), StructField(AvailableDtTS,StringType,true), StructField(Address,StringType,true), StructField(City,StringType,true), StructField(Zipcode,IntegerType,true), StructField(Battalion,StringType,true), StructField(StationArea,StringType,true), StructField(Box,StringType,true), StructField(OriginalPriority,StringType,true), StructField(Priority,StringType,true), StructField(FinalPriority,IntegerType,true), StructField(ALSUnit,BooleanT...


In [62]:
val sfFireFile = "/home/papalayeba/Bureau/sparkprojet/sf-fire-calls.csv"
val fireDTFS = spark
  .read
  .schema(FireDTFSchema
         )
  .option("header", 
          "true")
  .csv(sfFireFile)

sfFireFile: String = /home/papalayeba/Bureau/sparkprojet/sf-fire-calls.csv
fireDTFS: org.apache.spark.sql.DataFrame = [CallNumber: int, UnitID: string ... 26 more fields]


In [59]:
fireDTFS.printSchema()

root
 |-- CallNumber: integer (nullable = true)
 |-- UnitID: string (nullable = true)
 |-- IncidentNumber: integer (nullable = true)
 |-- CallType: string (nullable = true)
 |-- IncidentDate: string (nullable = true)
 |-- OnWatchDate: string (nullable = true)
 |-- CallFinalDisposition: string (nullable = true)
 |-- AvailableDtTS: string (nullable = true)
 |-- Address: string (nullable = true)
 |-- City: string (nullable = true)
 |-- Zipcode: integer (nullable = true)
 |-- Battalion: string (nullable = true)
 |-- StationArea: string (nullable = true)
 |-- Box: string (nullable = true)
 |-- OriginalPriority: string (nullable = true)
 |-- Priority: string (nullable = true)
 |-- FinalPriority: integer (nullable = true)
 |-- ALSUnit: boolean (nullable = true)
 |-- CallTypeGroup: string (nullable = true)
 |-- NumAlarms: integer (nullable = true)
 |-- UnitType: string (nullable = true)
 |-- UnitSequenceInCallDispatch: integer (nullable = true)
 |-- FirePreventionDistrict: string (nullable = t

In [None]:
3. Mettre en cache le nouveau DataFrame

In [63]:
fireDTFS.cache()

res26: fireDTFS.type = [CallNumber: int, UnitID: string ... 26 more fields]


### Question 4
**Quels sont les types d'appels les plus courants?**

In [64]:
fireDF.groupBy("CallType","Address" ).count()

res27: org.apache.spark.sql.DataFrame = [CallType: string, Address: string ... 1 more field]


In [65]:
res27.show()

+-----------------+--------------------+-----+
|         CallType|             Address|count|
+-----------------+--------------------+-----+
| Medical Incident|300 Block of EDDY ST|  633|
| Medical Incident|100 Block of GOLD...|  373|
|           Alarms|1500 Block of FIL...|    1|
| Medical Incident|800 Block of OFAR...|  189|
| Medical Incident|19TH AV/JUNIPERO ...|   11|
|   Structure Fire|    5TH ST/HOWARD ST|   21|
| Medical Incident|500 Block of 16TH...|   10|
| Medical Incident|400 Block of 23RD...|   12|
| Medical Incident|MONTEREY BL/RIDGE...|    1|
| Medical Incident|400 Block of GEAR...|   81|
| Medical Incident|  18TH ST/DIAMOND ST|    4|
| Medical Incident|CALL BOX: GENEVA ...|    7|
| Medical Incident|DEL VALE AV/OSHAU...|    1|
| Medical Incident|DUNCAN ST/GUERRER...|    2|
|   Structure Fire|  EDDY ST/WEBSTER ST|    7|
| Medical Incident|1600 Block of OAK ST|    5|
|           Alarms| 300 Block of 7TH ST|    3|
|Traffic Collision|   43RD AV/FULTON ST|    2|
| Medical Inc

### Question 5-a
**Quels sont boites postaux rencontrés dans les appels les plus courants?**

In [66]:
fireDF
 .groupBy("CallType", "UnitType","Zipcode","CallNumber", "Address")
 .agg(count( "Zipcode").as("Count")).show()

+--------------------+--------------+-------+----------+--------------------+-----+
|            CallType|      UnitType|Zipcode|CallNumber|             Address|Count|
+--------------------+--------------+-------+----------+--------------------+-----+
|    Medical Incident|         MEDIC|  94133|  20110350|500 Block of BROA...|    1|
|    Medical Incident|         MEDIC|  94102|  20120309|   OAK ST/WEBSTER ST|    1|
|    Medical Incident|         MEDIC|  94105|  20230440|   0 Block of 2ND ST|    1|
|      Structure Fire|        ENGINE|  94110|  20740052|1000 Block of POT...|    1|
|              Alarms|         TRUCK|  94103|  20950034|700 Block of NATO...|    1|
|    Medical Incident|         MEDIC|  94109|  20980218|1000 Block of GEA...|    1|
|    Medical Incident|         MEDIC|  94112|  21030024|CALL BOX: SAN JOS...|    1|
|              Alarms|         TRUCK|  94104|  21080396|500 Block of CALI...|    1|
|              Alarms|         CHIEF|  94117|  21110136|300 Block of BUEN...

### Question 5-a
**Quels sont les quartiers de San Francisco dont les codes postaux sont 94102 et 94103?**

In [82]:
val FireDF = newFireDF
  .select( "Neighborhood", "Address" ,"CallType", "City", "ZipCode") 
  .where( $"City" === "SF")
  .where($"ZipCode" === "94102")


FireDF.show(10, false)

+------------+-----------------------+----------------+----+-------+
|Neighborhood|Address                |CallType        |City|ZipCode|
+------------+-----------------------+----------------+----+-------+
|Tenderloin  |MARKET ST/MCALLISTER ST|Medical Incident|SF  |94102  |
|Tenderloin  |600 Block of POLK ST   |Alarms          |SF  |94102  |
|Tenderloin  |400 Block of TURK ST   |Structure Fire  |SF  |94102  |
|Hayes Valley|OAK ST/WEBSTER ST      |Medical Incident|SF  |94102  |
|Tenderloin  |0 Block of JONES ST    |Structure Fire  |SF  |94102  |
|Tenderloin  |400 Block of EDDY ST   |Medical Incident|SF  |94102  |
|Hayes Valley|500 Block of OAK ST    |Alarms          |SF  |94102  |
|Hayes Valley|HAIGHT ST/OCTAVIA ST   |Medical Incident|SF  |94102  |
|Tenderloin  |0 Block of LARKIN ST   |Structure Fire  |SF  |94102  |
|Tenderloin  |100 Block of TURK ST   |Medical Incident|SF  |94102  |
+------------+-----------------------+----------------+----+-------+
only showing top 10 rows



FireDF: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [Neighborhood: string, Address: string ... 3 more fields]


In [68]:
val FireDF = newFireDF
  .select( "Neighborhood", "Address" ,"CallType", "City", "ZipCode") 
  .where( $"City" === "SF")
  .where($"ZipCode" === "94103")


FireDF.show(10, false)

+------------------------------+--------------------------+----------------+----+-------+
|Neighborhood                  |Address                   |CallType        |City|ZipCode|
+------------------------------+--------------------------+----------------+----+-------+
|South of Market               |9TH ST/HOWARD ST          |Structure Fire  |SF  |94103  |
|Mission                       |400 Block of VALENCIA ST  |Medical Incident|SF  |94103  |
|Mission                       |16TH ST/MISSION ST        |Medical Incident|SF  |94103  |
|South of Market               |4TH ST/MISSION ST         |Medical Incident|SF  |94103  |
|South of Market               |300 Block of CLEMENTINA ST|Medical Incident|SF  |94103  |
|Financial District/South Beach|700 Block of MARKET ST    |Alarms          |SF  |94103  |
|Mission                       |100 Block of JULIAN AVE   |Medical Incident|SF  |94103  |
|South of Market               |5TH ST/MARKET ST          |Medical Incident|SF  |94103  |
|South of 

FireDF: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [Neighborhood: string, Address: string ... 3 more fields]


### Question 6
**Determiner le nombre total d'appels, ainsi que la moyenne, le minimum et le maximum du temps de réponse des appels?**

In [74]:
fireTSDF.groupBy("CallType").max().show(5)
fireTSDF.groupBy("CallType").min().show(5)
fireTSDF.groupBy("CallType").sum().show(5)
fireTSDF.groupBy("CallType").mean().show(5)

+--------------------+---------------+-------------------+------------+------------------+--------------+-------------------------------+--------------------------+
|            CallType|max(CallNumber)|max(IncidentNumber)|max(Zipcode)|max(FinalPriority)|max(NumAlarms)|max(UnitSequenceInCallDispatch)|max(ResponseDelayedinMins)|
+--------------------+---------------+-------------------+------------+------------------+--------------+-------------------------------+--------------------------+
|Elevator / Escala...|      183043602|           18127585|       94158|                 3|             1|                              7|                 48.716667|
|         Marine Fire|      153242813|           15124770|       94158|                 3|             1|                             19|                 13.183333|
|  Aircraft Emergency|      150012799|           15000434|       94132|                 3|             1|                             30|                 13.166667|
|Confined 

### Question 7-a
**Combien d'années distinctes trouve t-on dans ce Dataset?**  
Dans ce dataset nous avons des données comprises entre 2000-2018. Vous pouvez utilisez la fonction Spark `year()` pour les dates en Timestamp

In [270]:
import org.apache.spark.sql.SparkSession

import org.apache.spark.sql.SparkSession


In [271]:
val spark = SparkSession.
    builder.
    config("spark.ui.port", "0").
    appName("Data Processing").
    getOrCreate

spark: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@18477109


In [272]:
spark


res164: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@18477109


In [273]:
spark.sparkContext.getConf.getAll.foreach(println)

(spark.app.id,local-1610351418767)
(spark.repl.class.outputDir,/tmp/tmp74ec6ll6)
(spark.rdd.compress,True)
(spark.serializer.objectStreamReset,100)
(spark.master,local[*])
(spark.submit.pyFiles,)
(spark.executor.id,driver)
(spark.submit.deployMode,client)
(spark.repl.class.uri,spark://papalayeba:45315/classes)
(spark.driver.port,45315)
(spark.app.name,spylon-kernel)
(spark.ui.showConsoleProgress,true)
(spark.driver.host,papalayeba)


In [292]:
// spark.read.csv avec option

val df = spark.
    read.
    schema("""order_id INT, order_date TIMESTAMP,
              order_customer_id INT, order_status STRING
           """).
    option("sep", ",").
    csv("/home/papalayeba/Bureau/sparkprojet/sf-fire-calls.csv")

df: org.apache.spark.sql.DataFrame = [order_id: int, order_date: timestamp ... 2 more fields]


In [293]:
df


res178: org.apache.spark.sql.DataFrame = [order_id: int, order_date: timestamp ... 2 more fields]


In [294]:
df.printSchema

root
 |-- order_id: integer (nullable = true)
 |-- order_date: timestamp (nullable = true)
 |-- order_customer_id: integer (nullable = true)
 |-- order_status: string (nullable = true)



In [306]:
df.showow()

+--------+----------+-----------------+------------+
|order_id|order_date|order_customer_id|order_status|
+--------+----------+-----------------+------------+
+--------+----------+-----------------+------------+



In [295]:
df.
selectExpr("order_date"
).show(100)

+-------------------+
|         order_date|
+-------------------+
|               null|
|2021-01-11 13:00:00|
|               null|
|               null|
|               null|
|               null|
|2021-01-11 08:00:00|
|               null|
|               null|
|               null|
|               null|
|               null|
|2021-01-11 15:00:00|
|               null|
|               null|
|               null|
|               null|
|               null|
|               null|
|               null|
|               null|
|2021-01-11 06:00:00|
|               null|
|               null|
|               null|
|               null|
|2021-01-11 19:00:00|
|               null|
|               null|
|               null|
|               null|
|               null|
|               null|
|               null|
|               null|
|               null|
|               null|
|               null|
|               null|
|               null|
|               null|
|               null|
|         

### Question 7-b
**Quelle semaine de l'année 2018 a eu le plus d'appels d'incendie?**

In [19]:
//Reponse 7-b
/*
Ecrire ici votre code
*/

In [212]:
fireTSDF.show(3)

+----------+------+--------------+----------------+------------+-----------+--------------------+--------------------+--------------------+----+-------+---------+-----------+----+----------------+--------+-------------+-------+-------------+---------+--------+--------------------------+----------------------+------------------+--------------------+--------------------+-------------+---------------------+
|CallNumber|UnitID|IncidentNumber|        CallType|IncidentDate|OnWatchDate|CallFinalDisposition|       AvailableDtTS|             Address|City|Zipcode|Battalion|StationArea| Box|OriginalPriority|Priority|FinalPriority|ALSUnit|CallTypeGroup|NumAlarms|UnitType|UnitSequenceInCallDispatch|FirePreventionDistrict|SupervisorDistrict|        Neighborhood|            Location|        RowID|ResponseDelayedinMins|
+----------+------+--------------+----------------+------------+-----------+--------------------+--------------------+--------------------+----+-------+---------+-----------+----+-----

### Question 8
**Quels sont les quartiers de San Francisco qui ont connu le pire temps de réponse en 2018?**

In [80]:
val FireDF = newFireDF
  .select( "Neighborhood", "Address" ,"CallType", "City", "CallDate","ResponseDelayedinMins") 
  .where( $"City" === "SF")
  .where($"ResponseDelayedinMins" > "10")
  .where(year($"CallDate") =!= "2018")



FireDF.show(10, false)

+------------+-------+--------+----+--------+---------------------+
|Neighborhood|Address|CallType|City|CallDate|ResponseDelayedinMins|
+------------+-------+--------+----+--------+---------------------+
+------------+-------+--------+----+--------+---------------------+



FireDF: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [Neighborhood: string, Address: string ... 4 more fields]


### Question 9

**Comment stocker les données du Dataframe sous format de fichiers Parquet?**

In [81]:
val df = spark.
    read.
    schema("""order_id INT, 
              order_date STRING, 
              order_customer_id INT, 
              order_status STRING
           """
          ).
    csv("/home/papalayeba/Bureau/spark/sf-fire-calls.csv")

df: org.apache.spark.sql.DataFrame = [order_id: int, order_date: string ... 2 more fields]


In [336]:
spark.conf.get("spark.sql.parquet.compression.codec")

res215: String = snappy


In [337]:
// En utilisant write.parquet
df.
    write.
    mode("overwrite").
    option("compression", "none").
    parquet("/home/papalayeba/Bureau/spark/sf-fire-calls.csv")

In [338]:
// En utilisant write.format("parquet")
df.
    coalesce(1).
    write.
    mode("overwrite").
    option("compression", "none").
    format("parquet").
    save("/home/papalayeba/Bureau/spark/sf-fire-calls.csv")

In [339]:
import sys.process._

"ls -l /home/papalayeba/Bureau/spark/sf-fire-calls.csv" !

total 4
-rw-r--r-- 1 papalayeba papalayeba 599 san 11 11:33 part-00000-04f09e49-a944-4cb4-a7ed-211a001a9b2a-c000.parquet
-rw-r--r-- 1 papalayeba papalayeba   0 san 11 11:33 _SUCCESS


import sys.process._
res218: Int = 0


### Question 10
**Comment relire les données stockée en format Parquet?**

In [340]:
spark.read.parquet("/home/papalayeba/Bureau/spark/sf-fire-calls.csv").printSchema

root
 |-- order_id: integer (nullable = true)
 |-- order_date: string (nullable = true)
 |-- order_customer_id: integer (nullable = true)
 |-- order_status: string (nullable = true)



In [341]:
spark.read.parquet("/home/papalayeba/Bureau/sparkprojet/sf-fire-calls.csv").show

+--------+----------+-----------------+------------+
|order_id|order_date|order_customer_id|order_status|
+--------+----------+-----------------+------------+
+--------+----------+-----------------+------------+



## FIN