# Projet Final Apache Spark

**Nom Etudiant :**  Serigne  Saliou

**Prenom Etudiant:**  Gueye

**Classe :**  Master Intelligence Artificielle


## Description
Ce projet consiste à utiliser Apache Spark pour faire l'analyse et le traitement des données de **[San Francisco Fire Department Calls ](https://data.sfgov.org/Public-Safety/Fire-Department-Calls-for-Service/nuek-vuh3)** afin de fournir quelques KPI (*Key Performance Indicator*). Le **SF Fire Dataset** comprend les réponses aux appels de toutes les unités d'incendie. Chaque enregistrement comprend le numéro d'appel, le numéro d'incident, l'adresse, l'identifiant de l'unité, le type d'appel et la disposition. Tous les intervalles de temps pertinents sont également inclus. Étant donné que ce Dataset est basé sur les réponses et que la plupart des appels impliquent plusieurs unités, ainsi il existe plusieurs enregistrements pour chaque numéro d'appel. Les adresses sont associées à un numéro de bloc, à une intersection ou à une boîte d'appel, et non à une adresse spécifique.

**Plus de details sur la description des données cliquer sur ce [lien](https://data.sfgov.org/Public-Safety/Fire-Department-Calls-for-Service/nuek-vuh3)**

## Travail à faire.
L'objectif de ce projet est de comprendre le **SF Fire Dataset** afin de bien répondre aux questions en utilisant les codes Spark/Scala adéquates.

- Créer un repos git (public ou privé) et partager le repos avec mon mail (limahin10@gmail.com)
- Ecrire un code lisible et bien indenté 
- N'oublier pas de mettre en commentaire la justification de vos réponses sur les cellules Markdown. 


## Note:
- Le projet est personnel, c'est-à-dire chaque notebook ne concerne qu'un seul étudiant. 
- Deadline : **Jeudi 10 janvier 2021** (Aucune de dérogation ne sera acceptée)

### Chargement des données

Importation des packages Spark

In [50]:
import org.apache.spark.sql.types._ 
import org.apache.spark.sql.functions._ 
import spark.implicits._

import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._
import spark.implicits._


Nous allons jeter un coup d'oeil sur la structure des données avant de définir un schéma

In [51]:
!head -1 "datasets/sf-fire/sf-fire-calls.csv"

CallNumber,UnitID,IncidentNumber,CallType,CallDate,WatchDate,CallFinalDisposition,AvailableDtTm,Address,City,Zipcode,Battalion,StationArea,Box,OriginalPriority,Priority,FinalPriority,ALSUnit,CallTypeGroup,NumAlarms,UnitType,UnitSequenceInCallDispatch,FirePreventionDistrict,SupervisorDistrict,Neighborhood,Location,RowID,Delay



Vu que la taille de ces données est énormes, inferer le schema pour un très grande volumes de données s'avère un peu couteux. Nous allons ainsi définir un schema pour le Dataset.

In [52]:
val fireSchema = StructType(Array(StructField("CallNumber", IntegerType, true),
  StructField("UnitID", StringType, true),
  StructField("IncidentNumber", IntegerType, true),
  StructField("CallType", StringType, true),                  
  StructField("CallDate", StringType, true),      
  StructField("WatchDate", StringType, true),
  StructField("CallFinalDisposition", StringType, true),
  StructField("AvailableDtTm", StringType, true),
  StructField("Address", StringType, true),       
  StructField("City", StringType, true),       
  StructField("Zipcode", IntegerType, true),       
  StructField("Battalion", StringType, true),                 
  StructField("StationArea", StringType, true),       
  StructField("Box", StringType, true),       
  StructField("OriginalPriority", StringType, true),       
  StructField("Priority", StringType, true),       
  StructField("FinalPriority", IntegerType, true),       
  StructField("ALSUnit", BooleanType, true),       
  StructField("CallTypeGroup", StringType, true),
  StructField("NumAlarms", IntegerType, true),
  StructField("UnitType", StringType, true),
  StructField("UnitSequenceInCallDispatch", IntegerType, true),
  StructField("FirePreventionDistrict", StringType, true),
  StructField("SupervisorDistrict", StringType, true),
  StructField("Neighborhood", StringType, true),
  StructField("Location", StringType, true),
  StructField("RowID", StringType, true),
  StructField("Delay", FloatType, true)))

fireSchema: org.apache.spark.sql.types.StructType = StructType(StructField(CallNumber,IntegerType,true), StructField(UnitID,StringType,true), StructField(IncidentNumber,IntegerType,true), StructField(CallType,StringType,true), StructField(CallDate,StringType,true), StructField(WatchDate,StringType,true), StructField(CallFinalDisposition,StringType,true), StructField(AvailableDtTm,StringType,true), StructField(Address,StringType,true), StructField(City,StringType,true), StructField(Zipcode,IntegerType,true), StructField(Battalion,StringType,true), StructField(StationArea,StringType,true), StructField(Box,StringType,true), StructField(OriginalPriority,StringType,true), StructField(Priority,StringType,true), StructField(FinalPriority,IntegerType,true), StructField(ALSUnit,BooleanType,true)...


In [53]:
val sfFireFile = "datasets/sf-fire/sf-fire-calls.csv"
val fireDF = spark
  .read
  .schema(fireSchema)
  .option("header", "true")
  .csv(sfFireFile)

sfFireFile: String = datasets/sf-fire/sf-fire-calls.csv
fireDF: org.apache.spark.sql.DataFrame = [CallNumber: int, UnitID: string ... 26 more fields]


Nous allons mettre en cache le Dataframe

In [54]:
fireDF.cache()

res37: fireDF.type = [CallNumber: int, UnitID: string ... 26 more fields]


In [55]:
fireDF.count()

res38: Long = 175296


In [56]:
fireDF.printSchema()

root
 |-- CallNumber: integer (nullable = true)
 |-- UnitID: string (nullable = true)
 |-- IncidentNumber: integer (nullable = true)
 |-- CallType: string (nullable = true)
 |-- CallDate: string (nullable = true)
 |-- WatchDate: string (nullable = true)
 |-- CallFinalDisposition: string (nullable = true)
 |-- AvailableDtTm: string (nullable = true)
 |-- Address: string (nullable = true)
 |-- City: string (nullable = true)
 |-- Zipcode: integer (nullable = true)
 |-- Battalion: string (nullable = true)
 |-- StationArea: string (nullable = true)
 |-- Box: string (nullable = true)
 |-- OriginalPriority: string (nullable = true)
 |-- Priority: string (nullable = true)
 |-- FinalPriority: integer (nullable = true)
 |-- ALSUnit: boolean (nullable = true)
 |-- CallTypeGroup: string (nullable = true)
 |-- NumAlarms: integer (nullable = true)
 |-- UnitType: string (nullable = true)
 |-- UnitSequenceInCallDispatch: integer (nullable = true)
 |-- FirePreventionDistrict: string (nullable = true)
 

In [57]:
fireDF.show(5)

+----------+------+--------------+----------------+----------+----------+--------------------+--------------------+--------------------+----+-------+---------+-----------+----+----------------+--------+-------------+-------+-------------+---------+--------+--------------------------+----------------------+------------------+--------------------+--------------------+-------------+---------+
|CallNumber|UnitID|IncidentNumber|        CallType|  CallDate| WatchDate|CallFinalDisposition|       AvailableDtTm|             Address|City|Zipcode|Battalion|StationArea| Box|OriginalPriority|Priority|FinalPriority|ALSUnit|CallTypeGroup|NumAlarms|UnitType|UnitSequenceInCallDispatch|FirePreventionDistrict|SupervisorDistrict|        Neighborhood|            Location|        RowID|    Delay|
+----------+------+--------------+----------------+----------+----------+--------------------+--------------------+--------------------+----+-------+---------+-----------+----+----------------+--------+------------

Filtrage des d'appels de type "Medical Incident"

In [58]:
val fewFireDF = fireDF
  .select("IncidentNumber", "AvailableDtTm", "CallType") 
  .where($"CallType" =!= "Medical Incident")

fewFireDF.show(20, false)

+--------------+----------------------+-----------------------------+
|IncidentNumber|AvailableDtTm         |CallType                     |
+--------------+----------------------+-----------------------------+
|2003235       |01/11/2002 01:51:44 AM|Structure Fire               |
|2003250       |01/11/2002 04:16:46 AM|Vehicle Fire                 |
|2003259       |01/11/2002 06:01:58 AM|Alarms                       |
|2003279       |01/11/2002 08:03:26 AM|Structure Fire               |
|2003301       |01/11/2002 09:46:44 AM|Alarms                       |
|2003304       |01/11/2002 09:58:53 AM|Alarms                       |
|2003382       |01/11/2002 02:59:04 PM|Structure Fire               |
|2003408       |01/11/2002 04:09:08 PM|Structure Fire               |
|2003408       |01/11/2002 04:09:08 PM|Structure Fire               |
|2003408       |01/11/2002 04:09:08 PM|Structure Fire               |
|2003429       |01/11/2002 05:17:15 PM|Odor (Strange / Unknown)     |
|2003453       |01/1

fewFireDF: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [IncidentNumber: int, AvailableDtTm: string ... 1 more field]


### Question 1
**Combien de types d'appels distincts ont été passés ?**  
Pour être sûr, il ne faut pas compter les valeurs «nulles» dans la colonne.

In [59]:
// Reponse 1
/*

Ecrire ici votre code
*/
//suppresion des valeurs nulles de la colonne CallType
val noNullFewFireDFCallType=  fewFireDF.na.drop(cols=Seq("CallType"))
//on utilise dropDuplicates pour supprimer les dupplicata
val dropDisCallType = noNullFewFireDFCallType.dropDuplicates("CallType")
//on affiche le nombre de valeur distinc en utilisant la fonction count()
  println("Le nombre de type d'appel distincte : "+dropDisCallType.count())
  dropDisCallType.show(false)
noNullFewFireDFCallType.select(countDistinct("CallType"))

//noNullFewFireDFCallType.groupBy("CallType").count().show()

Le nombre de type d'appel distincte : 29
+--------------+----------------------+--------------------------------------------+
|IncidentNumber|AvailableDtTm         |CallType                                    |
+--------------+----------------------+--------------------------------------------+
|2008447       |01/29/2002 04:58:55 AM|Elevator / Escalator Rescue                 |
|5092002       |12/06/2005 04:41:49 PM|Marine Fire                                 |
|2013793       |02/15/2002 07:19:48 PM|Aircraft Emergency                          |
|11112063      |12/04/2011 04:16:46 PM|Confined Space / Structure Collapse         |
|6007230       |01/26/2006 02:37:40 PM|Administrative                              |
|2003259       |01/11/2002 06:01:58 AM|Alarms                                      |
|2003429       |01/11/2002 05:17:15 PM|Odor (Strange / Unknown)                    |
|2004152       |01/14/2002 08:16:54 AM|Citizen Assist / Service Call               |
|2057385       |07/11/20

noNullFewFireDFCallType: org.apache.spark.sql.DataFrame = [IncidentNumber: int, AvailableDtTm: string ... 1 more field]
dropDisCallType: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [IncidentNumber: int, AvailableDtTm: string ... 1 more field]
res42: org.apache.spark.sql.DataFrame = [count(DISTINCT CallType): bigint]


In [60]:

//on peut aussi utiliser la fonction countdistinct
noNullFewFireDFCallType.select(countDistinct("CallType")).show()

+------------------------+
|count(DISTINCT CallType)|
+------------------------+
|                      29|
+------------------------+



### Question 2

**Quels types d'appels différents ont été passés au service d'incendie?**

In [61]:
// Reponse 2
/*
Ecrire ici votre code
*/
//types d'appels différents ont été passés au service d'incendie
  dropDisCallType.select("CallType").show(29)

+--------------------+
|            CallType|
+--------------------+
|Elevator / Escala...|
|         Marine Fire|
|  Aircraft Emergency|
|Confined Space / ...|
|      Administrative|
|              Alarms|
|Odor (Strange / U...|
|Citizen Assist / ...|
|              HazMat|
|Watercraft in Dis...|
|           Explosion|
|           Oil Spill|
|        Vehicle Fire|
|  Suspicious Package|
|Extrication / Ent...|
|               Other|
|        Outside Fire|
|   Traffic Collision|
|       Assist Police|
|Gas Leak (Natural...|
|        Water Rescue|
|   Electrical Hazard|
|   High Angle Rescue|
|      Structure Fire|
|Industrial Accidents|
|Mutual Aid / Assi...|
|          Fuel Spill|
|Smoke Investigati...|
|Train / Rail Inci...|
+--------------------+



### Question 3

**Trouver toutes les réponses ou les délais sont supérieurs à 5 minutes?**

*Indication
1. Renommer la colonne Delay -> ReponseDelayedinMins
2. Retourner un nouveau DataFrame
3. Afficher tous les appels où le temps de réponse à un site d'incendie a eu lieu après un retard de plus de 5 minutes

In [62]:
// Reponse 3

val newFireDF = fireDF.withColumnRenamed("Delay", "ResponseDelayedinMins")
//on chosit le numero d"ppel et le temps de reponse et on filtre ce qui sont plus grand que 5
newFireDF.select("CallNumber","ResponseDelayedinMins").filter("ResponseDelayedinMins>5").show()

/*
Completer le code
*/

+----------+---------------------+
|CallNumber|ResponseDelayedinMins|
+----------+---------------------+
|  20110315|                 5.35|
|  20120147|                 6.25|
|  20130013|                  5.2|
|  20140067|                  5.6|
|  20140177|                 7.25|
|  20150056|            11.916667|
|  20150254|             5.116667|
|  20150265|             8.633333|
|  20150265|             95.28333|
|  20150380|                 5.45|
|  20150414|                  7.6|
|  20160059|             6.133333|
|  20160064|            5.1833334|
|  20170118|            6.9166665|
|  20170342|                  5.2|
|  20180129|                 6.35|
|  20180191|             7.983333|
|  20180382|                13.55|
|  20190062|                 5.15|
|  20190097|            13.583333|
+----------+---------------------+
only showing top 20 rows



newFireDF: org.apache.spark.sql.DataFrame = [CallNumber: int, UnitID: string ... 26 more fields]


### Transformations des dates
Maintenant nous allons d'abord:
1. Transformer les dates de type String en Spark Timestamp afin que nous puissions effectuer des requêtes basées sur la date plus tard
2. Retourner le Dataframe transformée
3. Mettre en cache le nouveau DataFrame

In [63]:
val fireTSDF = newFireDF
  .withColumn("IncidentDate", to_timestamp(col("CallDate"), "MM/dd/yyyy")).drop("CallDate") 
  .withColumn("OnWatchDate", to_timestamp(col("WatchDate"), "MM/dd/yyyy")).drop("WatchDate") 
  .withColumn("AvailableDtTS", to_timestamp(col("AvailableDtTm"), "MM/dd/yyyy hh:mm:ss a")).drop("AvailableDtTm")

fireTSDF.cache()
//fireTSDF.printSchema()

fireTSDF: org.apache.spark.sql.DataFrame = [CallNumber: int, UnitID: string ... 26 more fields]
res46: fireTSDF.type = [CallNumber: int, UnitID: string ... 26 more fields]


### Question 4
**Quels sont les types d'appels les plus courants?**

In [64]:
//Reponse4


/*
Ecrire ici votre code
*/
//pour cette question on regroupe les type d'appel et on compte leur nombre et onsuite on les range par ordre decroissant
val fireTSDF2= fireTSDF.groupBy("CallType").count().sort(desc("count")).show()







+--------------------+------+
|            CallType| count|
+--------------------+------+
|    Medical Incident|113794|
|      Structure Fire| 23319|
|              Alarms| 19406|
|   Traffic Collision|  7013|
|Citizen Assist / ...|  2524|
|               Other|  2166|
|        Outside Fire|  2094|
|        Vehicle Fire|   854|
|Gas Leak (Natural...|   764|
|        Water Rescue|   755|
|Odor (Strange / U...|   490|
|   Electrical Hazard|   482|
|Elevator / Escala...|   453|
|Smoke Investigati...|   391|
|          Fuel Spill|   193|
|              HazMat|   124|
|Industrial Accidents|    94|
|           Explosion|    89|
|Train / Rail Inci...|    57|
|  Aircraft Emergency|    36|
+--------------------+------+
only showing top 20 rows



fireTSDF2: Unit = ()


### Question 5-a
**Quels sont boites postaux rencontrés dans les appels les plus courants?**

In [65]:
//Reponse 5-a
/*
Ecrire ici votre code
*/
val fireTSDF2= fireTSDF.groupBy("CallType","Zipcode").count().sort(desc("count")).show()

//fireTSDF2=fireTSDF.withColumn("TypeAppePlusfrequent",fireTSDF.groupBy("CallType").count().sort(desc("count")))


+----------------+-------+-----+
|        CallType|Zipcode|count|
+----------------+-------+-----+
|Medical Incident|  94102|16130|
|Medical Incident|  94103|14775|
|Medical Incident|  94110| 9995|
|Medical Incident|  94109| 9479|
|Medical Incident|  94124| 5885|
|Medical Incident|  94112| 5630|
|Medical Incident|  94115| 4785|
|Medical Incident|  94122| 4323|
|Medical Incident|  94107| 4284|
|Medical Incident|  94133| 3977|
|Medical Incident|  94117| 3522|
|Medical Incident|  94134| 3437|
|Medical Incident|  94114| 3225|
|Medical Incident|  94118| 3104|
|Medical Incident|  94121| 2953|
|Medical Incident|  94116| 2738|
|Medical Incident|  94132| 2594|
|  Structure Fire|  94110| 2267|
|Medical Incident|  94105| 2258|
|  Structure Fire|  94102| 2229|
+----------------+-------+-----+
only showing top 20 rows



fireTSDF2: Unit = ()


### Question 5-a
**Quels sont les quartiers de San Francisco dont les codes postaux sont 94102 et 94103?**

In [66]:
//Reponse 5-b
/*
Ecrire ici votre code
*/
val street_zipcode=fireTSDF.select("Zipcode","Address","city").filter($"Zipcode" === 94102 || $"Zipcode"=== 94103).show()

//fireTSDF.printSchema()


+-------+--------------------+----+
|Zipcode|             Address|city|
+-------+--------------------+----+
|  94102|MARKET ST/MCALLIS...|  SF|
|  94102|600 Block of POLK ST|  SF|
|  94103|    9TH ST/HOWARD ST|  SF|
|  94103|400 Block of VALE...|  SF|
|  94103|  16TH ST/MISSION ST|  SF|
|  94103|   4TH ST/MISSION ST|  SF|
|  94102|400 Block of TURK ST|  SF|
|  94102|   OAK ST/WEBSTER ST|  SF|
|  94102| 0 Block of JONES ST|  SF|
|  94102|400 Block of EDDY ST|  SF|
|  94103|300 Block of CLEM...|  SF|
|  94102| 500 Block of OAK ST|  SF|
|  94103|700 Block of MARK...|  SF|
|  94102|HAIGHT ST/OCTAVIA ST|  SF|
|  94103|100 Block of JULI...|  SF|
|  94102|0 Block of LARKIN ST|  SF|
|  94102|100 Block of TURK ST|  SF|
|  94102|CALL BOX: BUCHANA...|  SF|
|  94103|    5TH ST/MARKET ST|  SF|
|  94103| 100 Block of 7TH ST|  SF|
+-------+--------------------+----+
only showing top 20 rows



street_zipcode: Unit = ()


### Question 6
**Determiner le nombre total d'appels, ainsi que la moyenne, le minimum et le maximum du temps de réponse des appels?**

In [67]:
//Reponse 6
/*
Pour chaque type d'appels on compte son nombre,ensuite on fait la somme
*/
//fireTSDF.printSchema()
//Pour chaque type d'appels on compte son nombre
val fireTSDF2= fireTSDF.groupBy("CallType").count().sort(desc("count"))
//on fait le somme du colonne pour avoir le nombre total d'appel
fireTSDF2.select(sum("count")).show()
//moyenne,maximum,minumum
fireTSDF.select(mean("ResponseDelayedinMins")).show()
//minimum
fireTSDF.select(min("ResponseDelayedinMins")).show()
//maximum
fireTSDF.select(max("ResponseDelayedinMins")).show()
// heure
//reTSDF.select(hour(max("ResponseDelayedinMins"))).show()

//reTSDF.groupBy("ResponseDelayedinMins").mean().select("ResponseDelayedinMins","avg(ResponseDelayedinMins)").show()




+----------+
|sum(count)|
+----------+
|    175296|
+----------+

+--------------------------+
|avg(ResponseDelayedinMins)|
+--------------------------+
|         3.892364154521585|
+--------------------------+

+--------------------------+
|min(ResponseDelayedinMins)|
+--------------------------+
|               0.016666668|
+--------------------------+

+--------------------------+
|max(ResponseDelayedinMins)|
+--------------------------+
|                   1844.55|
+--------------------------+



fireTSDF2: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [CallType: string, count: bigint]


### Question 7-a
**Combien d'années distinctes trouve t-on dans ce Dataset?**  
Dans ce dataset nous avons des données comprises entre 2000-2018. Vous pouvez utilisez la fonction Spark `year()` pour les dates en Timestamp

In [68]:
//Reponse 7-a
/*
Ecrire ici votre code
*/
fireTSDF.select(countDistinct(year(fireTSDF("AvailableDtTS")))).show()

+-------------------------------------------------+
|count(DISTINCT year(CAST(AvailableDtTS AS DATE)))|
+-------------------------------------------------+
|                                               19|
+-------------------------------------------------+



### Question 7-b
**Quelle semaine de l'année 2018 a eu le plus d'appels d'incendie?**

In [69]:
//Reponse 7-b
/*
Ecrire ici votre code
*/
//on selectionne d'abord les types d'appels d'incendie de l' anneé 2018
val fire_call_in_2018=fireTSDF.select("CallType","AvailableDtTS").filter($"CallType"==="Structure Fire"||$"CallType"==="Outside Fire" 
 and year(fireTSDF("AvailableDtTS"))===2018)
//on ajoute le colonne des semaines de l'annee 2018
val fire_call_in_2018_2=fire_call_in_2018.withColumn("semaine",weekofyear(fire_call_in_2018("AvailableDtTS")))
//enfin on compte le nombre d'appel pour chaque semaine en utiliant groupBy et on l'ordonne par ordre decroissant
fire_call_in_2018_2.groupBy("semaine").count().sort(desc("count")).show()
//le resulat montre que c'est la semaine 1


+-------+-----+
|semaine|count|
+-------+-----+
|      1|   37|
|     43|   35|
|     25|   33|
|      8|   32|
|     27|   31|
|     44|   31|
|     38|   30|
|     26|   30|
|     11|   29|
|     40|   29|
|     15|   28|
|     28|   28|
|      7|   27|
|     16|   27|
|     18|   27|
|     22|   26|
|     33|   26|
|      6|   25|
|     23|   25|
|     31|   25|
+-------+-----+
only showing top 20 rows



fire_call_in_2018: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [CallType: string, AvailableDtTS: timestamp]
fire_call_in_2018_2: org.apache.spark.sql.DataFrame = [CallType: string, AvailableDtTS: timestamp ... 1 more field]


### Question 8
**Quels sont les quartiers de San Francisco qui ont connu le pire temps de réponse en 2018?**

In [77]:
//Reponse 8
/*
Ecrire ici votre code
*/
//On selectionne les quartier de sans francisco avec les temps de reponse en 2018 et on ordonne les temps de repone par ordre decroisssant 
fireTSDF.select("city","Address","AvailableDtTS","ResponseDelayedinMins")
.filter(year(fireTSDF("AvailableDtTS"))===2018).sort(desc("ResponseDelayedinMins")).show()

+-------------+--------------------+-------------------+---------------------+
|         city|             Address|      AvailableDtTS|ResponseDelayedinMins|
+-------------+--------------------+-------------------+---------------------+
|San Francisco|600 Block of UNIO...|2018-03-18 09:15:56|            491.26666|
|San Francisco|200 Block of MARK...|2018-01-21 04:51:33|            406.63333|
|San Francisco|300 Block of EDDY ST|2018-03-23 02:57:27|            340.48334|
|San Francisco|100 Block of CARL ST|2018-05-21 08:26:24|            175.86667|
|San Francisco|200 Block of WILL...|2018-04-17 18:31:32|                155.8|
|San Francisco|200 Block of THE ...|2018-06-08 15:10:52|            135.51666|
|San Francisco|1900 Block of CAL...|2018-05-03 22:02:46|            129.01666|
|San Francisco|1100 Block of 22N...|2018-03-05 09:58:11|                109.8|
|San Francisco|1500 Block of 7TH...|2018-01-12 20:05:28|            106.13333|
|San Francisco|500 Block of MINN...|2018-04-21 15:20

### Question 9

**Comment stocker les données du Dataframe sous format de fichiers Parquet?**

In [81]:
//Reponse 9
/*
Ecrire ici votre code
*/
fireTSDF.write.parquet("fire.parquet")


org.apache.spark.sql.AnalysisException:  path file:/E:/MASTER IA/programmation_fontionelle/fire.parquet already exists.;

### Question 10
**Comment relire les données stockée en format Parquet?**

In [80]:
//Reponse 10
/*
Ecrire ici votre code
*/
val fireparquetFileDF = spark.read.parquet("fire.parquet")

// Parquet files can also be used to create a temporary view and then used in SQL statements
//parquetFileDF.createOrReplaceTempView("parquetFile")

org.apache.spark.sql.AnalysisException:  Unable to infer schema for Parquet. It must be specified manually.;

## FIN