In [16]:
import os 
os.environ["SPARK_HOME"] = ""

import findspark
findspark.init()

In [17]:
from pyspark.sql import SparkSession
from pyspark.sql import functions
from pyspark.sql.functions import col, avg

spark = SparkSession.builder\
    .appName("capstone_project")\
    .enableHiveSupport()\
    .getOrCreate()

In [18]:
spark.sql("SHOW DATABASES").show()

+---------+
|namespace|
+---------+
|  default|
+---------+



In [19]:
from schemas import redlight_cam_schema, speed_cam_schema, streets_schema, traffic_hist_schema

tables = [
    'crashes', 
    'redlight_cam', 
    'speed_cam', 
    'streets', 
    'traffic_hist'
]

paths = {
    table: f'./data/raw/traffic/{table}/{table}.csv'
    for table in tables
} 

schemas = {
    # 'crashes'
    'redlight_cam': redlight_cam_schema,
    'speed_cam': speed_cam_schema,
    'streets': streets_schema,
    'traffic_hist': traffic_hist_schema
}

def read_table_to_view(spark: SparkSession):
    for table in tables:
        read_obj = spark.read.format("csv").option("header", "true")

        if table in schemas:
            read_obj = read_obj.schema(schemas[table])
        else:
            read_obj = read_obj.option("inferSchema", "true")
 
        df = read_obj.load(paths[table])
        df.createOrReplaceTempView(table)

In [20]:
read_table_to_view(spark)

                                                                                

In [21]:
spark.sql("SHOW TABLES").show()

+---------+----------------+-----------+
|namespace|       tableName|isTemporary|
+---------+----------------+-----------+
|         |         crashes|       true|
|         |d_camera_staging|       true|
|         |       d_streets|       true|
|         |    redlight_cam|       true|
|         |       speed_cam|       true|
|         |         streets|       true|
|         |    traffic_hist|       true|
+---------+----------------+-----------+



In [145]:
spark.sql("SELECT COUNT(DISTINCT camera_id) from speed_cam").show()
spark.sql("SELECT COUNT(DISTINCT camera_id) from redlight_cam").show()

23/05/23 21:41:50 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: CAMERA ID
 Schema: camera_id
Expected: camera_id but found: CAMERA ID
CSV file: file:///home/pcminh/bigdata-capstone/data/raw/traffic/speed_cam/speed_cam.csv


+-------------------------+
|count(DISTINCT camera_id)|
+-------------------------+
|                      177|
+-------------------------+



23/05/23 21:41:50 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: CAMERA ID
 Schema: camera_id
Expected: camera_id but found: CAMERA ID
CSV file: file:///home/pcminh/bigdata-capstone/data/raw/traffic/redlight_cam/redlight_cam.csv


+-------------------------+
|count(DISTINCT camera_id)|
+-------------------------+
|                      365|
+-------------------------+



                                                                                

In [22]:
from pyspark.sql.functions import regexp_replace, when

dcam = spark.sql("""
with d_cam_speed_upper
as (
select distinct
    camera_id,
    "speed" as type,
    NULL as red_light_intersection,
    UPPER(address) as address,
    latitude,
    longitude
from speed_cam
where camera_id is not null),

d_cam_redlight_upper
as (
select distinct
    camera_id,
    "redlight" as type,
    intersection as red_light_intersection,
    UPPER(address) as address,
    latitude,
    longitude
from redlight_cam
where camera_id is not null),

d_cam_merged
as (
    select * from d_cam_speed_upper
    union 
    select * from d_cam_redlight_upper
)

select 
    *
from d_cam_merged
""")


