# ENGR-E 516 Assignment 2
## Srinivas Kini
## skini@iu.edu

In [66]:
# Import spark libraries
import math
from pyspark.sql.window import Window
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *
import pandas as pd 
import numpy as np 

In [2]:
# Create a spark session 
spark = SparkSession.builder.appName('skini-a2').getOrCreate()
context = spark.sparkContext

23/04/10 11:17:08 WARN Utils: Your hostname, Srinivass-MacBook-Air.local resolves to a loopback address: 127.0.0.1; using 10.0.0.229 instead (on interface en0)
23/04/10 11:17:08 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/04/10 11:17:09 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


# Part 1

In [3]:
# Read the parking violations file
ny_df = spark.read.options(header=True, inferschema=True).csv("ny_parking_violations.csv")

                                                                                

In [5]:
# Renaming columns for ease of use in Spark SQL
col_dict = {}
for column in ny_df.columns:
    new_col_name = '_'.join(column.lower().split(' '))
    ny_df = ny_df.withColumnRenamed(column, new_col_name)
ny_df.columns


['summons_number',
 'plate_id',
 'registration_state',
 'plate_type',
 'issue_date',
 'violation_code',
 'vehicle_body_type',
 'vehicle_make',
 'issuing_agency',
 'street_code1',
 'street_code2',
 'street_code3',
 'vehicle_expiration_date',
 'violation_location',
 'violation_precinct',
 'issuer_precinct',
 'issuer_code',
 'issuer_command',
 'issuer_squad',
 'violation_time',
 'time_first_observed',
 'violation_county',
 'violation_in_front_of_or_opposite',
 'house_number',
 'street_name',
 'intersecting_street',
 'date_first_observed',
 'law_section',
 'sub_division',
 'violation_legal_code',
 'days_parking_in_effect____',
 'from_hours_in_effect',
 'to_hours_in_effect',
 'vehicle_color',
 'unregistered_vehicle?',
 'vehicle_year',
 'meter_number',
 'feet_from_curb',
 'violation_post_code',
 'violation_description',
 'no_standing_or_stopping_violation',
 'hydrant_violation',
 'double_parking_violation']

In [6]:
# Create a temp. VIEW for Spark SQl
table_name = "parking_violations"
ny_df.createOrReplaceTempView(table_name)

## Q1: When are tickets most likely to be issued?

In [7]:
print("Based on Date\n")
q1_query = f'''
SELECT
  SUBSTR(issue_date, 1, 10) AS issued_on,
  COUNT(*) AS tickets
FROM {table_name}
GROUP BY issued_on
ORDER BY tickets DESC
LIMIT 5
'''.strip()

spark.sql(q1_query).show()

print("\nBased on Year\n")
q1_query = f'''
SELECT
  YEAR(TO_DATE(CAST(UNIX_TIMESTAMP(issue_date,'MM/dd/yyyy') AS timestamp))) AS year,
  COUNT(*) AS tickets
FROM {table_name}
GROUP BY year
ORDER BY tickets DESC
LIMIT 5
'''.strip()

spark.sql(q1_query).show()



Based on Date



                                                                                

+----------+-------+
| issued_on|tickets|
+----------+-------+
|08/04/2022|  66726|
|08/05/2022|  65393|
|08/02/2022|  64876|
|06/30/2022|  64846|
|07/19/2022|  64815|
+----------+-------+


Based on Year





+----+-------+
|year|tickets|
+----+-------+
|2022|9154317|
|2023|2380085|
|2021|    477|
|2024|    117|
|2020|     90|
+----+-------+



                                                                                

## Q2: What are the most common years and types of cars to be ticketed?


In [8]:
q2_query = f'''
SELECT
  vehicle_year AS year,
  vehicle_make AS make,
  COUNT(*) AS num_tickets
FROM {table_name}
WHERE vehicle_year IS NOT NULL AND vehicle_year <> 0 AND vehicle_make IS NOT NULL
GROUP BY vehicle_year, vehicle_make
ORDER BY num_tickets DESC
LIMIT 5
'''.strip()

spark.sql(q2_query).show()




+----+-----+-----------+
|year| make|num_tickets|
+----+-----+-----------+
|2021|TOYOT|     117999|
|2019|HONDA|     113890|
|2021|HONDA|     107202|
|2020|HONDA|     104349|
|2022|TOYOT|      95782|
+----+-----+-----------+



                                                                                

## Q3: Where are tickets most commonly issued?

In [9]:
q3_query = '''
SELECT
  {} AS {}, COUNT(*) AS tickets
FROM parking_violations
WHERE {} IS NOT NULL
GROUP BY {}
ORDER BY tickets DESC
LIMIT 5
'''.strip()

