In [1]:
import pyspark
from pyspark.sql.session import SparkSession
from pyspark.sql.functions import col, sum as spark_sum
from pyspark.sql.functions import count
from pyspark.sql.functions import broadcast
from pyspark.sql.functions import to_date
from pyspark.sql.types import IntegerType

In [2]:
spark_csv = SparkSession.builder.getOrCreate()
speed_camera_df = spark_csv.read.csv("/home/azureuser/Desktop/Datasources/Speed_Camera_Violations_20240226.csv", 
                                     header=True)
print("Speed Camera Violations Data:")
speed_camera_df.show()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/02/29 03:42:08 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Speed Camera Violations Data:
+---------------+---------+--------------+----------+------------+------------+--------+---------+--------+
|        ADDRESS|CAMERA ID|VIOLATION DATE|VIOLATIONS|X COORDINATE|Y COORDINATE|LATITUDE|LONGITUDE|LOCATION|
+---------------+---------+--------------+----------+------------+------------+--------+---------+--------+
|1111 N HUMBOLDT|   CHI010|    04/10/2015|        67|        NULL|        NULL|    NULL|     NULL|    NULL|
|1111 N HUMBOLDT|   CHI010|    04/25/2015|        71|        NULL|        NULL|    NULL|     NULL|    NULL|
|1111 N HUMBOLDT|   CHI010|    04/14/2015|        38|        NULL|        NULL|    NULL|     NULL|    NULL|
|1111 N HUMBOLDT|   CHI010|    04/16/2015|        55|        NULL|        NULL|    NULL|     NULL|    NULL|
|1111 N HUMBOLDT|   CHI010|    04/24/2015|        54|        NULL|        NULL|    NULL|     NULL|    NULL|
| 5520 S WESTERN|   CHI069|    04/26/2015|        35|        NULL|        NULL|    NULL|     NULL|    NULL

In [3]:
# Check the number of rows
num_rows = speed_camera_df.count()
print("Number of rows in Speed Camera Violations Data:", num_rows)

# Check the number of columns
num_columns = len(speed_camera_df.columns)
print("Number of columns in Speed Camera Violations Data:", num_columns)

Number of rows in Speed Camera Violations Data: 372708
Number of columns in Speed Camera Violations Data: 9


In [4]:
speed_camera_df.dtypes

[('ADDRESS', 'string'),
 ('CAMERA ID', 'string'),
 ('VIOLATION DATE', 'string'),
 ('VIOLATIONS', 'string'),
 ('X COORDINATE', 'string'),
 ('Y COORDINATE', 'string'),
 ('LATITUDE', 'string'),
 ('LONGITUDE', 'string'),
 ('LOCATION', 'string')]

In [5]:
speed_camera_df = speed_camera_df.withColumn("VIOLATION DATE", to_date("VIOLATION DATE", "MM/dd/yyyy"))
speed_camera_df = speed_camera_df.withColumn("VIOLATIONS", speed_camera_df["VIOLATIONS"].cast(IntegerType()))
speed_camera_df = speed_camera_df.withColumn("X COORDINATE", col("X COORDINATE").cast("double"))
speed_camera_df = speed_camera_df.withColumn("Y COORDINATE", col("Y COORDINATE").cast("double"))
speed_camera_df = speed_camera_df.withColumn("LATITUDE", col("LATITUDE").cast("double"))
speed_camera_df = speed_camera_df.withColumn("LONGITUDE", col("LONGITUDE").cast("double"))
speed_camera_df.printSchema()

root
 |-- ADDRESS: string (nullable = true)
 |-- CAMERA ID: string (nullable = true)
 |-- VIOLATION DATE: date (nullable = true)
 |-- VIOLATIONS: integer (nullable = true)
 |-- X COORDINATE: double (nullable = true)
 |-- Y COORDINATE: double (nullable = true)
 |-- LATITUDE: double (nullable = true)
 |-- LONGITUDE: double (nullable = true)
 |-- LOCATION: string (nullable = true)



In [6]:
# Display the number of NA values in each column
speedcamera_na_counts = speed_camera_df.select(*(spark_sum(col(c).isNull().cast("int")).alias(c) 
                                                for c in speed_camera_df.columns))
print("Number of NA values in each column in Speed Camera Violations:")
speedcamera_na_counts.show()

