# San Francisco Fire Calls

Ce cahier est un exemple de comment utiliser DataFrame et Spark SQL pour les  analyse de données courants.




 Dataset : [San Francisco Fire Department Calls ](https://data.sfgov.org/Public-Safety/Fire-Department-Calls-for-Service/nuek-vuh3) 

In [0]:
from pyspark.sql.types import *
from pyspark.sql.functions import *

sf_fire_file = "/databricks-datasets/learning-spark-v2/sf-fire/sf-fire-calls.csv"

Définir notre schéma car le fichier comporte 4 millions d'enregistrements. Inférer le schéma est coûteux pour les fichiers volumineux.


In [0]:
fire_schema = StructType([StructField('CallNumber', IntegerType(), True),
                     StructField('UnitID', StringType(), True),
                     StructField('IncidentNumber', IntegerType(), True),
                     StructField('CallType', StringType(), True),                  
                     StructField('CallDate', StringType(), True),      
                     StructField('WatchDate', StringType(), True),
                     StructField('CallFinalDisposition', StringType(), True),
                     StructField('AvailableDtTm', StringType(), True),
                     StructField('Address', StringType(), True),       
                     StructField('City', StringType(), True),       
                     StructField('Zipcode', IntegerType(), True),       
                     StructField('Battalion', StringType(), True),                 
                     StructField('StationArea', StringType(), True),       
                     StructField('Box', StringType(), True),       
                     StructField('OriginalPriority', StringType(), True),       
                     StructField('Priority', StringType(), True),       
                     StructField('FinalPriority', IntegerType(), True),       
                     StructField('ALSUnit', BooleanType(), True),       
                     StructField('CallTypeGroup', StringType(), True),
                     StructField('NumAlarms', IntegerType(), True),
                     StructField('UnitType', StringType(), True),
                     StructField('UnitSequenceInCallDispatch', IntegerType(), True),
                     StructField('FirePreventionDistrict', StringType(), True),
                     StructField('SupervisorDistrict', StringType(), True),
                     StructField('Neighborhood', StringType(), True),
                     StructField('Location', StringType(), True),
                     StructField('RowID', StringType(), True),
                     StructField('Delay', FloatType(), True)])

In [0]:
fire_df = spark.read.csv(sf_fire_file, header=True, schema=fire_schema)

Mettre en cache le DataFrame car nous allons effectuer certaines opérations dessus.

In [0]:
fire_df.cache()

In [0]:
fire_df.count()

In [0]:
fire_df.printSchema()

In [0]:
display(fire_df.limit(5))

Filtrer les types d'appels "Incident médical".

Notez que les méthodes filter() et where() sur le DataFrame sont similaires. Consultez la documentation pertinente pour connaître les types d'arguments respectifs.

In [0]:
few_fire_df = (fire_df
               .select("IncidentNumber", "AvailableDtTm", "CallType")
               .where(col("CallType") != "Medical Incident"))

few_fire_df.show(5, truncate=False)

**Q-1) Combien de types d'appels distincts ont été passés au service des pompiers ?**

Pour être sûr, ne comptons pas les chaînes "null" dans cette colonne.

In [0]:
fire_df.select("CallType").where(col("CallType").isNotNull()).distinct().count()

**Q-2) Quels sont les types d'appels distincts passés au service des pompiers ?**

Voici tous les types d'appels distincts passés au département des pompiers de San Francisco.

In [0]:
fire_df.select("CallType").where(col("CallType").isNotNull()).distinct().show(10, False)

**Q-3) Trouvez tous les délais de réponse ou retards supérieurs à 5 minutes :**

1. Renommez la colonne "Delay" en "ReponseDelayedinMins".
2. Renvoie un nouveau DataFrame.
3. Trouvez tous les appels où le temps de réponse sur le site d'incendie a été retardé de plus de 5 minutes.

In [0]:
new_fire_df = fire_df.withColumnRenamed("Delay", "ResponseDelayedinMins")
new_fire_df.select("ResponseDelayedinMins").where(col("ResponseDelayedinMins") > 5).show(5, False)

Effectuons quelques opérations ETL (Extract, Transform, Load) :

Transformez les dates au format chaîne de caractères en type de données Spark Timestamp afin de pouvoir effectuer ultérieurement des requêtes basées sur le temps.
Renvoyez une requête transformée.
Mettez en cache le nouveau DataFrame.

In [0]:
fire_ts_df = (new_fire_df
              .withColumn("IncidentDate", to_timestamp(col("CallDate"), "MM/dd/yyyy")).drop("CallDate") 
              .withColumn("OnWatchDate",   to_timestamp(col("WatchDate"), "MM/dd/yyyy")).drop("WatchDate")
              .withColumn("AvailableDtTS", to_timestamp(col("AvailableDtTm"), "MM/dd/yyyy hh:mm:ss a")).drop("AvailableDtTm"))          

