<a href="https://colab.research.google.com/github/yashjtorreto/Cloud-Computing-Assignment/blob/main/CC_assignment_final.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [43]:
import os
from requests import Session
from pyspark.sql import SparkSession
from pyspark.sql.functions import lit
from pyspark import SparkConf

In [44]:
# Initialize Spark Session
spark = SparkSession.builder \
    .appName("NBA ETL Pipeline") \
    .config("spark.jars", "/path/to/postgresql-connector.jar") \
    .getOrCreate()

In [45]:
# Define file URLs
data_sources = {
    "player_metrics_miss": "https://www.kaggle.com/datasets/matthewjohnson14/nba-player-shooting-motions?select=player_metrics_miss.csv",
    "path_detail": "https://www.dropbox.com/scl/fi/9llp2cy1jl3bx7zuqyorp/path_detail.csv?rlkey=pf7dzvj86na5exuh5eytwg7h0&e=1&dl=0",
    "player_metrics_open": "https://www.dropbox.com/scl/fi/ty12tbqi19w5i6drnykv7/player_metrics_open.csv?rlkey=1qj278pasngogkmj7lqh3kujb&e=1&dl=0",
    "path_detail_open": "https://www.dropbox.com/scl/fi/szz04ps9j0kqgclx1oaqa/path_detail_open.csv?rlkey=v9vm9knrl4ktckm7wkf2cyoue&e=1&dl=0",
    "player_metrics_defended": "https://www.dropbox.com/scl/fi/awzgtgnrnfzstpuvh15cc/player_metrics_defended.csv?rlkey=62nhrd1ffw03y34u4fftdqfes&e=1&dl=0",
    "path_detail_defended": "https://www.dropbox.com/scl/fi/0fw5xqf8t64t6cr0lpum2/path_detail_defended.csv?rlkey=77v9cr8gzt4masvy3unnv8vnr&e=1&dl=0",
    "player_metrics_made": "https://www.dropbox.com/scl/fi/ld7gnfh8aok3yxk86qf9c/player_metrics_made.csv?rlkey=3jq0cu1mkolp2fnqsm91475x1&e=1&dl=0",
    "path_detail_made": "https://www.dropbox.com/scl/fi/csv28z6jd3n4z4ddotp46/path_detail_made.csv?rlkey=fdb0esikdsj2eta2ls608p9lh&e=1&dl=0",
    "player_metrics_miss_2": "https://www.dropbox.com/scl/fi/s9dz3v8tltf2k4ch3q2pe/player_metrics_miss.csv?rlkey=6e68dh4i99lvh073ut0hs0omq&e=1&dl=0",
    "path_detail_miss": "https://www.dropbox.com/scl/fi/tyioo0zxwf1x1stqijbcr/path_detail_miss.csv?rlkey=pfm57a7qz67t7o6gcnpcwv7il&e=1&dl=0"
}


In [46]:

# Directory to store downloaded files
DOWNLOAD_DIR = "nba_data"  # Change this to your desired directory

# Create the download directory if it doesn't exist
if not os.path.exists(DOWNLOAD_DIR):
    os.makedirs(DOWNLOAD_DIR)

# Download files using requests
session = Session()
for filename, url in data_sources.items():
    filepath = os.path.join(DOWNLOAD_DIR, filename + ".csv")  # Create full file path
    print(f"Downloading {filename} from {url} to {filepath}")
    try:
        response = session.get(url, stream=True)
        response.raise_for_status()  # Raise an exception for bad status codes

        with open(filepath, "wb") as f:
            for chunk in response.iter_content(chunk_size=8192):
                f.write(chunk)
        print(f"Downloaded {filename} successfully.")
    except Exception as e:
        print(f"Error downloading {filename}: {e}")

print("All files downloaded (or skipped due to errors).")

Downloading player_metrics_miss from https://www.kaggle.com/datasets/matthewjohnson14/nba-player-shooting-motions?select=player_metrics_miss.csv to nba_data/player_metrics_miss.csv
Downloaded player_metrics_miss successfully.
Downloading path_detail from https://www.dropbox.com/scl/fi/9llp2cy1jl3bx7zuqyorp/path_detail.csv?rlkey=pf7dzvj86na5exuh5eytwg7h0&e=1&dl=0 to nba_data/path_detail.csv
Downloaded path_detail successfully.
Downloading player_metrics_open from https://www.dropbox.com/scl/fi/ty12tbqi19w5i6drnykv7/player_metrics_open.csv?rlkey=1qj278pasngogkmj7lqh3kujb&e=1&dl=0 to nba_data/player_metrics_open.csv
Downloaded player_metrics_open successfully.
Downloading path_detail_open from https://www.dropbox.com/scl/fi/szz04ps9j0kqgclx1oaqa/path_detail_open.csv?rlkey=v9vm9knrl4ktckm7wkf2cyoue&e=1&dl=0 to nba_data/path_detail_open.csv
Downloaded path_detail_open successfully.
Downloading player_metrics_defended from https://www.dropbox.com/scl/fi/awzgtgnrnfzstpuvh15cc/player_metrics_d

