In [1]:
# Intialization
import os
import sys

os.environ["SPARK_HOME"] = "/home/talentum/spark"
os.environ["PYLIB"] = os.environ["SPARK_HOME"] + "/python/lib"
# In below two lines, use /usr/bin/python2.7 if you want to use Python 2
os.environ["PYSPARK_PYTHON"] = "/usr/bin/python3.6" 
os.environ["PYSPARK_DRIVER_PYTHON"] = "/usr/bin/python3"
sys.path.insert(0, os.environ["PYLIB"] +"/py4j-0.10.7-src.zip")
sys.path.insert(0, os.environ["PYLIB"] +"/pyspark.zip")

# NOTE: Whichever package you want mention here.
# os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages com.databricks:spark-xml_2.11:0.6.0 pyspark-shell' 
# os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-avro_2.11:2.4.0 pyspark-shell'
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages com.databricks:spark-xml_2.11:0.6.0,org.apache.spark:spark-avro_2.11:2.4.3 pyspark-shell'
# os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages com.databricks:spark-xml_2.11:0.6.0,org.apache.spark:spark-avro_2.11:2.4.0 pyspark-shell'

In [2]:
#Entrypoint 2.x
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Spark SQL basic example").getOrCreate()

# On yarn:
# spark = SparkSession.builder.appName("Spark SQL basic example").enableHiveSupport().master("yarn").getOrCreate()
# specify .master("yarn")

sc = spark.sparkContext

In [3]:
# Read CSV file
df = spark.read.csv("NYC_Parking_Ticket_Analysis.csv", header=True, inferSchema=True, sep=',', quote='"', escape='"')

# Limit to first 10,000 rows
# sample_df = df.limit(10000)

# Show the limited DataFrame
# sample_df.show()

In [4]:
from pyspark.sql.functions import col

# Calculate the number of non-null values for each column
non_null_counts = {col_name: df.filter(col(col_name).isNotNull()).count() for col_name in df.columns}

# Print the number of non-null values for each column
# print("Non-null values for each column:")
# for col_name, non_null_count in non_null_counts.items():
#    print(f"{col_name}: {non_null_count}")

# Collect the result as a dictionary
# print("Non-null values as dictionary:")
# print(non_null_counts)

# Calculate the threshold for 80% non-null values
total_rows = df.count()
threshold = 0.8 * total_rows

# Select columns that meet the threshold
selected_columns = [col_name for col_name, non_null_count in non_null_counts.items() if non_null_count >= threshold]

# Create a new DataFrame with only the selected columns
df_selected = df.select(selected_columns)

# Print the selected columns
##print("Columns with at least 80% non-null values:")
# print(selected_columns)

# Show the resulting DataFrame
# df_selected.show()

In [5]:
len(df_selected.columns)

31

In [6]:
# Check for missing values
missing_values = {col_name: df_selected.filter(col(col_name).isNull()).count() for col_name in df_selected.columns}
print("Missing values for each column:")
for col_name, missing_count in missing_values.items():
    print(f"{col_name}: {missing_count}")

Missing values for each column:
Summons Number: 0
Plate ID: 5
Registration State: 0
Plate Type: 0
Issue Date: 0
Violation Code: 0
Vehicle Body Type: 195873
Vehicle Make: 202263
Issuing Agency: 0
Street Code1: 0
Street Code2: 0
Street Code3: 0
Vehicle Expiration Date: 1
Violation Location: 4338996
Violation Precinct: 2
Issuer Precinct: 1
Issuer Code: 2
Issuer Command: 4296253
Issuer Squad: 4296929
Violation Time: 8069
Violation County: 4259979
Violation In Front Of Or Opposite: 4593303
House Number: 4880498
Street Name: 19234
Date First Observed: 12
Law Section: 5
Sub Division: 4485
Vehicle Color: 335501
Vehicle Year: 36
Feet From Curb: 43
Violation Description: 3751349


In [7]:
# List of columns to keep
selected_columns = [
    'Summons Number', 'Plate ID', 'Registration State', 'Plate Type',
    'Issue Date', 'Violation Code', 'Vehicle Body Type', 'Vehicle Make',
    'Issuing Agency',
    'Violation Precinct', 'Violation Time', 'Violation County', 'Street Name',
    'Law Section', 'Sub Division', 'Vehicle Color', 'Feet From Curb', 'Violation Description'
]

