In [None]:
!pip install pyspark

In [None]:
from pyspark import SparkContext, SparkConf 
from pyspark.sql import SparkSession, SQLContext

from pyspark.sql.types import *
from pyspark.sql.functions import col, to_date

import matplotlib.pyplot as plt
from matplotlib import figure
import folium


import re
import os
from folium.plugins import FloatImage
from folium.plugins import HeatMap

from datetime import datetime

from IPython.display import display
import matplotlib
import matplotlib.pyplot as plt

In [None]:
spark = SparkSession.builder.master("local").appName("Project Mathieu Lepoutre Data Mining").getOrCreate()

sc = spark.sparkContext
sc

In [None]:
firesschema = StructType([ \
    StructField("OBJECTID",IntegerType(),nullable=True), \
    StructField("FIRE_NAME",StringType(),True), \
    StructField("FIRE_YEAR",IntegerType(),True), \
    StructField("DISCOVERY_DATE", StringType(), True), \
    StructField("DISCOVERY_TIME", StringType(), True), \
    StructField("STAT_CAUSE_DESCR", StringType(), True), \
    StructField("FIRE_SIZE",StringType(),True), \
    StructField("FIRE_SIZE_CLASS", StringType(), True), \
    StructField("LATITUDE", FloatType(), True), \
    StructField("LONGITUDE", FloatType(), True), \
    StructField("STATE", StringType(), True), \
    StructField("COUNTY", StringType(), nullable=True), \


  ])



In [None]:
firesDF = spark.read.option("header","true").csv(path="../input/firesdata", schema= firesschema)

In [None]:
firesDF.show()

In [None]:
firesDF.count()

In [None]:
fireyear = firesDF.filter("FIRE_YEAR is not NULL").groupBy("FIRE_YEAR").count().sort(col("count").desc())
fireyear.show()

In [None]:
xas1 = fireyear.filter("FIRE_YEAR is not NULL").select("FIRE_YEAR").rdd.flatMap(lambda x: x).collect()
yas1 = fireyear.filter("count is not NULL").select("count").rdd.flatMap(lambda x: x).collect()

In [None]:
fig1 = plt.figure(figsize=(10, 7))
fig1.suptitle('Fire Year', fontsize=14)
ax = fig1.add_subplot(111)
ax.set_xlabel('Fire Year')
ax.set_ylabel('Count')
ax.bar(xas1, yas1, fc='darksalmon', align='center')
plt.show()

In [None]:
firesize = firesDF.filter("FIRE_YEAR is not NULL").groupBy("FIRE_SIZE").count().sort(col("count").desc())
firesize.show()

In [None]:
firestate = firesDF.filter("FIRE_YEAR is not NULL").groupBy("STATE").count().sort(col("count").desc())
firestate.show()

In [None]:
xas2 = firestate.filter("STATE is not NULL").select("STATE").rdd.flatMap(lambda x: x).collect()
yas2 = firestate.filter("count is not NULL").select("count").rdd.flatMap(lambda x: x).collect()

In [None]:
fig1 = plt.figure(figsize=(25, 14))
fig1.suptitle('In which state do the most fires occur?', fontsize=14)
ax = fig1.add_subplot(111)
ax.set_xlabel('State')
ax.set_ylabel('Count')
ax.bar(xas2, yas2, fc='darksalmon', align='center')
plt.show()

In [None]:
firecause = firesDF.filter("FIRE_YEAR is not NULL").groupBy("STAT_CAUSE_DESCR").count().sort(col("count").desc())
firecause.show()

In [None]:
xas3 = firecause.filter("STAT_CAUSE_DESCR is not NULL").select("STAT_CAUSE_DESCR").rdd.flatMap(lambda x: x).collect()
yas3 = firecause.filter("count is not NULL").select("count").rdd.flatMap(lambda x: x).collect()

In [None]:
fig1 = plt.figure(figsize=(22, 15))
fig1.suptitle('Cause of the fire', fontsize=14)
ax = fig1.add_subplot(111)
ax.set_xlabel('Cause')
ax.set_ylabel('Count')
ax.bar(xas3, yas3, fc='darksalmon', align='center')
plt.show()

In [None]:
latLongDF = firesDF.select("LATITUDE", "LONGITUDE")

In [None]:
latLongDF.show(10)

In [None]:
firesmap = folium.Map(location=[37.796661, -102.678676], zoom_start=5)

In [None]:
firesmap

In [None]:
def convert_row(row):
    try:
        return [row.LATITUDE, row.LONGITUDE]
    except:
        pass

mapResult = latLongDF.rdd.map(lambda r: convert_row(r)).collect()

In [None]:
totals = mapResult[:1000]
totals

In [None]:
for total in totals:
    try:
        folium.CircleMarker(location=[total[0], total[1]], radius=10).add_to(firesmap)
    except:
        pass

firesmap

In [None]:
databaseDF = spark.read.option("header","true").csv(path="../input/firefighter-fatalities")

In [None]:
databaseDF.show()

In [None]:
import pyspark.sql.functions as sql
numberofdeaths = databaseDF.select('Date of Death', sql.split('Date of Death', ', ').alias('year'))
df_sizes = numberofdeaths.select(sql.size('year').alias('col2'))
df_max = df_sizes.agg(sql.max('col2'))
nb_columns = df_max.collect()[0][0]
df_result = numberofdeaths.select('Date of Death', *[numberofdeaths['year'][i] for i in range(nb_columns)])
numberofdeaths = df_result.groupBy(df_result["year[1]"]).count().select('year[1]', sql.col('count').alias('numberOfDeath')).sort(col("numberOfDeath").desc())
numberofdeaths.show()

In [None]:
combine = fireyear.join(numberofdeaths, fireyear["FIRE_YEAR"] == numberofdeaths["year[1]"])

In [None]:
combine.show()

In [None]:
yas4 = combine.filter("count is not NULL").select("count").rdd.flatMap(lambda x: x).collect()
xas4 = combine.filter("numberOfDeath is not NULL").select("numberOfDeath").rdd.flatMap(lambda x: x).collect()

In [None]:
fig1 = plt.figure(figsize=(30, 5))
fig1.suptitle('Number of Fatalities Per Count of Fires in a Year', fontsize=14)
ax = fig1.add_subplot(111)
ax.set_xlabel('Fatalities')
ax.set_ylabel('Count of Fires')
ax.bar(xas4, yas4, fc='darksalmon', align='center')
plt.show()