Number of NA values in each column in Speed Camera Violations:




+-------+---------+--------------+----------+------------+------------+--------+---------+--------+
|ADDRESS|CAMERA ID|VIOLATION DATE|VIOLATIONS|X COORDINATE|Y COORDINATE|LATITUDE|LONGITUDE|LOCATION|
+-------+---------+--------------+----------+------------+------------+--------+---------+--------+
|      0|        5|             0|         0|       13932|       13932|   13932|    13932|   13932|
+-------+---------+--------------+----------+------------+------------+--------+---------+--------+



                                                                                

In [7]:
speed_camera_df = speed_camera_df.withColumnRenamed("VIOLATIONS", "SPEED CAMERA VIOLATIONS")
speed_camera_df.show()

+---------------+---------+--------------+-----------------------+------------+------------+--------+---------+--------+
|        ADDRESS|CAMERA ID|VIOLATION DATE|SPEED CAMERA VIOLATIONS|X COORDINATE|Y COORDINATE|LATITUDE|LONGITUDE|LOCATION|
+---------------+---------+--------------+-----------------------+------------+------------+--------+---------+--------+
|1111 N HUMBOLDT|   CHI010|    2015-04-10|                     67|        NULL|        NULL|    NULL|     NULL|    NULL|
|1111 N HUMBOLDT|   CHI010|    2015-04-25|                     71|        NULL|        NULL|    NULL|     NULL|    NULL|
|1111 N HUMBOLDT|   CHI010|    2015-04-14|                     38|        NULL|        NULL|    NULL|     NULL|    NULL|
|1111 N HUMBOLDT|   CHI010|    2015-04-16|                     55|        NULL|        NULL|    NULL|     NULL|    NULL|
|1111 N HUMBOLDT|   CHI010|    2015-04-24|                     54|        NULL|        NULL|    NULL|     NULL|    NULL|
| 5520 S WESTERN|   CHI069|    2

In [8]:
speedcamera_aggregated_df = speed_camera_df.groupBy("VIOLATION DATE").agg(count("*").alias("SPEED_CAMERA_VIOLATION_COUNT"))
print("Aggregated Speed Camera Violations:")
speedcamera_aggregated_df.show()

num_rows = speedcamera_aggregated_df.count()
print("Number of rows in Aggregated Speed Camera Violations Data:", num_rows)

Aggregated Speed Camera Violations:


                                                                                

+--------------+----------------------------+
|VIOLATION DATE|SPEED_CAMERA_VIOLATION_COUNT|
+--------------+----------------------------+
|    2014-11-12|                         111|
|    2014-09-26|                         117|
|    2015-03-09|                         135|
|    2021-06-22|                         141|
|    2017-08-11|                          71|
|    2019-06-04|                         145|
|    2017-09-11|                         144|
|    2020-08-24|                          70|
|    2018-05-28|                          75|
|    2021-10-11|                          83|
|    2015-05-19|                         142|
|    2021-01-27|                          73|
|    2016-03-01|                         132|
|    2019-05-08|                         154|
|    2018-08-10|                          72|
|    2021-11-13|                          77|
|    2021-08-27|                          93|
|    2015-03-06|                         136|
|    2016-04-25|                  

In [9]:
spark_maria = SparkSession.builder \
    .config("spark.driver.extraClassPath", "mariadb-java-client-3.3.2.jar") \
    .getOrCreate()

server = "localhost"
port = 3306
database = "redlight_violations"
jdbc_url = f"jdbc:mysql://{server}:{port}/{database}?permitMysqlScheme"
jdbc_driver = "org.mariadb.jdbc.Driver"
properties = {
    "user": "root",
    "password": "root",
    "driver": jdbc_driver
}

red_light_df = spark_maria.read.jdbc(jdbc_url, "(select * from redlight_views) tab", properties=properties)
print("RedLight Camera Violations Data:")
red_light_df.show()

24/02/29 03:42:20 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


RedLight Camera Violations Data:


[Stage 18:>                                                         (0 + 1) / 1]