# Create a new DataFrame with only the selected columns
df_clean_1 = df_selected.select(*selected_columns)

# Show the result
# df_clean_1.show()

In [8]:
df_no_duplicates = df_clean_1.dropDuplicates()
df_no_duplicates.count()

30459729

In [9]:
df_cleaned = df_no_duplicates.dropna()
df_cleaned.count()

23008035

In [10]:
# Check for missing values
missing_values = {col_name: df_cleaned.filter(col(col_name).isNull()).count() for col_name in df_cleaned.columns}
print("Missing values for each column:")
for col_name, missing_count in missing_values.items():
    print(f"{col_name}: {missing_count}")

Missing values for each column:
Summons Number: 0
Plate ID: 0
Registration State: 0
Plate Type: 0
Issue Date: 0
Violation Code: 0
Vehicle Body Type: 0
Vehicle Make: 0
Issuing Agency: 0
Violation Precinct: 0
Violation Time: 0
Violation County: 0
Street Name: 0
Law Section: 0
Sub Division: 0
Vehicle Color: 0
Feet From Curb: 0
Violation Description: 0


In [12]:
df_cleaned.select("Violation Time").show(10)

+--------------+
|Violation Time|
+--------------+
|         0001A|
|         0003A|
|         0020A|
|         0023A|
|         0027A|
|         0030A|
|         0042A|
|         0056A|
|         0100A|
|         0100A|
+--------------+
only showing top 10 rows



In [None]:
# Description of Summons Number Column
#df_cleaned.describe("Summons Number").show()

# Description of Plate ID Column
#df_cleaned.describe("Plate ID").show()

# Description of Registration State Column
#df_cleaned.describe("Registration State").show()

# Description of Violation Code Column
#df_cleaned.describe("Violation Code").show()

In [None]:
# df_cleaned.show(1,vertical=True)

In [13]:
df_cleaned.select("Violation Time").dtypes

[('Violation Time', 'string')]

In [14]:
from pyspark.sql.functions import col, to_date, to_timestamp, lpad, concat_ws
from pyspark.sql.types import IntegerType

In [15]:
from pyspark.sql.functions import col, lpad, substring, concat_ws, when

# Convert 'Violation Time' from 'HHMMX' to 'HH:MM AM/PM'
df_cleaned = df_cleaned.withColumn(
    "Violation Time",
    concat_ws(' ',
        concat_ws(':',
            lpad(substring(col("Violation Time"), 1, 2), 2, '0'),  # Hour part
            lpad(substring(col("Violation Time"), 3, 2), 2, '0')   # Minute part
        ),
        when(substring(col("Violation Time"), 5, 1) == 'A', 'AM')  # AM/PM part
        .otherwise('PM')
    )
)

# Show the resulting DataFrame
df_cleaned.select("Violation Time").show(30, truncate=False)


+--------------+
|Violation Time|
+--------------+
|00:01 AM      |
|00:03 AM      |
|00:20 AM      |
|00:23 AM      |
|00:27 AM      |
|00:30 AM      |
|00:42 AM      |
|00:56 AM      |
|01:00 AM      |
|01:00 AM      |
|01:00 PM      |
|01:00 PM      |
|01:00 PM      |
|01:00 PM      |
|01:01 PM      |
|01:01 PM      |
|01:01 PM      |
|01:02 PM      |
|01:02 PM      |
|01:02 PM      |
|01:02 PM      |
|01:03 PM      |
|01:04 PM      |
|01:04 PM      |
|01:05 AM      |
|01:05 PM      |
|01:05 PM      |
|01:06 PM      |
|01:06 PM      |
|01:06 PM      |
+--------------+
only showing top 30 rows



In [16]:
df_cleaned.select("Violation Time").dtypes

[('Violation Time', 'string')]

In [17]:
from pyspark.sql.functions import to_timestamp, col

# Convert 'Violation Time' from 'HH:MM AM/PM' string to timestamp
df_cleaned = df_cleaned.withColumn(
    "Violation Time",
    to_timestamp(col("Violation Time"), "hh:mm a")
)

