**Install JDK & Spark**

In [None]:
# Install Java and PySpark
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!pip install pyspark

# Set environment variables for Java
import os
os.environ['JAVA_HOME'] = "/usr/lib/jvm/java-11-openjdk-amd64"


**Setting Environment Variables**


In [None]:
import os

# Set environment variables
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.3.2-bin-hadoop3"

**Starting Spark Session**

In [None]:
import findspark
findspark.init()

from pyspark.sql import SparkSession

# Create a Spark session
spark = SparkSession.builder.master("local[*]").getOrCreate()

**Loading Dataset**

In [None]:
# Load CSV file into Spark DataFrame
df = spark.read.csv('/vgchartz-2024.csv', header=True, inferSchema=True)

# Show the first few rows of the DataFrame
df.show(5)

+--------------------+--------------------+-------+-------+--------------+--------------+------------+-----------+--------+--------+---------+-----------+-------------------+-------------------+
|                 img|               title|console|  genre|     publisher|     developer|critic_score|total_sales|na_sales|jp_sales|pal_sales|other_sales|       release_date|        last_update|
+--------------------+--------------------+-------+-------+--------------+--------------+------------+-----------+--------+--------+---------+-----------+-------------------+-------------------+
|/games/boxart/ful...|  Grand Theft Auto V|    PS3| Action|Rockstar Games|Rockstar North|         9.4|      20.32|    6.37|    0.99|     9.85|       3.12|2013-09-17 00:00:00|               null|
|/games/boxart/ful...|  Grand Theft Auto V|    PS4| Action|Rockstar Games|Rockstar North|         9.7|      19.39|    6.06|     0.6|     9.71|       3.02|2014-11-18 00:00:00|2018-01-03 00:00:00|
|/games/boxart/827...|Gra

In [4]:
# Load the dataset into an RDD
file_path = "/vgchartz-2024.csv"  # Update to match your file path in Colab
rdd = spark.sparkContext.textFile(file_path)


In [5]:
# Extract header and data
header = rdd.first()
rdd = rdd.filter(lambda row: row != header)  # Remove header row


 **Mapper**

In [6]:
# Mapper function: Parse the CSV row and calculate sales disparity
def mapper(row):
    try:
        fields = row.split(',')
        console = fields[2]
        na_sales = float(fields[8]) if fields[8] else 0.0
        jp_sales = float(fields[9]) if fields[9] else 0.0
        sales_disparity = abs(na_sales - jp_sales)
        return (console, (sales_disparity, na_sales, jp_sales))
    except ValueError:
        # Handle rows with invalid data
        return (None, (0.0, 0.0, 0.0))  # Return a default value for invalid rows
# Apply mapper
mapped_rdd = rdd.map(mapper)

# Filter out invalid rows (where console is None)
filtered_rdd = mapped_rdd.filter(lambda x: x[0] is not None)


**Reducer**

In [7]:
# Reducer function: Aggregate sales disparity, NA sales, and JP sales for each console
def reducer(a, b):
    total_sales_disparity = a[0] + b[0]
    total_na_sales = a[1] + b[1]
    total_jp_sales = a[2] + b[2]
    return (total_sales_disparity, total_na_sales, total_jp_sales)

# Apply reducer
reduced_rdd = filtered_rdd.reduceByKey(reducer)


**Post Processing**

In [8]:
# Convert RDD to DataFrame with explicit schema
def rdd_to_row(record):
    console, values = record
    sales_disparity, na_sales, jp_sales = values
    return Row(console=console, total_sales_disparity=sales_disparity, total_na_sales=na_sales, total_jp_sales=jp_sales)

rows_rdd = reduced_rdd.map(rdd_to_row)
disparity_by_console_df = spark.createDataFrame(rows_rdd)

# Create a window to rank consoles by their sales disparity
windowSpec = Window.orderBy(F.desc("total_sales_disparity"))

# Apply ranking to find consoles with the highest sales disparity
disparity_ranked = disparity_by_console_df.withColumn("rank", F.row_number().over(windowSpec))

# Display the top consoles with the highest sales disparity
top_disparity_consoles = disparity_ranked.filter(disparity_ranked["rank"] <= 10)  # Show top 10 for example
top_disparity_consoles.show(10, truncate=False)


+-------+---------------------+------------------+------------------+----+
|console|total_sales_disparity|total_na_sales    |total_jp_sales    |rank|
+-------+---------------------+------------------+------------------+----+
|PS2    |533.9999999999843    |493.38999999999714|92.11000000000024 |1   |
|X360   |518.8899999999962    |529.3100000000005 |14.139999999999917|2   |
|PS3    |343.30999999999784   |352.4699999999998 |68.90000000000023 |3   |
|PS     |313.2199999999978    |265.8299999999987 |92.97000000000004 |4   |
|DS     |293.4899999999938    |267.06999999999874|70.70000000000064 |5   |
|Wii    |283.4199999999978    |289.0099999999983 |25.410000000000053|6   |
|PS4    |181.34000000000066   |192.65000000000092|31.21000000000012 |7   |
|XB     |164.4100000000008    |167.58000000000064|3.349999999999998 |8   |
|XOne   |163.91000000000057   |164.6500000000006 |0.7800000000000002|9   |
|GBA    |148.7300000000006    |139.66000000000034|26.570000000000007|10  |
+-------+----------------

**Sales trends by Genre**

In [12]:
spark = SparkSession.builder.appName("VideoGameSalesAnalysis").getOrCreate()

# Load the dataframe
df = spark.read.csv('/vgchartz-2024.csv', header=True, inferSchema=True)

# Convert release_date to DateType and extract year
df = df.withColumn("release_date", F.to_date("release_date", "yyyy-MM-dd"))
df = df.withColumn("release_year", F.year(F.col("release_date")))

# Group by genre and year, then calculate total sales disparity
sales_trends_df = df.groupBy("genre", "release_year").agg(
    F.sum("na_sales").alias("total_na_sales"),
    F.sum("jp_sales").alias("total_jp_sales")
).withColumn(
    "sales_disparity", F.abs(F.col("total_na_sales") - F.col("total_jp_sales"))
)

# Show trends for top 5 genres by total sales disparity
sales_trends_df.orderBy(F.desc("sales_disparity")).show(10, truncate=False)

+-------+------------+------------------+------------------+------------------+
|genre  |release_year|total_na_sales    |total_jp_sales    |sales_disparity   |
+-------+------------+------------------+------------------+------------------+
|Misc   |2008        |48.600000000000016|3.019999999999997 |45.58000000000002 |
|Action |2009        |51.25000000000002 |6.629999999999998 |44.620000000000026|
|Shooter|2011        |45.3              |2.09              |43.209999999999994|
|Action |2007        |46.04             |3.5199999999999965|42.52             |
|Shooter|2010        |43.660000000000004|2.22              |41.440000000000005|
|Sports |2009        |44.96000000000001 |3.9099999999999984|41.05000000000001 |
|Sports |2010        |45.200000000000045|4.299999999999998 |40.90000000000005 |
|Sports |2008        |45.849999999999994|6.599999999999997 |39.25             |
|Action |2008        |42.58999999999996 |4.019999999999999 |38.569999999999965|
|Action |2010        |44.68000000000006 