dcam = dcam.withColumn("address",
                       when(dcam.address.endswith("STREET"), regexp_replace(dcam.address, "STREET", "ST")) \
                       .when(dcam.address.endswith("STREE"), regexp_replace(dcam.address, "STREE", "ST")) \
                       .when(dcam.address.endswith("AVENUE"), regexp_replace(dcam.address, "AVENUE", "AVE")) \
                       .when(dcam.address.endswith("AVENU"), regexp_replace(dcam.address, "AVENU", "AVE")) \
                       .when(dcam.address.endswith("AVEN"), regexp_replace(dcam.address, "AVEN", "AVE")) \
                       .when(dcam.address.endswith("PARKWAY"), regexp_replace(dcam.address, "PARKWAY", "PKWY")) \
                       .when(dcam.address.endswith("PARKWA"), regexp_replace(dcam.address, "PARKWA", "PKWY")) \
                       .when(dcam.address.endswith("ROAD"), regexp_replace(dcam.address, "ROAD", "RD")) \
                       .when(dcam.address.endswith("ROA"), regexp_replace(dcam.address, "ROA", "RD")) \
                       .when(dcam.address.endswith("BOULEVARD"), regexp_replace(dcam.address, "BOULEVARD", "BLVD")) \
                       .when(dcam.address.endswith("BOUL"), regexp_replace(dcam.address, "BOUL", "BLVD")) \
                       .when(dcam.address.endswith("DRIVE"), regexp_replace(dcam.address, "DRIVE", "DR")) \
                       .when(dcam.address.endswith("DRIV"), regexp_replace(dcam.address, "DRIV", "DR")) \
                       .when(dcam.address.endswith("MARTIN LUTHER KING"), regexp_replace(dcam.address, "MARTIN LUTHER KING", "DR MARTIN LUTHER KING")) \
                       .when(dcam.address.endswith("DR MARTIN L KING"), regexp_replace(dcam.address, "DR MARTIN L KING", "DR MARTIN LUTHER KING")) \
                       .when(dcam.address.endswith("MARTIN L KING"), regexp_replace(dcam.address, "MARTIN L KING", "DR MARTIN LUTHER KING")) \
                       .otherwise(dcam.address)
                       )

dcam.createOrReplaceTempView("d_camera_staging")

dcam = spark.sql("""
    SELECT
        camera_id,
        type,
        red_light_intersection,
        address,
        split(UPPER(address), ' ') as addr_toks, 
        CAST(addr_toks[0] AS INT) as addr_no, 
        concat_ws(' ', filter(addr_toks , (x,i) -> i > 0)) as street_name,
        latitude,
        longitude
    FROM d_camera_staging
""")

streets = spark.sql("""
    select 
        row_number() over (order by street asc, suffix asc, direction asc) as street_key,
        UPPER(full_street_name) as full_street_name,
        UPPER(direction) as direction,
        UPPER(street) as street,
        UPPER(suffix) as suffix,
        UPPER(suffix_direction) as suffix_direction,
        min_address as min_address,
        max_address as max_address
    from streets
""")

In [9]:
dcam.show()

23/05/25 23:54:06 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: ADDRESS, CAMERA ID, LATITUDE, LONGITUDE
 Schema: address, camera_id, latitude, longitude
Expected: camera_id but found: CAMERA ID
CSV file: file:///home/pcminh/bigdata-capstone/data/raw/traffic/speed_cam/speed_cam.csv
23/05/25 23:54:07 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: INTERSECTION, CAMERA ID, ADDRESS, LATITUDE, LONGITUDE
 Schema: intersection, camera_id, address, latitude, longitude
Expected: camera_id but found: CAMERA ID
CSV file: file:///home/pcminh/bigdata-capstone/data/raw/traffic/redlight_cam/redlight_cam.csv

