In [1]:
# Spark SQL
!pip install pyspark[sql]
# Spark Connect
!pip install pyspark[connect]

Collecting pyspark[sql]
  Downloading pyspark-3.5.0.tar.gz (316.9 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m316.9/316.9 MB[0m [31m3.6 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.0-py2.py3-none-any.whl size=317425345 sha256=9392787445764efe39b5a15e2ae5560d05ec569614de463cfc9c4427df306577
  Stored in directory: /root/.cache/pip/wheels/41/4e/10/c2cf2467f71c678cfc8a6b9ac9241e5e44a01940da8fbb17fc
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.0
Collecting grpcio-status>=1.56.0 (from pyspark[connect])
  Downloading grpcio_status-1.60.0-py3-none-any.whl (14 kB)
Collecting protobuf!=3.20.0,!=3.20.1,!=4.21.1,!=4.21.2,!=4.21.3,!=4.21.4,!=4.21.5,<5.0.0.dev0,>=3.19.5 (from googleapis-common-protos>=1.56.4->pyspark[co

In [2]:
from pyspark.sql import SparkSession
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import VectorAssembler

In [3]:
myspark = SparkSession.builder.appName("LinearRegressionAssignment").getOrCreate()

In [4]:
#file location and type

file_location = "/content/videogamesales.csv"
file_type = "csv"

df = myspark.read.csv(file_location, header = True, inferSchema = True)

In [5]:
df.show()

+----+--------------------+--------+----+------------+--------------------+--------+--------+--------+-----------+------------+
|Rank|                Name|Platform|Year|       Genre|           Publisher|NA_Sales|EU_Sales|JP_Sales|Other_Sales|Global_Sales|
+----+--------------------+--------+----+------------+--------------------+--------+--------+--------+-----------+------------+
|   1|          Wii Sports|     Wii|2006|      Sports|            Nintendo|   41.49|   29.02|    3.77|       8.46|       82.74|
|   2|   Super Mario Bros.|     NES|1985|    Platform|            Nintendo|   29.08|    3.58|    6.81|       0.77|       40.24|
|   3|      Mario Kart Wii|     Wii|2008|      Racing|            Nintendo|   15.85|   12.88|    3.79|       3.31|       35.82|
|   4|   Wii Sports Resort|     Wii|2009|      Sports|            Nintendo|   15.75|   11.01|    3.28|       2.96|        33.0|
|   5|Pokemon Red/Pokem...|      GB|1996|Role-Playing|            Nintendo|   11.27|    8.89|   10.22|  

In [6]:
df.printSchema()

root
 |-- Rank: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Platform: string (nullable = true)
 |-- Year: string (nullable = true)
 |-- Genre: string (nullable = true)
 |-- Publisher: string (nullable = true)
 |-- NA_Sales: double (nullable = true)
 |-- EU_Sales: double (nullable = true)
 |-- JP_Sales: double (nullable = true)
 |-- Other_Sales: double (nullable = true)
 |-- Global_Sales: double (nullable = true)



In [7]:
df.columns

['Rank',
 'Name',
 'Platform',
 'Year',
 'Genre',
 'Publisher',
 'NA_Sales',
 'EU_Sales',
 'JP_Sales',
 'Other_Sales',
 'Global_Sales']

In [8]:
df.count()

16598

In [9]:
#Handling Categorical Feature

from pyspark.ml.feature import StringIndexer

In [10]:
from pyspark.sql.functions import col

df.where(col("Year").isNotNull()).count()

df = df.filter((df.Year != 'N/A') & (df.Publisher != 'N/A'))
df.count()

16291

In [None]:
# df.groupBy('Name').count().show()

In [None]:
# df.groupBy('Publisher').count().show()

In [None]:
# df.groupBy('Platform').count().show()

In [None]:
# df.na.drop().show()

In [11]:
df.printSchema()

root
 |-- Rank: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Platform: string (nullable = true)
 |-- Year: string (nullable = true)
 |-- Genre: string (nullable = true)
 |-- Publisher: string (nullable = true)
 |-- NA_Sales: double (nullable = true)
 |-- EU_Sales: double (nullable = true)
 |-- JP_Sales: double (nullable = true)
 |-- Other_Sales: double (nullable = true)
 |-- Global_Sales: double (nullable = true)



In [12]:
from pyspark.ml.feature import StringIndexer

indexer = StringIndexer(inputCols=["Name","Platform","Year","Genre","Publisher"], outputCols=["Name_indexed","Platform_indexed","Year_indexed","Genre_indexed","Publisher_indexed"])
df_r = indexer.fit(df).transform(df)
df_r.show()

+----+--------------------+--------+----+------------+--------------------+--------+--------+--------+-----------+------------+------------+----------------+------------+-------------+-----------------+
|Rank|                Name|Platform|Year|       Genre|           Publisher|NA_Sales|EU_Sales|JP_Sales|Other_Sales|Global_Sales|Name_indexed|Platform_indexed|Year_indexed|Genre_indexed|Publisher_indexed|
+----+--------------------+--------+----+------------+--------------------+--------+--------+--------+-----------+------------+------------+----------------+------------+-------------+-----------------+
|   1|          Wii Sports|     Wii|2006|      Sports|            Nintendo|   41.49|   29.02|    3.77|       8.46|       82.74|     10954.0|             3.0|         5.0|          1.0|              6.0|
|   2|   Super Mario Bros.|     NES|1985|    Platform|            Nintendo|   29.08|    3.58|    6.81|       0.77|       40.24|      2449.0|            20.0|        35.0|          7.0|    

In [13]:
df_r.printSchema()

root
 |-- Rank: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Platform: string (nullable = true)
 |-- Year: string (nullable = true)
 |-- Genre: string (nullable = true)
 |-- Publisher: string (nullable = true)
 |-- NA_Sales: double (nullable = true)
 |-- EU_Sales: double (nullable = true)
 |-- JP_Sales: double (nullable = true)
 |-- Other_Sales: double (nullable = true)
 |-- Global_Sales: double (nullable = true)
 |-- Name_indexed: double (nullable = false)
 |-- Platform_indexed: double (nullable = false)
 |-- Year_indexed: double (nullable = false)
 |-- Genre_indexed: double (nullable = false)
 |-- Publisher_indexed: double (nullable = false)



In [None]:
# from pyspark.sql.functions import col,isnan,when,count
# df2 = df.select([count(when(col(c).contains('None') | \
#                             col(c).contains('NULL') | \
#                             (col(c) == '' ) | \
#                             col(c).isNull() | \
#                             isnan(c), c
#                            )).alias(c)
#                     for c in df.columns])
# df2.show()

In [None]:
# df2=df.select([when(col(c)=="",0).otherwise(col(c)).alias(c) for c in df.columns])
# df2.show()

In [None]:
# from pyspark.sql.functions import col,isnan,when,count
# df3 = df2.select([count(when(col(c).contains('None') | \
#                             col(c).contains('NULL') | \
#                             (col(c) == '' ) | \
#                             col(c).isNull() | \
#                             isnan(c), c
#                            )).alias(c)
#                     for c in df2.columns])
# df3.show()

In [None]:
# indexer = StringIndexer(inputCols=["Name","Platfrom","Year","Genre","Publisher"], outputCols=["Name_indexed","Platfrom_indexed","Year_indexed","Genre_indexed","Publisher_indexed"])
# df3_r =indexer.fit(df3).transform(df3)
# df3_r.show()

In [None]:
# df1 = df.filter((df.Name != 'N/A') | (df.Publisher != 'N/A'))

In [None]:
# df1.show()

In [None]:
# from pyspark.sql.functions import col,isnan,when,count
# df2 = df1.select([count(when(col(c).contains('N/A') | \
#                             col(c).contains('NULL') | \
#                             (col(c) == '' ) | \
#                             col(c).isNull() | \
#                             isnan(c), c
#                            )).alias(c)
#                     for c in df1.columns])
# df2.show()

In [None]:
# from pyspark.sql.types import IntegerType
# df = df.withColumn("Year", df["Year"].cast(IntegerType()))

In [None]:
# df_new = df.select("Rank", "Platform", "Genre", "NA_Sales", "EU_Sales", "JP_Sales", "Other_Sales", "Global_Sales")

In [None]:
# df_new.show()

In [None]:
# from pyspark.sql.functions import col,isnan,when,count
# df_new1 = df_new.select([count(when(col(c).contains('N/A') | \
#                             col(c).contains('NULL') | \
#                             (col(c) == '' ) | \
#                             col(c).isNull() | \
#                             isnan(c), c
#                            )).alias(c)
#                     for c in df_new.columns])
# df_new1.show()

In [None]:
# # indexer = StringIndexer(inputCols=["Platfrom","Genre"], outputCols=["Platfrom_indexed","Genre_indexed"])
# # df_new_r =indexer.fit(df_new).transform(df_new)
# # df_new_r.show()


# from pyspark.ml.feature import OneHotEncoder

# encoder = OneHotEncoder(
#     inputCols=["Rank"],
#     outputCols=["Rank_Encoded"]
# )
# model = encoder.fit(df_new)
# encoded_df = model.transform(df_new)
# encoded_df.show()

In [None]:
# from pyspark.ml.feature import StringIndexer

# df_test = myspark.createDataFrame(
# [(0, "a"), (1, "b"), (2, "c"), (3, "a"), (4, "a"), (5, "c")],
# ["id", "category"])

# indexer = StringIndexer(inputCol="category", outputCol="categoryIndex")
# indexed = indexer.fit(df_test).transform(df_test)
# indexed.show()

In [14]:
from pyspark.ml.feature import VectorAssembler

In [15]:
df_r.columns

['Rank',
 'Name',
 'Platform',
 'Year',
 'Genre',
 'Publisher',
 'NA_Sales',
 'EU_Sales',
 'JP_Sales',
 'Other_Sales',
 'Global_Sales',
 'Name_indexed',
 'Platform_indexed',
 'Year_indexed',
 'Genre_indexed',
 'Publisher_indexed']

In [16]:
featureAssembler = VectorAssembler(inputCols = ["Rank", "Name_indexed", "Platform_indexed", "Year_indexed", "Genre_indexed", "Publisher_indexed", "NA_Sales", "EU_Sales", "JP_Sales", "Other_Sales"], outputCol= "Independent Features")

output = featureAssembler.transform(df_r)

In [17]:
output.show()

+----+--------------------+--------+----+------------+--------------------+--------+--------+--------+-----------+------------+------------+----------------+------------+-------------+-----------------+--------------------+
|Rank|                Name|Platform|Year|       Genre|           Publisher|NA_Sales|EU_Sales|JP_Sales|Other_Sales|Global_Sales|Name_indexed|Platform_indexed|Year_indexed|Genre_indexed|Publisher_indexed|Independent Features|
+----+--------------------+--------+----+------------+--------------------+--------+--------+--------+-----------+------------+------------+----------------+------------+-------------+-----------------+--------------------+
|   1|          Wii Sports|     Wii|2006|      Sports|            Nintendo|   41.49|   29.02|    3.77|       8.46|       82.74|     10954.0|             3.0|         5.0|          1.0|              6.0|[1.0,10954.0,3.0,...|
|   2|   Super Mario Bros.|     NES|1985|    Platform|            Nintendo|   29.08|    3.58|    6.81|  

In [18]:
output.select("Independent Features").show()

+--------------------+
|Independent Features|
+--------------------+
|[1.0,10954.0,3.0,...|
|[2.0,2449.0,20.0,...|
|[3.0,6939.0,3.0,1...|
|[4.0,10956.0,3.0,...|
|[5.0,8272.0,21.0,...|
|[6.0,2497.0,21.0,...|
|[7.0,7758.0,0.0,5...|
|[8.0,10952.0,3.0,...|
|[9.0,7761.0,3.0,0...|
|[10.0,4625.0,20.0...|
|[11.0,7809.0,0.0,...|
|[12.0,6938.0,0.0,...|
|[13.0,8259.0,21.0...|
|[14.0,10946.0,3.0...|
|[15.0,10947.0,3.0...|
|[16.0,6459.0,4.0,...|
|[17.0,168.0,2.0,1...|
|[18.0,347.0,1.0,9...|
|[19.0,2451.0,15.0...|
|[20.0,3436.0,0.0,...|
+--------------------+
only showing top 20 rows



In [19]:
finalized_data = output.select("Independent Features", "Global_Sales")

In [20]:
finalized_data.show()

+--------------------+------------+
|Independent Features|Global_Sales|
+--------------------+------------+
|[1.0,10954.0,3.0,...|       82.74|
|[2.0,2449.0,20.0,...|       40.24|
|[3.0,6939.0,3.0,1...|       35.82|
|[4.0,10956.0,3.0,...|        33.0|
|[5.0,8272.0,21.0,...|       31.37|
|[6.0,2497.0,21.0,...|       30.26|
|[7.0,7758.0,0.0,5...|       30.01|
|[8.0,10952.0,3.0,...|       29.02|
|[9.0,7761.0,3.0,0...|       28.62|
|[10.0,4625.0,20.0...|       28.31|
|[11.0,7809.0,0.0,...|       24.76|
|[12.0,6938.0,0.0,...|       23.42|
|[13.0,8259.0,21.0...|        23.1|
|[14.0,10946.0,3.0...|       22.72|
|[15.0,10947.0,3.0...|        22.0|
|[16.0,6459.0,4.0,...|       21.82|
|[17.0,168.0,2.0,1...|        21.4|
|[18.0,347.0,1.0,9...|       20.81|
|[19.0,2451.0,15.0...|       20.61|
|[20.0,3436.0,0.0,...|       20.22|
+--------------------+------------+
only showing top 20 rows



In [21]:
from pyspark.ml.regression import LinearRegression
#Test Train split
train_data, test_data = finalized_data.randomSplit([0.80, 0.20])

regressor = LinearRegression(featuresCol="Independent Features", labelCol = "Global_Sales")
regressor=regressor.fit(train_data)

In [22]:
regressor.coefficients

DenseVector([0.0, -0.0, 0.0, -0.0, 0.0, -0.0, 1.0001, 1.0, 1.0, 0.9998])

In [23]:
regressor.intercept

-0.00042816333572960467

In [25]:
#predictions

pred_results = regressor.evaluate(test_data)

In [26]:
pred_results.predictions.show()

+--------------------+------------+--------------------+
|Independent Features|Global_Sales|          prediction|
+--------------------+------------+--------------------+
|(10,[0,1,2,5,6],[...|        0.39| 0.39016949440124404|
|(10,[0,1,2,5,6],[...|        0.25|  0.2502710436889888|
|(10,[0,1,2,5,6],[...|        0.05| 0.05070355672617592|
|(10,[0,1,2,5,6],[...|        0.03|0.030707880660320742|
|(10,[0,1,2,5,7],[...|        0.02|0.020957614743175517|
|(10,[0,1,2,5,7],[...|        0.02|0.021077771872010907|
|(10,[0,1,2,5,8],[...|        0.24|  0.2400522533287207|
|(10,[0,1,2,5,8],[...|        0.06| 0.06075789036025825|
|(10,[0,1,2,5,8],[...|        0.03|0.030641104013684216|
|(10,[0,1,2,5,8],[...|        0.03|0.030815327277941833|
|(10,[0,1,2,7,9],[...|        0.03|0.040997221025678376|
|(10,[0,1,3,4,8],[...|        0.18| 0.18008777730494172|
|(10,[0,1,3,5,6],[...|        0.07|0.060479130813796605|
|(10,[0,1,3,5,6],[...|        0.07| 0.06040633913889547|
|(10,[0,1,3,5,6],[...|        0

In [27]:
#performance matrices

pred_results.r2, pred_results.meanAbsoluteError, pred_results.meanSquaredError

(0.9999935323748185, 0.002960060100332109, 2.6947835280706155e-05)