In [0]:
%pyspark

# Import necessary libraries
from pyspark.sql import SparkSession

# Initialize Spark Session
spark = SparkSession.builder \
    .appName("NBA Analysis with Spark") \
    .config("spark.hadoop.fs.defaultFS", "hdfs://namenode:9000") \
    .config("spark.sql.catalogImplementation", "in-memory") \
    .getOrCreate()

# Define HDFS path
hdfs_path = "hdfs://namenode:9000/user/test/output/cleanedData"

# Load data into DataFrame
df = spark.read.csv(hdfs_path, header=False, inferSchema=False)

# Assign column names to match the schema
column_names = [
    "event_id", "event_num", "game_id", "home_description", "time", "period",
    "player1_id", "player1_name", "player1_team_abbr", "player1_team_city",
    "player1_team_id", "player1_team_name", "player2_id", "player2_name",
    "player2_team_abbr", "player2_team_city", "player2_team_id", "player2_team_name",
    "player3_id", "player3_name", "player3_team_abbr", "player3_team_city",
    "player3_team_id", "player3_team_name", "score", "score_margin", "visitor_description"
]
df = df.toDF(*column_names)

In [1]:
%pyspark

from pyspark.sql.functions import col, regexp_extract
from pyspark.sql.functions import when, sum as _sum

# Extract home and visitor scores from the 'score' column in the format 'X - Y'
df = df.withColumn("home_score", regexp_extract(col("score"), r"(\d+)\s*-\s*\d+", 1).cast("int")) \
       .withColumn("visitor_score", regexp_extract(col("score"), r"\d+\s*-\s*(\d+)", 1).cast("int"))

# Verify the extracted home_score and visitor_score
df.select("score", "home_score", "visitor_score").show(5, truncate=False)

# Aggregate total points scored by player in each game (assuming points are from home or visitor team)
player_game_scores = df.groupBy("player1_id", "game_id") \
                       .agg(
                           _sum(when(col("home_score") >= 40, col("home_score")).otherwise(0)).alias("total_home_points"),
                           _sum(when(col("visitor_score") >= 40, col("visitor_score")).otherwise(0)).alias("total_visitor_points")
                       )

# Calculate the total points for each player by summing both home and visitor points
player_game_scores = player_game_scores.withColumn(
    "total_points", col("total_home_points") + col("total_visitor_points")
)

# Filter players who scored 40+ points in a game
players_40_plus = player_game_scores.filter(col("total_points") >= 40).select("player1_id").distinct()

# Calculate the total number of players
total_players = df.select("player1_id").distinct().count()

# Calculate percentage of players scoring 40+ points
percentage_40_plus = (players_40_plus.count() / total_players) * 100

print(f"Percentage of players scoring 40+ points: {percentage_40_plus:.2f}%")

In [2]:
%pyspark
from pyspark.sql.functions import when, countDistinct

# Determine the winning team
df = df.withColumn("winner_team", when(col("home_score") > col("visitor_score"), col("player1_team_name"))
                                  .otherwise(col("player2_team_name")))

# Determine the losing team
df = df.withColumn("loser_team", when(col("home_score") < col("visitor_score"), col("player1_team_name"))
                                  .otherwise(col("player2_team_name")))

# Group by loser_team and count distinct game IDs
matches_lost = df.groupBy("loser_team").agg(countDistinct("game_id").alias("matches_lost"))
matches_lost.show()