In [2]:
# Setup Spark Env
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://apachemirror.wuchna.com/spark/spark-2.4.7/spark-2.4.7-bin-hadoop2.7.tgz
!tar xf spark-2.4.7-bin-hadoop2.7.tgz
!pip install -q findspark
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.7-bin-hadoop2.7"
import findspark
findspark.init()

In [3]:
# Initialize a Spark Application in which all the code for that Session will run on
from pyspark.sql import SparkSession
spark = SparkSession.builder \
    .master("local[*]") \
    .appName("Learning_Spark") \
    .getOrCreate()

In [6]:
# Download the dataset from here https://www.kaggle.com/gregorut/videogamesales

In [7]:
# reading CSV files into a DataFrame
data = spark.read.csv('vgsales.csv',inferSchema=True, header=True)

**Data Exploration**

In [8]:
data.count(), len(data.columns)

(16598, 11)

In [9]:
data.show(5)

+----+--------------------+--------+----+------------+---------+--------+--------+--------+-----------+------------+
|Rank|                Name|Platform|Year|       Genre|Publisher|NA_Sales|EU_Sales|JP_Sales|Other_Sales|Global_Sales|
+----+--------------------+--------+----+------------+---------+--------+--------+--------+-----------+------------+
|   1|          Wii Sports|     Wii|2006|      Sports| Nintendo|   41.49|   29.02|    3.77|       8.46|       82.74|
|   2|   Super Mario Bros.|     NES|1985|    Platform| Nintendo|   29.08|    3.58|    6.81|       0.77|       40.24|
|   3|      Mario Kart Wii|     Wii|2008|      Racing| Nintendo|   15.85|   12.88|    3.79|       3.31|       35.82|
|   4|   Wii Sports Resort|     Wii|2009|      Sports| Nintendo|   15.75|   11.01|    3.28|       2.96|        33.0|
|   5|Pokemon Red/Pokem...|      GB|1996|Role-Playing| Nintendo|   11.27|    8.89|   10.22|        1.0|       31.37|
+----+--------------------+--------+----+------------+---------+

In [10]:
data.printSchema()

root
 |-- Rank: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Platform: string (nullable = true)
 |-- Year: string (nullable = true)
 |-- Genre: string (nullable = true)
 |-- Publisher: string (nullable = true)
 |-- NA_Sales: double (nullable = true)
 |-- EU_Sales: double (nullable = true)
 |-- JP_Sales: double (nullable = true)
 |-- Other_Sales: double (nullable = true)
 |-- Global_Sales: double (nullable = true)



In [12]:
data.select("Name","Platform","Year","Global_Sales") \
.show(15, truncate=False)

+---------------------------+--------+----+------------+
|Name                       |Platform|Year|Global_Sales|
+---------------------------+--------+----+------------+
|Wii Sports                 |Wii     |2006|82.74       |
|Super Mario Bros.          |NES     |1985|40.24       |
|Mario Kart Wii             |Wii     |2008|35.82       |
|Wii Sports Resort          |Wii     |2009|33.0        |
|Pokemon Red/Pokemon Blue   |GB      |1996|31.37       |
|Tetris                     |GB      |1989|30.26       |
|New Super Mario Bros.      |DS      |2006|30.01       |
|Wii Play                   |Wii     |2006|29.02       |
|New Super Mario Bros. Wii  |Wii     |2009|28.62       |
|Duck Hunt                  |NES     |1984|28.31       |
|Nintendogs                 |DS      |2005|24.76       |
|Mario Kart DS              |DS      |2005|23.42       |
|Pokemon Gold/Pokemon Silver|GB      |1999|23.1        |
|Wii Fit                    |Wii     |2007|22.72       |
|Wii Fit Plus               |Wi

In [13]:
data.describe(["Platform","Global_Sales"]).show()

