<center>
    
# ENGR-E:516 Engineering Cloud Computing
## Assignment - 2
### Rohit Goud Kalakuntla
### email: rokala@iu.edu

</center>

In [144]:
# importing pyspark and other required libraries
import pyspark
# Importing all the required libraries
import scipy
import numpy as np
import pandas as pd

In [145]:
# findspark is a python library: Provides findspark.init() to make pyspark importable as a regular library.
!pip install findspark



In [146]:
# importing findspark and initializing it for usage of pyspark
import findspark
findspark.init()

In [147]:
# importing other pyspark modules
# pyspark sqp modules
from pyspark.sql import functions as pysf, SparkSession, Row
from pyspark.sql.window import Window
from pyspark.sql.functions import desc, when, count, col, sum,expr, avg, first
from pyspark.sql.types import IntegerType

# pyspark ML modules
from pyspark.ml.clustering import KMeans
from pyspark.ml.feature import VectorAssembler

from functools import reduce

In [148]:
# Creating Spark Session with the name 'ECC Assignment 2'
# Allocating for 3 cores
# using 3 cores to run the application using parallel processing which can be utilized by multinode cluster
spark = SparkSession.builder.master("local[3]").appName("rokala - ECC Assignment 2").getOrCreate()

In [149]:
# Loading the Parking_Violations_Issued_-_Fiscal_Year_2024_20240411.csv
# Renamed file: NYC_Parking_Violations.csv
df = spark.read.csv("NYC_Parking_Violations.csv", header=True, inferSchema= True)

In [150]:
# printing schems details
df.printSchema()

root
 |-- Summons Number: long (nullable = true)
 |-- Plate ID: string (nullable = true)
 |-- Registration State: string (nullable = true)
 |-- Plate Type: string (nullable = true)
 |-- Issue Date: string (nullable = true)
 |-- Violation Code: integer (nullable = true)
 |-- Vehicle Body Type: string (nullable = true)
 |-- Vehicle Make: string (nullable = true)
 |-- Issuing Agency: string (nullable = true)
 |-- Street Code1: integer (nullable = true)
 |-- Street Code2: integer (nullable = true)
 |-- Street Code3: integer (nullable = true)
 |-- Vehicle Expiration Date: integer (nullable = true)
 |-- Violation Location: integer (nullable = true)
 |-- Violation Precinct: integer (nullable = true)
 |-- Issuer Precinct: integer (nullable = true)
 |-- Issuer Code: integer (nullable = true)
 |-- Issuer Command: string (nullable = true)
 |-- Issuer Squad: string (nullable = true)
 |-- Violation Time: string (nullable = true)
 |-- Time First Observed: string (nullable = true)
 |-- Violation Coun

In [151]:
# dispalying first 20 rows of dta from the dataset including the headers
df.show()

+--------------+--------+------------------+----------+----------+--------------+-----------------+------------+--------------+------------+------------+------------+-----------------------+------------------+------------------+---------------+-----------+--------------+------------+--------------+-------------------+----------------+---------------------------------+------------+------------------+-------------------+-------------------+-----------+------------+--------------------+--------------------------+--------------------+------------------+-------------+---------------------+------------+------------+--------------+-------------------+---------------------+---------------------------------+-----------------+------------------------+
|Summons Number|Plate ID|Registration State|Plate Type|Issue Date|Violation Code|Vehicle Body Type|Vehicle Make|Issuing Agency|Street Code1|Street Code2|Street Code3|Vehicle Expiration Date|Violation Location|Violation Precinct|Issuer Precinct|Issu

In [152]:
# Get the number of rows and columns
num_rows, num_cols = df.count(), len(df.columns)

# Print the shape (number of rows, number of columns)
print("Shape of the dataset:", (num_rows, num_cols))

Shape of the dataset: (10717482, 43)


In [153]:
# Enter the number of nodes
# 3: manager, worker1 and worker2
nodes = 3 
node_dict ={0:"rokala-manager", 1:"rokala-worker1", 2:"rokala-worker2"}
data_in_node = [df.columns[i::nodes] for i in range(nodes)]