print("By violation_county\n")
spark.sql(q3_query.format(*(["violation_county"] * 4))).show()

print("By violation_precinct\n")
spark.sql(q3_query.format(*(["violation_precinct"] * 4))).show()

print("By violation_location\n")
spark.sql(q3_query.format(*(["violation_location"] * 4))).show()

By violation_county



                                                                                

+----------------+-------+
|violation_county|tickets|
+----------------+-------+
|              NY|2450153|
|              QN|1858441|
|              BK|1732079|
|              BX|1497854|
|               K|1365103|
+----------------+-------+

By violation_precinct



                                                                                

+------------------+-------+
|violation_precinct|tickets|
+------------------+-------+
|                 0|5349526|
|                19| 282466|
|                13| 254057|
|                 6| 224686|
|               114| 221523|
+------------------+-------+

By violation_location





+------------------+-------+
|violation_location|tickets|
+------------------+-------+
|                19| 282466|
|                13| 254057|
|                 6| 224686|
|               114| 221523|
|                14| 190012|
+------------------+-------+



                                                                                

## Which color of the vehicle is most likely to get a ticket?

In [10]:
q4_query = f'''
SELECT vehicle_color AS color,
  COUNT(*) AS num_tickets
FROM parking_violations
WHERE vehicle_color IS NOT NULL
GROUP BY vehicle_color
ORDER BY num_tickets DESC
LIMIT 5
'''.strip()
spark.sql(q4_query).show()



+-----+-----------+
|color|num_tickets|
+-----+-----------+
|   GY|    2275457|
|   WH|    2055818|
|   BK|    1992788|
|   BL|     760235|
|WHITE|     671757|
+-----+-----------+



                                                                                

## K-Means

In [11]:
from pyspark.ml.feature import OneHotEncoder, StringIndexer
from pyspark.ml import Pipeline

# Filter out null rows
ny_df = ny_df.filter((ny_df.vehicle_color.isNotNull()) & (ny_df.street_code1.isNotNull()))

# Convert `vehicle_color` to numerical feature using One-Hot Encoding
indexer = StringIndexer(inputCol="vehicle_color", outputCol="vehicle_color_index")
encoder = OneHotEncoder(inputCol="vehicle_color_index", outputCol="vehicle_color_vec")
pipeline = Pipeline(stages=[indexer, encoder])
ny_df_kmeans = pipeline.fit(ny_df).transform(ny_df).select('vehicle_color_vec', 'street_code1')
ny_df_kmeans = VectorAssembler(inputCols=['vehicle_color_vec', 'street_code1'], outputCol="features").transform(ny_df_kmeans)

# Train K-Means model
kmeans = KMeans(featuresCol='features', k=3)
model = kmeans.fit(ny_df_kmeans)

                                                                                

In [12]:
from pyspark.ml.linalg import Vectors

# Preprocess the input DataFrame
input_df = spark.createDataFrame([(Vectors.dense([1.0, 0.0, 0.0]), 34510), (Vectors.dense([1.0, 0.0, 0.0]), 10030), (Vectors.dense([1.0, 0.0, 0.0]), 34050)], ["vehicle_color_vec", "street_code1"])

# Preprocess the input DataFrame
indexer = StringIndexer(inputCol="vehicle_color_vec", outputCol="vehicle_color_index")
indexer = StringIndexer(inputCol="vehicle_color_vec", outputCol="vehicle_color_index")
encoder = OneHotEncoder(inputCol="vehicle_color_index", outputCol="vehicle_color_vec_encoded")
pipeline = Pipeline(stages=[indexer, encoder])
input_df = pipeline.fit(input_df).transform(input_df)

# Assemble feature vector and predict using K-Means model
input_data = VectorAssembler(inputCols=['vehicle_color_vec_encoded', 'street_code1'], outputCol="features").transform(input_df)
predictions = model.transform(input_data)

# Calculate the probability of getting a ticket based on the cluster assignments of the input data
cluster_assignments = predictions.select('prediction').rdd.flatMap(lambda x: x).collect()
probabilities = model.summary.clusterSizes / sum(model.summary.clusterSizes)
ticket_probability = probabilities[cluster_assignments[0]]

