# Engineering Cloud Computing (ENGR-E 516)
## Assignment 2

### NY Parking Violations

In [1]:
# Checking if pyspark is installed properly and its path is right

import pyspark
import findspark

findspark.init()

In [2]:
# Importing all the required libraries
import scipy
from scipy import spatial
import numpy as np
import pandas as pd
import sys
import pyspark 
from pyspark.sql import Row
from pyspark.sql import functions as pysf
from pyspark.sql.window import Window
from pyspark.sql.functions import desc, when, count, col, sum, regexp_replace,expr, avg, first
from pyspark.ml.clustering import KMeans
from pyspark.ml.feature import VectorAssembler
from pyspark.sql import SparkSession 
from pyspark.sql import functions as Func
from pyspark.sql.types import IntegerType
from pyspark import SparkContext, SparkConf
from functools import reduce



In [3]:
import warnings
warnings.filterwarnings("ignore")


In [4]:
# Creating Spark Session with the name 'ECC Assignment 2'
# Allocating for 4 cores
spark = SparkSession.builder\
        .master("local[4]")\
        .appName("ECC_Assignment_2")\
        .config("spark.executor.memory", "4g") \
        .getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/04/13 00:38:59 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [5]:
file_path='file:////home/hadoop/assignment2/nyc_data/nyc_data.csv'

parking_df = spark.read.csv(file_path, header=True, inferSchema=True)

                                                                                

In [6]:
parking_df.head()

24/04/13 00:39:21 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