+-------+--------+------------------+
|summary|Platform|      Global_Sales|
+-------+--------+------------------+
|  count|   16598|             16598|
|   mean|  2600.0|  0.53744065550074|
| stddev|     0.0|1.5550279355699066|
|    min|    2600|              0.01|
|    max|    XOne|             82.74|
+-------+--------+------------------+



In [14]:
data.groupBy("Platform") \
.count() \
.orderBy("count", ascending=False) \
.show(10)

+--------+-----+
|Platform|count|
+--------+-----+
|      DS| 2163|
|     PS2| 2161|
|     PS3| 1329|
|     Wii| 1325|
|    X360| 1265|
|     PSP| 1213|
|      PS| 1196|
|      PC|  960|
|      XB|  824|
|     GBA|  822|
+--------+-----+
only showing top 10 rows



In [15]:
from pyspark.sql.types import DoubleType

In [31]:
data = data.withColumn("Year", data["Year"].cast(DoubleType()))
data = data.withColumn("Global_Sales", data["Global_Sales"].cast(DoubleType()))
data = data.withColumn("Rank", data["Rank"].cast(DoubleType()))

In [32]:
from pyspark.ml.feature import VectorAssembler


In [33]:
inputcols = ["Year", "Rank", "Other_Sales"]
assembler = VectorAssembler(inputCols= inputcols,outputCol = "predictors")

In [34]:
predictors = assembler.transform(data)
predictors.columns

['Rank',
 'Name',
 'Platform',
 'Year',
 'Genre',
 'Publisher',
 'NA_Sales',
 'EU_Sales',
 'JP_Sales',
 'Other_Sales',
 'Global_Sales',
 'predictors']

In [35]:
model_data = predictors.select("predictors", "Global_Sales")
model_data.show(5,truncate=False)

+-----------------+------------+
|predictors       |Global_Sales|
+-----------------+------------+
|[2006.0,1.0,8.46]|82.74       |
|[1985.0,2.0,0.77]|40.24       |
|[2008.0,3.0,3.31]|35.82       |
|[2009.0,4.0,2.96]|33.0        |
|[1996.0,5.0,1.0] |31.37       |
+-----------------+------------+
only showing top 5 rows



In [54]:
train_data,test_data = model_data.randomSplit([0.8,0.2])

In [59]:
from pyspark.ml.regression import LinearRegression
from pyspark.ml.linalg import Vectors

In [60]:
# Prepare training data from a list of (label, features) tuples.

training = spark.createDataFrame([
    (1.0, Vectors.dense([0.0, 1.1, 0.1])),
    (0.0, Vectors.dense([2.0, 1.0, -1.0])),
    (0.0, Vectors.dense([2.0, 1.3, 1.0])),
    (1.0, Vectors.dense([0.0, 1.2, -0.5]))], ["label", "features"])

In [62]:
lr = LinearRegression(maxIter=10, regParam=0.01)

In [63]:
model1 = lr.fit(training)

In [70]:
# Prepare test data
test = spark.createDataFrame([
    (1.0, Vectors.dense([0.0, 1.1, 0.1])),
    (0.0, Vectors.dense([2.0, 1.0, -1.0])),
    (0.0, Vectors.dense([2.0, 1.3, 1.0])),
    (1.0, Vectors.dense([0.0, 1.2, -0.5]))], ["label", "features"])

In [71]:
prediction = model1.transform(test)

In [73]:
result = prediction.select("features", "label", "prediction").collect()

In [74]:
for row in result:
    print("features=%s, label=%s -> prediction=%s"
          % (row.features, row.label, row.prediction))

features=[0.0,1.1,0.1], label=1.0 -> prediction=0.9874228695233253
features=[2.0,1.0,-1.0], label=0.0 -> prediction=0.010808847444715441
features=[2.0,1.3,1.0], label=0.0 -> prediction=0.008779670194922096
features=[0.0,1.2,-0.5], label=1.0 -> prediction=0.9929886128370375