nulls_in_node = []
for i, columns in enumerate(data_in_node):
    print(f"Cleaning & Processing the data in the node {node_dict.get(i)}...")
    # selecting all the null values and making a count in each node
    nulls = df.select([count(when(col(c).isNull(), c)).alias(c) for c in columns]).collect()
    nulls_in_node.append(nulls)

Cleaning & Processing the data in the node rokala-manager...
Cleaning & Processing the data in the node rokala-worker1...
Cleaning & Processing the data in the node rokala-worker2...


# NY Parking Violations

## Question 1: 

### When are tickets most likely to be issued?

In [154]:
# View to use it for solving the questions
df.createOrReplaceTempView("ny_data_view")

In [155]:
# Query to count number of violations at a particular time and arrange them in descending order
q1 = spark.sql('''SELECT `Violation Time`, COUNT(*) AS Violation_Count FROM ny_data_view GROUP BY `Violation Time` ORDER BY Violation_Count DESC''')

# showing only top 15 rows, because the data is huge
q1.show(15)

+--------------+---------------+
|Violation Time|Violation_Count|
+--------------+---------------+
|         0836A|          20112|
|         0839A|          19531|
|         0840A|          19436|
|         0838A|          19433|
|         0906A|          19412|
|         1139A|          19255|
|         1140A|          19154|
|         1141A|          19135|
|         0841A|          18987|
|         1142A|          18936|
|         1145A|          18917|
|         0837A|          18893|
|         0842A|          18806|
|         1143A|          18738|
|         0910A|          18698|
+--------------+---------------+
only showing top 15 rows



## Question 2: 

### What are the most common years and types of cars to be ticketed?

In [156]:
# Spark Query to count the number of violations for different vehicle types in each vehicle year 

df1= spark.table("ny_data_view")

# Filter out records where Vehicle Body Type is NULL or Vehicle Year <= 0
filtered_df = df1.filter((col("Vehicle Body Type").isNotNull()) & (col("Vehicle Year") > 0))

# Perform the grouping and counting as per the SQL query
q2 = filtered_df.groupBy(col("Vehicle Body Type").alias("Vehicle Type"), col("Vehicle Year").cast("int").alias("Vehicle Year")) \
                       .agg(count("*").alias("Violation Count")) \
                       .orderBy(col("Violation Count").desc())

# Dispalying only top 15 records; whole ouput is huge
q2.show(15)

+------------+------------+---------------+
|Vehicle Type|Vehicle Year|Violation Count|
+------------+------------+---------------+
|        SUBN|        2021|         468827|
|        SUBN|        2022|         452375|
|        SUBN|        2023|         447136|
|        SUBN|        2019|         345019|
|        SUBN|        2020|         343284|
|        SUBN|        2018|         275701|
|        SUBN|        2017|         226830|
|        SUBN|        2016|         186232|
|        SUBN|        2015|         180923|
|        4DSD|        2017|         155318|
|        4DSD|        2019|         152338|
|        4DSD|        2018|         146010|
|        SUBN|        2014|         142949|
|        4DSD|        2020|         138040|
|        4DSD|        2023|         134262|
+------------+------------+---------------+
only showing top 15 rows



## Question 3:

### Where are tickets most commonly issued?

In [157]:
# Filter out records where Violation Location is null
df_filtered = df1.filter(col("Violation Location").isNotNull())

# Group by Violation Location, count the violations, and order by count in descending order
q3 = df_filtered.groupBy("Violation Location") \
                              .count() \
                              .withColumnRenamed("count", "Number Of Tickets") \
                              .orderBy(col("Number Of Tickets").desc())

# Show the top 15 violation locations
q3.show(15)

+------------------+-----------------+
|Violation Location|Number Of Tickets|
+------------------+-----------------+
|                19|           276203|
|               114|           213205|
|                 6|           207636|
|                13|           189589|
|                14|           178348|
|               109|           153765|
|                 1|           148286|
|                18|           147809|
|                 9|           142074|
|               115|           135832|
|                61|           116439|
|                66|           115903|
|                20|           115747|
|               112|           109812|
|                70|           107721|
+------------------+-----------------+
only showing top 15 rows