In [47]:
# prompt: # Load datasets into DataFrames

# Load the datasets into Spark DataFrames
dfs = {}
for filename, _ in data_sources.items():
    filepath = os.path.join(DOWNLOAD_DIR, filename + ".csv")
    try:
        dfs[filename] = spark.read.csv(filepath, header=True, inferSchema=True)
        print(f"Loaded {filename} into DataFrame.")
    except Exception as e:
        print(f"Error loading {filename}: {e}")

# Example usage of the loaded DataFrames
# Assuming 'player_metrics_made' is loaded successfully:
# dfs['player_metrics_made'].show(5)
# dfs['path_detail'].printSchema()

Loaded player_metrics_miss into DataFrame.
Loaded path_detail into DataFrame.
Loaded player_metrics_open into DataFrame.
Loaded path_detail_open into DataFrame.
Loaded player_metrics_defended into DataFrame.
Loaded path_detail_defended into DataFrame.
Loaded player_metrics_made into DataFrame.
Loaded path_detail_made into DataFrame.
Loaded player_metrics_miss_2 into DataFrame.
Loaded path_detail_miss into DataFrame.


In [48]:
# prompt: # Transformation: Add outcome columns

# Add outcome column to each DataFrame
for df_name, df in dfs.items():
    if "made" in df_name:
        dfs[df_name] = df.withColumn("outcome", lit("made"))
    elif "miss" in df_name:
        dfs[df_name] = df.withColumn("outcome", lit("miss"))
    elif "open" in df_name:
        dfs[df_name] = df.withColumn("outcome", lit("open"))
    elif "defended" in df_name:
        dfs[df_name] = df.withColumn("outcome", lit("defended"))
    else:
        dfs[df_name] = df.withColumn("outcome", lit("unknown")) # Handle other cases

# Verify the outcome column addition
# Example: Show the first few rows of player_metrics_made with the new column
# dfs['player_metrics_made'].show(5)

In [49]:
# prompt:  Verify the outcome column addition

# Verify the outcome column addition for all dataframes
for df_name, df in dfs.items():
    print(f"Verifying outcome column for: {df_name}")
    df.select("outcome").distinct().show()

Verifying outcome column for: player_metrics_miss
+-------+
|outcome|
+-------+
|   miss|
+-------+

Verifying outcome column for: path_detail
+-------+
|outcome|
+-------+
|unknown|
+-------+

Verifying outcome column for: player_metrics_open
+-------+
|outcome|
+-------+
|   open|
+-------+

Verifying outcome column for: path_detail_open
+-------+
|outcome|
+-------+
|   open|
+-------+

Verifying outcome column for: player_metrics_defended
+--------+
| outcome|
+--------+
|defended|
+--------+

Verifying outcome column for: path_detail_defended
+--------+
| outcome|
+--------+
|defended|
+--------+

Verifying outcome column for: player_metrics_made
+-------+
|outcome|
+-------+
|   made|
+-------+

Verifying outcome column for: path_detail_made
+-------+
|outcome|
+-------+
|   made|
+-------+

Verifying outcome column for: player_metrics_miss_2
+-------+
|outcome|
+-------+
|   miss|
+-------+

Verifying outcome column for: path_detail_miss
+-------+
|outcome|
+-------+
|   miss|
+

In [50]:
dfs['player_metrics_made'].show(5)


+--------------------+-------+
|     <!DOCTYPE html>|outcome|
+--------------------+-------+
|<html class="maes...|   made|
|<head><meta chars...|   made|
|<meta content="no...|   made|
|<meta content="wi...|   made|
|<meta content="ht...|   made|
+--------------------+-------+
only showing top 5 rows



In [51]:
# prompt: # Combine datasets into a single DataFrame

from functools import reduce
from pyspark.sql import DataFrame

# Combine all DataFrames into a single DataFrame
combined_df = reduce(DataFrame.unionAll, dfs.values())

# Show the combined DataFrame (optional)
combined_df.show(5)

# You can now perform operations on the combined_df

+--------------------+-------+
|     <!DOCTYPE html>|outcome|
+--------------------+-------+
|    <html lang="en">|   miss|
|              <head>|   miss|
|  <title>NBA Play...|   miss|
|  <meta charset="...|   miss|
|    <meta name="r...|   miss|
+--------------------+-------+
only showing top 5 rows



In [53]:
# Print the shape of the combined dataset
print((combined_df.count(), len(combined_df.columns)))

(4134, 2)


In [52]:
import pandas as pd

# Assuming 'combined_df' from the previous code is available

# Convert the Spark DataFrame to a Pandas DataFrame
pandas_df = combined_df.toPandas()

# Save the Pandas DataFrame to a CSV file
pandas_df.to_csv("combined_nba_data.csv", index=False)

In [54]:
from google.colab import files
files.download('combined_nba_data.csv')

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>