# 1. Import libraries

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum, count, mean, expr

from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.evaluation import RegressionEvaluator

# data visualization
import plotly.graph_objects as go

# 2. Spark Session Creation

In [2]:
spark = SparkSession.builder.appName("VideoGameSalesAnalysis").getOrCreate()

# 3. Load data

In [3]:
# path to data
data_path = "dataset/Video_Games_Sales_as_at_22_Dec_2016.csv"

In [4]:
df_raw_data = spark.read.csv(data_path, header=True, inferSchema=True)

# 4. Data Exloration

## 4.1 Show top 5 rows

In [5]:
df_raw_data.show(5)

+--------------------+--------+---------------+------------+---------+--------+--------+--------+-----------+------------+------------+------------+----------+----------+---------+------+
|                Name|Platform|Year_of_Release|       Genre|Publisher|NA_Sales|EU_Sales|JP_Sales|Other_Sales|Global_Sales|Critic_Score|Critic_Count|User_Score|User_Count|Developer|Rating|
+--------------------+--------+---------------+------------+---------+--------+--------+--------+-----------+------------+------------+------------+----------+----------+---------+------+
|          Wii Sports|     Wii|           2006|      Sports| Nintendo|   41.36|   28.96|    3.77|       8.45|       82.53|          76|          51|         8|       322| Nintendo|     E|
|   Super Mario Bros.|     NES|           1985|    Platform| Nintendo|   29.08|    3.58|    6.81|       0.77|       40.24|        NULL|        NULL|      NULL|      NULL|     NULL|  NULL|
|      Mario Kart Wii|     Wii|           2008|      Racing|

## 4.2 Print schema

In [6]:
df_raw_data.printSchema()

root
 |-- Name: string (nullable = true)
 |-- Platform: string (nullable = true)
 |-- Year_of_Release: string (nullable = true)
 |-- Genre: string (nullable = true)
 |-- Publisher: string (nullable = true)
 |-- NA_Sales: double (nullable = true)
 |-- EU_Sales: double (nullable = true)
 |-- JP_Sales: double (nullable = true)
 |-- Other_Sales: double (nullable = true)
 |-- Global_Sales: double (nullable = true)
 |-- Critic_Score: integer (nullable = true)
 |-- Critic_Count: integer (nullable = true)
 |-- User_Score: string (nullable = true)
 |-- User_Count: integer (nullable = true)
 |-- Developer: string (nullable = true)
 |-- Rating: string (nullable = true)



## 4.3 Summary of the data, like count, mean, min, max

In [7]:
df_raw_data.describe().show()

+-------+--------------------+--------+------------------+--------+---------------+------------------+-------------------+-------------------+-------------------+------------------+-----------------+------------------+------------------+------------------+---------------+------+
|summary|                Name|Platform|   Year_of_Release|   Genre|      Publisher|          NA_Sales|           EU_Sales|           JP_Sales|        Other_Sales|      Global_Sales|     Critic_Score|      Critic_Count|        User_Score|        User_Count|      Developer|Rating|
+-------+--------------------+--------+------------------+--------+---------------+------------------+-------------------+-------------------+-------------------+------------------+-----------------+------------------+------------------+------------------+---------------+------+
|  count|               16717|   16719|             16719|   16717|          16719|             16719|              16719|              16719|              1671

## 4.4 Distinct count of each columns

In [8]:
for column in df_raw_data.columns:
    col_cnt = df_raw_data.select(column).distinct().count()
    print(f"Count Distinct {column}: {col_cnt}")

Count Distinct Name: 11563
Count Distinct Platform: 31
Count Distinct Year_of_Release: 40
Count Distinct Genre: 13
Count Distinct Publisher: 582
Count Distinct NA_Sales: 402
Count Distinct EU_Sales: 307
Count Distinct JP_Sales: 244
Count Distinct Other_Sales: 155
Count Distinct Global_Sales: 629
Count Distinct Critic_Score: 83
Count Distinct Critic_Count: 107
Count Distinct User_Score: 97
Count Distinct User_Count: 889
Count Distinct Developer: 1697
Count Distinct Rating: 9


## 4.4 Count of each columns without null values

In [9]:
not_null_counts = df_raw_data.select([sum((~col(c).isNull()).cast("int")).alias(c) for c in df_raw_data.columns])
not_null_counts.show()

+-----+--------+---------------+-----+---------+--------+--------+--------+-----------+------------+------------+------------+----------+----------+---------+------+
| Name|Platform|Year_of_Release|Genre|Publisher|NA_Sales|EU_Sales|JP_Sales|Other_Sales|Global_Sales|Critic_Score|Critic_Count|User_Score|User_Count|Developer|Rating|
+-----+--------+---------------+-----+---------+--------+--------+--------+-----------+------------+------------+------------+----------+----------+---------+------+
|16717|   16719|          16719|16717|    16719|   16719|   16719|   16719|      16719|       16719|        8137|        8137|     10015|      7590|    10096|  9950|
+-----+--------+---------------+-----+---------+--------+--------+--------+-----------+------------+------------+------------+----------+----------+---------+------+