In [158]:
# Filter out records where Violation Location is null
df_filtered = df1.filter(col("Violation Location").isNotNull())

# Group by Violation Location, count the violations, and order by count in descending order
q3 = df_filtered.groupBy("Violation Location", "Street Name") \
                              .count() \
                              .withColumnRenamed("count", "Number Of Tickets") \
                              .orderBy(col("Number Of Tickets").desc())

# Show the top 15 violation locations
q3.show(15)

+------------------+-------------+-----------------+
|Violation Location|  Street Name|Number Of Tickets|
+------------------+-------------+-----------------+
|                19|      3rd Ave|            32731|
|                19|Lexington Ave|            24743|
|               112|  Queens Blvd|            22994|
|                19|  Madison Ave|            21856|
|               114|  Steinway St|            21197|
|               115|     37th Ave|            18517|
|                19|      1st Ave|            17536|
|                19|      2nd Ave|            16144|
|               112|    Austin St|            15162|
|                24|     Broadway|            14443|
|               109|      Main St|            14125|
|                20|     Broadway|            13894|
|               102|  Jamaica Ave|            13250|
|                 6|  Bleecker St|            12820|
|               106|  Liberty Ave|            12639|
+------------------+-------------+------------

## Question 4:

### **Which color of the vehicle is most likely to get a ticket?**

#### *From below analysis, it is observed that the GY colored vehicle is most likely to be issued a ticket*

In [24]:
# Group the data by `Vehicle Color`, count the occurrences, and sort the results by count in descending order
q4 = df.groupBy("Vehicle Color") \
                          .count() \
                          .withColumnRenamed("count", "Number Of Violations") \
                          .orderBy(col("Number Of Violations").desc())

# Display the top 20 vehicle colors by number of violations
q4.show(20)

print(f"The color of vehicle that is most likely to get a ticket is: ", q4.select("Vehicle Color").collect()[0]['Vehicle Color'])

+-------------+--------------------+
|Vehicle Color|Number Of Violations|
+-------------+--------------------+
|           GY|             2086345|
|           WH|             1924600|
|           BK|             1821705|
|         NULL|             1015121|
|           BL|              688919|
|        WHITE|              610936|
|        BLACK|              401993|
|           RD|              393388|
|         GREY|              303176|
|         BLUE|              140721|
|           GR|              134696|
|        SILVE|              134375|
|        BROWN|              129885|
|          RED|              116436|
|          BLK|               89406|
|           TN|               70852|
|           BR|               68616|
|           YW|               65868|
|          GRY|               64505|
|          WHI|               54907|
+-------------+--------------------+
only showing top 20 rows

The color of vehicle that is most likely to get a ticket is:  GY


# KNN

## Given a Black vehicle parking illegally at 34510, 10030, 34050 (street codes). What is the probability that it will get an ticket? (very rough prediction)


In [26]:
parking_df = df.selectExpr("CAST(`Street Code1` AS FLOAT) AS Street_Code1",
                           "CAST(`Street Code2` AS FLOAT) AS Street_Code2",
                           "CAST(`Street Code3` AS FLOAT) AS Street_Code3",
                           "`Vehicle Color`")

# Initialize VectorAssembler
vector_assembler = VectorAssembler(inputCols=["Street_Code1", "Street_Code2", "Street_Code3"],
                                   outputCol="shooting_target_matrix")

parking_df = vector_assembler.transform(parking_df)

In [27]:
# Define the K-means model
# Set the number of clusters to 3 and a seed for reproducibility
# Tried with Cluster 3 but results were not good, so I am using cluster 5
kmeans = KMeans().setK(5).setSeed(1)  

# Fit the model
model = kmeans.fit(parking_df)

# Make predictions
predictions = model.transform(parking_df)

# Show the results
predictions.show()