# Show the resulting DataFrame
df_cleaned.select("Violation Time").show(20, truncate=False)


+-------------------+
|Violation Time     |
+-------------------+
|null               |
|null               |
|null               |
|null               |
|null               |
|null               |
|null               |
|null               |
|1970-01-01 01:00:00|
|1970-01-01 01:00:00|
|1970-01-01 13:00:00|
|1970-01-01 13:00:00|
|1970-01-01 13:00:00|
|1970-01-01 13:00:00|
|1970-01-01 13:01:00|
|1970-01-01 13:01:00|
|1970-01-01 13:01:00|
|1970-01-01 13:02:00|
|1970-01-01 13:02:00|
|1970-01-01 13:02:00|
+-------------------+
only showing top 20 rows



In [18]:
df_cleaned = df_cleaned.withColumn("Issue Date", to_date(col("Issue Date"), "MM/dd/yyyy"))


In [19]:
df_cleaned.dtypes

[('Summons Number', 'bigint'),
 ('Plate ID', 'string'),
 ('Registration State', 'string'),
 ('Plate Type', 'string'),
 ('Issue Date', 'date'),
 ('Violation Code', 'int'),
 ('Vehicle Body Type', 'string'),
 ('Vehicle Make', 'string'),
 ('Issuing Agency', 'string'),
 ('Violation Precinct', 'int'),
 ('Violation Time', 'timestamp'),
 ('Violation County', 'string'),
 ('Street Name', 'string'),
 ('Law Section', 'string'),
 ('Sub Division', 'string'),
 ('Vehicle Color', 'string'),
 ('Feet From Curb', 'string'),
 ('Violation Description', 'string')]

In [20]:
df_cleaned.select("Issue Date").show(10)

+----------+
|Issue Date|
+----------+
|2013-08-23|
|2013-08-09|
|2013-08-23|
|2013-08-17|
|2013-08-02|
|2013-08-21|
|2013-08-24|
|2013-07-30|
|2013-08-08|
|2013-08-22|
+----------+
only showing top 10 rows



In [21]:
df_cleaned.coalesce(1).write.format("csv") \
  .option("header", "true") \
  .mode("overwrite") \
  .save("output_final_NYC.csv")


In [22]:
num_rows = df_cleaned.count()
num_columns = len(df_cleaned.columns)
print(f"Shape of DataFrame: ({num_rows}, {num_columns})")

Shape of DataFrame: (23008035, 18)


In [23]:
df_cleaned.show(10,vertical=True)

-RECORD 0-------------------------------------
 Summons Number        | 7440710630           
 Plate ID              | 84790MA              
 Registration State    | NY                   
 Plate Type            | COM                  
 Issue Date            | 2013-08-23           
 Violation Code        | 78                   
 Vehicle Body Type     | PICK                 
 Vehicle Make          | FORD                 
 Issuing Agency        | T                    
 Violation Precinct    | 114                  
 Violation Time        | null                 
 Violation County      | Q                    
 Street Name           | 37th St              
 Law Section           | 408                  
 Sub Division          | k6                   
 Vehicle Color         | RD                   
 Feet From Curb        | 0                    
 Violation Description | 78-Nighttime PKG ... 
-RECORD 1-------------------------------------
 Summons Number        | 7997176457           
 Plate ID    

In [24]:
# Check for missing values
missing_values = {col_name: df_cleaned.filter(col(col_name).isNull()).count() for col_name in df_cleaned.columns}
print("Missing values for each column:")
for col_name, missing_count in missing_values.items():
    print(f"{col_name}: {missing_count}")

Missing values for each column:
Summons Number: 0
Plate ID: 0
Registration State: 0
Plate Type: 0
Issue Date: 0
Violation Code: 0
Vehicle Body Type: 0
Vehicle Make: 0
Issuing Agency: 0
Violation Precinct: 0
Violation Time: 157248
Violation County: 0
Street Name: 0
Law Section: 0
Sub Division: 0
Vehicle Color: 0
Feet From Curb: 0
Violation Description: 0