Traceback (most recent call last):
  File "/opt/homebrew/lib/python3.11/site-packages/pyspark/serializers.py", line 458, in dumps
    return cloudpickle.dumps(obj, pickle_protocol)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/homebrew/lib/python3.11/site-packages/pyspark/cloudpickle/cloudpickle_fast.py", line 73, in dumps
    cp.dump(obj)
  File "/opt/homebrew/lib/python3.11/site-packages/pyspark/cloudpickle/cloudpickle_fast.py", line 602, in dump
    return Pickler.dump(self, obj)
           ^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/homebrew/lib/python3.11/site-packages/pyspark/cloudpickle/cloudpickle_fast.py", line 692, in reducer_override
    return self._function_reduce(obj)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/homebrew/lib/python3.11/site-packages/pyspark/cloudpickle/cloudpickle_fast.py", line 565, in _function_reduce
    return self._dynamic_function_reduce(obj)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/homebrew/lib/python3.11/site-pac

PicklingError: Could not serialize object: IndexError: tuple index out of range

## Part 2

In [69]:
nba = spark.read.options(header=True, inferschema=True).csv("shot_logs.csv")
nba.printSchema()

root
 |-- GAME_ID: integer (nullable = true)
 |-- MATCHUP: string (nullable = true)
 |-- LOCATION: string (nullable = true)
 |-- W: string (nullable = true)
 |-- FINAL_MARGIN: integer (nullable = true)
 |-- SHOT_NUMBER: integer (nullable = true)
 |-- PERIOD: integer (nullable = true)
 |-- GAME_CLOCK: timestamp (nullable = true)
 |-- SHOT_CLOCK: double (nullable = true)
 |-- DRIBBLES: integer (nullable = true)
 |-- TOUCH_TIME: double (nullable = true)
 |-- SHOT_DIST: double (nullable = true)
 |-- PTS_TYPE: integer (nullable = true)
 |-- SHOT_RESULT: string (nullable = true)
 |-- CLOSEST_DEFENDER: string (nullable = true)
 |-- CLOSEST_DEFENDER_PLAYER_ID: integer (nullable = true)
 |-- CLOSE_DEF_DIST: double (nullable = true)
 |-- FGM: integer (nullable = true)
 |-- PTS: integer (nullable = true)
 |-- player_name: string (nullable = true)
 |-- player_id: integer (nullable = true)



## _For each pair of the players (A, B), we define the fear sore of A when facing B is the hit_
## _rate, such that B is closet defender when A is shooting. Based on the fear sore, for each_
## _player, please find out who is his ”most unwanted defender”_

In [24]:
# Create a temp. VIEW for Spark SQl
table_name = "shots"
nba.createOrReplaceTempView(table_name)

### First, we will group the data by the player and the closest defender and aggregate the shots that they have missed and hit for that defender

In [88]:
# Calculate hits and misses based on SHOT_RESULT
agg = nba.groupBy("player_name","closest_defender")\
    .agg(count(when(nba["SHOT_RESULT"]=="made", True))\
    .alias('hits'),count("SHOT_RESULT").alias('misses'))
agg.show()

+----------------+------------------+----+------+
|     player_name|  closest_defender|hits|misses|
+----------------+------------------+----+------+
|   brian roberts|        Gasol, Pau|   0|     1|
|    al jefferson| Hardaway Jr., Tim|   0|     1|
|     cody zeller|     Price, Ronnie|   0|     1|
|       gary neal|     Beal, Bradley|   3|     3|
|       gary neal|     Smart, Marcus|   0|     4|
|gerald henderson|    Bazemore, Kent|   0|     2|
|    kemba walker|     Williams, Lou|   1|     2|
|lance stephenson|    Fournier, Evan|   0|     2|
| marvin williams| Early, Cleanthony|   1|     1|
|  gordon hayward|Aldridge, LaMarcus|   2|     5|
|  gordon hayward|    Bazemore, Kent|   1|     1|
|   trevor booker|   Thompson, Jason|   3|     3|
|   trevor booker|    Frye, Channing|   1|     2|
|     enes kanter|   Chandler, Tyson|   1|     6|
|      dante exum|      Williams, Mo|   0|     3|
|      jon ingles|     Jack, Jarrett|   1|     1|
|      jon ingles|     Williams, Lou|   1|     5|


## To calculate the hit rate, we can use the following equation
### hit_rate = hits / (hits + misses)

In [89]:
agg = agg.withColumn('hit_rate', round(agg['hits']/(agg['hits'] + agg['misses']), 2))
agg.orderBy(agg['player_name']).show()