Row(summons_number=1159637337, plate_id='KZH2758', registration_state='NY', plate_type='PAS', issue_date=datetime.datetime(2023, 6, 9, 0, 0), violation_code=67, vehicle_body_type='VAN', vehicle_make='HONDA', issuing_agency='P', street_code1=0, street_code2=0, street_code3=0, vehicle_expiration_date=20250201.0, violation_location=43, violation_precinct=43, issuer_precinct=43, issuer_code=972773, issuer_command='0043', issuer_squad='0000', violation_time='0911A', time_first_observed=None, violation_county='BX', violation_in_front_of_or_opposite=None, house_number=None, street_name='I/O TAYLOR AVE', intersecting_street='GUERLAIN', date_first_observed=0, law_section=408, sub_division='E5', violation_legal_code=None, days_parking_in_effect='BBBBBBB', from_hours_in_effect='ALL', to_hours_in_effect='ALL', vehicle_color='BLUE', unregistered_vehicle=0, vehicle_year=2006, meter_number='-', feet_from_curb=0, violation_post_code=None, violation_description=None, no_standing_or_stopping_violation=N

In [7]:
num_rows = parking_df.count()
print("Number of rows:", num_rows)

num_cols = len(parking_df.columns)
print("Number of columns:", num_cols)




Number of rows: 10717482
Number of columns: 43


                                                                                

In [8]:
parking_df.describe()

DataFrame[summary: string, summons_number: string, plate_id: string, registration_state: string, plate_type: string, violation_code: string, vehicle_body_type: string, vehicle_make: string, issuing_agency: string, street_code1: string, street_code2: string, street_code3: string, vehicle_expiration_date: string, violation_location: string, violation_precinct: string, issuer_precinct: string, issuer_code: string, issuer_command: string, issuer_squad: string, violation_time: string, time_first_observed: string, violation_county: string, violation_in_front_of_or_opposite: string, house_number: string, street_name: string, intersecting_street: string, date_first_observed: string, law_section: string, sub_division: string, violation_legal_code: string, days_parking_in_effect: string, from_hours_in_effect: string, to_hours_in_effect: string, vehicle_color: string, unregistered_vehicle: string, vehicle_year: string, meter_number: string, feet_from_curb: string, violation_post_code: string, vio

### Counting the number of null values in each column

In [9]:
from pyspark.sql.functions import col, count, when
from functools import reduce

# Specify the number of threads
num_threads = 4

# Split columns into groups
columns_per_thread = [parking_df.columns[i::num_threads] for i in range(num_threads)]

# Count nulls in parallel
counts_per_thread = []

for i, columns in enumerate(columns_per_thread):
    print(f"Processing thread {i + 1}/{num_threads}...")

    # Count nulls for the current set of columns
    counts = parking_df.select([count(when(col(c).isNull(), c)).alias(c) for c in columns]).collect()

    # Append the result to the list
    counts_per_thread.append(counts)

Processing thread 1/4...


                                                                                

Processing thread 2/4...


                                                                                

Processing thread 3/4...


                                                                                

Processing thread 4/4...


                                                                                

In [10]:
total_counts = reduce(lambda x, y: x + y, counts_per_thread)

total_counts

[Row(summons_number=0, issue_date=0, issuing_agency=0, vehicle_expiration_date=0, issuer_code=0, time_first_observed=10087137, street_name=1507, sub_division=1767, to_hours_in_effect=7338965, meter_number=9381186, no_standing_or_stopping_violation=10717482),
 Row(plate_id=1, violation_code=0, street_code1=0, violation_location=4923863, issuer_command=4918591, violation_county=102892, intersecting_street=4980009, violation_legal_code=5799044, vehicle_color=1015118, feet_from_curb=0, hydrant_violation=10717482),
 Row(registration_state=0, vehicle_body_type=28485, street_code2=0, violation_precinct=0, issuer_squad=5292644, violation_in_front_of_or_opposite=4973482, date_first_observed=0, days_parking_in_effect=5014718, unregistered_vehicle=10490502, violation_post_code=5519326, double_parking_violation=10717482),
 Row(plate_type=0, vehicle_make=10679, street_code3=0, issuer_precinct=0, violation_time=336, house_number=5015875, law_section=0, from_hours_in_effect=7338955, vehicle_year=0, v

#### time_first_observed has a count of 10087137 null values, street_name has 1507, and so on.

### Question 1

When are tickets most likely to be issued?

In [11]:
# Creating a view which will come handy in querying the dataset
parking_df.createTempView("parking_view")

In [12]:
# SQL query to count the occurrences of each unique time of violation. 
#It then groups the data by violation_time and orders the result set by the count of violations in descending order, 
#revealing the most common times for violations.

violation_time = spark.sql('''SELECT `violation_time`, COUNT(*) AS count FROM parking_view 
            GROUP BY `violation_time` 
            ORDER BY count DESC''')

violation_time.show(30)



+--------------+-----+
|violation_time|count|
+--------------+-----+
|         0836A|20112|
|         0839A|19531|
|         0840A|19436|
|         0838A|19433|
|         0906A|19412|
|         1139A|19255|
|         1140A|19154|
|         1141A|19135|
|         0841A|18987|
|         1142A|18936|
|         1145A|18917|
|         0837A|18893|
|         0842A|18806|
|         1143A|18738|
|         0910A|18698|
|         1138A|18679|
|         0845A|18648|
|         0908A|18648|
|         0909A|18604|
|         1136A|18541|
|         1144A|18524|
|         0907A|18464|
|         0843A|18363|
|         1146A|18324|
|         1147A|18313|
|         0844A|18185|
|         1137A|17977|
|         0846A|17964|
|         1148A|17963|
|         0912A|17929|
+--------------+-----+
only showing top 30 rows



                                                                                

#### Observations:
The highest count of violations issued at a specific time is 20112, which corresponds to "0836A", indicating that 8:36 AM has the highest number of violations issued among the top 30 times.

### Question 2

What are the most common years and types of cars to be ticketed?

In [13]:
# SQL query to count the number of occurrences for each unique combination of vehicle type and year. 
#It filters out null values and invalid years, then groups the data by vehicle type and year, 
#finally ordering the results by the count of violations in descending order.

common_year_car = spark.sql('''SELECT `vehicle_body_type` AS `vehicle_type`, CAST(`vehicle_year` AS INT) AS `vehicle_year`, COUNT(*) AS `violation_count` FROM parking_view 
                        WHERE `vehicle_body_type` IS NOT NULL AND `vehicle_year` > 0 
                        GROUP BY `vehicle_body_type`, `vehicle_year` 
                        ORDER BY `violation_count` DESC''')
common_year_car.show(30)



+------------+------------+---------------+
|vehicle_type|vehicle_year|violation_count|
+------------+------------+---------------+
|        SUBN|        2021|         468828|
|        SUBN|        2022|         452377|
|        SUBN|        2023|         447136|
|        SUBN|        2019|         345021|
|        SUBN|        2020|         343283|
|        SUBN|        2018|         275702|
|        SUBN|        2017|         226828|
|        SUBN|        2016|         186233|
|        SUBN|        2015|         180925|
|        4DSD|        2017|         155318|
|        4DSD|        2019|         152339|
|        4DSD|        2018|         146010|
|        SUBN|        2014|         142950|
|        4DSD|        2020|         138040|
|        4DSD|        2023|         134263|
|        4DSD|        2021|         134002|
|        4DSD|        2022|         131561|
|        SUBN|        2013|         130515|
|        4DSD|        2015|         129270|
|        4DSD|        2016|     

                                                                                

#### Observations:
The top combinations include "SUBN" vehicles from recent years such as 2021, 2022, and 2023, followed by "4DSD" vehicles from similar years. The counts decrease as the vehicle years go further back in time, suggesting that newer vehicles are more commonly ticketed.

### Question 3

Where are tickets most commonly issued?

In [14]:
#SQL query to counts the number of tickets issued at each unique violation location  from parking_view. 
#It filters out null values in the Violation_Location column, groups the data by location, 
#and orders the result set by the count of tickets issued at each location in descending order, highlighting the most common violation locations.

common_loc = spark.sql('''SELECT COUNT(*) AS Number_of_tickets, `Violation_Location` FROM parking_view 
                    WHERE `Violation_Location` IS NOT NULL GROUP BY `Violation_Location` 
                    ORDER BY Number_of_tickets DESC''')
common_loc.show(30)



+-----------------+------------------+
|Number_of_tickets|Violation_Location|
+-----------------+------------------+
|           276203|                19|
|           213205|               114|
|           207636|                 6|
|           189589|                13|
|           178348|                14|
|           153765|               109|
|           148286|                 1|
|           147809|                18|
|           142074|                 9|
|           135832|               115|
|           116439|                61|
|           115903|                66|
|           115747|                20|
|           109812|               112|
|           107721|                70|
|           104404|                84|
|           104246|               103|
|           103097|                52|
|           102733|               108|
|            98620|                46|
|            94933|                10|
|            88065|               110|
|            84947|      

                                                                                

#### Observations:
Locations with violation codes "19", "114", "6", "13", and "14" have the highest number of tickets issued, with counts ranging from 189,589 to 276,203 tickets.

### Question 4

Which color of the vehicle is most likely to get a ticket?

In [15]:
# SQl query to count the number of vehicles for each unique vehicle color from parking_view.
#It groups the data by vehicle color and orders the result set by the count of vehicles in descending order

vehicle_color = spark.sql('''SELECT `Vehicle_Color`, COUNT(*) AS count FROM parking_view 
                    GROUP BY `Vehicle_Color`
                    ORDER BY count DESC''')
vehicle_color.show(30)



+-------------+-------+
|Vehicle_Color|  count|
+-------------+-------+
|           GY|2086349|
|           WH|1924604|
|           BK|1821703|
|         NULL|1015118|
|           BL| 688918|
|        WHITE| 610935|
|        BLACK| 401993|
|           RD| 393388|
|         GREY| 303176|
|         BLUE| 140721|
|           GR| 134699|
|        SILVE| 134375|
|        BROWN| 129885|
|          RED| 116436|
|          BLK|  89406|
|           TN|  70852|
|           BR|  68615|
|           YW|  65868|
|          GRY|  64505|
|          WHI|  54907|
|           GL|  39134|
|        OTHER|  33184|
|           MR|  30204|
|        GREEN|  26247|
|           OR|  24657|
|          SIL|  22166|
|         GRAY|  21722|
|          BLU|  20578|
|        YELLO|  15621|
|         LTGY|  12880|
+-------------+-------+
only showing top 30 rows



                                                                                

#### Observations:
The most common vehicle colors are "GY" (presumably gray), "WH" (white), and "BK" (black), with counts exceeding millions of vehicles each.

### Question 5

Given a Black vehicle parking illegally at 34510, 10030, 34050 (street codes). What is the probability that it will get an ticket?

In [16]:
def find_nearest_cluster(point, cluster_centers):
    # euclidean distances between point and all cluster centers
    distances = np.sqrt(np.sum((point - cluster_centers) ** 2, axis=1))

    # nearest cluster
    nearest_cluster_idx = np.argmin(distances)

    return nearest_cluster_idx

In [17]:
#Converting to float
parking_df = parking_df.withColumn("Street_Code1", col("Street_Code1").cast("float"))
parking_df = parking_df.withColumn("Street_Code2", col("Street_Code2").cast("float"))
parking_df = parking_df.withColumn("Street_Code3", col("Street_Code3").cast("float"))

In [18]:
# preparing data for clustering by combining the street codes into a vector
assembler = VectorAssembler(inputCols=["Street_Code1", "Street_Code2", "Street_Code3"], outputCol="features")
kmeans_data = assembler.transform(parking_df.select("Street_Code1", "Street_Code2", "Street_Code3", "Vehicle_Color"))

In [19]:
# train the K-Means model with 3 clusters
kmeans = KMeans(featuresCol="features", k=3)
model = kmeans.fit(kmeans_data.select("features"))

24/04/13 00:42:37 WARN MemoryStore: Not enough space to cache rdd_104_4 in memory! (computed 44.2 MiB so far)
24/04/13 00:42:37 WARN BlockManager: Persisting block rdd_104_4 to disk instead.
24/04/13 00:42:37 WARN MemoryStore: Not enough space to cache rdd_104_6 in memory! (computed 44.2 MiB so far)
24/04/13 00:42:37 WARN BlockManager: Persisting block rdd_104_6 to disk instead.
24/04/13 00:42:37 WARN MemoryStore: Not enough space to cache rdd_104_7 in memory! (computed 44.2 MiB so far)
24/04/13 00:42:37 WARN BlockManager: Persisting block rdd_104_7 to disk instead.
24/04/13 00:42:39 WARN MemoryStore: Not enough space to cache rdd_104_6 in memory! (computed 44.2 MiB so far)
24/04/13 00:42:39 WARN MemoryStore: Not enough space to cache rdd_104_4 in memory! (computed 44.2 MiB so far)
24/04/13 00:42:40 WARN MemoryStore: Not enough space to cache rdd_104_8 in memory! (computed 29.5 MiB so far)
24/04/13 00:42:40 WARN BlockManager: Persisting block rdd_104_8 to disk instead.
24/04/13 00:42:4

In [20]:
# predict the cluster for the specified location
prediction = model.transform(kmeans_data)
print("Showing the clustered data:")
prediction.show()

Showing the clustered data:
+------------+------------+------------+-------------+--------------------+----------+
|Street_Code1|Street_Code2|Street_Code3|Vehicle_Color|            features|prediction|
+------------+------------+------------+-------------+--------------------+----------+
|         0.0|         0.0|         0.0|         BLUE|           (3,[],[])|         1|
|     17870.0|     25390.0|     32670.0|         GRAY|[17870.0,25390.0,...|         2|
|     17870.0|     25390.0|     32670.0|         GRAY|[17870.0,25390.0,...|         2|
|     12690.0|     41700.0|     61090.0|        WHITE|[12690.0,41700.0,...|         0|
|     12690.0|     41700.0|     61090.0|        WHITE|[12690.0,41700.0,...|         0|
|         0.0|         0.0|         0.0|           BK|           (3,[],[])|         1|
|      8690.0|     21690.0|     21740.0|         NULL|[8690.0,21690.0,2...|         2|
|         0.0|     61090.0|         0.0|          GRY|   [0.0,61090.0,0.0]|         2|
|         0.0| 

In [21]:
# different ways the color black is in data
color = ['BLK', 'Black', 'BC', 'BLAC', 'BCK', 'BLK.', 'BK', 'BK.', 'BK/', 'B LAC'] 
prediction_black = prediction.groupBy('prediction').agg(count(when(col('Vehicle_Color').isin(color), 1)).alias('Count'),\
     count('Vehicle_Color').alias('Total_Cars')).orderBy('prediction')

prediction_black = prediction_black.withColumn("probability", col("Count") / col("Total_Cars"))
print("Probability of black vehicle getting ticket in different clusters:")
prediction_black.show()

Probability of black vehicle getting ticket in different clusters:




+----------+-------+----------+-------------------+
|prediction|  Count|Total_Cars|        probability|
+----------+-------+----------+-------------------+
|         0| 237841|   1318337|0.18040986485246185|
|         1|1140955|   4808821|0.23726293825451186|
|         2| 533813|   3575206| 0.1493097180973628|
+----------+-------+----------+-------------------+



                                                                                

#### Observations:
Cluster 2: Has the lowest probability of a black vehicle getting a ticket, at about 14.93%. This suggests that, within this cluster, black vehicles are ticketed less frequently relative to the total number of cars ticketed.
Cluster 1: Shows a significantly higher probability of 23.72%, which is the highest among the three clusters. This could indicate that this cluster represents areas where black vehicles are more prone to getting tickets or are more heavily monitored.
Cluster 0: Has a moderate probability of 18.04%, falling between the other two clusters in terms of ticketing probability for black vehicles.

In [23]:
# find the cluster centers
cluster_centers = np.array(model.clusterCenters()).astype(float)

# find the cluster for the street codes
nearest_cluser = find_nearest_cluster([34510, 10030, 34050], cluster_centers)

In [24]:
print("Cluster for Street Code (34510, 10030, 34050) is:", nearest_cluser)

Cluster for Street Code (34510, 10030, 34050) is: 2


#### Observations:
The result indicates that the closest cluster to the street codes (34510, 10030, 34050) is Cluster 2.

In [25]:
# filter for this current cluster index
current_prediction = prediction_black.filter(col("prediction") == int(nearest_cluser))

print("Probability of Black vehicle parking illegally at 34510, 10030, 34050 getting ticket:")
current_prediction.show()

Probability of Black vehicle parking illegally at 34510, 10030, 34050 getting ticket:




+----------+------+----------+------------------+
|prediction| Count|Total_Cars|       probability|
+----------+------+----------+------------------+
|         2|533813|   3575206|0.1493097180973628|
+----------+------+----------+------------------+



                                                                                

#### Observations:
With a ticketing probability of about 0.142490231472495 (or 14.25%), Cluster 2 shows a relatively lower risk for black vehicles getting ticketed compared to other clusters

### NBA

### Question 1

For each pair of the players (A, B), we define the fear sore of A when facing B is the hit rate, such that B is closet defender when A is shooting. Based on the fear sore, for each player, please find out who is his "most unwanted defender".

In [26]:
file_path='file:////home/hadoop/assignment2/shot_logs/shot_logs.csv'

nba_df = spark.read.csv(file_path, header=True, inferSchema=True)

nba_df.printSchema()

root
 |-- GAME_ID: integer (nullable = true)
 |-- MATCHUP: string (nullable = true)
 |-- LOCATION: string (nullable = true)
 |-- W: string (nullable = true)
 |-- FINAL_MARGIN: integer (nullable = true)
 |-- SHOT_NUMBER: integer (nullable = true)
 |-- PERIOD: integer (nullable = true)
 |-- GAME_CLOCK: timestamp (nullable = true)
 |-- SHOT_CLOCK: double (nullable = true)
 |-- DRIBBLES: integer (nullable = true)
 |-- TOUCH_TIME: double (nullable = true)
 |-- SHOT_DIST: double (nullable = true)
 |-- PTS_TYPE: integer (nullable = true)
 |-- SHOT_RESULT: string (nullable = true)
 |-- CLOSEST_DEFENDER: string (nullable = true)
 |-- CLOSEST_DEFENDER_PLAYER_ID: integer (nullable = true)
 |-- CLOSE_DEF_DIST: double (nullable = true)
 |-- FGM: integer (nullable = true)
 |-- PTS: integer (nullable = true)
 |-- player_name: string (nullable = true)
 |-- player_id: integer (nullable = true)



In [27]:
nba_df.show()

+--------+--------------------+--------+---+------------+-----------+------+-------------------+----------+--------+----------+---------+--------+-----------+-----------------+--------------------------+--------------+---+---+-------------+---------+
| GAME_ID|             MATCHUP|LOCATION|  W|FINAL_MARGIN|SHOT_NUMBER|PERIOD|         GAME_CLOCK|SHOT_CLOCK|DRIBBLES|TOUCH_TIME|SHOT_DIST|PTS_TYPE|SHOT_RESULT| CLOSEST_DEFENDER|CLOSEST_DEFENDER_PLAYER_ID|CLOSE_DEF_DIST|FGM|PTS|  player_name|player_id|
+--------+--------------------+--------+---+------------+-----------+------+-------------------+----------+--------+----------+---------+--------+-----------+-----------------+--------------------------+--------------+---+---+-------------+---------+
|21400899|MAR 04, 2015 - CH...|       A|  W|          24|          1|     1|2024-04-13 01:09:00|      10.8|       2|       1.9|      7.7|       2|       made|   Anderson, Alan|                    101187|           1.3|  1|  2|brian roberts|   2031

In [28]:
num_rows = nba_df.count()
print("Number of rows:", num_rows)

num_cols = len(nba_df.columns)
print("Number of columns:", num_cols)


Number of rows: 128069
Number of columns: 21


### Grouping the dataframe by player and defender & aggregating the shot statistics

### Question 1
For each pair of the players (A, B), we define the fear sore of A when facing B is the hitrate, such that B is closet defender when A is shooting. Based on the fear sore, for each player, please find out who is his ”most unwanted defender”.

In [29]:
# Group by player and defender, compute shots made and total shots
shots_player_defender = nba_df.groupBy("player_name", "CLOSEST_DEFENDER").agg(
    sum(when(col("SHOT_RESULT") == "made", 1).otherwise(0)).alias("shot_result"),
    count("*").alias("total_shots")
)

In [30]:
# Calculate hit rate
shots_hitrate = shots_player_defender.withColumn("hit_rate", col("shot_result") / col("total_shots"))
shots_hitrate = shots_hitrate.dropDuplicates(["player_name", "CLOSEST_DEFENDER"])


In [31]:
from pyspark.sql.functions import col, sum, count, when, min, mean, max

# Find the most unwanted defender for each player
min_hitrate = shots_hitrate.groupBy("player_name").agg(min("hit_rate").alias("min_hit_rate"))
most_unwanted_defender = shots_hitrate.join(min_hitrate, ["player_name"]) \
    .where(col("hit_rate") == col("min_hit_rate")) \
    .select("player_name", "CLOSEST_DEFENDER", "hit_rate") \
    .orderBy("player_name")

most_unwanted_defender.show()

+------------+--------------------+--------+
| player_name|    CLOSEST_DEFENDER|hit_rate|
+------------+--------------------+--------+
|aaron brooks|       Nurkic, Jusuf|     0.0|
|aaron brooks|      Frye, Channing|     0.0|
|aaron brooks|       Harris, Devin|     0.0|
|aaron brooks|         Scola, Luis|     0.0|
|aaron brooks|         Exum, Dante|     0.0|
|aaron brooks|          Lawson, Ty|     0.0|
|aaron brooks|     Crawford, Jamal|     0.0|
|aaron brooks|      Fournier, Evan|     0.0|
|aaron brooks|       O'Quinn, Kyle|     0.0|
|aaron brooks|        Wear, Travis|     0.0|
|aaron brooks|   Dos Santos, Atila|     0.0|
|aaron brooks|        Hairston, PJ|     0.0|
|aaron brooks|     Johnson, Wesley|     0.0|
|aaron brooks|      Powell, Dwight|     0.0|
|aaron brooks|      Rivers, Austin|     0.0|
|aaron brooks|Antetokounmpo, Gi...|     0.0|
|aaron brooks|     Splitter, Tiago|     0.0|
|aaron brooks|      Johnson, Chris|     0.0|
|aaron brooks|    Schroder, Dennis|     0.0|
|aaron bro

In [32]:
# Adding a tie-breaking condition based on the total shots attempted
from pyspark.sql.functions import first

most_unwanted_defender = (
    shots_hitrate
    .join(min_hitrate, ["player_name"])
    .where(col("hit_rate") == col("min_hit_rate"))
    .groupBy("player_name")
    .agg(first("CLOSEST_DEFENDER").alias("CLOSEST_DEFENDER"), 
         first("hit_rate").alias("hit_rate"), 
         max("total_shots").alias("total_shots"))
    .orderBy("player_name")
)

most_unwanted_defender.show()

+----------------+-----------------+--------+-----------+
|     player_name| CLOSEST_DEFENDER|hit_rate|total_shots|
+----------------+-----------------+--------+-----------+
|    aaron brooks|    Nurkic, Jusuf|     0.0|          5|
|    aaron gordon|   Rivers, Austin|     0.0|          2|
| al farouq aminu|   Johnson, James|     0.0|          3|
|      al horford|      Diaw, Boris|     0.0|          4|
|    al jefferson|Hardaway Jr., Tim|     0.0|          3|
|   alan anderson|       Leuer, Jon|     0.0|          5|
|     alan crabbe| Sefolosha, Thabo|     0.0|          3|
|        alex len|  Knight, Brandon|     0.0|          4|
|   alexis ajinca|     Meeks, Jodie|     0.0|          3|
|      alonzo gee|     Korver, Kyle|     0.0|          3|
|amare stoudemire|       Deng, Luol|     0.0|          3|
|    amir johnson|       Tucker, PJ|     0.0|          3|
|  andre drummond|    James, LeBron|     0.0|          5|
|  andre iguodala|      Lowry, Kyle|     0.0|          3|
|    andre mil

#### Observations:
All listed defenders have a hit_rate of 0.0. This indicates that for these specific matchups, the offensive player failed to make any shots. 
The number of shot attempts (total_shots) against each defender varies. For example, Andrew Wiggins attempted 9 shots against Brian Roberts without success, which suggests a significant defensive impact by Roberts during those instances.
On the other hand, some players, like Andre Miller and Andre Roberson, had only 2 attempts against their respective defenders.

### Question 2

For each player, we define the comfortable zone of shooting is a matrix of, {SHOT DIST, CLOSE DEF DIST, SHOT CLOCK}
Please develop a Spark-based algorithm to classify each player’s records into 4 comfortable zones. Considering the hit rate, which zone is the best for James Harden, Chris Paul, Stephen Curry, and Lebron James. 

In [33]:
# Prepare data for clustering
nba_data = nba_df.na.drop()
nba_data = nba_data.withColumn('SHOT_RESULT', when(col('SHOT_RESULT') == 'made', 1).otherwise(0))
nba_data = nba_data.withColumn("SHOT_DIST", col("SHOT_DIST").cast("float"))
nba_data = nba_data.withColumn("CLOSE_DEF_DIST", col("CLOSE_DEF_DIST").cast("float"))
nba_data = nba_data.withColumn("SHOT_CLOCK", col("SHOT_CLOCK").cast("float"))

In [34]:
assembler = VectorAssembler(inputCols=["SHOT_DIST", "CLOSE_DEF_DIST", "SHOT_CLOCK"], outputCol="features")
kmeans_nba_data = assembler.transform(nba_data).select('player_name', 'features', 'SHOT_RESULT')


In [35]:
# K-means clustering
kmeans = KMeans(featuresCol="features", k=4)
model = kmeans.fit(kmeans_nba_data)

In [36]:
# Filter for specific players and make predictions
players = ['james harden', 'chris paul', 'stephen curry', 'lebron james']
kmeans_nba_data = kmeans_nba_data.filter(col('player_name').isin(players))
predictions = model.transform(kmeans_nba_data)

In [39]:
predictions = predictions.select('player_name', 'prediction', 'SHOT_RESULT')

In [40]:
# Calculate average scoring probability per zone
player_predictions = predictions.groupBy("player_name", "prediction").agg(mean("SHOT_RESULT").alias("Best_zone_res"))
best_zone = player_predictions.withColumn('best_avg', max('Best_zone_res').over(Window.partitionBy("player_name"))) \
    .filter(col('Best_zone_res') == col('best_avg')) \
    .select(col("player_name"), col("prediction").alias("prediction (zone)"), col("Best_zone_res"))

best_zone.show()

+-------------+-----------------+------------------+
|  player_name|prediction (zone)|     Best_zone_res|
+-------------+-----------------+------------------+
|   chris paul|                2|0.5531914893617021|
| james harden|                0|0.5604395604395604|
| lebron james|                0|0.6613545816733067|
|stephen curry|                0|0.6350710900473934|
+-------------+-----------------+------------------+



#### Observations:
Each player has a specific zone that yields the highest scoring probability, with LeBron James showing the highest efficiency at approximately 0.661 in his optimal zone.
Zones are identified by prediction numbers, where zone 0 is notably effective for James Harden, LeBron James, and Stephen Curry.

In [41]:
# Stop the SparkSession
spark.stop()