### NY Parking Violations

In [41]:
# Checking if pyspark is installed properly and its path is right

import pyspark
import findspark

findspark.init()

In [42]:
# Importing all the required libraries
import scipy
import scipy.spatial
import numpy as np
import pandas as pd
import sys
import pyspark 
from pyspark.sql import Row
from pyspark.sql import functions as pysf
from pyspark.sql.window import Window
from pyspark.sql.functions import desc, when, count, col, sum, regexp_replace,expr, avg, first
from pyspark.ml.clustering import KMeans
from pyspark.ml.feature import VectorAssembler
from pyspark.sql import SparkSession 
from pyspark.sql import functions as Func
from pyspark.sql.types import IntegerType
from pyspark import SparkContext, SparkConf
from functools import reduce

### Creating a spark session to work with spark dataframes and spark sql.
#### Setting the configurations for the spark session.
- Using 4 cores
- Name of the application
- Setting the memory for the executor to 8g
- Setting the memory for the driver to 8g
- Setting the memory overhead for the executor to 4g
- Setting the default file system to hdfs

In [43]:
spark = SparkSession.builder\
    .master("local[4]")\
    .appName("Assignment2") \
    .config("spark.executor.memory", "8g") \
    .config("spark.driver.memory", "8g") \
    .config("spark.executor.memoryOverhead", "4g") \
    .config("spark.hadoop.fs.defaultFS", "hdfs://localhost:9000") \
    .getOrCreate()

Let's load the data into a pandas dataframe and take a look at the data. This data is the latest one which was available on the NYC Open Data website. The data is from 2024.

In [44]:
parking_df = spark.read.csv("hdfs://localhost:9000/Parking_Violations_Issued_-_Fiscal_Year_2024_20240411.csv", header=True, inferSchema=True)

                                                                                

In [45]:
parking_df.show(10)

+--------------+--------+------------------+----------+----------+--------------+-----------------+------------+--------------+------------+------------+------------+-----------------------+------------------+------------------+---------------+-----------+--------------+------------+--------------+-------------------+----------------+---------------------------------+------------+--------------+-------------------+-------------------+-----------+------------+--------------------+--------------------------+--------------------+------------------+-------------+---------------------+------------+------------+--------------+-------------------+---------------------+---------------------------------+-----------------+------------------------+
|Summons Number|Plate ID|Registration State|Plate Type|Issue Date|Violation Code|Vehicle Body Type|Vehicle Make|Issuing Agency|Street Code1|Street Code2|Street Code3|Vehicle Expiration Date|Violation Location|Violation Precinct|Issuer Precinct|Issuer C

- `from pyspark.sql.functions import col, count, when`: This line imports necessary functions from the PySpark SQL module. `col()` function is used to reference DataFrame columns, `count()` function is used to count non-null values, and `when()` function is used to define conditional expressions.
- `from functools import reduce`: This line imports the `reduce()` function from the `functools` module. It's used later in the code for reducing the lists of DataFrames.

Here, `num_threads` is set to 6, indicating the number of parallel threads or processes that will be used for computation.

In [46]:
from pyspark.sql.functions import col, count, when
from functools import reduce

# Specify the number of threads
num_threads = 6

This line splits the columns of the DataFrame `parking_df` into `num_threads` groups. Each group will be processed by a separate thread. The slicing operation `parking_df.columns[i::num_threads]` divides the columns into groups based on the remainder of the division by `num_threads`.

In [47]:
# Split the columns into groups based on the number of threads
columns_per_thread = [parking_df.columns[i::num_threads] for i in range(num_threads)]

This loop iterates over each group of columns and counts the number of null values for each column in parallel using PySpark DataFrame API:
- `count(when(col(c).isNull(), c))`: This expression counts the number of null values in each column `c`.
- `.alias(c)`: This assigns a name to the count result corresponding to the original column name `c`.
- `parking_df.select(...)`: This selects the count of null values for each column in the current group of columns.
- `.collect()`: This triggers the computation and collects the results as a list of Row objects.
- The counts for each group of columns are appended to the `counts_per_thread` list.

The loop also prints a message indicating the processing progress for each thread.

This code is designed to efficiently count null values in DataFrame columns in parallel across multiple threads, which can improve performance for large datasets.

