In [91]:
from pyspark.sql import *

spark = SparkSession.builder.master('local').getOrCreate()
sc = spark.sparkContext

In [92]:
sales_ranking_df = spark.read.csv("../datasets/raw_data/general/vgsales.csv", header=True)
sales_ranking_df.createOrReplaceTempView("sales_ranking")

In [283]:
sales_ranking_df.select(sales_ranking_df.Rank.cast("int"),
                        sales_ranking_df.Name,
                        sales_ranking_df.Platform,
                        sales_ranking_df.Year.cast("int"),
                        sales_ranking_df.Genre,
                        sales_ranking_df.Publisher,
                        sales_ranking_df.NA_Sales.cast("float"),
                        sales_ranking_df.EU_Sales.cast("float"),
                        sales_ranking_df.JP_Sales.cast("float"),
                        sales_ranking_df.Other_Sales.cast("float"),
                        sales_ranking_df.Global_Sales.cast("float"),
                       ).write\
                        .mode("overwrite")\
                        .partitionBy('Year')\
                        .parquet("../datasets/refined_data/general/vgsales_datatype_fixed")

In [284]:
sales_ranking_from_parquet_df = spark.read.parquet("../datasets/refined_data/general/vgsales_datatype_fixed")
sales_ranking_from_parquet_df.createOrReplaceTempView("sales_ranking_parquet")

In [202]:
sales_ranking_df.count()

16598

In [188]:
sales_ranking_from_parquet_df.count()

16598

In [195]:
spark.sql("select count(*) from sales_ranking").show()

+--------+
|count(1)|
+--------+
|   16598|
+--------+



In [196]:
spark.sql("select count(*) from sales_ranking_parquet").show()

+--------+
|count(1)|
+--------+
|   16598|
+--------+



In [285]:
# sales_ranking_df.show(10,True)
sales_ranking_from_parquet_df.sort("Rank").show(10)

+----+--------------------+--------+------------+---------+--------+--------+--------+-----------+------------+----+
|Rank|                Name|Platform|       Genre|Publisher|NA_Sales|EU_Sales|JP_Sales|Other_Sales|Global_Sales|Year|
+----+--------------------+--------+------------+---------+--------+--------+--------+-----------+------------+----+
|   1|          Wii Sports|     Wii|      Sports| Nintendo|   41.49|   29.02|    3.77|       8.46|       82.74|2006|
|   2|   Super Mario Bros.|     NES|    Platform| Nintendo|   29.08|    3.58|    6.81|       0.77|       40.24|1985|
|   3|      Mario Kart Wii|     Wii|      Racing| Nintendo|   15.85|   12.88|    3.79|       3.31|       35.82|2008|
|   4|   Wii Sports Resort|     Wii|      Sports| Nintendo|   15.75|   11.01|    3.28|       2.96|        33.0|2009|
|   5|Pokemon Red/Pokem...|      GB|Role-Playing| Nintendo|   11.27|    8.89|   10.22|        1.0|       31.37|1996|
|   6|              Tetris|      GB|      Puzzle| Nintendo|    2

In [158]:
q = """
    select Platform,
            sum(cast(NA_Sales as decimal(18,2))) as sum_NA_sales,
            sum(cast(EU_Sales as decimal(18,2))) as sum_EU_sales,
            sum(cast(JP_Sales as decimal(18,2))) as sum_JP_sales,
            sum(cast(Other_Sales as decimal(18,2))) as sum_Other_sales,
            sum(cast(Global_Sales as decimal(18,2))) as sum_Global_sales
    from sales_ranking
    group by Platform
    order by sum_Global_sales desc
    """
sales_per_platform = spark.sql(q).show(100)

+--------+------------+------------+------------+---------------+----------------+
|Platform|sum_NA_sales|sum_EU_sales|sum_JP_sales|sum_Other_sales|sum_Global_sales|
+--------+------------+------------+------------+---------------+----------------+
|     PS2|      583.84|      339.29|      139.20|         193.44|         1255.64|
|    X360|      601.05|      280.58|       12.43|          85.54|          979.96|
|     PS3|      392.26|      343.71|       79.99|         141.93|          957.84|
|     Wii|      507.71|      268.38|       69.35|          80.61|          926.71|
|      DS|      390.71|      194.65|      175.57|          60.53|          822.49|
|      PS|      336.51|      213.60|      139.82|          40.91|          730.66|
|     GBA|      187.54|       75.25|       47.33|           7.73|          318.50|
|     PSP|      108.99|       68.25|       76.79|          42.19|          296.28|
|     PS4|       96.80|      123.70|       14.30|          43.36|          278.10|
|   