In [0]:
fire_ts_df.cache()
fire_ts_df.columns

Vérifiez les colonnes transformées avec le type de données Spark Timestamp.

In [0]:
fire_ts_df.select("IncidentDate", "OnWatchDate", "AvailableDtTS").show(5, False)

**Q-4) Quels étaient les types d'appels les plus fréquents ?**

Énumérez-les par ordre décroissant.

In [0]:
(fire_ts_df
 .select("CallType").where(col("CallType").isNotNull())
 .groupBy("CallType")
 .count()
 .orderBy("count", ascending=False)
 .show(n=10, truncate=False))

**Q-4a) Quels codes postaux étaient responsables des appels les plus fréquents ?**

Investigons quels codes postaux de San Francisco ont généré le plus d'appels d'incendie et quel type d'appels était associé.

1. Filtrer par CallType.
2. Regrouper par CallType et code postal.
3. Les compter et les afficher par ordre décroissant.

Il semble que les appels les plus fréquents étaient tous liés à des incidents médicaux, et les deux codes postaux sont 94102 et 94103.

In [0]:
(fire_ts_df
 .select("CallType", "ZipCode")
 .where(col("CallType").isNotNull())
 .groupBy("CallType", "Zipcode")
 .count()
 .orderBy("count", ascending=False)
 .show(10, truncate=False))

**Q-4b) Quels quartiers de San Francisco sont associés aux codes postaux 94102 et 94103 ?**

Découvrons les quartiers associés à ces deux codes postaux. Il est fort probable que ce soient des quartiers contestés avec un nombre élevé de crimes signalés.

In [0]:
fire_ts_df.select("Neighborhood", "Zipcode").where((col("Zipcode") == 94102) | (col("Zipcode") == 94103)).distinct().show(10, truncate=False)

**Q-5) Quelle était la somme de tous les appels, la moyenne, le minimum et le maximum des temps de réponse pour les appels ?**

Utilisons les fonctions intégrées de Spark SQL pour calculer la somme, la moyenne, le minimum et le maximum de quelques colonnes :

* Nombre total d'alarmes
* Quels étaient le minimum et le maximum du retard dans le temps de réponse avant l'arrivée du service des incendies sur le lieu de l'appel

In [0]:
fire_ts_df.select(sum("NumAlarms"), avg("ResponseDelayedinMins"), min("ResponseDelayedinMins"), max("ResponseDelayedinMins")).show()

**Q-6a) Combien d'années distinctes de données sont présentes dans le fichier CSV ?**

Nous pouvons utiliser la fonction SQL Spark `year()` sur le type de données de colonne de type Timestamp, IncidentDate.

En tout, nous avons des appels d'incendie des années 2000 à 2018.

In [0]:
fire_ts_df.select(year("IncidentDate")).distinct().orderBy(year("IncidentDate")).show()

**Q-6b) Quelle semaine de l'année 2018 a enregistré le plus d'appels d'incendie ?**

**Note** : La semaine 1 est la semaine du Nouvel An et la semaine 25 est la semaine du 4 juillet. Il y a beaucoup de feux d'artifice, il est donc logique qu'il y ait un plus grand nombre d'appels.

In [0]:
fire_ts_df.filter(year("IncidentDate") == 2018).groupBy(weekofyear("IncidentDate")).count().orderBy("count", ascending=False).show()

**Q-7) Quels quartiers de San Francisco ont eu les pires temps de réponse en 2018 ?**

Il semble que si vous habitez à Presidio Heights, le service des incendies est arrivé en moins de 3 minutes, tandis que Mission Bay a pris plus de 6 minutes.

In [0]:
fire_ts_df.select("Neighborhood", "ResponseDelayedinMins").filter(year("IncidentDate") == 2018).show(10, False)

**Q-8a) Comment pouvons-nous utiliser des fichiers Parquet ou une table SQL pour stocker des données et les relire ?**

In [0]:
fire_ts_df.write.format("parquet").mode("overwrite").save("/tmp/fireServiceParquet/")

In [0]:
%fs ls /tmp/fireServiceParquet/

**Q-8b) Comment pouvons-nous utiliser une table SQL Parquet pour stocker des données et les relire ?**

In [0]:
fire_ts_df.write.format("parquet").mode("overwrite").saveAsTable("FireServiceCalls")

In [0]:
%sql
CACHE TABLE FireServiceCalls

In [0]:
%sql
SELECT * FROM FireServiceCalls LIMIT 10

**Q-8c) Comment lire des données depuis un fichier Parquet ?**

Notez que nous n'avons pas besoin de spécifier le schéma ici car il est stocké en tant que métadonnée Parquet.

In [0]:
file_parquet_df = spark.read.format("parquet").load("/tmp/fireServiceParquet/")

In [0]:
display(file_parquet_df.limit(10))