# 🎮 Video Game Industry EDA

This project aims to explore and analyze video game data to uncover key trends, patterns, and insights in the gaming industry. Using Exploratory Data Analysis (EDA) techniques, we will examine various aspects such as:

- Global and regional sales performance
- Popular genres and platforms over time
- Trends in game releases by year and region

The goal is to answer questions like:
- Which genres are the most successful globally?
- Do higher review scores correlate with higher sales?
- How do regional preferences differ across markets?

The analysis will help us better understand the factors that contribute to a game's success and how the video game industry has evolved over the years.

## Setting up the environment

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.window import Window
from pyspark.sql.types import *
from pyspark.sql.types import NumericType

spark = SparkSession.builder \
    .master("local[*]") \
    .appName("Video Games Sales Analysis") \
    .getOrCreate()

## Loading the dataset

### Dataset description
**Source:** [kaggle - Video Game Sales](https://www.kaggle.com/datasets/anandshaw2001/video-game-sales)

This dataset contains sales data for video games across platforms, genres and regions.

**Sub-dataset information:**.
- Number of rows: 16598
- Number of columns: 11

**Columns:**.

- **Rank** - Ranking of total sales.
- **Name** - Name of the game.
- **Platform** - Platform on which the game was released (e.g. PC, PS4).
- **Year** - Year the game was released
- **Genre** - Genre of the game
- **Publisher** - Publisher of the game
- **NA_Sales** - Sales in North America (in millions).
- **EU_Sales** - Sales in Europe (in millions)
- **JP_Sales** - Sales in Japan (in millions)
- **Other_Sales** - Sales in the rest of the world (in millions)
- **Global_Sales** - Total sales worldwide (in millions)

In [2]:
df = spark.read.csv('vgsales.csv', header=True, inferSchema=True)

In [3]:
df.printSchema()
print('Number of rows:', df.count())
print('First 10 rows: \n')
df.show(10)

root
 |-- Rank: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Platform: string (nullable = true)
 |-- Year: string (nullable = true)
 |-- Genre: string (nullable = true)
 |-- Publisher: string (nullable = true)
 |-- NA_Sales: double (nullable = true)
 |-- EU_Sales: double (nullable = true)
 |-- JP_Sales: double (nullable = true)
 |-- Other_Sales: double (nullable = true)
 |-- Global_Sales: double (nullable = true)

Number of rows: 16598
First 10 rows: 

+----+--------------------+--------+----+------------+---------+--------+--------+--------+-----------+------------+
|Rank|                Name|Platform|Year|       Genre|Publisher|NA_Sales|EU_Sales|JP_Sales|Other_Sales|Global_Sales|
+----+--------------------+--------+----+------------+---------+--------+--------+--------+-----------+------------+
|   1|          Wii Sports|     Wii|2006|      Sports| Nintendo|   41.49|   29.02|    3.77|       8.46|       82.74|
|   2|   Super Mario Bros.|     NES|1985|    Platform

In [4]:
df = df.withColumn('Year', df.Year.cast('int'))

In [5]:
numeric_columns = [f.name for f in df.schema.fields if isinstance(f.dataType, NumericType)]

df.select(numeric_columns).describe().show()

+-------+-----------------+------------------+-------------------+------------------+-------------------+--------------------+------------------+
|summary|             Rank|              Year|           NA_Sales|          EU_Sales|           JP_Sales|         Other_Sales|      Global_Sales|
+-------+-----------------+------------------+-------------------+------------------+-------------------+--------------------+------------------+
|  count|            16598|             16327|              16598|             16598|              16598|               16598|             16598|
|   mean|8300.605253645017|2006.4064433147546|0.26466742981084057|0.1466520062658483|0.07778166044101108|0.048063019640913515|  0.53744065550074|
| stddev|  4791.8539328964| 5.828981114713253| 0.8166830292988798|0.5053512312869136| 0.3092906480822022| 0.18858840291271395|1.5550279355699066|
|    min|                1|              1980|                0.0|               0.0|                0.0|                 0.

## First observations

### North American market dominates:

- Highest average sales and highest peak values

- This indicates that NA is the largest market for games

### Europe also a significant market:

- Second only to NA in terms of average and maximum sales

### Japan's low share:

- Despite Japan's importance in the history of video games, sales there are much lower - both average and maximum sales

### High variation in popularity:

- High standard deviation, especially in NA and globally, suggests that only a small number of games achieve huge sales successes, while most games sell at low levels

In [6]:
df.groupBy('Year').count().sort('count').show()

+----+-----+
|Year|count|
+----+-----+
|2020|    1|
|2017|    3|
|1980|    9|
|1985|   14|
|1984|   14|
|1988|   15|
|1990|   16|
|1987|   16|
|1989|   17|
|1983|   17|
|1986|   21|
|1982|   36|
|1991|   41|
|1992|   43|
|1981|   46|
|1993|   60|
|1994|  121|
|1995|  219|
|1996|  263|
|NULL|  271|
+----+-----+
only showing top 20 rows



## Data Cleaning & Preprocessing

In [7]:
df = df.na.replace("N/A", None) # in our dataset the missing data is marked as a string, this is how the isNull function would skip and return 0
missing_values = df.select([sum(col(c).isNull().cast('int')).alias(c) for c in df.columns])
print('Number of missing values:\n')
missing_values.show()


total_rows = df.count()
missing_percent = missing_values.select([
    round((col(c) / total_rows * 100), 2).alias(c)
    for c in missing_values.columns
])
print('% of missing values:\n')
missing_percent.show()

Number of missing values:

+----+----+--------+----+-----+---------+--------+--------+--------+-----------+------------+
|Rank|Name|Platform|Year|Genre|Publisher|NA_Sales|EU_Sales|JP_Sales|Other_Sales|Global_Sales|
+----+----+--------+----+-----+---------+--------+--------+--------+-----------+------------+
|   0|   0|       0| 271|    0|       58|       0|       0|       0|          0|           0|
+----+----+--------+----+-----+---------+--------+--------+--------+-----------+------------+

% of missing values:

+----+----+--------+----+-----+---------+--------+--------+--------+-----------+------------+
|Rank|Name|Platform|Year|Genre|Publisher|NA_Sales|EU_Sales|JP_Sales|Other_Sales|Global_Sales|
+----+----+--------+----+-----+---------+--------+--------+--------+-----------+------------+
| 0.0| 0.0|     0.0|1.63|  0.0|     0.35|     0.0|     0.0|     0.0|        0.0|         0.0|
+----+----+--------+----+-----+---------+--------+--------+--------+-----------+------------+



In [8]:
df.where(col('Rank').between(179,180)).show() # quick check

+----+---------------+--------+----+------+-----------------+--------+--------+--------+-----------+------------+
|Rank|           Name|Platform|Year| Genre|        Publisher|NA_Sales|EU_Sales|JP_Sales|Other_Sales|Global_Sales|
+----+---------------+--------+----+------+-----------------+--------+--------+--------+-----------+------------+
| 179| Tomb Raider II|      PS|1997|Action|Eidos Interactive|     2.3|    2.46|     0.2|       0.28|        5.24|
| 180|Madden NFL 2004|     PS2|NULL|Sports|  Electronic Arts|    4.26|    0.26|    0.01|       0.71|        5.23|
+----+---------------+--------+----+------+-----------------+--------+--------+--------+-----------+------------+



## checking duplicated rows

In [9]:
duplicate_rows = df.groupBy(df.columns).count().filter("count > 1")
print("Number of duplicated rows:", duplicate_rows.count())

Number of duplicated rows: 0


In [10]:
df.show()

+----+--------------------+--------+----+------------+--------------------+--------+--------+--------+-----------+------------+
|Rank|                Name|Platform|Year|       Genre|           Publisher|NA_Sales|EU_Sales|JP_Sales|Other_Sales|Global_Sales|
+----+--------------------+--------+----+------------+--------------------+--------+--------+--------+-----------+------------+
|   1|          Wii Sports|     Wii|2006|      Sports|            Nintendo|   41.49|   29.02|    3.77|       8.46|       82.74|
|   2|   Super Mario Bros.|     NES|1985|    Platform|            Nintendo|   29.08|    3.58|    6.81|       0.77|       40.24|
|   3|      Mario Kart Wii|     Wii|2008|      Racing|            Nintendo|   15.85|   12.88|    3.79|       3.31|       35.82|
|   4|   Wii Sports Resort|     Wii|2009|      Sports|            Nintendo|   15.75|   11.01|    3.28|       2.96|        33.0|
|   5|Pokemon Red/Pokem...|      GB|1996|Role-Playing|            Nintendo|   11.27|    8.89|   10.22|  

In [11]:
df.where(col('Name') == 'Madden NFL 2004').show() # quick check

+----+---------------+--------+----+------+---------------+--------+--------+--------+-----------+------------+
|Rank|           Name|Platform|Year| Genre|      Publisher|NA_Sales|EU_Sales|JP_Sales|Other_Sales|Global_Sales|
+----+---------------+--------+----+------+---------------+--------+--------+--------+-----------+------------+
| 180|Madden NFL 2004|     PS2|NULL|Sports|Electronic Arts|    4.26|    0.26|    0.01|       0.71|        5.23|
|1883|Madden NFL 2004|      XB|2003|Sports|Electronic Arts|    1.02|    0.02|     0.0|       0.05|        1.09|
|3898|Madden NFL 2004|      GC|2003|Sports|Electronic Arts|     0.4|     0.1|     0.0|       0.01|        0.51|
|5714|Madden NFL 2004|     GBA|2003|Sports|Electronic Arts|    0.22|    0.08|     0.0|       0.01|        0.31|
+----+---------------+--------+----+------+---------------+--------+--------+--------+-----------+------------+



In [12]:
df.groupby('Name').agg(round(avg(col('Year'))).cast('int').alias('Avg_Year')).show(truncate=False)

+-----------------------------------------+--------+
|Name                                     |Avg_Year|
+-----------------------------------------+--------+
|The Elder Scrolls V: Skyrim              |2011    |
|The Legend of Zelda: Oracle of Ages      |2001    |
|Call of Duty Black Ops: Declassified     |2012    |
|Joust                                    |1982    |
|Legacy of Kain: Soul Reaver              |NULL    |
|All-Star Baseball 2003                   |2002    |
|J-League Soccer: Prime Goal              |1993    |
|RIFT                                     |2011    |
|Ninokuni: Shikkoku no Madoushi           |2010    |
|Paws & Claws: Dogs & Cats Best Friends   |2007    |
|Barbie: Jet, Set & Style!                |2011    |
|Barnstorming                             |1981    |
|Yu-Gi-Oh! GX: Tag Force 2                |2007    |
|Sherlock Holmes: The Mystery of the Mummy|2009    |
|ESPN X Games Skateboarding               |2001    |
|Virtua Cop 2                             |199

In [13]:
df.where(col('Name') == 'Legacy of Kain: Soul Reaver').show() # quick check

+----+--------------------+--------+----+------+-----------------+--------+--------+--------+-----------+------------+
|Rank|                Name|Platform|Year| Genre|        Publisher|NA_Sales|EU_Sales|JP_Sales|Other_Sales|Global_Sales|
+----+--------------------+--------+----+------+-----------------+--------+--------+--------+-----------+------------+
|1992|Legacy of Kain: S...|      PS|NULL|Action|Eidos Interactive|    0.58|     0.4|     0.0|       0.07|        1.04|
+----+--------------------+--------+----+------+-----------------+--------+--------+--------+-----------+------------+



In [14]:
avg_year_df = df.filter(col("Year").isNotNull()) \
    .groupBy("Name") \
    .agg(round(avg("Year")).cast("int").alias("Avg_Year"))

In [15]:
window_spec = Window.partitionBy("Name", "Publisher")
publisher_counts = df.filter(col("Publisher").isNotNull()) \
    .withColumn("pub_count", count("*").over(window_spec))

publisher_counts = df.filter(col("Publisher").isNotNull()) \
    .withColumn("pub_count", count("*").over(window_spec))

In [16]:
publisher_counts.show(truncate=False)

+-----+-------------------------------------------+--------+----+------------+------------------+--------+--------+--------+-----------+------------+---------+
|Rank |Name                                       |Platform|Year|Genre       |Publisher         |NA_Sales|EU_Sales|JP_Sales|Other_Sales|Global_Sales|pub_count|
+-----+-------------------------------------------+--------+----+------------+------------------+--------+--------+--------+-----------+------------+---------+
|4756 |'98 Koshien                                |PS      |1998|Sports      |Magical Company   |0.15    |0.1     |0.12    |0.03       |0.41        |1        |
|8359 |.hack//G.U. Vol.1//Rebirth                 |PS2     |2006|Role-Playing|Namco Bandai Games|0.0     |0.0     |0.17    |0.0        |0.17        |1        |
|7109 |.hack//G.U. Vol.2//Reminisce               |PS2     |2006|Role-Playing|Namco Bandai Games|0.11    |0.09    |0.0     |0.03       |0.23        |1        |
|8604 |.hack//G.U. Vol.2//Reminisce (jp 

In [17]:
ranked_publishers = publisher_counts \
    .withColumn("rank", row_number().over(Window.partitionBy("Name").orderBy(col("pub_count").desc()))) \
    .filter(col("rank") == 1) \
    .select("Name", col("Publisher").alias("Most_Common_Publisher"))

In [18]:
ranked_publishers.show(truncate=False)

+-------------------------------------------+---------------------+
|Name                                       |Most_Common_Publisher|
+-------------------------------------------+---------------------+
|'98 Koshien                                |Magical Company      |
|.hack//G.U. Vol.1//Rebirth                 |Namco Bandai Games   |
|.hack//G.U. Vol.2//Reminisce               |Namco Bandai Games   |
|.hack//G.U. Vol.2//Reminisce (jp sales)    |Namco Bandai Games   |
|.hack//G.U. Vol.3//Redemption              |Namco Bandai Games   |
|.hack//Infection Part 1                    |Atari                |
|.hack//Link                                |Namco Bandai Games   |
|.hack//Mutation Part 2                     |Atari                |
|.hack//Outbreak Part 3                     |Atari                |
|.hack//Quarantine Part 4: The Final Chapter|Atari                |
|.hack: Sekai no Mukou ni + Versus          |Namco Bandai Games   |
|007 Racing                                 |Ele

In [19]:
fill_values_df = avg_year_df.join(ranked_publishers, on="Name", how="outer")

In [20]:
fill_values_df.show()

+--------------------+--------+---------------------+
|                Name|Avg_Year|Most_Common_Publisher|
+--------------------+--------+---------------------+
|         '98 Koshien|    1998|      Magical Company|
|.hack//G.U. Vol.1...|    2006|   Namco Bandai Games|
|.hack//G.U. Vol.2...|    2006|   Namco Bandai Games|
|.hack//G.U. Vol.2...|    2006|   Namco Bandai Games|
|.hack//G.U. Vol.3...|    2007|   Namco Bandai Games|
|.hack//Infection ...|    2002|                Atari|
|         .hack//Link|    2010|   Namco Bandai Games|
|.hack//Mutation P...|    2002|                Atari|
|.hack//Outbreak P...|    2002|                Atari|
|.hack//Quarantine...|    2003|                Atari|
|.hack: Sekai no M...|    2012|   Namco Bandai Games|
|          007 Racing|    2000|      Electronic Arts|
|007: Quantum of S...|    2008|           Activision|
|007: The World is...|    2000|      Electronic Arts|
|007: Tomorrow Nev...|    1999|      Electronic Arts|
|           1 vs. 100|    20

In [21]:
df_clean = df.join(fill_values_df, on="Name", how="left")

In [22]:
df_clean = df_clean.withColumn(
    "Year", coalesce(col("Year"), col("Avg_Year"))
).withColumn(
    "Publisher", coalesce(col("Publisher"), col("Most_Common_Publisher"))
).drop("Avg_Year", "Most_Common_Publisher")

In [23]:
missing_values = df_clean.select([sum(col(c).isNull().cast('int')).alias(c) for c in df_clean.columns])
print('Number of missing values:\n')
missing_values.show()


total_rows = df_clean.count()
missing_percent = missing_values.select([
    round((col(c) / total_rows * 100), 2).alias(c)
    for c in missing_values.columns
])
print('% of missing values:\n')
missing_percent.show()

Number of missing values:

+----+----+--------+----+-----+---------+--------+--------+--------+-----------+------------+
|Name|Rank|Platform|Year|Genre|Publisher|NA_Sales|EU_Sales|JP_Sales|Other_Sales|Global_Sales|
+----+----+--------+----+-----+---------+--------+--------+--------+-----------+------------+
|   0|   0|       0| 147|    0|       51|       0|       0|       0|          0|           0|
+----+----+--------+----+-----+---------+--------+--------+--------+-----------+------------+

% of missing values:

+----+----+--------+----+-----+---------+--------+--------+--------+-----------+------------+
|Name|Rank|Platform|Year|Genre|Publisher|NA_Sales|EU_Sales|JP_Sales|Other_Sales|Global_Sales|
+----+----+--------+----+-----+---------+--------+--------+--------+-----------+------------+
| 0.0| 0.0|     0.0|0.89|  0.0|     0.31|     0.0|     0.0|     0.0|        0.0|         0.0|
+----+----+--------+----+-----+---------+--------+--------+--------+-----------+------------+



In [24]:
df_clean = df_clean.na.drop()

In [25]:
df_clean.filter(
    col("Year").isNull() | col("Publisher").isNull()
).count()

0

## Checking for inconsistencies in the data

In [26]:
if "Year" in df.columns:
    invalid_years = df.filter(col("Year") > 2025)
    print("Future-dated entries (Year > 2025):", invalid_years.count())
else:
    print("The 'Year' column does not exist in the dataset.")

numeric_columns = [f.name for f in df.schema.fields if isinstance(f.dataType, NumericType)]

negative_value_counts = {}

# Check for negative values in numeric columns
for col_name in numeric_columns:
    negative_count = df.filter(col(col_name) < 0).count()
    if negative_count > 0:
        negative_value_counts[col_name] = negative_count

if negative_value_counts:
    print("Negative values found in the following numeric columns:")
    for col_name, count in negative_value_counts.items():
        print(f" - {col_name}: {count}")
else:
    print("No negative values found in numeric columns")

Future-dated entries (Year > 2025): 0
No negative values found in numeric columns


In [27]:
df_clean = df_clean.filter(~col("Year").isin([2020, 2017]))

In [28]:
df_clean.groupBy('Year').count().sort('count').show()

+----+-----+
|Year|count|
+----+-----+
|1980|    9|
|1985|   14|
|1984|   14|
|1988|   15|
|1990|   16|
|1987|   16|
|1989|   17|
|1983|   17|
|1986|   21|
|1982|   36|
|1991|   41|
|1992|   43|
|1981|   46|
|1993|   60|
|1994|  121|
|1995|  219|
|1996|  263|
|1997|  289|
|1999|  339|
|2016|  342|
+----+-----+
only showing top 20 rows



## Identification of outliers using IQR

In [29]:
import numpy as np
def check_outliers(data, quantile_resolution=0.01):

  outlier_counts = {}
  numeric_columns = [f.name for f in data.schema.fields if isinstance(f.dataType, NumericType)]

  for col_name in numeric_columns:
      try:
          quantiles = data.approxQuantile(col_name, [0.25, 0.75], quantile_resolution)
          if len(quantiles) < 2:
              continue
          Q1, Q3 = quantiles
          IQR = Q3 - Q1
          lower_bound = Q1 - 1.5 * IQR
          upper_bound = Q3 + 1.5 * IQR

          outliers = data.filter((col(col_name) < lower_bound) | (col(col_name) > upper_bound))
          count_outliers = outliers.count()

          outlier_percent = np.round(count_outliers / total_rows * 100, 2)
          #print(outlier_percent)

          outlier_counts[col_name] = count_outliers, outlier_percent
      except Exception as e:
          print(f"Error for column '{col_name}': {e}")

  for col_name, count in outlier_counts.items():
      print(f"Number of outliers '{col_name}': {count[0]} ({count[1]} %)")

In [30]:
check_outliers(df_clean)

Number of outliers 'Rank': 0 (0.0 %)
Number of outliers 'Year': 305 (1.84 %)
Number of outliers 'NA_Sales': 1769 (10.66 %)
Number of outliers 'EU_Sales': 2071 (12.48 %)
Number of outliers 'JP_Sales': 2926 (17.63 %)
Number of outliers 'Other_Sales': 2256 (13.59 %)
Number of outliers 'Global_Sales': 1883 (11.34 %)


In [31]:
num_cols = [f.name for f in df_clean.schema.fields if isinstance(f.dataType, NumericType)]

In [32]:
df_clean.select(
    [skewness(c).alias(f'skew_{c}') for c in num_cols] +
    [kurtosis(c).alias(f'kurt_{c}') for c in num_cols]
).show()

+--------------------+-------------------+------------------+------------------+------------------+------------------+------------------+-------------------+------------------+-----------------+----------------+-----------------+------------------+-----------------+
|           skew_Rank|          skew_Year|     skew_NA_Sales|     skew_EU_Sales|     skew_JP_Sales|  skew_Other_Sales| skew_Global_Sales|          kurt_Rank|         kurt_Year|    kurt_NA_Sales|   kurt_EU_Sales|    kurt_JP_Sales|  kurt_Other_Sales|kurt_Global_Sales|
+--------------------+-------------------+------------------+------------------+------------------+------------------+------------------+-------------------+------------------+-----------------+----------------+-----------------+------------------+-----------------+
|0.001847214914020...|-1.0059727103739404|18.738133243760107|18.788464506292332|11.155980827141203|24.127973038428316|17.326939409408688|-1.2006314255968686|1.8580677857938142|643.8892591522499|748.5

In [33]:
df_clean.describe(num_cols[1:]).show()

+-------+------------------+------------------+-------------------+-------------------+-------------------+------------------+
|summary|              Year|          NA_Sales|           EU_Sales|           JP_Sales|        Other_Sales|      Global_Sales|
+-------+------------------+------------------+-------------------+-------------------+-------------------+------------------+
|  count|             16412|             16412|              16412|              16412|              16412|             16412|
|   mean|2006.4079332195954|0.2656300268096706|0.14764501584209988|0.07836582988056927|0.04840604435777989|0.5403180599561945|
| stddev| 5.817685332012175|0.8205505754914889| 0.5079653797237415| 0.3108534125628473| 0.1895532403780277|1.5628664150657214|
|    min|              1980|               0.0|                0.0|                0.0|                0.0|              0.01|
|    max|              2016|             41.49|              29.02|              10.22|              10.57|    

- NA_Sales - extreme rightward distribution (mean much < max)
- EU_Sales - similarly, strongly skewed distribution
- JP_Sales - higher concentration of low values
- Other_Sales - small values, dominance of low values
- Global_Sales - global sales also show strong skewness

Conclusions:
- All sales data are strongly right-skewed - most games sell poorly, and only a few reach very high numbers.

- This is confirmed by the logarithms of sales and their distribution - low averages, low maxes

- low sales of most games - confirms the Pareto principle: 20% of games generate 80% of sales

## Full Pipeline

In [34]:
from pyspark.ml import Transformer
from pyspark.sql.functions import (
    col, when, log1p, round as spark_round, desc,
    count, avg, row_number, coalesce
)
from pyspark.sql.types import NumericType
from pyspark.sql import DataFrame, Window

class Preprocessing(Transformer):
    def _transform(self, df: DataFrame) -> DataFrame:

        df = df.na.replace("N/A", None)

        df = df.dropDuplicates()

        if "Year" in df.columns:
            df = df.withColumn("Year", col("Year").cast("integer"))

        avg_year_df = df.filter(col("Year").isNotNull()) \
            .groupBy("Name") \
            .agg(spark_round(avg("Year")).cast("int").alias("Avg_Year"))

        pub_window = Window.partitionBy("Name", "Publisher")
        pub_counts = df.filter(col("Publisher").isNotNull()) \
            .withColumn("pub_count", count("*").over(pub_window))

        ranked_publishers = pub_counts \
            .withColumn("rank", row_number().over(Window.partitionBy("Name").orderBy(desc("pub_count")))) \
            .filter(col("rank") == 1) \
            .select("Name", col("Publisher").alias("Most_Common_Publisher"))

        fill_df = avg_year_df.join(ranked_publishers, on="Name", how="outer")
        df = df.join(fill_df, on="Name", how="left")

        df = df.withColumn("Year", coalesce(col("Year"), col("Avg_Year"))) \
               .withColumn("Publisher", coalesce(col("Publisher"), col("Most_Common_Publisher"))) \
               .drop("Avg_Year", "Most_Common_Publisher")

        df = df.filter(~col("Year").isin([2017, 2020]))

        df = df.na.drop()

        sales_columns = ['NA_Sales', 'EU_Sales', 'JP_Sales', 'Other_Sales', 'Global_Sales']
        for col_name in sales_columns:
            if col_name in df.columns:
                df = df.withColumn(f'{col_name}_log', log1p(col(col_name)))


        df = df.withColumn("Is_Japan_Hit", when(col("JP_Sales") > 1.0, 1).otherwise(0))

        return df

In [35]:
from pyspark.ml import Pipeline

preprocessing = Preprocessing()

pipeline = Pipeline(stages=[preprocessing])

pipeline_model = pipeline.fit(df)
prepared_df = pipeline_model.transform(df)

prepared_df.show(5, truncate=False)

+--------------------------------------+----+--------+----+------------+---------------------------+--------+--------+--------+-----------+------------+-------------------+------------------+--------------------+-------------------+------------------+------------+
|Name                                  |Rank|Platform|Year|Genre       |Publisher                  |NA_Sales|EU_Sales|JP_Sales|Other_Sales|Global_Sales|NA_Sales_log       |EU_Sales_log      |JP_Sales_log        |Other_Sales_log    |Global_Sales_log  |Is_Japan_Hit|
+--------------------------------------+----+--------+----+------------+---------------------------+--------+--------+--------+-----------+------------+-------------------+------------------+--------------------+-------------------+------------------+------------+
|FIFA Soccer 11                        |200 |PS3     |2010|Sports      |Electronic Arts            |0.6     |3.29    |0.06    |1.13       |5.08        |0.4700036292457356 |1.4562867329399256|0.058268908123

In [38]:
prepared_df.write.csv("cleaned_data.csv", header=True, mode="overwrite")