In [9]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

In [10]:
df = spark.read.option("header", True).csv('data/vgsales.csv')
df.show()

+----+--------------------+--------+----+------------+--------------------+--------+--------+--------+-----------+------------+
|Rank|                Name|Platform|Year|       Genre|           Publisher|NA_Sales|EU_Sales|JP_Sales|Other_Sales|Global_Sales|
+----+--------------------+--------+----+------------+--------------------+--------+--------+--------+-----------+------------+
|   1|          Wii Sports|     Wii|2006|      Sports|            Nintendo|   41.49|   29.02|    3.77|       8.46|       82.74|
|   2|   Super Mario Bros.|     NES|1985|    Platform|            Nintendo|   29.08|    3.58|    6.81|       0.77|       40.24|
|   3|      Mario Kart Wii|     Wii|2008|      Racing|            Nintendo|   15.85|   12.88|    3.79|       3.31|       35.82|
|   4|   Wii Sports Resort|     Wii|2009|      Sports|            Nintendo|   15.75|   11.01|    3.28|       2.96|          33|
|   5|Pokemon Red/Pokem...|      GB|1996|Role-Playing|            Nintendo|   11.27|    8.89|   10.22|  

In [11]:
from pyspark.sql.functions import col, sum, round
import pyspark.sql.functions as func
from pyspark.sql.window import Window

In [12]:
df = df.withColumn('Sales_per_year', sum('Global_Sales').over(Window.partitionBy('Year')))
df = df.withColumn('Sales_per_year', df['Sales_per_year'].cast('float'))
df = df.select('*',round('Sales_per_year',2))
df = df.drop('Sales_per_year')
df.show()

+----+--------------------+--------+----+--------+------------+--------+--------+--------+-----------+------------+------------------------+
|Rank|                Name|Platform|Year|   Genre|   Publisher|NA_Sales|EU_Sales|JP_Sales|Other_Sales|Global_Sales|round(Sales_per_year, 2)|
+----+--------------------+--------+----+--------+------------+--------+--------+--------+-----------+------------+------------------------+
| 259|           Asteroids|    2600|1980| Shooter|       Atari|       4|    0.26|       0|       0.05|        4.31|                   11.38|
| 545|     Missile Command|    2600|1980| Shooter|       Atari|    2.56|    0.17|       0|       0.03|        2.76|                   11.38|
|1768|             Kaboom!|    2600|1980|    Misc|  Activision|    1.07|    0.07|       0|       0.01|        1.15|                   11.38|
|1971|            Defender|    2600|1980|    Misc|       Atari|    0.99|    0.05|       0|       0.01|        1.05|                   11.38|
|2671|       

In [13]:
df = df.withColumn('Sales_year_platform_NA', sum('NA_Sales').over(Window.partitionBy('Year','Platform')))
df = df.withColumn('Sales_year_platform_NA', df['Sales_year_platform_NA'].cast('float'))
df = df.select('*',round('Sales_year_platform_NA',2))
df = df.drop('Sales_year_platform_NA')

df.show()

+----+--------------------+--------+----+--------+------------+--------+--------+--------+-----------+------------+------------------------+--------------------------------+
|Rank|                Name|Platform|Year|   Genre|   Publisher|NA_Sales|EU_Sales|JP_Sales|Other_Sales|Global_Sales|round(Sales_per_year, 2)|round(Sales_year_platform_NA, 2)|
+----+--------------------+--------+----+--------+------------+--------+--------+--------+-----------+------------+------------------------+--------------------------------+
| 259|           Asteroids|    2600|1980| Shooter|       Atari|       4|    0.26|       0|       0.05|        4.31|                   11.38|                           10.59|
| 545|     Missile Command|    2600|1980| Shooter|       Atari|    2.56|    0.17|       0|       0.03|        2.76|                   11.38|                           10.59|
|1768|             Kaboom!|    2600|1980|    Misc|  Activision|    1.07|    0.07|       0|       0.01|        1.15|               

In [26]:
df = df.withColumn('Sales_year_platform_EU', sum('EU_Sales').over(Window.partitionBy('Year','Platform')))
df = df.withColumn('Sales_year_platform_EU', df['Sales_year_platform_EU'].cast('float'))
df = df.select('*',round('Sales_year_platform_EU',2))
df = df.drop('Sales_year_platform_EU')

+----+--------------------+--------+----+--------+------------+--------+--------+--------+-----------+------------+--------------+------------------------+----------------------+--------------------------------+----------------------+--------------------------------+
|Rank|                Name|Platform|Year|   Genre|   Publisher|NA_Sales|EU_Sales|JP_Sales|Other_Sales|Global_Sales|Sales_per_year|round(Sales_per_year, 2)|Sales_year_platform_NA|round(Sales_year_platform_NA, 2)|Sales_year_platform_EU|round(Sales_year_platform_EU, 2)|
+----+--------------------+--------+----+--------+------------+--------+--------+--------+-----------+------------+--------------+------------------------+----------------------+--------------------------------+----------------------+--------------------------------+
| 259|           Asteroids|    2600|1980| Shooter|       Atari|       4|    0.26|       0|       0.05|        4.31|         11.38|                   11.38|                 10.59|                  