+------------+------------+------------+-------------+--------------------+----------+
|Street_Code1|Street_Code2|Street_Code3|Vehicle Color|            features|prediction|
+------------+------------+------------+-------------+--------------------+----------+
|         0.0|         0.0|         0.0|         BLUE|           (3,[],[])|         1|
|     17870.0|     25390.0|     32670.0|         GRAY|[17870.0,25390.0,...|         0|
|     17870.0|     25390.0|     32670.0|         GRAY|[17870.0,25390.0,...|         0|
|     12690.0|     41700.0|     61090.0|        WHITE|[12690.0,41700.0,...|         4|
|     12690.0|     41700.0|     61090.0|        WHITE|[12690.0,41700.0,...|         4|
|         0.0|         0.0|         0.0|           BK|           (3,[],[])|         1|
|      8690.0|     21690.0|     21740.0|         NULL|[8690.0,21690.0,2...|         0|
|         0.0|     61090.0|         0.0|          GRY|   [0.0,61090.0,0.0]|         0|
|         0.0|         0.0|         0.0|   

In [36]:
# list of all Black colors
veh_black = ['BLK', 'BK.', 'BCK', 'BK', 'BLK.', 'Black', 'BC', 'BLAC', 'BK/', 'B LAC']

# Group by 'prediction', calculate the count of black vehicles, total cars, and directly calculate the probability
prob_black = predictions.groupBy('prediction').agg(
    count(when(col('Vehicle Color').isin(veh_black), True)).alias('Count_Black'),
    count('Vehicle Color').alias('Total_Cars'),
    (count(when(col('Vehicle Color').isin(veh_black), True)) / count('Vehicle Color')).alias('Probability_Black')
).orderBy('prediction')

# Display the result
prob_black.show()

+----------+-----------+----------+-------------------+
|prediction|Count_Black|Total_Cars|  Probability_Black|
+----------+-----------+----------+-------------------+
|         0|     394032|   2723473|0.14467997296099502|
|         1|    1153169|   4924934|0.23414912768374155|
|         2|     132451|    760911| 0.1740689778436637|
|         3|     125919|    692084|0.18194178741308858|
|         4|     107040|    600959| 0.1781153123590794|
+----------+-----------+----------+-------------------+



In [37]:
from scipy.spatial import distance

# find the nearest cluster
def nearest_cluster(data_point, cluster_centers):
    # Calculate distances using Euclidean distance squared
    distances = distance.cdist([data_point], cluster_centers, 'sqeuclidean')[0]
    # Return the index (ID) of the nearest cluster center
    return np.argmin(distances)


In [38]:
cluster_centers = np.array(model.clusterCenters()).astype(float)
street_codes = np.array([34510.0, 10030.0, 34050.0])
cluster_id = nearest_cluster(street_codes, cluster_centers)

print('Cluster id for Street Code (34510, 10030, 34050) is:', cluster_id)

Cluster id for Street Code (34510, 10030, 34050) is: 0


In [39]:
# Displaying the required probability
prob_black.filter(col('prediction') == cluster_id).show()
probability_black = prob_black.filter(col('prediction') == cluster_id).select("Probability_Black").collect()
print(f"The Probability of Black color vehicle issued a ticket parking illegally at 34510, 10030, 34050 (street codes) is ", probability_black[0]["Probability_Black"])

+----------+-----------+----------+-------------------+
|prediction|Count_Black|Total_Cars|  Probability_Black|
+----------+-----------+----------+-------------------+
|         0|     394032|   2723473|0.14467997296099502|
+----------+-----------+----------+-------------------+

The Probability of Black color vehicle issued a ticket parking illegally at 34510, 10030, 34050 (street codes) is  0.14467997296099502


# NBA SHOT LOGS

## Question 1:

### For each pair of the players (A, B), we define the fear sore of A when facing B is the hit rate, such that B is closet defender when A is shooting. Based on the fear sore, for each player, please find out who is his ?most unwanted defender?

In [159]:
nba_shots_log = spark.read.csv("shot_logs.csv", header=True, inferSchema=True)

# printing schema details
nba_shots_log.printSchema()

root
 |-- GAME_ID: integer (nullable = true)
 |-- MATCHUP: string (nullable = true)
 |-- LOCATION: string (nullable = true)
 |-- W: string (nullable = true)
 |-- FINAL_MARGIN: integer (nullable = true)
 |-- SHOT_NUMBER: integer (nullable = true)
 |-- PERIOD: integer (nullable = true)
 |-- GAME_CLOCK: timestamp (nullable = true)
 |-- SHOT_CLOCK: double (nullable = true)
 |-- DRIBBLES: integer (nullable = true)
 |-- TOUCH_TIME: double (nullable = true)
 |-- SHOT_DIST: double (nullable = true)
 |-- PTS_TYPE: integer (nullable = true)
 |-- SHOT_RESULT: string (nullable = true)
 |-- CLOSEST_DEFENDER: string (nullable = true)
 |-- CLOSEST_DEFENDER_PLAYER_ID: integer (nullable = true)
 |-- CLOSE_DEF_DIST: double (nullable = true)
 |-- FGM: integer (nullable = true)
 |-- PTS: integer (nullable = true)
 |-- player_name: string (nullable = true)
 |-- player_id: integer (nullable = true)



In [160]:
nba_shots_log.show()

+--------+--------------------+--------+---+------------+-----------+------+-------------------+----------+--------+----------+---------+--------+-----------+-----------------+--------------------------+--------------+---+---+-------------+---------+
| GAME_ID|             MATCHUP|LOCATION|  W|FINAL_MARGIN|SHOT_NUMBER|PERIOD|         GAME_CLOCK|SHOT_CLOCK|DRIBBLES|TOUCH_TIME|SHOT_DIST|PTS_TYPE|SHOT_RESULT| CLOSEST_DEFENDER|CLOSEST_DEFENDER_PLAYER_ID|CLOSE_DEF_DIST|FGM|PTS|  player_name|player_id|
+--------+--------------------+--------+---+------------+-----------+------+-------------------+----------+--------+----------+---------+--------+-----------+-----------------+--------------------------+--------------+---+---+-------------+---------+
|21400899|MAR 04, 2015 - CH...|       A|  W|          24|          1|     1|2024-04-12 01:09:00|      10.8|       2|       1.9|      7.7|       2|       made|   Anderson, Alan|                    101187|           1.3|  1|  2|brian roberts|   2031

In [161]:
# Creating a view for the nba dataset
nba_shots_log.createOrReplaceTempView("nba_shot_view")


In [168]:
# Grouping the dataframe by player and defender & displaying the shot stats i.e, shot made or shot missed
from pyspark.sql import functions as F

nba_grouped_df = nba_shots_log.groupBy("player_id", "CLOSEST_DEFENDER_PLAYER_ID").agg(
    F.sum(F.when(F.col("SHOT_RESULT") == "made", 1).otherwise(0)).alias("Made_Shots"),
    F.sum(F.when(F.col("SHOT_RESULT") == "missed", 1).otherwise(0)).alias("Missed_Shots")
).select(
    F.col("player_id").alias("_Player_ID"),
    F.col("CLOSEST_DEFENDER_PLAYER_ID").alias("Closest_Defender_ID"),
    F.col("Made_Shots"),
    F.col("Missed_Shots")
)

nba_grouped_df.show()

+----------+-------------------+----------+------------+
|_Player_ID|Closest_Defender_ID|Made_Shots|Missed_Shots|
+----------+-------------------+----------+------------+
|    203148|             101179|         0|           1|
|    202687|             201980|         1|           0|
|      2744|               1717|         0|           2|
|    203469|             202329|         1|           1|
|    201945|             202322|         0|           3|
|    202689|             202699|         6|           8|
|    202689|             203924|         1|           0|
|    203077|               2730|         1|           0|
|    203077|             201584|         2|           0|
|    202362|             201188|         2|           0|
|    202330|             201978|         2|           1|
|    202324|             203135|         1|           2|
|    203957|               2617|         1|           0|
|      2430|             203092|         3|           2|
|      2430|             200770

In [169]:
 # Calculating the hit rate for each combination (player and defender)
av = expr("Made_Shots / (Made_Shots + Missed_Shots)")
nba_grouped_df = nba_grouped_df.withColumn("HitRate", av)

FILTER = "HitRate is NOT NULL"
# Droping duplicates & filtering out records with "null" hit rates
nba_grouped_df = nba_grouped_df.dropDuplicates(["_Player_ID", "HitRate"]).filter(FILTER)
nba_grouped_df.show(10)

+----------+-------------------+----------+------------+-------------------+
|_Player_ID|Closest_Defender_ID|Made_Shots|Missed_Shots|            HitRate|
+----------+-------------------+----------+------------+-------------------+
|    201945|             202322|         0|           3|                0.0|
|    203082|             203490|         0|           1|                0.0|
|    201600|             202324|         1|           3|               0.25|
|    201155|             101145|         1|           4|                0.2|
|    201143|             202700|         7|           2| 0.7777777777777778|
|    202330|             201967|         2|           7| 0.2222222222222222|
|    202681|             202689|        11|          17|0.39285714285714285|
|    201600|             201579|         2|           4| 0.3333333333333333|
|    202322|             201565|        13|          16| 0.4482758620689655|
|    101162|             201149|         8|           5| 0.6153846153846154|

In [170]:
# Group by "_Player_ID"
nba_grouped_by_player = nba_grouped_df.groupBy("_Player_ID")

# Aggregate to find the minimum "HitRate" for each group
nba_min_hit_rate = nba_grouped_by_player.agg({"HitRate": "min"})

# Rename the column "min(HitRate)" to "HitRate"
nba_shots_log_final = nba_min_hit_rate.withColumnRenamed("min(HitRate)", "HitRate")


# Joining nba_grouped_df with nba_shots_log_final on columns "_Player_ID" and "HitRate"
nba_joined_df = nba_grouped_df.join(nba_shots_log_final, ["_Player_ID", "HitRate"])

# Dropping the "HitRate" column
nba_grouped_df = nba_joined_df.drop("HitRate")

# Selecting only "_Player_ID" and "Closest_Defender_ID" columns
nba_grouped_df = nba_grouped_df.select("_Player_ID", "Closest_Defender_ID")


# Joining nba_grouped_df with nba_shots_log based on matching player IDs and defender IDs
nba_shots_log_merged = (
    nba_grouped_df
    .join(
        nba_shots_log,
        (nba_grouped_df["_Player_ID"] == nba_shots_log["player_id"]) & 
        (nba_grouped_df["Closest_Defender_ID"] == nba_shots_log["CLOSEST_DEFENDER_PLAYER_ID"])
    )
)

# Show the merged DataFrame
nba_shots_log_merged.show()

+----------+-------------------+--------+--------------------+--------+---+------------+-----------+------+-------------------+----------+--------+----------+---------+--------+-----------+------------------+--------------------------+--------------+---+---+--------------------+---------+
|_Player_ID|Closest_Defender_ID| GAME_ID|             MATCHUP|LOCATION|  W|FINAL_MARGIN|SHOT_NUMBER|PERIOD|         GAME_CLOCK|SHOT_CLOCK|DRIBBLES|TOUCH_TIME|SHOT_DIST|PTS_TYPE|SHOT_RESULT|  CLOSEST_DEFENDER|CLOSEST_DEFENDER_PLAYER_ID|CLOSE_DEF_DIST|FGM|PTS|         player_name|player_id|
+----------+-------------------+--------+--------------------+--------+---+------------+-----------+------+-------------------+----------+--------+----------+---------+--------+-----------+------------------+--------------------------+--------------+---+---+--------------------+---------+
|    203148|             101179|21400097|NOV 09, 2014 - CH...|       A|  L|         -15|          3|     4|2024-04-12 08:04:00|   

In [100]:
# Grouping & selecting the columns that would answer the question
nba_shots_log_final = nba_shots_log_merged.groupBy("_Player_ID", "Closest_Defender_ID").agg(first("player_name").alias("Player Name"), first("CLOSEST_DEFENDER").alias("Most Unwanted Defender")) \
                    .orderBy("_Player_ID") \
                    .limit(10).select(
                        F.col("_Player_ID").alias("Player ID"),
                        F.col("Closest_Defender_ID").alias("Closest Defender ID"),
                        F.col("Player Name"),
                        F.col("Most Unwanted Defender")
                    )

nba_shots_log_final.show()

+---------+-------------------+--------------+----------------------+
|Player ID|Closest Defender ID|   Player Name|Most Unwanted Defender|
+---------+-------------------+--------------+----------------------+
|      708|             203957| kevin garnett|           Exum, Dante|
|      977|             203937|   kobe bryant|        Anderson, Kyle|
|     1495|             203148|    tim duncan|        Roberts, Brian|
|     1713|               2037|  vince carter|       Crawford, Jamal|
|     1717|             201581|dirk nowtizski|           Hickson, JJ|
|     1718|             203079|   paul pierce|         Waiters, Dion|
|     1889|             201168|  andre miller|       Splitter, Tiago|
|     1890|             201229|  shawn marion|     Tolliver, Anthony|
|     1891|             201572|   jason terry|          Lopez, Brook|
|     1938|             203461| manu ginobili|      Bennett, Anthony|
+---------+-------------------+--------------+----------------------+



## Question 2:

### For each player, we define the comfortable zone of shooting is a matrix of, 
### {SHOT DIST, CLOSE DEF DIST, SHOT CLOCK} 
### Please develop a Spark-based algorithm to classify each player?s records into 4 comfortable zones. Considering the hit rate, which zone is the best for **James Harden, Chris Paul, Stephen Curry, and Lebron James**

In [126]:
# Drop rows with null values
# convert the shot result values
# made = 1
# missed = 0
nba_shots_log = nba_shots_log.na.drop().withColumn("SHOT_RESULT", when(col("SHOT_RESULT") == "made", 1).otherwise(0))

In [127]:
nba_shots_log.show()

+--------+--------------------+--------+---+------------+-----------+------+-------------------+----------+--------+----------+---------+--------+-----------+-----------------+--------------------------+--------------+---+---+-------------+---------+
| GAME_ID|             MATCHUP|LOCATION|  W|FINAL_MARGIN|SHOT_NUMBER|PERIOD|         GAME_CLOCK|SHOT_CLOCK|DRIBBLES|TOUCH_TIME|SHOT_DIST|PTS_TYPE|SHOT_RESULT| CLOSEST_DEFENDER|CLOSEST_DEFENDER_PLAYER_ID|CLOSE_DEF_DIST|FGM|PTS|  player_name|player_id|
+--------+--------------------+--------+---+------------+-----------+------+-------------------+----------+--------+----------+---------+--------+-----------+-----------------+--------------------------+--------------+---+---+-------------+---------+
|21400899|MAR 04, 2015 - CH...|       A|  W|          24|          1|     1|2024-04-12 01:09:00|      10.8|       2|       1.9|      7.7|       2|          1|   Anderson, Alan|                    101187|           1.3|  1|  2|brian roberts|   2031

In [128]:
# Assignment of shooting_target_matrix as given in the question
shooting_target_matrix = ["SHOT_DIST",
                          "CLOSE_DEF_DIST",
                          "SHOT_CLOCK"]

# Convert columns to float data type
for col_name in shooting_target_matrix:
    nba_shots_log = nba_shots_log.withColumn(col_name, col(col_name).cast("float"))

# Show the DataFrame (10 rows)
nba_shots_log.show(10)

+--------+--------------------+--------+---+------------+-----------+------+-------------------+----------+--------+----------+---------+--------+-----------+-----------------+--------------------------+--------------+---+---+-------------+---------+
| GAME_ID|             MATCHUP|LOCATION|  W|FINAL_MARGIN|SHOT_NUMBER|PERIOD|         GAME_CLOCK|SHOT_CLOCK|DRIBBLES|TOUCH_TIME|SHOT_DIST|PTS_TYPE|SHOT_RESULT| CLOSEST_DEFENDER|CLOSEST_DEFENDER_PLAYER_ID|CLOSE_DEF_DIST|FGM|PTS|  player_name|player_id|
+--------+--------------------+--------+---+------------+-----------+------+-------------------+----------+--------+----------+---------+--------+-----------+-----------------+--------------------------+--------------+---+---+-------------+---------+
|21400899|MAR 04, 2015 - CH...|       A|  W|          24|          1|     1|2024-04-12 01:09:00|      10.8|       2|       1.9|      7.7|       2|          1|   Anderson, Alan|                    101187|           1.3|  1|  2|brian roberts|   2031

In [129]:

# Create VectorAssembler to assemble the features into a vector
# InputCols: List of column names to be assembled into a vector
# OutputCol: Name of the output column containing the assembled vector

# Transform the DataFrame using VectorAssembler
# Select the 'player_name' column, the assembled 'shooting_target_matrix' column,
# and cast 'SHOT_RESULT' column to float, alias it as 'SHOT FLAG'

nba_shots_log_vec = VectorAssembler(inputCols=shooting_target_matrix, outputCol="shooting_target_matrix") \
                    .transform(nba_shots_log) \
                    .select('player_name', 'shooting_target_matrix', col('SHOT_RESULT') \
                    .cast('float') \
                    .alias('SHOT FLAG'))

# displayin the 10 rows
nba_shots_log_vec.show(10)

+-------------+----------------------+---------+
|  player_name|shooting_target_matrix|SHOT FLAG|
+-------------+----------------------+---------+
|brian roberts|  [7.69999980926513...|      1.0|
|brian roberts|  [28.2000007629394...|      0.0|
|brian roberts|  [17.2000007629394...|      0.0|
|brian roberts|  [3.70000004768371...|      0.0|
|brian roberts|  [18.3999996185302...|      0.0|
|brian roberts|  [20.7000007629394...|      0.0|
|brian roberts|  [3.5,2.0999999046...|      1.0|
|brian roberts|  [24.6000003814697...|      0.0|
|brian roberts|  [22.3999996185302...|      0.0|
|brian roberts|  [24.5,4.699999809...|      0.0|
+-------------+----------------------+---------+
only showing top 10 rows



In [130]:
# Initialize and fit the KMeans model with four clusters
nba_K_means = KMeans(k=4, seed=1, featuresCol="shooting_target_matrix")

# Fitting the model
nba_K_model = nba_K_means.fit(nba_shots_log_vec)

# Filter the DataFrame for specific players
players = ['james harden', 'chris paul', 'stephen curry', 'lebron james']
nba_shots_log_vec_filtered = nba_shots_log_vec.filter(col('player_name').isin(players))

In [131]:
nba_shots_log_k_means = nba_K_model.transform(nba_shots_log_vec_filtered).select('player_name', 'prediction', 'SHOT FLAG')

In [142]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window

# Define a window specification to partition by player_name
windowSpec = Window.partitionBy("player_name")

# Group by player_name and prediction, calculate the mean of SHOT FLAG, and sort the result
players_zones = (
    nba_shots_log_k_means
    .groupBy("player_name", F.col("prediction").alias("Zone"))
    .agg(F.mean("SHOT FLAG").alias("AVERAGE SHOT FLAG"))
    .withColumn('maxavg', F.max('AVERAGE SHOT FLAG').over(windowSpec))
    .where(F.col('AVERAGE SHOT FLAG') == F.col('maxavg'))
    .drop('maxavg')
    .withColumnRenamed("player_name", "Player Name")
    .withColumnRenamed("AVERAGE SHOT FLAG", "SHOT RESULT AVERAGE") 
)

# Show the DataFrame
players_zones.show()


+-------------+----+-------------------+
|  Player Name|Zone|SHOT RESULT AVERAGE|
+-------------+----+-------------------+
|   chris paul|   2| 0.5531914893617021|
| james harden|   1| 0.5604395604395604|
| lebron james|   1| 0.6613545816733067|
|stephen curry|   1| 0.6350710900473934|
+-------------+----+-------------------+



In [143]:
# Stopping the Spark Cluster
spark.stop()