+---------+-----+----------------------+--------------------+--------------------+-------+-------------------+------------+-------------+
|camera_id| type|red_light_intersection|             address|           addr_toks|addr_no|        street_name|    latitude|    longitude|
+---------+-----+----------------------+--------------------+--------------------+-------+-------------------+------------+-------------+
|   CHI188|speed|                  null|   6935 W ADDISON ST|[6935, W, ADDISON...|   6935|       W ADDISON ST|41.945293896|-87.799816069|
|   CHI128|speed|                  null|  1226 N WESTERN AVE|[1226, N, WESTERN...|   1226|      N WESTERN AVE|41.903817103|-87.687194998|
|   CHI010|speed|                  null|     1111 N HUMBOLDT| [1111, N, HUMBOLDT]|   1111|         N HUMBOLDT|        null|         null|
|   CHI154|speed|                  null|     2416 W 103RD ST|[2416, W, 103RD, ST]|   2416|         W 103RD ST|41.706487201|-87.682391059|
|   CHI180|speed|                 

                                                                                

In [26]:
streets.createOrReplaceTempView("d_streets")
dcam.createOrReplaceTempView("d_camera_staging")

dcam2 = spark.sql("""
    SELECT 
        camera_id, 
        type, 
        red_light_intersection, 
        address, 
        street_key, 
        latitude, 
        longitude
    FROM 
        d_camera_staging dcs
    LEFT JOIN
        d_streets ds
    ON ds.full_street_name LIKE CONCAT(dcs.street_name, '%')
""")

In [None]:
spark.sql("select * from d_street where full_street_name like ")

In [24]:
dcam2.count()

23/05/26 00:01:00 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: ADDRESS, CAMERA ID, LATITUDE, LONGITUDE
 Schema: address, camera_id, latitude, longitude
Expected: camera_id but found: CAMERA ID
CSV file: file:///home/pcminh/bigdata-capstone/data/raw/traffic/speed_cam/speed_cam.csv
23/05/26 00:01:00 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: Full Street Name
 Schema: full_street_name
Expected: full_street_name but found: Full Street Name
CSV file: file:///home/pcminh/bigdata-capstone/data/raw/traffic/streets/streets.csv
23/05/26 00:01:00 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: INTERSECTION, CAMERA ID, ADDRESS, LATITUDE, LONGITUDE
 Schema: intersection, camera_id, address, latitude, longitude
Expected: camera_id but found: CAMERA ID
CSV file: file:///home/pcminh/bigdata-capstone/data/raw/traffic/redlight_cam/redlight_cam.csv
                                                                     

547

In [27]:
dcam2.show()

23/05/26 00:01:47 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/05/26 00:01:47 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/05/26 00:01:47 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: ADDRESS, CAMERA ID, LATITUDE, LONGITUDE
 Schema: address, camera_id, latitude, longitude
Expected: camera_id but found: CAMERA ID
CSV file: file:///home/pcminh/bigdata-capstone/data/raw/traffic/speed_cam/speed_cam.csv
23/05/26 00:01:47 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: INTERSECTION, CAMERA ID, ADDRESS, LATITUDE, LONGITUDE
 Schema: intersection, camera_id, address, latitude, longitude
Expected: camera_id but found: CAMERA ID
CSV file: file:///home/pcminh/bigdata-capstone/data/raw/traffic/redlight_cam/redlight_cam.csv
23/05/26 00:01

+---------+-----+----------------------+--------------------+----------+------------+-------------+
|camera_id| type|red_light_intersection|             address|street_key|    latitude|    longitude|
+---------+-----+----------------------+--------------------+----------+------------+-------------+
|   CHI188|speed|                  null|   6935 W ADDISON ST|       470|41.945293896|-87.799816069|
|   CHI128|speed|                  null|  1226 N WESTERN AVE|      2533|41.903817103|-87.687194998|
|   CHI128|speed|                  null|  1226 N WESTERN AVE|      2534|41.903817103|-87.687194998|
|   CHI010|speed|                  null|     1111 N HUMBOLDT|      1312|        null|         null|
|   CHI010|speed|                  null|     1111 N HUMBOLDT|      1313|        null|         null|
|   CHI010|speed|                  null|     1111 N HUMBOLDT|      1314|        null|         null|
|   CHI010|speed|                  null|     1111 N HUMBOLDT|      1315|        null|         null|


In [156]:
dcam2.createOrReplaceTempView("d_camera")

spark.sql("""
    select * from d_camera
    where camera_id in
    (select 
        camera_id
    from d_camera
    group by camera_id
    having count(camera_id) > 1)
""").show()

23/05/23 21:48:23 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/05/23 21:48:23 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/05/23 21:48:23 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: ADDRESS, CAMERA ID, LATITUDE, LONGITUDE
 Schema: address, camera_id, latitude, longitude
Expected: camera_id but found: CAMERA ID
CSV file: file:///home/pcminh/bigdata-capstone/data/raw/traffic/speed_cam/speed_cam.csv
23/05/23 21:48:23 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: Full Street Name
 Schema: full_street_name
Expected: full_street_name but found: Full Street Name
CSV file: file:///home/pcminh/bigdata-capstone/data/raw/traffic/streets/streets.csv
23/05/23 21:48:23 WARN CSVHeaderChecker: CSV header does not conform to the schema.


+---------+--------+----------------------+--------------------+----------+------------+-------------+
|camera_id|    type|red_light_intersection|             address|street_key|    latitude|    longitude|
+---------+--------+----------------------+--------------------+----------+------------+-------------+
|   CHI133|   speed|                  null|    19 W CHICAGO AVE|      null|41.896556107|-87.629025904|
|   CHI126|   speed|                  null|   319 E ILLINOIS ST|      null|41.890910096|-87.619347521|
|   CHI126|   speed|                  null|   324 E ILLINOIS ST|      null|41.891134484| -87.61908944|
|   CHI133|   speed|                  null|    11 E CHICAGO AVE|      null|41.896574581|-87.627695378|
|     1421|redlight|    DAMEN AND DIVERSEY|2000 W DIVERSEY PKWY|      null|41.932394164|-87.678173031|
|     1421|redlight|  LARAMIE AND FULLE...|  2400 N LARAMIE AVE|      null|41.924151874|-87.756294662|
|     2462|redlight|  STONEY ISLAND AND...|7900 S STONY ISLA...|      nul

In [159]:
spark.sql("select distinct address from redlight_cam where camera_id = 2054").show()

23/05/23 21:51:05 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: CAMERA ID, ADDRESS
 Schema: camera_id, address
Expected: camera_id but found: CAMERA ID
CSV file: file:///home/pcminh/bigdata-capstone/data/raw/traffic/redlight_cam/redlight_cam.csv


+--------------------+
|             address|
+--------------------+
|2400 W VAN BUREN ...|
|2400 W VAN BUREN ...|
+--------------------+



In [140]:
streets.filter("full_street_name LIKE '%PULASKI%'").show()

+----------+--------------------+---------+------------------+------+----------------+-----------+-----------+
|street_key|    full_street_name|direction|            street|suffix|suffix_direction|min_address|max_address|
+----------+--------------------+---------+------------------+------+----------------+-----------+-----------+
|      1334| W I55 PULASKI RD ER|        W|    I55 PULASKI RD|    ER|                |       3700|       4499|
|      1335| W I55 PULASKI RD XR|        W|    I55 PULASKI RD|    XR|                |       3725|       4399|
|      1511|N KENNEDY PULASKI...|        N|KENNEDY PULASKI RD|    ER|                |       3800|       3899|
|      1512|N KENNEDY PULASKI...|        N|KENNEDY PULASKI RD|    XR|                |       3828|       3999|
|      2177|        N PULASKI RD|        N|           PULASKI|    RD|                |          1|       6399|
|      2178|        S PULASKI RD|        S|           PULASKI|    RD|                |          1|      11499|
+

23/05/23 21:32:59 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/05/23 21:32:59 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/05/23 21:32:59 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/05/23 21:32:59 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: Full Street Name, Direction, Street , Suffix, Suffix Direction, Min Address, Max Address
 Schema: full_street_name, direction, street, suffix, suffix_direction, min_address, max_address
Expected: full_street_name but found: Full Street Name
CSV file: file:///home/pcminh/bigdata-capstone/data/raw/traffic/streets/streets.csv
23/05/23 21:32:59 WARN WindowExec: No Partition Defined for Window operation! Moving al

In [32]:
dcam2.filter("street_key IS NULL").selectExpr("camera_id", "type", "red_light_intersection", "address", "street_key", "latitude", "longitude").show()

23/05/23 20:40:13 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/05/23 20:40:13 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/05/23 20:40:13 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: ADDRESS, CAMERA ID, LATITUDE, LONGITUDE
 Schema: address, camera_id, latitude, longitude
Expected: camera_id but found: CAMERA ID
CSV file: file:///home/pcminh/bigdata-capstone/data/raw/traffic/speed_cam/speed_cam.csv
23/05/23 20:40:14 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: INTERSECTION, CAMERA ID, ADDRESS, LATITUDE, LONGITUDE
 Schema: intersection, camera_id, address, latitude, longitude
Expected: camera_id but found: CAMERA ID
CSV file: file:///home/pcminh/bigdata-capstone/data/raw/traffic/redlight_cam/redlight_cam.csv
23/05/23 20:40

+---------+-----+----------------------+--------------------+----------+------------+-------------+
|camera_id| type|red_light_intersection|             address|street_key|    latitude|    longitude|
+---------+-----+----------------------+--------------------+----------+------------+-------------+
|   CHI014|speed|                  null|       6909 S KEDZIE|      null|41.767731869|-87.702737907|
|   CHI070|speed|                  null|         2513 W 55TH|      null|41.793702249|-87.687244272|
|   CHI022|speed|                  null|   11153 S VINCENNES|      null|41.690701951|-87.664122385|
|   CHI079|speed|                  null|  2705 W IRVING PARK|      null|41.953875265|-87.696204089|
|   CHI045|speed|                  null|         445 W 127TH|      null|41.663174059|-87.633520452|
|   CHI059|speed|                  null|      5433 S PULASKI|      null|41.794045046|-87.723036363|
|   CHI071|speed|                  null|      7833 S PULASKI|      null| 41.75038453|-87.721794539|


In [166]:
spark.sql("""
with f_violation_redlight_staging 
as (
    select 
        camera_id,
        violation_date_mmddyyyy,
        to_date(violation_date_mmddyyyy, "MM/dd/yyyy") as violation_date,
        date_format(violation_date, "yyyyMMdd") as violation_date_key,
        "redlight" as violation_type,
        violations
    from redlight_cam
),

f_violation_speed_staging 
as (
    select 
        camera_id,
        violation_date_mmddyyyy,
        to_date(violation_date_mmddyyyy, "MM/dd/yyyy") as violation_date,
        date_format(violation_date, "yyyyMMdd") as violation_date_key,
        "speed" as violation_type,
        violations
    from speed_cam
),

f_violation_staging_merged
as (
    select * from f_violation_redlight_staging
    union
    select * from f_violation_speed_staging 
)

select 
    camera_id,
    violation_date_key,
    violation_type,
    violations
from f_violation_staging_merged

""").show()

23/05/23 22:09:03 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: CAMERA ID, VIOLATION DATE, VIOLATIONS
 Schema: camera_id, violation_date_mmddyyyy, violations
Expected: camera_id but found: CAMERA ID
CSV file: file:///home/pcminh/bigdata-capstone/data/raw/traffic/redlight_cam/redlight_cam.csv
23/05/23 22:09:04 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: CAMERA ID, VIOLATION DATE, VIOLATIONS
 Schema: camera_id, violation_date_mmddyyyy, violations
Expected: camera_id but found: CAMERA ID
CSV file: file:///home/pcminh/bigdata-capstone/data/raw/traffic/speed_cam/speed_cam.csv

+---------+------------------+--------------+----------+
|camera_id|violation_date_key|violation_type|violations|
+---------+------------------+--------------+----------+
|     1533|          20140715|      redlight|         2|
|     1234|          20140802|      redlight|        17|
|     1623|          20140723|      redlight|         3|
|     2054|          20140726|      redlight|         7|
|     1503|          20140731|      redlight|         2|
|     2054|          20141012|      redlight|         5|
|     2552|          20141211|      redlight|        12|
|     1581|          20140818|      redlight|         2|
|     2764|          20190222|      redlight|         2|
|     2662|          20140821|      redlight|         1|
|     2764|          20140828|      redlight|         3|
|     1153|          20140901|      redlight|         3|
|     1234|          20150410|      redlight|         5|
|     1701|          20190606|      redlight|         1|
|     1934|          20141209| 

                                                                                