In [14]:
df = df.withColumn('Sales_year_platform_JP', sum('JP_Sales').over(Window.partitionBy('Year','Platform')))
df = df.withColumn('Sales_year_platform_JP', df['Sales_year_platform_JP'].cast('float'))
df = df.select('*',round('Sales_year_platform_JP',2))
df = df.drop('Sales_year_platform_JP')

In [15]:
df = df.withColumn('Sales_year_platform_Other', sum('Other_Sales').over(Window.partitionBy('Year','Platform')))
df = df.withColumn('Sales_year_platform_Other', df['Sales_year_platform_Other'].cast('float'))
df = df.select('*',round('Sales_year_platform_Other',2))
df = df.drop('Sales_year_platform_Other')

In [16]:
df = df.withColumn('Percent_publisher_NA', col('NA_Sales')/sum(col('NA_Sales')).over(Window.partitionBy('Year','Platform')))
df = df.withColumn('Percent_publisher_NA', df['Percent_publisher_NA'].cast('float'))
df = df.select('*',round('Percent_publisher_NA',2))
df = df.drop('Percent_publisher_NA')

In [17]:
df = df.withColumn('Percent_publisher_EU', col('EU_Sales')/sum(col('EU_Sales')).over(Window.partitionBy('Year','Platform')))
df = df.withColumn('Percent_publisher_EU', df['Percent_publisher_EU'].cast('float'))
df = df.select('*',round('Percent_publisher_EU',2))
df = df.drop('Percent_publisher_EU')

In [18]:
df = df.withColumn('Percent_publisher_JP', col('JP_Sales')/sum(col('JP_Sales')).over(Window.partitionBy('Year','Platform')))
df = df.withColumn('Percent_publisher_JP', df['Percent_publisher_JP'].cast('float'))
df = df.select('*',round('Percent_publisher_JP',2))
df = df.drop('Percent_publisher_JP')

In [19]:
df = df.withColumn('Percent_publisher_Other', col('Other_Sales')/sum(col('Other_Sales')).over(Window.partitionBy('Year','Platform')))
df = df.withColumn('Percent_publisher_Other', df['Percent_publisher_Other'].cast('float'))
df = df.select('*',round('Percent_publisher_Other',2))
df = df.drop('Percent_publisher_Other')

In [20]:
df = df.withColumn('Percent_publisher', col('Global_Sales')/sum(col('Global_Sales')).over(Window.partitionBy('Publisher')))
df = df.withColumn('Percent_publisher', df['Percent_publisher'].cast('float'))
df = df.select('*',round('Percent_publisher',2))
df = df.drop('Percent_publisher')

In [21]:
df = df.withColumn('Game_sales', sum('Global_Sales').over(Window.partitionBy('Year')))
df = df.withColumn('Game_sales', df['Game_sales'].cast('float'))
df = df.select('*',round('Game_sales',2))
df = df.drop('Game_sales')

In [22]:
df.printSchema()

root
 |-- Rank: string (nullable = true)
 |-- Name: string (nullable = true)
 |-- Platform: string (nullable = true)
 |-- Year: string (nullable = true)
 |-- Genre: string (nullable = true)
 |-- Publisher: string (nullable = true)
 |-- NA_Sales: string (nullable = true)
 |-- EU_Sales: string (nullable = true)
 |-- JP_Sales: string (nullable = true)
 |-- Other_Sales: string (nullable = true)
 |-- Global_Sales: string (nullable = true)
 |-- round(Sales_per_year, 2): float (nullable = true)
 |-- round(Sales_year_platform_NA, 2): float (nullable = true)
 |-- round(Sales_year_platform_JP, 2): float (nullable = true)
 |-- round(Sales_year_platform_Other, 2): float (nullable = true)
 |-- round(Percent_publisher_NA, 2): float (nullable = true)
 |-- round(Percent_publisher_EU, 2): float (nullable = true)
 |-- round(Percent_publisher_JP, 2): float (nullable = true)
 |-- round(Percent_publisher_Other, 2): float (nullable = true)
 |-- round(Percent_publisher, 2): float (nullable = true)
 |-- round

In [23]:
df.show()

+----+--------------------+--------+----+--------+--------------------+--------+--------+--------+-----------+------------+------------------------+--------------------------------+--------------------------------+-----------------------------------+------------------------------+------------------------------+------------------------------+---------------------------------+---------------------------+--------------------+
|Rank|                Name|Platform|Year|   Genre|           Publisher|NA_Sales|EU_Sales|JP_Sales|Other_Sales|Global_Sales|round(Sales_per_year, 2)|round(Sales_year_platform_NA, 2)|round(Sales_year_platform_JP, 2)|round(Sales_year_platform_Other, 2)|round(Percent_publisher_NA, 2)|round(Percent_publisher_EU, 2)|round(Percent_publisher_JP, 2)|round(Percent_publisher_Other, 2)|round(Percent_publisher, 2)|round(Game_sales, 2)|
+----+--------------------+--------+----+--------+--------------------+--------+--------+--------+-----------+------------+-----------------------