In [48]:
# Initialize an empty list to store counts per thread
counts_per_thread = []

# Iterate through the columns_per_thread list
for i, columns in enumerate(columns_per_thread):
    print(f"Processing thread {i + 1}/{num_threads}...")

    # Select columns with null counts, alias them with column names, and collect the counts
    counts = parking_df.select([count(when(col(c).isNull(), c)).alias(c) for c in columns]).collect()

    # Append the counts for the current thread to the counts_per_thread list
    counts_per_thread.append(counts)


Processing thread 1/6...


                                                                                

Processing thread 2/6...


                                                                                

Processing thread 3/6...


                                                                                

Processing thread 4/6...


                                                                                

Processing thread 5/6...


                                                                                

Processing thread 6/6...


                                                                                

The next part of the code uses the reduce() function from the functools module to combine the counts from each thread into a single total count.

In [49]:
total_counts = reduce(lambda x, y: x + y, counts_per_thread)

total_counts

[Row(Summons Number=0, Vehicle Body Type=28486, Vehicle Expiration Date=0, Issuer Squad=5292644, Street Name=1507, Days Parking In Effect    =5014718, Meter Number=9381186, Double Parking Violation=10717482),
 Row(Plate ID=1, Vehicle Make=10679, Violation Location=4923863, Violation Time=336, Intersecting Street=4980009, From Hours In Effect=7338955, Feet From Curb=0),
 Row(Registration State=0, Issuing Agency=0, Violation Precinct=0, Time First Observed=10087137, Date First Observed=0, To Hours In Effect=7338965, Violation Post Code=5519326),
 Row(Plate Type=0, Street Code1=0, Issuer Precinct=0, Violation County=102892, Law Section=0, Vehicle Color=1015121, Violation Description=227812),
 Row(Issue Date=0, Street Code2=0, Issuer Code=0, Violation In Front Of Or Opposite=4973482, Sub Division=1767, Unregistered Vehicle?=10490502, No Standing or Stopping Violation=10717482),
 Row(Violation Code=0, Street Code3=0, Issuer Command=4918591, House Number=5015875, Violation Legal Code=5799044

- `reduce(lambda x, y: x + y, counts_per_thread)`: This line uses the reduce() function to sum the counts from each thread (counts_per_thread). It takes a lambda function that adds two lists together (x + y) and applies it cumulatively to the list of counts.

#### Part 1

##### Question 1

When are tickets most likely to be issued?

In [50]:
# Creating a temporary view named "nyc_data" for easy querying of the dataset using SQL
parking_df.createTempView("nyc_data")

In [51]:
# Executing an SQL query to count the frequency of each violation time
# Selecting the "Violation Time" column and counting the occurrences of each distinct value
# Grouping the results by "Violation Time" and ordering them in descending order based on count
# Limiting the output to the top 30 records
question1 = spark.sql('''
    SELECT `Violation Time`, COUNT(*) AS count 
    FROM nyc_data 
    GROUP BY `Violation Time` 
    ORDER BY count DESC 
    LIMIT 30
''')

# Displaying the top 10 violation times along with their respective counts
question1.show(10)



+--------------+-----+
|Violation Time|count|
+--------------+-----+
|         0836A|20112|
|         0839A|19531|
|         0840A|19436|
|         0838A|19433|
|         0906A|19412|
|         1139A|19255|
|         1140A|19154|
|         1141A|19135|
|         0841A|18987|
|         1142A|18936|
+--------------+-----+
only showing top 10 rows



                                                                                

### Observations:
- The tickets are most likely to be issued from 08:36 AM to around 11:42 AM.

##### Question 2

What are the most common years and types of cars to be ticketed?

In [52]:
# Writing a Spark SQL query to count the number of violations for each vehicle body type and year
# Filtering out records where vehicle body type is not null and vehicle year is greater than 0
# Casting the "Vehicle Year" column to integer format and renaming it as "Vehicle Year"
# Grouping the results by "Vehicle Body Type" and "Vehicle Year"
# Ordering the results in descending order based on violation count
# Displaying the top 20 vehicle types along with their respective years and violation counts
question2 = spark.sql('''
    SELECT `Vehicle Body Type` AS `Vehicle Type`, 
           CAST(`Vehicle Year` AS INT) AS `Vehicle Year`, 
           COUNT(*) AS `Violation Count` 
    FROM nyc_data 
    WHERE `Vehicle Body Type` IS NOT NULL AND `Vehicle Year` > 0 
    GROUP BY `Vehicle Body Type`, `Vehicle Year` 
    ORDER BY `Violation Count` DESC
''')

# Displaying the top 10 vehicle types along with their respective years and violation counts
question2.show(10)



+------------+------------+---------------+
|Vehicle Type|Vehicle Year|Violation Count|
+------------+------------+---------------+
|        SUBN|        2021|         468827|
|        SUBN|        2022|         452375|
|        SUBN|        2023|         447136|
|        SUBN|        2019|         345019|
|        SUBN|        2020|         343284|
|        SUBN|        2018|         275701|
|        SUBN|        2017|         226830|
|        SUBN|        2016|         186232|
|        SUBN|        2015|         180923|
|        4DSD|        2017|         155318|
+------------+------------+---------------+
only showing top 10 rows



                                                                                

### Observations:
- The most common years of cars to be ticketed are 2023, 2022 and 2021.
- We can see that SUBN is the most common type of car to be ticketed and the second most common type is 4DSD.

##### Question 3

Where are tickets most commonly issued?

In [53]:
# Writing a Spark SQL query to count the number of violations for each violation location
# Grouping the dataset by the "Violation Location" column
# Counting the number of tickets for each violation location
# Ordering the results in descending order based on the number of tickets
# Displaying the top 15 violation locations along with their respective number of tickets
part3 = spark.sql('''
    SELECT COUNT(*) AS Number_of_tickets, `Violation Location` 
    FROM nyc_data 
    GROUP BY `Violation Location` 
    ORDER BY Number_of_tickets DESC
''')

# Displaying the top 15 violation locations along with their respective number of tickets
part3.show(15)



+-----------------+------------------+
|Number_of_tickets|Violation Location|
+-----------------+------------------+
|          4923863|              NULL|
|           276203|                19|
|           213205|               114|
|           207636|                 6|
|           189589|                13|
|           178348|                14|
|           153765|               109|
|           148286|                 1|
|           147809|                18|
|           142074|                 9|
|           135832|               115|
|           116439|                61|
|           115903|                66|
|           115747|                20|
|           109812|               112|
+-----------------+------------------+
only showing top 15 rows



                                                                                

##### Removing null values from the results and displaying the top 15 locations where tickets are most commonly issued.

In [54]:
# Wrote another Spark SQL query that takes care of the NULL location
part3_en = spark.sql('''SELECT COUNT(*) AS Number_of_tickets, `Violation Location` FROM nyc_data 
                    WHERE `Violation Location` IS NOT NULL GROUP BY `Violation Location` 
                    ORDER BY Number_of_tickets DESC''')
part3_en.show(15)# Displaying top 15 locations



+-----------------+------------------+
|Number_of_tickets|Violation Location|
+-----------------+------------------+
|           276203|                19|
|           213205|               114|
|           207636|                 6|
|           189589|                13|
|           178348|                14|
|           153765|               109|
|           148286|                 1|
|           147809|                18|
|           142074|                 9|
|           135832|               115|
|           116439|                61|
|           115903|                66|
|           115747|                20|
|           109812|               112|
|           107721|                70|
+-----------------+------------------+
only showing top 15 rows



                                                                                

### Observations:
- The tickets are most commonly issued on location 19 and second most commonly issued on location 114.

##### Question 4

Which color of the vehicle is most likely to get a ticket?

In [55]:
# Wrote a Spark SQL query to count the number of violations for each vehicle color
part4 = spark.sql('''SELECT `Vehicle Color`, COUNT(*) AS count FROM nyc_data 
                    GROUP BY `Vehicle Color`
                    ORDER BY count DESC''')
part4.show(10)



+-------------+-------+
|Vehicle Color|  count|
+-------------+-------+
|           GY|2086345|
|           WH|1924600|
|           BK|1821705|
|         NULL|1015121|
|           BL| 688919|
|        WHITE| 610936|
|        BLACK| 401993|
|           RD| 393388|
|         GREY| 303176|
|         BLUE| 140721|
+-------------+-------+
only showing top 10 rows



                                                                                

### Observations:
- The most common color of the vehicle to get a ticket is GY and the second most common color is WH.

#### Part 2: KNN

##### Question 5

Given a Black vehicle parking illegally at 34510, 10030, 34050 (street codes). What is the probability that it will get an ticket?

In [56]:
# Creating a function to find the nearest cluster center to a given data point
def nearest_cluster(data_point, cluster_centers):
    distances = scipy.spatial.distance.cdist([data_point], cluster_centers, metric="sqeuclidean")
    nearest_cluster_id = np.argmin(distances)
    return nearest_cluster_id

In [57]:
parking_df = parking_df.selectExpr("CAST(`Street Code1` AS FLOAT) AS Street_Code1",
                   "CAST(`Street Code2` AS FLOAT) AS Street_Code2",
                   "CAST(`Street Code3` AS FLOAT) AS Street_Code3",
                   "`Vehicle Color`")

vector_assembler = VectorAssembler(inputCols=["Street_Code1", "Street_Code2", "Street_Code3"],
                                   outputCol="features")

parking_df = vector_assembler.transform(parking_df)

In [58]:
kmeans_nyc = KMeans(k=5)
k_means_model = kmeans_nyc.fit(parking_df.select('features'))
park_clustered = k_means_model.transform(parking_df).cache()

                                                                                

In [59]:
park_clustered.show()



+------------+------------+------------+-------------+--------------------+----------+
|Street_Code1|Street_Code2|Street_Code3|Vehicle Color|            features|prediction|
+------------+------------+------------+-------------+--------------------+----------+
|         0.0|         0.0|         0.0|         BLUE|           (3,[],[])|         0|
|     17870.0|     25390.0|     32670.0|         GRAY|[17870.0,25390.0,...|         4|
|     17870.0|     25390.0|     32670.0|         GRAY|[17870.0,25390.0,...|         4|
|     12690.0|     41700.0|     61090.0|        WHITE|[12690.0,41700.0,...|         3|
|     12690.0|     41700.0|     61090.0|        WHITE|[12690.0,41700.0,...|         3|
|         0.0|         0.0|         0.0|           BK|           (3,[],[])|         0|
|      8690.0|     21690.0|     21740.0|         NULL|[8690.0,21690.0,2...|         4|
|         0.0|     61090.0|         0.0|          GRY|   [0.0,61090.0,0.0]|         4|
|         0.0|         0.0|         0.0|   

                                                                                

In [60]:
veh_black = ['Black', 'BLAC', 'B LAC', 'BLK', 'BK.', 'BCK', 'BK', 'BLK.', 'BC', 'BK/']
prob_veh_black = park_clustered.groupBy('prediction').agg(
    count(when(col('Vehicle Color').isin(veh_black), 1)).alias('Count'),
    count('Vehicle Color').alias('Total_Cars')
).orderBy('prediction')
prob_black = prob_veh_black.select(
    'prediction',
    'Count',
    'Total_Cars',
    (col('Count') / col('Total_Cars')).alias('Probability')
)

In [61]:
prob_black.show()

+----------+-------+----------+-------------------+
|prediction|  Count|Total_Cars|        Probability|
+----------+-------+----------+-------------------+
|         0|1120938|   4700662|0.23846385892029676|
|         1| 149758|    822634|0.18204693703396652|
|         2| 211642|   1413362|0.14974366085970897|
|         3| 220324|   1260829|0.17474534611751474|
|         4| 209949|   1504874|0.13951267680882254|
+----------+-------+----------+-------------------+



                                                                                

In [62]:
cluster_centers = np.array(k_means_model.clusterCenters()).astype(float)
street_codes = np.array([34510.0, 10030.0, 34050.0])
cluster_id = nearest_cluster(street_codes, cluster_centers)

print('Cluster id for Street Code (34510, 10030, 34050) is:', cluster_id)

Cluster id for Street Code (34510, 10030, 34050) is: 4


In [63]:
# Displaying the required probability
prob_black.filter(col('prediction') == cluster_id).show()

+----------+------+----------+-------------------+
|prediction| Count|Total_Cars|        Probability|
+----------+------+----------+-------------------+
|         4|209949|   1504874|0.13951267680882254|
+----------+------+----------+-------------------+



### Observations:
- The probability that a Black vehicle parking illegally will get a ticket is 0.139.

### NBA

#### Question 1

For each pair of the players (A, B), we define the fear sore of A when facing B is the hit rate, such that B is closet defender when A is shooting. Based on the fear sore, for each player, please find out who is his "most unwanted defender".

In [64]:
nba_df = spark.read.csv("hdfs://localhost:9000/shot_logs.csv", header=True, inferSchema=True)

nba_df.printSchema()

root
 |-- GAME_ID: integer (nullable = true)
 |-- MATCHUP: string (nullable = true)
 |-- LOCATION: string (nullable = true)
 |-- W: string (nullable = true)
 |-- FINAL_MARGIN: integer (nullable = true)
 |-- SHOT_NUMBER: integer (nullable = true)
 |-- PERIOD: integer (nullable = true)
 |-- GAME_CLOCK: timestamp (nullable = true)
 |-- SHOT_CLOCK: double (nullable = true)
 |-- DRIBBLES: integer (nullable = true)
 |-- TOUCH_TIME: double (nullable = true)
 |-- SHOT_DIST: double (nullable = true)
 |-- PTS_TYPE: integer (nullable = true)
 |-- SHOT_RESULT: string (nullable = true)
 |-- CLOSEST_DEFENDER: string (nullable = true)
 |-- CLOSEST_DEFENDER_PLAYER_ID: integer (nullable = true)
 |-- CLOSE_DEF_DIST: double (nullable = true)
 |-- FGM: integer (nullable = true)
 |-- PTS: integer (nullable = true)
 |-- player_name: string (nullable = true)
 |-- player_id: integer (nullable = true)



In [65]:
nba_df.show()

+--------+--------------------+--------+---+------------+-----------+------+-------------------+----------+--------+----------+---------+--------+-----------+-----------------+--------------------------+--------------+---+---+-------------+---------+
| GAME_ID|             MATCHUP|LOCATION|  W|FINAL_MARGIN|SHOT_NUMBER|PERIOD|         GAME_CLOCK|SHOT_CLOCK|DRIBBLES|TOUCH_TIME|SHOT_DIST|PTS_TYPE|SHOT_RESULT| CLOSEST_DEFENDER|CLOSEST_DEFENDER_PLAYER_ID|CLOSE_DEF_DIST|FGM|PTS|  player_name|player_id|
+--------+--------------------+--------+---+------------+-----------+------+-------------------+----------+--------+----------+---------+--------+-----------+-----------------+--------------------------+--------------+---+---+-------------+---------+
|21400899|MAR 04, 2015 - CH...|       A|  W|          24|          1|     1|2024-04-13 01:09:00|      10.8|       2|       1.9|      7.7|       2|       made|   Anderson, Alan|                    101187|           1.3|  1|  2|brian roberts|   2031

In [66]:
# Grouping the dataframe by player and defender & aggregating the shot statistics
nba_grouped_data_df = nba_df.groupBy(col("player_id").alias("PlayerID"), col("CLOSEST_DEFENDER_PLAYER_ID").alias("DefenderID")) \
             .agg(sum(when(col("SHOT_RESULT") == "made", 1).otherwise(0)).alias("count1s"), 
                  sum(when(col("SHOT_RESULT") == "missed", 1).otherwise(0)).alias("count0s"))
nba_grouped_data_df.show()

+--------+----------+-------+-------+
|PlayerID|DefenderID|count1s|count0s|
+--------+----------+-------+-------+
|  203148|    101179|      0|      1|
|  202687|    201980|      1|      0|
|    2744|      1717|      0|      2|
|  203469|    202329|      1|      1|
|  201945|    202322|      0|      3|
|  202689|    202699|      6|      8|
|  202689|    203924|      1|      0|
|  203077|      2730|      1|      0|
|  203077|    201584|      2|      0|
|  202362|    201188|      2|      0|
|  202330|    201978|      2|      1|
|  202324|    203135|      1|      2|
|  203957|      2617|      1|      0|
|    2430|    203092|      3|      2|
|    2430|    200770|      1|      0|
|  202391|    202328|      0|      1|
|  101179|    202390|      0|      1|
|  201961|    203200|      1|      0|
|  202325|    203498|      1|      4|
|  203135|    201951|      1|      0|
+--------+----------+-------+-------+
only showing top 20 rows



In [67]:
nba_df.createTempView("nba_data")

nba_grouped_data_df = spark.sql('''SELECT player_id AS PlayerID, CLOSEST_DEFENDER_PLAYER_ID AS DefenderID, 
                        SUM(CASE WHEN SHOT_RESULT = 'made' THEN 1 ELSE 0 END) AS count1s, 
                        SUM(CASE WHEN SHOT_RESULT = 'missed' THEN 1 ELSE 0 END) AS count0s FROM nba_data 
                        GROUP BY player_id, CLOSEST_DEFENDER_PLAYER_ID''')
nba_grouped_data_df.show()

+--------+----------+-------+-------+
|PlayerID|DefenderID|count1s|count0s|
+--------+----------+-------+-------+
|  203148|    101179|      0|      1|
|  202687|    201980|      1|      0|
|    2744|      1717|      0|      2|
|  203469|    202329|      1|      1|
|  201945|    202322|      0|      3|
|  202689|    202699|      6|      8|
|  202689|    203924|      1|      0|
|  203077|      2730|      1|      0|
|  203077|    201584|      2|      0|
|  202362|    201188|      2|      0|
|  202330|    201978|      2|      1|
|  202324|    203135|      1|      2|
|  203957|      2617|      1|      0|
|    2430|    203092|      3|      2|
|    2430|    200770|      1|      0|
|  202391|    202328|      0|      1|
|  101179|    202390|      0|      1|
|  201961|    203200|      1|      0|
|  202325|    203498|      1|      4|
|  203135|    201951|      1|      0|
+--------+----------+-------+-------+
only showing top 20 rows



In [68]:
# Calculating the hit rate for each combination (player and defender)
nba_grouped_data_df = nba_grouped_data_df.withColumn("HitRate", expr("count1s / (count1s + count0s)"))

# Droping duplicates & filtering out records with "null" hit rates
nba_grouped_data_df = nba_grouped_data_df.dropDuplicates(["PlayerID", "HitRate"]).filter("HitRate is not null")
nba_grouped_data_df.show()

+--------+----------+-------+-------+-------------------+
|PlayerID|DefenderID|count1s|count0s|            HitRate|
+--------+----------+-------+-------+-------------------+
|  201945|    202322|      0|      3|                0.0|
|  203082|    203490|      0|      1|                0.0|
|  201600|    202324|      1|      3|               0.25|
|  201143|    202700|      7|      2| 0.7777777777777778|
|  202330|    201967|      2|      7| 0.2222222222222222|
|  201600|    201579|      2|      4| 0.3333333333333333|
|  202322|    201565|     13|     16| 0.4482758620689655|
|  101162|    201149|      8|      5| 0.6153846153846154|
|  201939|    201935|     10|      4| 0.7142857142857143|
|     977|    202718|      3|     11|0.21428571428571427|
|  202685|    201600|      6|      5| 0.5454545454545454|
|  201155|    101145|      1|      4|                0.2|
|  203458|    201941|      2|      3|                0.4|
|  202681|    202689|     11|     17|0.39285714285714285|
|  201228|    

In [69]:
# Grouping & selecting the min hit rate (each player)
nba_df_final = nba_grouped_data_df.groupBy("PlayerID").agg({"HitRate": "min"}).withColumn("HitRate", col("min(HitRate)"))

nba_grouped_data_df = nba_grouped_data_df.join(nba_df_final, ["PlayerID", "HitRate"]).drop("HitRate").select("PlayerID", "DefenderID")

nba_df_merged = nba_grouped_data_df.join(nba_df, (nba_grouped_data_df["PlayerID"] == nba_df["player_id"]) & (nba_grouped_data_df["DefenderID"] == nba_df["CLOSEST_DEFENDER_PLAYER_ID"]))

In [70]:
# Grouping & selecting relevant columns
nba_df_final = nba_df_merged.groupBy("PlayerID", "DefenderID") \
                    .agg(first("player_name").alias("Player Name"), first("CLOSEST_DEFENDER").alias("Most Unwanted Defender")) \
                    .orderBy("PlayerID") \
                    .limit(10)

nba_df_final.show()

+--------+----------+--------------+----------------------+
|PlayerID|DefenderID|   Player Name|Most Unwanted Defender|
+--------+----------+--------------+----------------------+
|     708|    203957| kevin garnett|           Exum, Dante|
|     977|    203937|   kobe bryant|        Anderson, Kyle|
|    1495|    203148|    tim duncan|        Roberts, Brian|
|    1713|      2037|  vince carter|       Crawford, Jamal|
|    1717|    201581|dirk nowtizski|           Hickson, JJ|
|    1718|    203079|   paul pierce|         Waiters, Dion|
|    1889|    201168|  andre miller|       Splitter, Tiago|
|    1890|    201229|  shawn marion|     Tolliver, Anthony|
|    1891|    201572|   jason terry|          Lopez, Brook|
|    1938|    203461| manu ginobili|      Bennett, Anthony|
+--------+----------+--------------+----------------------+



### Observations:
- Here we can see Top 10 players and their most unwanted defenders.
- For Example, the most unwanted defender for player 'Kevin Garnett' is 'Dante Exum'.

In [71]:
# Creating a copy of the original dataframe just to be safe
nba_df_copy = nba_df.select("*")

#### Question 2

For each player, we define the comfortable zone of shooting is a matrix of,

{SHOT DIST, CLOSE DEF DIST, SHOT CLOCK}

Please develop a Spark-based algorithm to classify each player's records into 4 comfortable zones. Considering the hit rate, which zone is the best for James Harden, Chris Paul, Stephen Curry, and Lebron James.

In [72]:
# Filtering out rows with null values and converting the shot result to binary
nba_df = nba_df.na.drop()
nba_df = nba_df.withColumn("SHOT_RESULT", when(col("SHOT_RESULT") == "made", 1).otherwise(0))

In [73]:
nba_df.show()

+--------+--------------------+--------+---+------------+-----------+------+-------------------+----------+--------+----------+---------+--------+-----------+-----------------+--------------------------+--------------+---+---+-------------+---------+
| GAME_ID|             MATCHUP|LOCATION|  W|FINAL_MARGIN|SHOT_NUMBER|PERIOD|         GAME_CLOCK|SHOT_CLOCK|DRIBBLES|TOUCH_TIME|SHOT_DIST|PTS_TYPE|SHOT_RESULT| CLOSEST_DEFENDER|CLOSEST_DEFENDER_PLAYER_ID|CLOSE_DEF_DIST|FGM|PTS|  player_name|player_id|
+--------+--------------------+--------+---+------------+-----------+------+-------------------+----------+--------+----------+---------+--------+-----------+-----------------+--------------------------+--------------+---+---+-------------+---------+
|21400899|MAR 04, 2015 - CH...|       A|  W|          24|          1|     1|2024-04-13 01:09:00|      10.8|       2|       1.9|      7.7|       2|          1|   Anderson, Alan|                    101187|           1.3|  1|  2|brian roberts|   2031

In [74]:
# Selecting the required features
features = ["SHOT_DIST", "CLOSE_DEF_DIST", "SHOT_CLOCK"]

# Casting the selected features to float
nba_df = reduce(lambda df, col: df.withColumn(col, df[col].cast("float")), features, nba_df)
nba_df.show()

+--------+--------------------+--------+---+------------+-----------+------+-------------------+----------+--------+----------+---------+--------+-----------+-----------------+--------------------------+--------------+---+---+-------------+---------+
| GAME_ID|             MATCHUP|LOCATION|  W|FINAL_MARGIN|SHOT_NUMBER|PERIOD|         GAME_CLOCK|SHOT_CLOCK|DRIBBLES|TOUCH_TIME|SHOT_DIST|PTS_TYPE|SHOT_RESULT| CLOSEST_DEFENDER|CLOSEST_DEFENDER_PLAYER_ID|CLOSE_DEF_DIST|FGM|PTS|  player_name|player_id|
+--------+--------------------+--------+---+------------+-----------+------+-------------------+----------+--------+----------+---------+--------+-----------+-----------------+--------------------------+--------------+---+---+-------------+---------+
|21400899|MAR 04, 2015 - CH...|       A|  W|          24|          1|     1|2024-04-13 01:09:00|      10.8|       2|       1.9|      7.7|       2|          1|   Anderson, Alan|                    101187|           1.3|  1|  2|brian roberts|   2031

In [75]:
# Creating a vector assembler
vector_Asmblr = VectorAssembler(inputCols=features, outputCol="features")

# Transforming the dataframe
nba_df_vector = vector_Asmblr.transform(nba_df).select('player_name', 'features', col('SHOT_RESULT').cast('float').alias('SHOT_RESULT'))
nba_df_vector.show()

+-------------+--------------------+-----------+
|  player_name|            features|SHOT_RESULT|
+-------------+--------------------+-----------+
|brian roberts|[7.69999980926513...|        1.0|
|brian roberts|[28.2000007629394...|        0.0|
|brian roberts|[17.2000007629394...|        0.0|
|brian roberts|[3.70000004768371...|        0.0|
|brian roberts|[18.3999996185302...|        0.0|
|brian roberts|[20.7000007629394...|        0.0|
|brian roberts|[3.5,2.0999999046...|        1.0|
|brian roberts|[24.6000003814697...|        0.0|
|brian roberts|[22.3999996185302...|        0.0|
|brian roberts|[24.5,4.699999809...|        0.0|
|brian roberts|[14.6000003814697...|        1.0|
|brian roberts|[5.90000009536743...|        1.0|
|brian roberts|[26.3999996185302...|        0.0|
|brian roberts|[22.7999992370605...|        0.0|
|brian roberts|[24.7000007629394...|        1.0|
|brian roberts|[25.0,5.400000095...|        0.0|
|brian roberts|[25.6000003814697...|        0.0|
|brian roberts|[24.2

- Above output just shows 'Brian Robers', although it has been run for all the players. (Just to show the output I had displated top 20 rows)

In [76]:
# Having four clusters since we have 4 comfort zones to determine
nba_K_means = KMeans(k=4, seed=1, featuresCol="features")
nba_K_model = nba_K_means.fit(nba_df_vector)
# players which are mentioned in the question
players = ['james harden', 'chris paul', 'stephen curry', 'lebron james']
nba_df_vector = nba_df_vector[nba_df_vector['player_name'].isin(players)]

                                                                                

In [77]:
nba_df_pred = nba_K_model.transform(nba_df_vector).select('player_name', 'prediction', 'SHOT_RESULT')

In [78]:
# Converting Spark dataframe to Pandas dataframe
# This is done so that local computations could be done quite easily
nba_df_pred_pd = nba_df_pred.toPandas()
nba_df_pred_pd = nba_df_pred_pd[nba_df_pred_pd['player_name'].isin(players)]

In [79]:
# Grouping the dataframe by player_name and prediction, calculating the mean of SHOT_RESULT for each group,
# and sorting the result by player_name and prediction
nba_result = nba_df_pred.groupBy("player_name", "prediction").mean("SHOT_RESULT").sort("player_name", "prediction")

# Adding a new column 'best_zone' that contains the maximum average SHOT_RESULT for each player_name
# using a Window function to partition by player_name
best_zone = nba_result.withColumn('best_zone', pysf.max('avg(SHOT_RESULT)').over(Window.partitionBy("player_name"))) \
                .where(pysf.col('avg(SHOT_RESULT)')==pysf.col('best_zone')).drop('best_zone').withColumnRenamed('prediction', 'best_zone')

# Displaying the result with 'prediction' column renamed to 'best_zone'
best_zone.show()

+-------------+---------+------------------+
|  player_name|best_zone|  avg(SHOT_RESULT)|
+-------------+---------+------------------+
|   chris paul|        0|0.5563380281690141|
| james harden|        3|0.5604395604395604|
| lebron james|        3|0.6613545816733067|
|stephen curry|        3|0.6350710900473934|
+-------------+---------+------------------+



### Observation:
- The best zone for Chris Paul is Zone 0.
- The best zone for James Harden, Lebron James, and Stephen Curry is Zone 3.

In [80]:
spark.stop()