# Spark Example

## Check if Spark Installed

In [1]:
try:
    import pyspark
    print ("pyspark installed, version", pyspark.__version__)
except ImportError:
    print ("pyspark is not installed!")

pyspark installed, version 3.2.1


## Starting Spark

In [4]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").appName('SparkExample').getOrCreate()

## Reading in Data
* header: If the CSV has a header
* inferSchema: Tell Spark to infer the shcema, otherwise have to specify schema myself. If no schema specified, then everything is set as a string
* na.strings: If there are empty or null strings, then it is helpful to specify them

In [5]:
# Data
# data = [("Java", "20000"), ("Python", "100000"), ("Scala", "3000")]
# columns = ["language","count"]
# df = spark.createDataFrame(data).toDF(*columns)

data = spark.read.option("header",True).option("inferSchema", True).option("na.strings", "null").csv("file:///Users/Sean4/Desktop/COMP-261/Fire_Department_Calls_for_Service.csv")

## Showing Data
* df.show(): Shows the first few lines of the dataset
* df.printSchema(): Prints out the schema of the dataset

In [6]:
# Print DataFrame
data.show(1)

+-----------+-------+---------------+------------+----------+----------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------+-------------+----------------------+--------------------+-----------------+-------------+-------------------+---------+------------+----+-----------------+--------+--------------+--------+---------------+----------------+---------+------------------------------+------------------------+-------------------+------------------------------------+-------------+--------------------+----------------------+
|Call Number|Unit ID|Incident Number|   Call Type| Call Date|Watch Date|       Received DtTm|          Entry DtTm|       Dispatch DtTm|       Response DtTm|       On Scene DtTm|Transport DtTm|Hospital DtTm|Call Final Disposition|      Available DtTm|          Address|         City|Zipcode of Incident|Battalion|Station Area| Box|Original Priority|Priority|Final Priority|ALS Unit|Call Type Group|Number 

In [7]:
data.printSchema()

root
 |-- Call Number: integer (nullable = true)
 |-- Unit ID: string (nullable = true)
 |-- Incident Number: integer (nullable = true)
 |-- Call Type: string (nullable = true)
 |-- Call Date: string (nullable = true)
 |-- Watch Date: string (nullable = true)
 |-- Received DtTm: string (nullable = true)
 |-- Entry DtTm: string (nullable = true)
 |-- Dispatch DtTm: string (nullable = true)
 |-- Response DtTm: string (nullable = true)
 |-- On Scene DtTm: string (nullable = true)
 |-- Transport DtTm: string (nullable = true)
 |-- Hospital DtTm: string (nullable = true)
 |-- Call Final Disposition: string (nullable = true)
 |-- Available DtTm: string (nullable = true)
 |-- Address: string (nullable = true)
 |-- City: string (nullable = true)
 |-- Zipcode of Incident: integer (nullable = true)
 |-- Battalion: string (nullable = true)
 |-- Station Area: string (nullable = true)
 |-- Box: string (nullable = true)
 |-- Original Priority: string (nullable = true)
 |-- Priority: string (nullable

## Exploratory Data Analysis

In [8]:
data.count(), len(data.columns)

(6098235, 35)

In [9]:
data.describe(["City","Number of Alarms"]).show()

+-------+-----------+-------------------+
|summary|       City|   Number of Alarms|
+-------+-----------+-------------------+
|  count|    6089196|            6098235|
|   mean|       null|  1.004956680088583|
| stddev|       null|0.09595203175420591|
|    min|         AI|                  1|
|    max|Yerba Buena|                  5|
+-------+-----------+-------------------+



In [10]:
data.groupBy("City").count().orderBy("count", ascending=False).show(10)

+-------------+-------+
|         City|  count|
+-------------+-------+
|           SF|3351970|
|San Francisco|2622094|
|SAN FRANCISCO|  42631|
|     Presidio|  14091|
|           TI|  13783|
|Treasure Isla|  12840|
|           PR|   9949|
|          SFO|   9399|
|         null|   9039|
|  Yerba Buena|   2261|
+-------------+-------+
only showing top 10 rows



## Filtering Data

In [86]:
print(data.count())
condition1 = (data.Priority.isNotNull()) | (data["Final Priority"].isNotNull())
condition2 = data.City != "SAN FRANCISCO"
data = data.filter(condition1).filter(condition2)
data = data.na.drop("any")
print(data.count())

6046565
832949


## Machine Learning

In [11]:
# Prime the data
from pyspark.sql.types import DoubleType
data2 = data.withColumn("Incident Number", data["Incident Number"].cast(DoubleType()))
data2 = data.withColumn("Number of Alarms", data["Number of Alarms"].cast(DoubleType()))
data2 = data.withColumn("Unit sequence in call dispatch", data["Unit sequence in call dispatch"].cast(DoubleType()))
data2 = data.withColumn("Zipcode of Incident", data["Zipcode of Incident"].cast(DoubleType()))
condition = (data2["Incident Number"].isNotNull()) | (data2["Zipcode of Incident"].isNotNull()) | (data2["Number of Alarms"].isNotNull()) | (data2["Unit sequence in call dispatch"].isNotNull())
data2 = data2.na.drop("any")
data2 = data2.filter(condition).filter(condition2)

NameError: name 'condition2' is not defined

In [None]:
from pyspark.ml.feature import VectorAssembler
inputcols = ["Incident Number",  "Number of Alarms", "Unit sequence in call dispatch"]
assembler = VectorAssembler(inputCols= inputcols,
                            outputCol = "predictors")

In [12]:
predictors = assembler.transform(data2)
predictors.columns

NameError: name 'assembler' is not defined

In [13]:
model_data = predictors.select("predictors", "Zipcode of Incident")
model_data.show(5,truncate=False)
train_data,test_data = model_data.randomSplit([0.8,0.2])

NameError: name 'predictors' is not defined

In [92]:
from pyspark.ml.regression import LinearRegression
lr = LinearRegression(
    featuresCol = 'predictors', 
    labelCol = 'Zipcode of Incident')
lrModel = lr.fit(train_data)
pred = lrModel.evaluate(test_data)

In [93]:
pred.predictions.show(5)



+--------------------+-------------------+-----------------+
|          predictors|Zipcode of Incident|       prediction|
+--------------------+-------------------+-----------------+
|[2.0100387E7,1.0,...|            94109.0|94112.16053474919|
|[2.0100397E7,1.0,...|            94110.0|94113.83330224053|
|[2.0100414E7,1.0,...|            94133.0|94113.83330321994|
|[2.0100432E7,1.0,...|            94124.0|94112.16053734176|
|[2.010044E7,1.0,2.0]|            94114.0|94113.83330471787|
+--------------------+-------------------+-----------------+
only showing top 5 rows



## Writing Data to a file

In [None]:
# data.write.option("header","true").csv("/tmp/spark_output/Fire_Department_Calls_for_Service.csv")