### 4.4.1 Plotting above ditribution in bar graph

## 

In [10]:
import plotly.express as px
# Convert the PySpark DataFrame to Pandas DataFrame and transpose it
not_null_counts_pandas = not_null_counts.toPandas().transpose()

# Create the bar plot using Plotly
fig = go.Figure()

# Adding bar traces for each column in the transposed DataFrame
for column in not_null_counts_pandas.columns:
    fig.add_trace(go.Bar(
        x=not_null_counts_pandas.index,
        y=not_null_counts_pandas[column],
        name=column
    ))

# Customizing the layout
fig.update_layout(
    barmode='stack',
    title="Product Value Counts in Columns",
    xaxis_title="Columns",
    yaxis_title="Number of Null Values",
    template='plotly',
    showlegend=False,
    colorway=px.colors.sequential.Greens_r
)

# Adding the upper limit line
upper_limit = df_raw_data.count()
fig.add_hline(y=upper_limit, line_dash="dash", line_color="darkgreen", annotation_text="Upper Limit", annotation_position="top right")

# Show the plot
fig.show()


# 5. Data Cleaning and Preprocessing

## 5.1 Missing Values:

### 5.1.1 Identify Missing Values:

In [11]:
not_null_counts = df_raw_data.select([sum((~col(c).isNull()).cast("int")).alias(c) for c in df_raw_data.columns])
not_null_counts.show()

+-----+--------+---------------+-----+---------+--------+--------+--------+-----------+------------+------------+------------+----------+----------+---------+------+
| Name|Platform|Year_of_Release|Genre|Publisher|NA_Sales|EU_Sales|JP_Sales|Other_Sales|Global_Sales|Critic_Score|Critic_Count|User_Score|User_Count|Developer|Rating|
+-----+--------+---------------+-----+---------+--------+--------+--------+-----------+------------+------------+------------+----------+----------+---------+------+
|16717|   16719|          16719|16717|    16719|   16719|   16719|   16719|      16719|       16719|        8137|        8137|     10015|      7590|    10096|  9950|
+-----+--------+---------------+-----+---------+--------+--------+--------+-----------+------------+------------+------------+----------+----------+---------+------+



### 5.1.2 Handling Missing Values (Example - Mean Imputation for Numerical Features):

In [12]:

# Fill Critic_Score and User_Score with their respective means
mean_critic_score = df_raw_data.select(mean(col('Critic_Score'))).collect()[0][0]
mean_user_score = df_raw_data.select(mean(col('User_Score'))).collect()[0][0]
df_filled_scores = df_raw_data.fillna({'Critic_Score': mean_critic_score, 'User_Score': mean_user_score})

# Fill Critic_Count and User_Count with their respective medians
median_critic_count = df_filled_scores.select(expr('percentile_approx(Critic_Count, 0.5)').alias('median')).collect()[0][0]
median_user_count = df_filled_scores.select(expr('percentile_approx(User_Count, 0.5)').alias('median')).collect()[0][0]
df_filled_counts = df_filled_scores.fillna({'Critic_Count': median_critic_count, 'User_Count': median_user_count})

# Fill Developer with the mode or "Unknown"
mode_developer_row = df_filled_counts.groupBy('Developer').agg(count('*').alias('count')).orderBy('count', ascending=False).first()
mode_developer = mode_developer_row[0] if mode_developer_row and mode_developer_row[0] is not None else 'Unknown'
print(f"Mode Developer: {mode_developer}")
df_filled_developer = df_filled_counts.fillna({'Developer': mode_developer})

# Fill Rating with the mode or "Unknown"
mode_rating_row = df_filled_developer.groupBy('Rating').agg(count('*').alias('count')).orderBy('count', ascending=False).first()
mode_rating = mode_rating_row[0] if mode_rating_row and mode_rating_row[0] is not None else 'Unknown'
print(f"Mode Rating: {mode_rating}")
df_cleaned = df_filled_developer.fillna({'Rating': mode_rating})

# Show the cleaned DataFrame
df_cleaned.show()


Mode Developer: Unknown
Mode Rating: Unknown
+--------------------+--------+---------------+------------+--------------------+--------+--------+--------+-----------+------------+------------+------------+------------------+----------+-------------------+-------+
|                Name|Platform|Year_of_Release|       Genre|           Publisher|NA_Sales|EU_Sales|JP_Sales|Other_Sales|Global_Sales|Critic_Score|Critic_Count|        User_Score|User_Count|          Developer| Rating|
+--------------------+--------+---------------+------------+--------------------+--------+--------+--------+-----------+------------+------------+------------+------------------+----------+-------------------+-------+
|          Wii Sports|     Wii|           2006|      Sports|            Nintendo|   41.36|   28.96|    3.77|       8.45|       82.53|          76|          51|                 8|       322|           Nintendo|      E|
|   Super Mario Bros.|     NES|           1985|    Platform|            Nintendo|  