+------------+-----------------+----+------+--------+
| player_name| closest_defender|hits|misses|hit_rate|
+------------+-----------------+----+------+--------+
|aaron brooks|   Thompson, Klay|   1|     3|    0.25|
|aaron brooks|Livingston, Shaun|   1|     2|    0.33|
|aaron brooks|     Smith, Jason|   1|     2|    0.33|
|aaron brooks|    Lee, Courtney|   1|     3|    0.25|
|aaron brooks| Carroll, DeMarre|   1|     1|     0.5|
|aaron brooks|    Nurkic, Jusuf|   0|     2|     0.0|
|aaron brooks|     Lopez, Robin|   2|     3|     0.4|
|aaron brooks|    Harris, Devin|   0|     1|     0.0|
|aaron brooks|      Green, Jeff|   0|     1|     0.0|
|aaron brooks|     LaVine, Zach|   1|     3|    0.25|
|aaron brooks|    Ariza, Trevor|   1|     1|     0.5|
|aaron brooks|   Hayes, Charles|   0|     2|     0.0|
|aaron brooks| Williams, Marvin|   1|     2|    0.33|
|aaron brooks|  Napier, Shabazz|   4|     6|     0.4|
|aaron brooks|  Lillard, Damian|   3|     6|    0.33|
|aaron brooks|      Scola, L

### To remove duplicate values for the hit_rate we will rank the data based on player_name

In [90]:
agg = agg.withColumn("rank",row_number().over(Window.partitionBy("player_name")\
    .orderBy(agg["hit_rate"].asc())))
agg.show()

+------------+--------------------+----+------+--------+----+
| player_name|    closest_defender|hits|misses|hit_rate|rank|
+------------+--------------------+----+------+--------+----+
|aaron brooks|       Nurkic, Jusuf|   0|     2|     0.0|   1|
|aaron brooks|       Harris, Devin|   0|     1|     0.0|   2|
|aaron brooks|         Green, Jeff|   0|     1|     0.0|   3|
|aaron brooks|      Hayes, Charles|   0|     2|     0.0|   4|
|aaron brooks|         Scola, Luis|   0|     1|     0.0|   5|
|aaron brooks|         Exum, Dante|   0|     3|     0.0|   6|
|aaron brooks|          Lawson, Ty|   0|     5|     0.0|   7|
|aaron brooks|     Crawford, Jamal|   0|     1|     0.0|   8|
|aaron brooks|      Fournier, Evan|   0|     1|     0.0|   9|
|aaron brooks|       O'Quinn, Kyle|   0|     1|     0.0|  10|
|aaron brooks|        Wear, Travis|   0|     1|     0.0|  11|
|aaron brooks|   Dos Santos, Atila|   0|     1|     0.0|  12|
|aaron brooks|        Hairston, PJ|   0|     1|     0.0|  13|
|aaron b

### Next, we will only get those ranks which are 1 to get the 'Most Unwanted Defender'

In [91]:
mud = agg.where(agg['rank'] == 1).withColumnRenamed("closest_defender", "most_unwanted_defender")\
    .select("player_name", "most_unwanted_defender", "rank")
mud.show()

+----------------+----------------------+----+
|     player_name|most_unwanted_defender|rank|
+----------------+----------------------+----+
|    aaron brooks|         Nurkic, Jusuf|   1|
|    aaron gordon|        Rivers, Austin|   1|
| al farouq aminu|        Johnson, James|   1|
|      al horford|           Diaw, Boris|   1|
|    al jefferson|     Hardaway Jr., Tim|   1|
|   alan anderson|            Leuer, Jon|   1|
|     alan crabbe|      Sefolosha, Thabo|   1|
|        alex len|       Knight, Brandon|   1|
|   alexis ajinca|          Meeks, Jodie|   1|
|      alonzo gee|          Korver, Kyle|   1|
|amare stoudemire|            Deng, Luol|   1|
|    amir johnson|         Grant, Jerami|   1|
|  andre drummond|         James, LeBron|   1|
|  andre iguodala|           Lowry, Kyle|   1|
|    andre miller|          Turner, Evan|   1|
|  andre roberson|        Ginobili, Manu|   1|
|    andrew bogut|          Ibaka, Serge|   1|
|  andrew wiggins|        Sanders, Larry|   1|
| anthony ben

## _For each player, we define the comfortable zone of shooting is a matrix of,_
## _{SHOT DIST, CLOSE DEF DIST, SHOT CLOCK}_
## _Please develop a Spark-based algorithm to classify each player’s records into 4 comfort-_
## _able zones. Considering the hit rate, which zone is the best for James Harden, Chris Paul, Stephen Curry, and Lebron James._

### To calculate comfortable zones for these players, we can make use of K-Means clustering where each cluster maps to a zone. Since we need 4 zones, there will be 4 clusters

In [92]:
zones = nba.select('player_name','shot_result','shot_dist','close_def_dist','shot_clock').dropna().dropDuplicates()
zones.show()