+--------------------+---------+--------------+----------+-----------------+-----------------+------------+-------------+--------------------+
|             ADDRESS|CAMERA ID|VIOLATION DATE|VIOLATIONS|     X COORDINATE|     Y COORDINATE|    LATITUDE|    LONGITUDE|            LOCATION|
+--------------------+---------+--------------+----------+-----------------+-----------------+------------+-------------+--------------------+
|4700 W IRVING PAR...|     2763|    04/09/2015|         4|             NULL|             NULL|        NULL|         NULL|                    |
|2400 W VAN BUREN ...|     2054|    04/14/2015|         5|             NULL|             NULL|        NULL|         NULL|                    |
|11500 S HALSTED S...|     2552|    04/08/2015|         5|             NULL|             NULL|        NULL|         NULL|                    |
|4700 W IRVING PAR...|     2764|    04/19/2015|         4|             NULL|             NULL|        NULL|         NULL|                    |

                                                                                

In [10]:
# Check the number of rows
num_rows = red_light_df.count()
print("Number of rows in RedLight Camera Violations Data:", num_rows)

# Check the number of columns
num_columns = len(red_light_df.columns)
print("Number of columns in RedLight Camera Violations Data:", num_columns)

Number of rows in RedLight Camera Violations Data: 922510
Number of columns in RedLight Camera Violations Data: 9


In [11]:
red_light_df.dtypes

[('ADDRESS', 'string'),
 ('CAMERA ID', 'int'),
 ('VIOLATION DATE', 'string'),
 ('VIOLATIONS', 'int'),
 ('X COORDINATE', 'double'),
 ('Y COORDINATE', 'double'),
 ('LATITUDE', 'double'),
 ('LONGITUDE', 'double'),
 ('LOCATION', 'string')]

In [12]:
red_light_df = red_light_df.withColumn("VIOLATION DATE", to_date("VIOLATION DATE", "MM/dd/yyyy"))
red_light_df.printSchema()

root
 |-- ADDRESS: string (nullable = true)
 |-- CAMERA ID: integer (nullable = true)
 |-- VIOLATION DATE: date (nullable = true)
 |-- VIOLATIONS: integer (nullable = true)
 |-- X COORDINATE: double (nullable = true)
 |-- Y COORDINATE: double (nullable = true)
 |-- LATITUDE: double (nullable = true)
 |-- LONGITUDE: double (nullable = true)
 |-- LOCATION: string (nullable = true)



In [13]:
# Display the number of NA values in each column
redlight_na_counts = red_light_df.select(*(spark_sum(col(c).isNull().cast("int")).alias(c) 
                                                for c in red_light_df.columns))
print("Number of NA values in each column in RedLight Camera Violations:")
redlight_na_counts.show()

Number of NA values in each column in RedLight Camera Violations:


[Stage 22:>                                                         (0 + 1) / 1]

+-------+---------+--------------+----------+------------+------------+--------+---------+--------+
|ADDRESS|CAMERA ID|VIOLATION DATE|VIOLATIONS|X COORDINATE|Y COORDINATE|LATITUDE|LONGITUDE|LOCATION|
+-------+---------+--------------+----------+------------+------------+--------+---------+--------+
|      0|      289|             0|         0|       48102|       48102|   48102|    48102|       0|
+-------+---------+--------------+----------+------------+------------+--------+---------+--------+



                                                                                

In [14]:
red_light_df = red_light_df.withColumnRenamed("VIOLATIONS", "REDLIGHT CAMERA VIOLATIONS")
red_light_df.show()

[Stage 25:>                                                         (0 + 1) / 1]

+--------------------+---------+--------------+--------------------------+-----------------+-----------------+------------+-------------+--------------------+
|             ADDRESS|CAMERA ID|VIOLATION DATE|REDLIGHT CAMERA VIOLATIONS|     X COORDINATE|     Y COORDINATE|    LATITUDE|    LONGITUDE|            LOCATION|
+--------------------+---------+--------------+--------------------------+-----------------+-----------------+------------+-------------+--------------------+
|4700 W IRVING PAR...|     2763|    2015-04-09|                         4|             NULL|             NULL|        NULL|         NULL|                    |
|2400 W VAN BUREN ...|     2054|    2015-04-14|                         5|             NULL|             NULL|        NULL|         NULL|                    |
|11500 S HALSTED S...|     2552|    2015-04-08|                         5|             NULL|             NULL|        NULL|         NULL|                    |
|4700 W IRVING PAR...|     2764|    2015-04-19

                                                                                