In [25]:
df_cleaned = df_cleaned.dropna()
df_cleaned.count()

22850787

In [29]:
from pyspark.sql.functions import date_format

# Assuming df is your DataFrame and 'timestamp_col' is your timestamp column
df_cleaned = df_cleaned.withColumn('Time', date_format('Violation Time', 'HH:mm:ss'))
df_cleaned.show(vertical=True)


-RECORD 0-------------------------------------
 Summons Number        | 7032646177           
 Plate ID              | XASP83               
 Registration State    | NJ                   
 Plate Type            | PAS                  
 Issue Date            | 2013-08-08           
 Violation Code        | 85                   
 Vehicle Body Type     | 4DSD                 
 Vehicle Make          | CHEVR                
 Issuing Agency        | T                    
 Violation Precinct    | 52                   
 Violation Time        | 1970-01-01 01:00:00  
 Violation County      | BX                   
 Street Name           | W Kingsbridge Rd     
 Law Section           | 408                  
 Sub Division          | k5                   
 Vehicle Color         | WHITE                
 Feet From Curb        | 0                    
 Violation Description | 85-Storage-3 hour... 
 Time                  | 01:00:00             
-RECORD 1-------------------------------------
 Summons Numb

In [30]:
df_cleaned.dtypes

[('Summons Number', 'bigint'),
 ('Plate ID', 'string'),
 ('Registration State', 'string'),
 ('Plate Type', 'string'),
 ('Issue Date', 'date'),
 ('Violation Code', 'int'),
 ('Vehicle Body Type', 'string'),
 ('Vehicle Make', 'string'),
 ('Issuing Agency', 'string'),
 ('Violation Precinct', 'int'),
 ('Violation Time', 'timestamp'),
 ('Violation County', 'string'),
 ('Street Name', 'string'),
 ('Law Section', 'string'),
 ('Sub Division', 'string'),
 ('Vehicle Color', 'string'),
 ('Feet From Curb', 'string'),
 ('Violation Description', 'string'),
 ('Time', 'string')]

In [31]:
df_cleaned = df_cleaned.drop('Violation Time')

In [32]:
df_cleaned = df_cleaned.withColumnRenamed('Time', 'Violation Time')

In [33]:
df_cleaned.show(10,vertical=True)

-RECORD 0-------------------------------------
 Summons Number        | 7032646177           
 Plate ID              | XASP83               
 Registration State    | NJ                   
 Plate Type            | PAS                  
 Issue Date            | 2013-08-08           
 Violation Code        | 85                   
 Vehicle Body Type     | 4DSD                 
 Vehicle Make          | CHEVR                
 Issuing Agency        | T                    
 Violation Precinct    | 52                   
 Violation County      | BX                   
 Street Name           | W Kingsbridge Rd     
 Law Section           | 408                  
 Sub Division          | k5                   
 Vehicle Color         | WHITE                
 Feet From Curb        | 0                    
 Violation Description | 85-Storage-3 hour... 
 Violation Time        | 01:00:00             
-RECORD 1-------------------------------------
 Summons Number        | 7453597730           
 Plate ID    

In [34]:
df_cleaned.dtypes

[('Summons Number', 'bigint'),
 ('Plate ID', 'string'),
 ('Registration State', 'string'),
 ('Plate Type', 'string'),
 ('Issue Date', 'date'),
 ('Violation Code', 'int'),
 ('Vehicle Body Type', 'string'),
 ('Vehicle Make', 'string'),
 ('Issuing Agency', 'string'),
 ('Violation Precinct', 'int'),
 ('Violation County', 'string'),
 ('Street Name', 'string'),
 ('Law Section', 'string'),
 ('Sub Division', 'string'),
 ('Vehicle Color', 'string'),
 ('Feet From Curb', 'string'),
 ('Violation Description', 'string'),
 ('Violation Time', 'string')]

In [35]:
df_cleaned.coalesce(1).write.format("csv") \
  .option("header", "true") \
  .mode("overwrite") \
  .save("output_final1_NYC.csv")


In [None]:
num_rows = df_cleaned.count()
num_columns = len(df_cleaned.columns)
print(f"Shape of DataFrame: ({num_rows}, {num_columns})")