+--------------------+-----------+---------+--------------+----------+
|         player_name|shot_result|shot_dist|close_def_dist|shot_clock|
+--------------------+-----------+---------+--------------+----------+
|       brian roberts|     missed|     19.6|           4.6|      16.7|
|       brian roberts|     missed|     23.8|           5.5|      17.6|
|       brian roberts|       made|     20.5|           5.3|      14.4|
|       brian roberts|     missed|     24.5|           5.2|      15.3|
|        al jefferson|       made|     19.3|           7.6|      10.0|
|        al jefferson|       made|      7.7|           2.2|       3.8|
|         cody zeller|     missed|     18.1|           4.9|      12.1|
|         cody zeller|     missed|      3.1|           2.4|      24.0|
|           gary neal|       made|     17.4|           3.1|      11.0|
|           gary neal|     missed|     25.5|           5.5|      11.8|
|           gary neal|     missed|      6.5|           0.2|      14.8|
|    g

In [93]:
assembler = VectorAssembler(inputCols=['shot_dist','close_def_dist','shot_clock'], outputCol="features",handleInvalid="skip")
kmeans = KMeans(featuresCol='features', k=4, seed=10)    
t_data = assembler.transform(zones)
op_fit = kmeans.fit(t_data)
zones = op_fit.transform(t_data)

                                                                                

In [94]:
zones.show()

+--------------------+-----------+---------+--------------+----------+---------------+----------+
|         player_name|shot_result|shot_dist|close_def_dist|shot_clock|       features|prediction|
+--------------------+-----------+---------+--------------+----------+---------------+----------+
|       brian roberts|     missed|     19.6|           4.6|      16.7|[19.6,4.6,16.7]|         1|
|       brian roberts|     missed|     23.8|           5.5|      17.6|[23.8,5.5,17.6]|         1|
|       brian roberts|       made|     20.5|           5.3|      14.4|[20.5,5.3,14.4]|         1|
|       brian roberts|     missed|     24.5|           5.2|      15.3|[24.5,5.2,15.3]|         1|
|        al jefferson|       made|     19.3|           7.6|      10.0|[19.3,7.6,10.0]|         2|
|        al jefferson|       made|      7.7|           2.2|       3.8|  [7.7,2.2,3.8]|         0|
|         cody zeller|     missed|     18.1|           4.9|      12.1|[18.1,4.9,12.1]|         1|
|         cody zelle

### Next, we can calculate the hit_rate like we did for the previous question

In [95]:
zones = zones.groupBy("prediction")\
    .agg(count(when(zones["SHOT_RESULT"]=="made", True))\
    .alias('hits'),count("SHOT_RESULT").alias('misses'))
zones = zones.withColumn('hit_rate', round(zones['hits']/(zones['hits'] + zones['misses']), 2))
zones.show()



+----------+-----+------+--------+
|prediction| hits|misses|hit_rate|
+----------+-----+------+--------+
|         1|12692| 32309|    0.28|
|         3|15779| 26674|    0.37|
|         2|10589| 28652|    0.27|
|         0|16801| 34821|    0.33|
+----------+-----+------+--------+



                                                                                

### From the previous dataframe, let us fetch the hit rates for James Harden, Chris Paul, Stephen Curry, and Lebron James

In [96]:
agg = agg.where(agg['player_name'].isin(['james harden','chris paul','stephen curry','lebron james']))
agg.show()

+-----------+--------------------+----+------+--------+----+
|player_name|    closest_defender|hits|misses|hit_rate|rank|
+-----------+--------------------+----+------+--------+----+
| chris paul|       Brewer, Corey|   0|     1|     0.0|   1|
| chris paul|    Prince, Tayshaun|   0|     1|     0.0|   2|
| chris paul|        Ennis, James|   0|     1|     0.0|   3|
| chris paul|       Marion, Shawn|   0|     1|     0.0|   4|
| chris paul|      Shumpert, Iman|   0|     1|     0.0|   5|
| chris paul|   Livingston, Shaun|   0|     1|     0.0|   6|
| chris paul|     Arthur, Darrell|   0|     1|     0.0|   7|
| chris paul|     Leonard, Meyers|   0|     2|     0.0|   8|
| chris paul|     Williams, Deron|   0|     1|     0.0|   9|
| chris paul|Dellavedova, Matthew|   0|     1|     0.0|  10|
| chris paul|        Dorsey, Joey|   0|     1|     0.0|  11|
| chris paul|       Waiters, Dion|   0|     1|     0.0|  12|
| chris paul|    Carroll, DeMarre|   0|     2|     0.0|  13|
| chris paul|   Williams