In [15]:
redlightcamera_aggregated_df = red_light_df.groupBy("VIOLATION DATE").agg(count("*").alias("REDLIGHT_CAMERA_VIOLATION_COUNT"))
print("Aggregated Speed Camera Violations:")
redlightcamera_aggregated_df.show()

num_rows = redlightcamera_aggregated_df.count()
print("Number of rows in Aggregated RedLight Camera Violations Data:", num_rows)

Aggregated Speed Camera Violations:


                                                                                

+--------------+-------------------------------+
|VIOLATION DATE|REDLIGHT_CAMERA_VIOLATION_COUNT|
+--------------+-------------------------------+
|    2014-11-12|                            270|
|    2014-09-26|                            315|
|    2019-05-08|                            260|
|    2017-08-11|                            260|
|    2018-05-28|                            262|
|    2015-05-19|                            261|
|    2016-03-01|                            246|
|    2015-03-09|                            254|
|    2019-06-04|                            262|
|    2018-08-10|                            280|
|    2020-08-24|                            259|
|    2017-09-11|                            255|
|    2021-06-22|                            273|
|    2021-11-13|                            264|
|    2021-01-27|                            231|
|    2021-10-11|                            247|
|    2021-08-27|                            276|
|    2021-12-18|    

[Stage 29:>                                                         (0 + 1) / 1]

Number of rows in Aggregated RedLight Camera Violations Data: 3513


                                                                                

In [16]:
# Joined Violation Count DataFrame
violations_count_df = redlightcamera_aggregated_df.join(speedcamera_aggregated_df, "VIOLATION DATE", "inner")
print("Joined Violations Count DataFrame:")
violations_count_df = violations_count_df.orderBy("VIOLATION DATE")
violations_count_df.show()

num_rows = violations_count_df.count()
print("Number of rows in Total Aggregated Violations Count Data:", num_rows)

Joined Violations Count DataFrame:


                                                                                

+--------------+-------------------------------+----------------------------+
|VIOLATION DATE|REDLIGHT_CAMERA_VIOLATION_COUNT|SPEED_CAMERA_VIOLATION_COUNT|
+--------------+-------------------------------+----------------------------+
|    2014-07-01|                            286|                          95|
|    2014-07-02|                            279|                          92|
|    2014-07-03|                            300|                          94|
|    2014-07-04|                            293|                          62|
|    2014-07-05|                            287|                          63|
|    2014-07-06|                            284|                          60|
|    2014-07-07|                            285|                          97|
|    2014-07-08|                            289|                          98|
|    2014-07-09|                            284|                          97|
|    2014-07-10|                            298|                

[Stage 41:>                                                         (0 + 1) / 1]

Number of rows in Total Aggregated Violations Count Data: 3513


                                                                                

In [17]:
spark_mongo = SparkSession.builder \
    .appName("MongoDBMflixAnalysis") \
    .config("spark.mongodb.input.uri", f"mongodb+srv://admin:admin@cluster0.xn5enkc.mongodb.net/dailytraffic.avgcount") \
    .config("spark.mongodb.output.uri", f"mongodb+srv://admin:admin@cluster0.xn5enkc.mongodb.net/dailytraffic.avgcount") \
    .config("spark.jars.packages", "org.mongodb.spark:mongo-spark-connector_2.12:3.0.1") \
    .getOrCreate()

avg_traffic_count_df = spark_mongo.read.format("com.mongodb.spark.sql.DefaultSource") \
    .option("uri", "mongodb+srv://admin:admin@cluster0.xn5enkc.mongodb.net/dailytraffic.avgcount") \
    .load()
print("Avg Daily Traffic Counts Data:")
avg_traffic_count_df.select("ID", "Date of Count", "Street", "Total Passing Vehicle Volume", "Traffic Volume Count Location  Address").show()

24/02/29 03:42:41 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


Avg Daily Traffic Counts Data:
+----+-------------+--------------------+----------------------------+--------------------------------------+
|  ID|Date of Count|              Street|Total Passing Vehicle Volume|Traffic Volume Count Location  Address|
+----+-------------+--------------------+----------------------------+--------------------------------------+
| 691|   08/15/2006|         Kimball Ave|                       15600|                            6067 North|
| 518|   10/03/2006|         Chicago Ave|                       18100|                              161 East|
|1367|   08/24/2006|             57th Dr|                       53500|                             1730 East|
| 960|   08/22/2006|         Ashland Ave|                       26700|                            5116 North|
|  85|   05/02/2006|            State St|                       19300|                            6416 South|
|1050|   09/20/2006|            Adams St|                       11700|                   

