In [6]:
# STEP 1: Read the CSV from the Lakehouse Files

file_path = "Files/video_games_sales.csv"  # path inside the Lakehouse

df_raw = (
    spark.read.format("csv")
    .option("header", "true")
    .option("inferSchema", "true")
    .load(file_path)
)

# Look at some rows
display(df_raw)

# See the schema (column names and types)
df_raw.printSchema()


StatementMeta(, c5676fa2-cb30-4bdf-9dfc-2ffe992678f9, 8, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, b4a2a6ec-afb6-4651-950c-235b4fe191c1)

root
 |-- Rank: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Platform: string (nullable = true)
 |-- Year: string (nullable = true)
 |-- Genre: string (nullable = true)
 |-- Publisher: string (nullable = true)
 |-- NA_Sales: double (nullable = true)
 |-- EU_Sales: double (nullable = true)
 |-- JP_Sales: double (nullable = true)
 |-- Other_Sales: double (nullable = true)
 |-- Global_Sales: double (nullable = true)



In [7]:
from pyspark.sql import functions as F

# How many rows?
print("Row count:", df_raw.count())

# How many nulls in each column?
null_counts = df_raw.select(
    [F.sum(F.col(c).isNull().cast("int")).alias(c) for c in df_raw.columns]
)
display(null_counts)


StatementMeta(, c5676fa2-cb30-4bdf-9dfc-2ffe992678f9, 9, Finished, Available, Finished)

Row count: 16598


SynapseWidget(Synapse.DataFrame, d489cea9-c3a9-4420-85e5-3d500aad9922)

In [8]:
from pyspark.sql import functions as F

# 1) Make sure numeric columns have numeric types
df_clean = (
    df_raw
    .withColumn("Year", F.col("Year").cast("int"))
    .withColumn("NA_Sales", F.col("NA_Sales").cast("double"))
    .withColumn("EU_Sales", F.col("EU_Sales").cast("double"))
    .withColumn("JP_Sales", F.col("JP_Sales").cast("double"))
    .withColumn("Other_Sales", F.col("Other_Sales").cast("double"))
    .withColumn("Global_Sales", F.col("Global_Sales").cast("double"))
)

# 2) Trim text columns
df_clean = (
    df_clean
    .withColumn("Name", F.trim(F.col("Name")))
    .withColumn("Platform", F.trim(F.col("Platform")))
    .withColumn("Genre", F.trim(F.col("Genre")))
    .withColumn("Publisher", F.trim(F.col("Publisher")))
)

# 3) Drop rows where critical columns are missing
df_clean = df_clean.dropna(subset=["Name", "Platform", "Year", "Global_Sales"])

# 4) Remove duplicate games (same name + platform + year)
df_clean = df_clean.dropDuplicates(["Name", "Platform", "Year"])

# 5) Add a Decade column (e.g. 1990, 2000, 2010)
df_clean = df_clean.withColumn(
    "Decade",
    (F.col("Year") / 10).cast("int") * 10
)

# 6) Check result
print("Cleaned row count:", df_clean.count())
df_clean.printSchema()
display(df_clean.limit(20))


StatementMeta(, c5676fa2-cb30-4bdf-9dfc-2ffe992678f9, 10, Finished, Available, Finished)

Cleaned row count: 16326
root
 |-- Rank: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Platform: string (nullable = true)
 |-- Year: integer (nullable = true)
 |-- Genre: string (nullable = true)
 |-- Publisher: string (nullable = true)
 |-- NA_Sales: double (nullable = true)
 |-- EU_Sales: double (nullable = true)
 |-- JP_Sales: double (nullable = true)
 |-- Other_Sales: double (nullable = true)
 |-- Global_Sales: double (nullable = true)
 |-- Decade: integer (nullable = true)



SynapseWidget(Synapse.DataFrame, 5d3b8e34-6800-48d6-a012-b27424e04543)

In [9]:
# STEP: Save cleaned dataframe as a Delta table in this Lakehouse

table_name = "video_game_sales_clean"

df_clean.write.format("delta").mode("overwrite").saveAsTable(table_name)

print("Table saved as:", table_name)


StatementMeta(, c5676fa2-cb30-4bdf-9dfc-2ffe992678f9, 11, Finished, Available, Finished)

Table saved as: video_game_sales_clean
