# Parking Tickets Pre-Processing

Extract the data from the large csv file into smaller, more memory-efficient compressed csv.gz format.

In [5]:
import sys

In [6]:
from pyspark.sql import SparkSession, types

spark = SparkSession.builder.appName("parking-ticket-preproc").getOrCreate()
spark.sparkContext.setLogLevel("WARN")

assert sys.version_info >= (3, 5)  # make sure we have Python 3.5+
assert spark.version >= "2.4"  # make sure we have Spark 2.4+

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/04/04 05:10:48 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
25/04/04 05:10:48 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [7]:
# schema of the raw csv file
schema = types.StructType(
    [
        types.StructField("block", types.IntegerType()),
        types.StructField("street", types.StringType()),
        types.StructField("entrydate", types.DateType()),
        types.StructField("bylaw", types.IntegerType()),
        types.StructField("section", types.StringType()),
        types.StructField("status", types.StringType()),
        types.StructField("infractiontext", types.StringType()),
        types.StructField("year", types.StringType()),
        types.StructField("bi_id", types.IntegerType())
    ]
)

In [8]:
# constants
in_file = "parking-tickets.csv"
num_partitions = 10
out_dir = "../parking-tickets"

In [9]:
# read the csv
raw_df = spark.read.csv(in_file, schema=schema, sep=";")
raw_df = raw_df.na.drop()
# partition the csv
original_df = raw_df.repartition(num_partitions)

# store the partitioned data
original_df.write.csv(out_dir, compression="gzip", mode="overwrite", header=True, sep=";")

                                                                                

In [10]:
# remove extra files and rename gz files
import os
i = 0
for root, dirs, files in os.walk(out_dir):
    for file in files:
        file_path = os.path.join(root, file)
        if file == "_SUCCESS" or file.endswith(".crc"):
            os.remove(file_path)
        elif file.endswith(".gz"):
            os.rename(file_path, f"{out_dir}/parking-ticket-{i}.json.gz")
            i += 1

## Validation

Ensure no data was lost during the partition and compression process.

In [11]:
print(f"Expected {num_partitions} json.gz files for parking tickets")

for root, dirs, files in os.walk(out_dir):
    for file in files:
        if file.endswith(".gz"):  # Check for gzip compressed files
            file_path = os.path.join(root, file)
            file_size = os.path.getsize(file_path)  # Get the size of the file in bytes
            print(f"File: {file_path}, Size: {file_size / 1024:.2f} KB") 

Expected 10 json.gz files for parking tickets
File: ../parking-tickets/parking-ticket-0.json.gz, Size: 1504.56 KB
File: ../parking-tickets/parking-ticket-1.json.gz, Size: 1505.85 KB
File: ../parking-tickets/parking-ticket-2.json.gz, Size: 1504.48 KB
File: ../parking-tickets/parking-ticket-3.json.gz, Size: 1502.92 KB
File: ../parking-tickets/parking-ticket-4.json.gz, Size: 1504.64 KB
File: ../parking-tickets/parking-ticket-5.json.gz, Size: 1508.65 KB
File: ../parking-tickets/parking-ticket-6.json.gz, Size: 1502.42 KB
File: ../parking-tickets/parking-ticket-7.json.gz, Size: 1505.38 KB
File: ../parking-tickets/parking-ticket-8.json.gz, Size: 1501.93 KB
File: ../parking-tickets/parking-ticket-9.json.gz, Size: 1503.56 KB


In [12]:
# compare the raw and compressed dataset for equality

def compare_dfs(df1, df2):
# Compare schemas
    if df1.schema != df2.schema:
        print("Schemas are different!")
        print(f"df1 schema: {df1.schema}")
        print(f"df2 schema: {df2.schema}")
        return False

    # Compare data
    if df1.count() != df2.count():
        print(f"Row counts are different: {df1.count()} | {df2.count()}")
        return False

    diff1 = df1.exceptAll(df2)
    diff2 = df2.exceptAll(df1)

    # Print out the rows that are different
    if diff1.count() > 0:
        print("Rows in df1 but not in df2:")
        diff1.show(truncate=False)

    if diff2.count() > 0:
        print("Rows in df2 but not in df1:")
        diff2.show(truncate=False)

    if diff1.count() == 0 and diff2.count() == 0:
        print("DataFrames are equal!")
        return True
    return False

compressed_df = spark.read.csv(out_dir, schema=schema, header=True, sep=";")
print(f"Split and compression successful: {compare_dfs(original_df, compressed_df)}")

25/04/04 05:11:01 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/04/04 05:11:01 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/04/04 05:11:01 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/04/04 05:11:01 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/04/04 05:11:01 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/04/04 05:11:01 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/04/04 05:11:01 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/04/04 05:11:01 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/04/04 05:11:01 WARN RowBasedKeyValueBatch: Calling spill() on

DataFrames are equal!
Split and compression successful: True


                                                                                

In [13]:
compressed_df.show()

+-----+----------------+----------+-----+-----------+------+--------------------+----+-------+
|block|          street| entrydate|bylaw|    section|status|      infractiontext|year|  bi_id|
+-----+----------------+----------+-----+-----------+------+--------------------+----+-------+
|  100|       W 8TH AVE|2024-10-19| 2952|    5(4)(B)|    IS|PARK IN A METERED...|2024|4547115|
| 1100|      ALBERNI ST|2024-11-02| 2952|    5(4)(B)|    IS|PARK IN A METERED...|2024|4555064|
| 1100|      SEYMOUR ST|2024-12-29| 2952|5(4)(A)(ii)|    IS|PARK IN A METERED...|2024|4716613|
|  100|       W 3RD AVE|2023-05-07| 2952|5(4)(A)(ii)|    IS|PARK IN A METERED...|2023|4557889|
| 1700|      E 11TH AVE|2023-07-21| 2849|    17.6(B)|    IS|PARK ON A STREET ...|2023|4563078|
|  200|       ROBSON ST|2024-11-29| 2952|    5(4)(B)|    IS|PARK IN A METERED...|2024|4701248|
| 2400|          ASH ST|2024-11-04| 2952|5(4)(A)(ii)|    IS|PARK IN A METERED...|2024|4696759|
|  800|        DRAKE ST|2023-03-10| 2952|5(4)(A)(i