In [1]:
import findspark 
findspark.init() 

import pyspark 
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import IntegerType

sc = SparkSession.builder.master("spark://192.168.1.109:7077").appName("NYC Traffic").getOrCreate()

sc

In [2]:
accidents = sc.read.option("header", "true").csv('file:///home/jorge/NYPD_Motor_Vehicle_Collisions.csv')


In [3]:
accidents.printSchema()

root
 |-- DATE: string (nullable = true)
 |-- TIME: string (nullable = true)
 |-- BOROUGH: string (nullable = true)
 |-- ZIP CODE: string (nullable = true)
 |-- LATITUDE: string (nullable = true)
 |-- LONGITUDE: string (nullable = true)
 |-- LOCATION: string (nullable = true)
 |-- ON STREET NAME: string (nullable = true)
 |-- CROSS STREET NAME: string (nullable = true)
 |-- OFF STREET NAME: string (nullable = true)
 |-- NUMBER OF PERSONS INJURED: string (nullable = true)
 |-- NUMBER OF PERSONS KILLED: string (nullable = true)
 |-- NUMBER OF PEDESTRIANS INJURED: string (nullable = true)
 |-- NUMBER OF PEDESTRIANS KILLED: string (nullable = true)
 |-- NUMBER OF CYCLIST INJURED: string (nullable = true)
 |-- NUMBER OF CYCLIST KILLED: string (nullable = true)
 |-- NUMBER OF MOTORIST INJURED: string (nullable = true)
 |-- NUMBER OF MOTORIST KILLED: string (nullable = true)
 |-- CONTRIBUTING FACTOR VEHICLE 1: string (nullable = true)
 |-- CONTRIBUTING FACTOR VEHICLE 2: string (nullable = tru

Query #1

In [4]:
accidents.createOrReplaceTempView("accidents")

#sqlAccidents = sc.sql("SELECT SUM(`NUMBER OF PERSONS KILLED`) as KILLS FROM accidents")
#sqlAccidents.show()

accidentsFormatDate = accidents.withColumn("DATE", to_timestamp(col("DATE"), "MM/dd/yyyy"))
accidentsAddNumWeek = accidentsFormatDate.withColumn("week of year", date_format(col("DATE"), "w"))
accidentsQ1 = accidentsAddNumWeek.withColumn("week of year", col("week of year").cast(IntegerType()))
#accidentsQ1.printSchema()

accidentsQ1.createOrReplaceTempView("accidentsQ1")
query1 = sc.sql("SELECT SUM(`NUMBER OF PERSONS KILLED`) AS KILLS, YEAR(DATE) AS YEAR, `week of year` AS WEEK FROM accidentsQ1 WHERE (`NUMBER OF PERSONS KILLED` IS NOT NULL) GROUP BY YEAR(DATE),`week of year` ORDER BY YEAR(DATE),`week of year`")

query1.show(query1.count())

+-----+----+----+
|KILLS|YEAR|WEEK|
+-----+----+----+
|  2.0|2012|   1|
|  3.0|2012|  27|
|  6.0|2012|  28|
|  4.0|2012|  29|
|  7.0|2012|  30|
|  5.0|2012|  31|
|  7.0|2012|  32|
|  1.0|2012|  33|
|  9.0|2012|  34|
|  7.0|2012|  35|
|  2.0|2012|  36|
|  6.0|2012|  37|
|  9.0|2012|  38|
|  7.0|2012|  39|
|  4.0|2012|  40|
|  2.0|2012|  41|
|  8.0|2012|  42|
|  8.0|2012|  43|
|  3.0|2012|  44|
|  2.0|2012|  45|
|  3.0|2012|  46|
|  4.0|2012|  47|
|  3.0|2012|  48|
|  1.0|2012|  49|
|  5.0|2012|  50|
|  9.0|2012|  51|
| 10.0|2012|  52|
| 11.0|2013|   1|
|  4.0|2013|   2|
|  4.0|2013|   3|
|  5.0|2013|   4|
|  9.0|2013|   5|
|  4.0|2013|   6|
|  5.0|2013|   7|
|  5.0|2013|   8|
|  6.0|2013|   9|
|  9.0|2013|  10|
|  5.0|2013|  11|
|  5.0|2013|  12|
|  3.0|2013|  13|
|  7.0|2013|  14|
|  1.0|2013|  15|
|  3.0|2013|  16|
|  3.0|2013|  17|
|  3.0|2013|  18|
|  3.0|2013|  19|
|  6.0|2013|  20|
|  1.0|2013|  21|
|  5.0|2013|  22|
|  4.0|2013|  23|
|  9.0|2013|  24|
|  3.0|2013|  25|
|  7.0|201

In [4]:
query1.createOrReplaceTempView("query1")
total = sc.sql("SELECT SUM(KILLS) FROM query1")
total.show()

+----------+
|sum(KILLS)|
+----------+
|    1173.0|
+----------+



Query #2

In [5]:
accidents.createOrReplaceTempView("accidentsQ2")
f1 = "(SELECT `CONTRIBUTING FACTOR VEHICLE 1` as FACTOR FROM accidentsQ2 WHERE (`CONTRIBUTING FACTOR VEHICLE 1` IS NOT NULL) AND (`CONTRIBUTING FACTOR VEHICLE 1` != 'Unspecified'))"
f2 = "(SELECT `CONTRIBUTING FACTOR VEHICLE 2` as FACTOR FROM accidentsQ2 WHERE (`CONTRIBUTING FACTOR VEHICLE 2` IS NOT NULL) AND (`CONTRIBUTING FACTOR VEHICLE 2` != 'Unspecified'))"
f3 = "(SELECT `CONTRIBUTING FACTOR VEHICLE 3` as FACTOR FROM accidentsQ2 WHERE (`CONTRIBUTING FACTOR VEHICLE 3` IS NOT NULL) AND (`CONTRIBUTING FACTOR VEHICLE 3` != 'Unspecified'))"
f4 = "(SELECT `CONTRIBUTING FACTOR VEHICLE 4` as FACTOR FROM accidentsQ2 WHERE (`CONTRIBUTING FACTOR VEHICLE 4` IS NOT NULL) AND (`CONTRIBUTING FACTOR VEHICLE 4` != 'Unspecified'))"
f5 = "(SELECT `CONTRIBUTING FACTOR VEHICLE 5` as FACTOR FROM accidentsQ2 WHERE (`CONTRIBUTING FACTOR VEHICLE 5` IS NOT NULL) AND (`CONTRIBUTING FACTOR VEHICLE 5` != 'Unspecified'))"

factors = sc.sql(f1 + "UNION"+ f2 + "UNION"+ f3 + "UNION" + f4 + "UNION"+ f5)

factors.createOrReplaceTempView("factorsTable")

u1 = "(SELECT factorsTable.FACTOR, accidentsQ2.`NUMBER OF PERSONS KILLED` FROM factorsTable JOIN accidentsQ2 ON factorsTable.FACTOR=accidentsQ2.`CONTRIBUTING FACTOR VEHICLE 1`)"
u2 = "(SELECT factorsTable.FACTOR, accidentsQ2.`NUMBER OF PERSONS KILLED` FROM factorsTable JOIN accidentsQ2 ON factorsTable.FACTOR=accidentsQ2.`CONTRIBUTING FACTOR VEHICLE 2`)"
u3 = "(SELECT factorsTable.FACTOR, accidentsQ2.`NUMBER OF PERSONS KILLED` FROM factorsTable JOIN accidentsQ2 ON factorsTable.FACTOR=accidentsQ2.`CONTRIBUTING FACTOR VEHICLE 3`)"
u4 = "(SELECT factorsTable.FACTOR, accidentsQ2.`NUMBER OF PERSONS KILLED` FROM factorsTable JOIN accidentsQ2 ON factorsTable.FACTOR=accidentsQ2.`CONTRIBUTING FACTOR VEHICLE 4`)"
u5 = "(SELECT factorsTable.FACTOR, accidentsQ2.`NUMBER OF PERSONS KILLED` FROM factorsTable JOIN accidentsQ2 ON factorsTable.FACTOR=accidentsQ2.`CONTRIBUTING FACTOR VEHICLE 5`)"

factorsAndKills = sc.sql(u1 + "UNION ALL" + u2 + "UNION ALL" + u3 + "UNION ALL" + u4 + "UNION ALL" + u5)

factorsAndKills.createOrReplaceTempView("query2")


query2 = sc.sql("SELECT FACTOR, COUNT(*) as ACCIDENTS, ROUND(AVG(`NUMBER OF PERSONS KILLED`)*100, 5) AS PERCENTAGE FROM query2 GROUP BY FACTOR ORDER BY ACCIDENTS")
query2.show(query2.count())


+--------------------+---------+----------+
|              FACTOR|ACCIDENTS|PERCENTAGE|
+--------------------+---------+----------+
|Shoulders Defecti...|       49|       0.0|
|Windshield Inadeq...|       51|       0.0|
|Driverless/Runawa...|       72|       0.0|
|Headlights Defective|       81|       0.0|
|Other Lighting De...|       90|       0.0|
| Tow Hitch Defective|       97|   1.03093|
|Cell Phone (hand-...|      133|       0.0|
|Cell Phone (hands...|      245|       0.0|
|Accelerator Defec...|      468|       0.0|
|      Animals Action|      554|       0.0|
|     Drugs (Illegal)|      625|      0.48|
|Traffic Control D...|      657|       0.0|
|Lane Marking Impr...|      699|       0.0|
|Pedestrian/Bicycl...|      834|   0.83933|
|    Steering Failure|      927|       0.0|
|Tire Failure/Inad...|      942|   0.10616|
|  Pavement Defective|     1170|   0.42735|
|         Fell Asleep|     1775|    0.4507|
|  Obstruction/Debris|     2113|   0.14198|
|               Glare|     2259|

Query #3

In [6]:
accidentsFormatDate = accidents.withColumn("DATE", to_timestamp(col("DATE"), "MM/dd/yyyy"))
accidentsAddNumWeek = accidentsFormatDate.withColumn("week of year", date_format(col("DATE"), "w"))
accidentsQ3 = accidentsAddNumWeek.withColumn("week of year", col("week of year").cast(IntegerType()))
accidentsQ3.createOrReplaceTempView("accidentsQ3")

boroughs = sc.sql("SELECT DISTINCT BOROUGH from accidentsQ3 WHERE BOROUGH!='0' ORDER BY BOROUGH")

boroughs.show()

+-------------+
|      BOROUGH|
+-------------+
|        BRONX|
|     BROOKLYN|
|    MANHATTAN|
|       QUEENS|
|STATEN ISLAND|
+-------------+



Stats for Bronx

In [7]:
query3A = sc.sql("SELECT YEAR(DATE) AS YEAR, `week of year` AS WEEK, COUNT(*) AS ACCIDENTS, ROUND(AVG(`NUMBER OF PERSONS KILLED`)*100,5) AS `% KILLS` FROM accidentsQ3 WHERE (`NUMBER OF PERSONS KILLED` IS NOT NULL) AND (BOROUGH = 'BRONX') GROUP BY YEAR(DATE),`week of year` ORDER BY YEAR(DATE),`week of year`")

query3A.show(query3A.count())

+----+----+---------+-------+
|YEAR|WEEK|ACCIDENTS|% KILLS|
+----+----+---------+-------+
|2012|   1|       96|    0.0|
|2012|  27|      402|    0.0|
|2012|  28|      366|0.81967|
|2012|  29|      351| 0.2849|
|2012|  30|      381|    0.0|
|2012|  31|      391|    0.0|
|2012|  32|      398|0.25126|
|2012|  33|      343|    0.0|
|2012|  34|      352|0.85227|
|2012|  35|      373|0.53619|
|2012|  36|      341|    0.0|
|2012|  37|      397|    0.0|
|2012|  38|      362|0.27624|
|2012|  39|      360|0.27778|
|2012|  40|      374|    0.0|
|2012|  41|      363|    0.0|
|2012|  42|      366|0.27322|
|2012|  43|      375|0.53333|
|2012|  44|      266|    0.0|
|2012|  45|      374|    0.0|
|2012|  46|      349|    0.0|
|2012|  47|      330|    0.0|
|2012|  48|      337|    0.0|
|2012|  49|      343|    0.0|
|2012|  50|      386|0.25907|
|2012|  51|      390|    0.0|
|2012|  52|      343|0.29155|
|2013|   1|      398|0.75377|
|2013|   2|      353|    0.0|
|2013|   3|      324|    0.0|
|2013|   4

Stats for Brooklyn

In [8]:
query3B = sc.sql("SELECT YEAR(DATE) AS YEAR, `week of year` AS WEEK, COUNT(*) AS ACCIDENTS, ROUND(AVG(`NUMBER OF PERSONS KILLED`)*100,5) AS `% KILLS` FROM accidentsQ3 WHERE (`NUMBER OF PERSONS KILLED` IS NOT NULL) AND (BOROUGH = 'BROOKLYN') GROUP BY YEAR(DATE),`week of year` ORDER BY YEAR(DATE),`week of year`")

query3B.show(query3B.count())

+----+----+---------+-------+
|YEAR|WEEK|ACCIDENTS|% KILLS|
+----+----+---------+-------+
|2012|   1|      218|0.45872|
|2012|  27|      931|0.21482|
|2012|  28|      910|0.10989|
|2012|  29|      832|0.12019|
|2012|  30|      899|0.11123|
|2012|  31|      795|0.12579|
|2012|  32|      923|0.32503|
|2012|  33|      886|    0.0|
|2012|  34|      815| 0.1227|
|2012|  35|      945|0.10582|
|2012|  36|      963|0.10384|
|2012|  37|      931|0.10741|
|2012|  38|      798|0.25063|
|2012|  39|      869|0.23015|
|2012|  40|      814|0.12285|
|2012|  41|      888|    0.0|
|2012|  42|      958|    0.0|
|2012|  43|      891|0.22447|
|2012|  44|      684|    0.0|
|2012|  45|      915|    0.0|
|2012|  46|      980|0.30612|
|2012|  47|      855|    0.0|
|2012|  48|      855|0.23392|
|2012|  49|      984|0.10163|
|2012|  50|      974|0.10267|
|2012|  51|      975|0.41026|
|2012|  52|      810|0.12346|
|2013|   1|      898|0.55679|
|2013|   2|      871|0.11481|
|2013|   3|      794|    0.0|
|2013|   4

Stats for Manhattan

In [9]:
query3C = sc.sql("SELECT YEAR(DATE) AS YEAR, `week of year` AS WEEK, COUNT(*) AS ACCIDENTS, ROUND(AVG(`NUMBER OF PERSONS KILLED`)*100,5) AS `% KILLS` FROM accidentsQ3 WHERE (`NUMBER OF PERSONS KILLED` IS NOT NULL) AND (BOROUGH = 'MANHATTAN') GROUP BY YEAR(DATE),`week of year` ORDER BY YEAR(DATE),`week of year`")

query3C.show(query3C.count())

+----+----+---------+-------+
|YEAR|WEEK|ACCIDENTS|% KILLS|
+----+----+---------+-------+
|2012|   1|      184|    0.0|
|2012|  27|      799|    0.0|
|2012|  28|      776|0.12887|
|2012|  29|      852|    0.0|
|2012|  30|      897|    0.0|
|2012|  31|      832|0.24038|
|2012|  32|      828|0.24155|
|2012|  33|      830|    0.0|
|2012|  34|      757| 0.1321|
|2012|  35|      769|    0.0|
|2012|  36|      793|    0.0|
|2012|  37|      849|0.23557|
|2012|  38|      862|0.34803|
|2012|  39|      814|0.12285|
|2012|  40|      821| 0.1218|
|2012|  41|      799|0.12516|
|2012|  42|      902|    0.0|
|2012|  43|      749|0.13351|
|2012|  44|      504|0.19841|
|2012|  45|      702|0.14245|
|2012|  46|      843|    0.0|
|2012|  47|      672|0.14881|
|2012|  48|      769|    0.0|
|2012|  49|      864|    0.0|
|2012|  50|      830|0.12048|
|2012|  51|      850|0.11765|
|2012|  52|      569|0.17575|
|2013|   1|      679|0.14728|
|2013|   2|      666|    0.0|
|2013|   3|      730|0.13699|
|2013|   4

Stats for Queens

In [10]:
query3D = sc.sql("SELECT YEAR(DATE) AS YEAR, `week of year` AS WEEK, COUNT(*) AS ACCIDENTS, ROUND(AVG(`NUMBER OF PERSONS KILLED`)*100,5) AS `% KILLS` FROM accidentsQ3 WHERE (`NUMBER OF PERSONS KILLED` IS NOT NULL) AND (BOROUGH = 'QUEENS') GROUP BY YEAR(DATE),`week of year` ORDER BY YEAR(DATE),`week of year`")

query3D.show(query3D.count())

+----+----+---------+-------+
|YEAR|WEEK|ACCIDENTS|% KILLS|
+----+----+---------+-------+
|2012|   1|      179|    0.0|
|2012|  27|      771| 0.1297|
|2012|  28|      670|    0.0|
|2012|  29|      752|0.26596|
|2012|  30|      742|0.67385|
|2012|  31|      704|    0.0|
|2012|  32|      729|    0.0|
|2012|  33|      701|    0.0|
|2012|  34|      643|0.31104|
|2012|  35|      733|    0.0|
|2012|  36|      709|    0.0|
|2012|  37|      792|    0.0|
|2012|  38|      711|0.28129|
|2012|  39|      756|0.13228|
|2012|  40|      738| 0.1355|
|2012|  41|      751|0.13316|
|2012|  42|      804|0.24876|
|2012|  43|      641|0.31201|
|2012|  44|      602|0.16611|
|2012|  45|      768|    0.0|
|2012|  46|      757|    0.0|
|2012|  47|      767|0.26076|
|2012|  48|      711|    0.0|
|2012|  49|      815|    0.0|
|2012|  50|      727|    0.0|
|2012|  51|      813|  0.123|
|2012|  52|      649|0.77042|
|2013|   1|      684| 0.1462|
|2013|   2|      695|0.43165|
|2013|   3|      695|0.28777|
|2013|   4

Stats for Staten Island

In [11]:
query3E = sc.sql("SELECT YEAR(DATE) AS YEAR, `week of year` AS WEEK, COUNT(*) AS ACCIDENTS, ROUND(AVG(`NUMBER OF PERSONS KILLED`)*100,5) AS `% KILLS` FROM accidentsQ3 WHERE (`NUMBER OF PERSONS KILLED` IS NOT NULL) AND (BOROUGH = 'STATEN ISLAND') GROUP BY YEAR(DATE),`week of year` ORDER BY YEAR(DATE),`week of year`")

query3E.show(query3E.count())

+----+----+---------+-------+
|YEAR|WEEK|ACCIDENTS|% KILLS|
+----+----+---------+-------+
|2012|   1|       38|    0.0|
|2012|  27|      180|    0.0|
|2012|  28|      166|    0.0|
|2012|  29|      164|    0.0|
|2012|  30|      181|0.55249|
|2012|  31|      184|0.54348|
|2012|  32|      189|    0.0|
|2012|  33|      191|    0.0|
|2012|  34|      185|    0.0|
|2012|  35|      198| 1.0101|
|2012|  36|      181|    0.0|
|2012|  37|      186|    0.0|
|2012|  38|      201|    0.0|
|2012|  39|      191|    0.0|
|2012|  40|      216|0.46296|
|2012|  41|      197|    0.0|
|2012|  42|      240|0.41667|
|2012|  43|      154|    0.0|
|2012|  44|      106| 0.9434|
|2012|  45|      187|0.53476|
|2012|  46|      210|    0.0|
|2012|  47|      190|    0.0|
|2012|  48|      181|    0.0|
|2012|  49|      202|    0.0|
|2012|  50|      178|    0.0|
|2012|  51|      225|0.88889|
|2012|  52|      181|0.55249|
|2013|   1|      141|    0.0|
|2013|   2|      188|    0.0|
|2013|   3|      171|    0.0|
|2013|   4