In [None]:
from pyspark import SparkContext, SparkConf
conf = SparkConf().setAppName("ParkingViolations").setMaster("yarn")
sc = SparkContext(conf=conf)

In [None]:
data = sc.textFile("/input/Parking_Violations_Issued_-_Fiscal_Year_2023.csv")


#### 1. When are tickets most likely to be issued?

In [None]:
data_tuple = data.map(lambda line: line.split(",")).map(lambda x: (x[0], x[1], x[2], x[3], x[4], x[5], x[6], x[7], x[8], x[9], x[10], x[11], x[12], x[13], x[14], x[15], x[16]))

In [None]:
month_year_count = data_tuple.map(lambda x: (x[1][0:7], 1)).reduceByKey(lambda a, b: a + b)
max_count = month_year_count.max(key=lambda x: x[1])
print("Month and year with the most number of tickets issued:", max_count[0])


#### 2. What are the most common years and types of cars to be ticketed?

In [None]:
years_and_cars = data.map(lambda line: (line.split(",")[3], line.split(",")[5]))
tickets_by_year_and_car = years_and_cars.map(lambda year_and_car: (year_and_car, 1)).reduceByKey(lambda a, b: a + b)
# Print the top 10 results
for (year, car_type), count in tickets_by_year_and_car.takeOrdered(10, key=lambda x: -x[1]):
    print("{} {}: {} tickets".format(year, car_type, count))

#### 3. Where are tickets most commonly issued?

In [None]:
locations = data.map(lambda line: line.split(",")[24])
ticket_locations = locations.map(lambda loc: (loc, 1)).reduceByKey(lambda a, b: a + b)
# Print the top 10 locations
for location, count in ticket_locations.takeOrdered(10, key=lambda x: -x[1]):
    print("{}: {} tickets".format(location, count))

#### 4. Which color of the vehicle is most likely to get a ticket?

In [None]:
colors = data.map(lambda line: line.split(",")[19])
ticket_colors = colors.map(lambda color: (color, 1)).reduceByKey(lambda a, b: a + b)
# Print the top 10 colors
for color, count in ticket_colors.takeOrdered(10, key=lambda x: -x[1]):
    print("{}: {} tickets".format(color, count))

#### 5. Given a Black vehicle parking illegally at 34510, 10030, 34050 (street codes). What is the probability that it will get an ticket? 

In [None]:
from pyspark.ml.clustering import KMeans
from pyspark.ml.feature import VectorAssembler

data = spark.read.csv('/input/Parking_Violations_Issued_-_Fiscal_Year_2023.csv', header=True, inferSchema=True)
selected_data = data.select('Plate ID', 'Registration State', 'Street Code1', 'Street Code2', 'Street Code3', 'Vehicle Color')

cleaned_data = selected_data.na.drop()
cleaned_data = cleaned_data.withColumn('Street Code1', cleaned_data['Street Code1'].cast('int'))
cleaned_data = cleaned_data.withColumn('Street Code2', cleaned_data['Street Code2'].cast('int'))
cleaned_data = cleaned_data.withColumn('Street Code3', cleaned_data['Street Code3'].cast('int'))

assembler = VectorAssembler(inputCols=['Street Code1', 'Street Code2', 'Street Code3'], outputCol='features')
vector_data = assembler.transform(cleaned_data)

kmeans = KMeans().setK(5).setSeed(1)
model = kmeans.fit(vector_data)

test_data = [(34510, 10030, 34050)]
test_df = spark.createDataFrame(test_data, ['Street Code1', 'Street Code2', 'Street Code3'])
test_vector_data = assembler.transform(test_df)
predicted_cluster = model.transform(test_vector_data).head()[3]

# Calculate the probability of getting a ticket in the predicted cluster
cluster_data = vector_data.filter(vector_data['prediction'] == predicted_cluster)
black_cars = cluster_data.filter(cluster_data['Vehicle Color'] == 'BLACK').count()
total_cars = cluster_data.count()
probability = black_cars / total_cars
print("Probability of getting a ticket: ", probability)


##### the number of clusters is set to 5 but this would need to be experimented with