In [204]:
# sales_ranking_df.groupBy("Platform").sum("Global_Sales").sort(desc("sum(Global_Sales)")).collect()
# sales_ranking_df.select(sales_ranking_df.Platform, sales_ranking_df.Global_Sales.cast("float")).show()
# grouped_global_sales = sales_ranking_df\
# .select(sales_ranking_df.Platform, sales_ranking_df.Global_Sales.cast("float"))\
# .groupBy(sales_ranking_df.Platform)\
# .sum("Global_Sales")

# grouped_global_sales.select(grouped_global_sales.Platform, grouped_global_sales.Column("sum(Global_Sales)")).show()

# def printRows(rows):
#     for row in rows:
#             print(row.Platform)

            
# def printRow(row):
#     print(row.Platform)

# platform_df.repartition(1).foreachPartition(printRows)
# platform_df.repartition(1).foreach(printRow)

# platform_df.show(100)
# spark.sql(q).show(100)

#### Challenge - Find the Top 3 Game per Platform and build a DF with this data

In [301]:
# SparkSQL
q_platforms = "select distinct Platform from sales_ranking_parquet"
platform_df = spark.sql(q_platforms).cache()
sales_ranking_from_parquet_df.cache()
schema = sales_ranking_from_parquet_df.schema
new_df = spark.createDataFrame(sc.emptyRDD(), schema)


for platform in platform_df.collect():
    q = f"""
        select *
        from sales_ranking_parquet
        where Platform = '{platform.Platform}'
        order by Rank
        limit 3;
        """
    filtered_df = spark.sql(q)
    new_df = new_df.union(filtered_df)    

new_df.show(100)

+-----+--------------------+--------+------------+--------------------+--------+--------+--------+-----------+------------+----+
| Rank|                Name|Platform|       Genre|           Publisher|NA_Sales|EU_Sales|JP_Sales|Other_Sales|Global_Sales|Year|
+-----+--------------------+--------+------------+--------------------+--------+--------+--------+-----------+------------+----+
|12637|         Policenauts|     3DO|   Adventure|Konami Digital En...|     0.0|     0.0|    0.06|        0.0|        0.06|1995|
|14999|         Bust-A-Move|     3DO|      Puzzle|         Micro Cabin|     0.0|     0.0|    0.02|        0.0|        0.02|1994|
|15482|Sotsugyou II: Neo...|     3DO|  Simulation|          Imageworks|     0.0|     0.0|    0.02|        0.0|        0.02|1995|
|   84|          The Sims 3|      PC|  Simulation|     Electronic Arts|    0.98|    6.42|     0.0|       0.71|        8.11|2009|
|  138|   World of Warcraft|      PC|Role-Playing|          Activision|    0.07|    6.21|     0.0

In [302]:
# Vanilla Spark
platform_df = sales_ranking_from_parquet_df.select("Platform").dropDuplicates().cache()
sales_ranking_from_parquet_df.cache()
schema = sales_ranking_from_parquet_df.schema
new_df = spark.createDataFrame(sc.emptyRDD(), schema)

for platform in platform_df.collect():
    filtered_df = sales_ranking_from_parquet_df.filter(sales_ranking_from_parquet_df.Platform == platform.Platform).limit(3)
    new_df = new_df.union(filtered_df)    

new_df.show(100)

+-----+--------------------+--------+------------+--------------------+--------+--------+--------+-----------+------------+----+
| Rank|                Name|Platform|       Genre|           Publisher|NA_Sales|EU_Sales|JP_Sales|Other_Sales|Global_Sales|Year|
+-----+--------------------+--------+------------+--------------------+--------+--------+--------+-----------+------------+----+
|12637|         Policenauts|     3DO|   Adventure|Konami Digital En...|     0.0|     0.0|    0.06|        0.0|        0.06|1995|
|15482|Sotsugyou II: Neo...|     3DO|  Simulation|          Imageworks|     0.0|     0.0|    0.02|        0.0|        0.02|1995|
|14999|         Bust-A-Move|     3DO|      Puzzle|         Micro Cabin|     0.0|     0.0|    0.02|        0.0|        0.02|1994|
| 1488|World of Warcraft...|      PC|Role-Playing|          Activision|    0.01|    0.13|     0.0|       1.18|        1.32|2008|
| 1719|               Spore|      PC|    Strategy|     Electronic Arts|    0.03|    1.06|     0.0