In [18]:
avg_traffic_count_df.printSchema()

root
 |-- Date of Count: string (nullable = true)
 |-- ID: long (nullable = true)
 |-- Latitude: double (nullable = true)
 |-- Location: string (nullable = true)
 |-- Longitude: double (nullable = true)
 |-- Street: string (nullable = true)
 |-- Total Passing Vehicle Volume: long (nullable = true)
 |-- Traffic Volume Count Location  Address: string (nullable = true)
 |-- Vehicle Volume By Each Direction of Traffic: string (nullable = true)
 |-- _id: struct (nullable = true)
 |    |-- oid: string (nullable = true)



In [19]:
# Display the number of NA values in each column
avg_traffic_na_counts = avg_traffic_count_df.select(*(spark_sum(col(c).isNull().cast("int")).alias(c) 
                                                for c in avg_traffic_count_df.columns))
print("Number of NA values in each column of Avg Daily Traffic Count Data:")
avg_traffic_na_counts.show()

Number of NA values in each column of Avg Daily Traffic Count Data:
+-------------+---+--------+--------+---------+------+----------------------------+--------------------------------------+-------------------------------------------+---+
|Date of Count| ID|Latitude|Location|Longitude|Street|Total Passing Vehicle Volume|Traffic Volume Count Location  Address|Vehicle Volume By Each Direction of Traffic|_id|
+-------------+---+--------+--------+---------+------+----------------------------+--------------------------------------+-------------------------------------------+---+
|            0|  0|       0|       0|        0|     0|                           0|                                     0|                                          0|  0|
+-------------+---+--------+--------+---------+------+----------------------------+--------------------------------------+-------------------------------------------+---+



In [20]:
spark_csv2 = SparkSession.builder.getOrCreate()
traffic_people_df = spark_csv2.read.csv("/home/azureuser/Desktop/Datasources/Traffic_Crashes_-_People_20240226.csv", 
                                     header=True)
print("Traffic Crashes - People Data:")
traffic_people_df.printSchema()

Traffic Crashes - People Data:
root
 |-- PERSON_ID: string (nullable = true)
 |-- PERSON_TYPE: string (nullable = true)
 |-- CRASH_RECORD_ID: string (nullable = true)
 |-- VEHICLE_ID: string (nullable = true)
 |-- CRASH_DATE: string (nullable = true)
 |-- SEAT_NO: string (nullable = true)
 |-- CITY: string (nullable = true)
 |-- STATE: string (nullable = true)
 |-- ZIPCODE: string (nullable = true)
 |-- SEX: string (nullable = true)
 |-- AGE: string (nullable = true)
 |-- DRIVERS_LICENSE_STATE: string (nullable = true)
 |-- DRIVERS_LICENSE_CLASS: string (nullable = true)
 |-- SAFETY_EQUIPMENT: string (nullable = true)
 |-- AIRBAG_DEPLOYED: string (nullable = true)
 |-- EJECTION: string (nullable = true)
 |-- INJURY_CLASSIFICATION: string (nullable = true)
 |-- HOSPITAL: string (nullable = true)
 |-- EMS_AGENCY: string (nullable = true)
 |-- EMS_RUN_NO: string (nullable = true)
 |-- DRIVER_ACTION: string (nullable = true)
 |-- DRIVER_VISION: string (nullable = true)
 |-- PHYSICAL_CONDIT

In [21]:
spark_csv3 = SparkSession.builder.getOrCreate()
traffic_fatalities_df = spark_csv3.read.csv("/home/azureuser/Desktop/Datasources/Traffic_Crashes_-_Vision_Zero_Chicago_Traffic_Fatalities_20240226.csv", 
                                     header=True)
print("Traffic Crashes - Fatalities Data:")
traffic_fatalities_df.printSchema()

Traffic Crashes - Fatalities Data:
root
 |-- Person_ID: string (nullable = true)
 |-- Crash_Date: string (nullable = true)
 |-- Crash_Location: string (nullable = true)
 |-- Victim: string (nullable = true)
 |-- Longitude: string (nullable = true)
 |-- Latitude: string (nullable = true)
 |-- Location: string (nullable = true)