In [13]:
df_cleaned_not_null_counts = df_cleaned.select([sum((~col(c).isNull()).cast("int")).alias(c) for c in df_cleaned.columns])
df_cleaned_not_null_counts.show()

+-----+--------+---------------+-----+---------+--------+--------+--------+-----------+------------+------------+------------+----------+----------+---------+------+
| Name|Platform|Year_of_Release|Genre|Publisher|NA_Sales|EU_Sales|JP_Sales|Other_Sales|Global_Sales|Critic_Score|Critic_Count|User_Score|User_Count|Developer|Rating|
+-----+--------+---------------+-----+---------+--------+--------+--------+-----------+------------+------------+------------+----------+----------+---------+------+
|16717|   16719|          16719|16717|    16719|   16719|   16719|   16719|      16719|       16719|       16719|       16719|     16719|     16719|    16719| 16719|
+-----+--------+---------------+-----+---------+--------+--------+--------+-----------+------------+------------+------------+----------+----------+---------+------+



In [14]:
df_cleaned.printSchema()

root
 |-- Name: string (nullable = true)
 |-- Platform: string (nullable = true)
 |-- Year_of_Release: string (nullable = true)
 |-- Genre: string (nullable = true)
 |-- Publisher: string (nullable = true)
 |-- NA_Sales: double (nullable = true)
 |-- EU_Sales: double (nullable = true)
 |-- JP_Sales: double (nullable = true)
 |-- Other_Sales: double (nullable = true)
 |-- Global_Sales: double (nullable = true)
 |-- Critic_Score: integer (nullable = true)
 |-- Critic_Count: integer (nullable = false)
 |-- User_Score: string (nullable = false)
 |-- User_Count: integer (nullable = false)
 |-- Developer: string (nullable = false)
 |-- Rating: string (nullable = false)



# 6. Regression Analysis

In [17]:
from pyspark.sql.functions import col, when

# Convert User_Score and Year_of_Release to numerical data types
df_cleaned = df_cleaned.withColumn('User_Score', when(col('User_Score').cast('double').isNotNull(), col('User_Score').cast('double')).otherwise(0.0))
df_cleaned = df_cleaned.withColumn('Year_of_Release', when(col('Year_of_Release').cast('int').isNotNull(), col('Year_of_Release').cast('int')).otherwise(0))

# Verify the data types
# df_cleaned.printSchema()

# Prepare features and label
feature_columns = ['Critic_Score', 'User_Score', 'Year_of_Release']
assembler = VectorAssembler(inputCols=feature_columns, outputCol='features')
data_prepared = assembler.transform(df_cleaned).select('features', 'Global_Sales')

# Show prepared data to ensure it's correct
# data_prepared.show()

# Split the data into training and test sets
train_data, test_data = data_prepared.randomSplit([0.8, 0.2], seed=42)

# Initialize and train the model
lr = LinearRegression(featuresCol='features', labelCol='Global_Sales', predictionCol="predicted_sales")
lr_model = lr.fit(train_data)

# # Make predictions
predictions = lr_model.transform(test_data)

# evaluator = RegressionEvaluator(labelCol='Global_Sales', predictionCol='prediction')
rmse = evaluator.evaluate(predictions, {evaluator.metricName: 'rmse'})
r2 = evaluator.evaluate(predictions, {evaluator.metricName: 'r2'})

# print(f'Root Mean Squared Error (RMSE): {rmse}')
print(f'R-squared (R2): {r2}')


+--------------------+------------+
|            features|Global_Sales|
+--------------------+------------+
|   [76.0,8.0,2006.0]|       82.53|
|[68.0,7.125046113...|       40.24|
|   [82.0,8.3,2008.0]|       35.52|
|   [80.0,8.0,2009.0]|       32.77|
|[68.0,7.125046113...|       31.37|
|[68.0,7.125046113...|       30.26|
|   [89.0,8.5,2006.0]|        29.8|
|   [58.0,6.6,2006.0]|       28.92|
|   [87.0,8.4,2009.0]|       28.32|
|[68.0,7.125046113...|       28.31|
|[68.0,7.125046113...|       24.67|
|   [91.0,8.6,2005.0]|       23.21|
|[68.0,7.125046113...|        23.1|
|   [80.0,7.7,2007.0]|        22.7|
|   [61.0,6.3,2010.0]|       21.81|
|   [80.0,7.4,2009.0]|       21.79|
|   [97.0,8.2,2013.0]|       21.04|
|   [95.0,9.0,2004.0]|       20.81|
|[68.0,7.125046113...|       20.61|
|   [77.0,7.9,2005.0]|       20.15|
+--------------------+------------+
only showing top 20 rows



NameError: name 'evaluator' is not defined

# Stop Spark Session

In [None]:
# spark.stop()