# Installing Pyspark

In [1]:
!pip install pyspark



# Importing Pyspark

In [2]:
import pyspark

In [3]:
import chardet
with open('metal_bands_2017.csv', 'rb') as rawdata:
    result = chardet.detect(rawdata.read(100000))
result

{'encoding': 'ISO-8859-1', 'confidence': 0.7293639268080162, 'language': ''}

In [4]:
import pandas as pd 
pd.read_csv('metal_bands_2017.csv', encoding='ISO-8859-1')

  return f(*args, **kwds)
  return f(*args, **kwds)


Unnamed: 0,band_name,fans,formed,origin,split,style
0,Iron Maiden,4195,1975.0,United Kingdom,,"New wave of british heavy,Heavy"
1,Opeth,4147,1990.0,Sweden,1990.0,"Extreme progressive,Progressive rock,Progressive"
2,Metallica,3712,1981.0,USA,,"Heavy,Bay area thrash"
3,Megadeth,3105,1983.0,USA,1983.0,"Thrash,Heavy,Hard rock"
4,Amon Amarth,3054,1988.0,Sweden,,Melodic death
5,Slayer,2955,1981.0,USA,1981.0,Thrash
6,Death,2690,1983.0,USA,2001.0,"Progressive death,Death,Progressive thrash"
7,Dream Theater,2329,1985.0,USA,1985.0,Progressive
8,Black Sabbath,2307,1968.0,United Kingdom,,"Doom,Heavy,Hard rock"
9,Nightwish,2183,1996.0,Finland,1996.0,"Symphonic power,Gothic,Symphonic"


# Creating a Spark session

In [5]:
from pyspark.sql import SparkSession

In [6]:
spark = SparkSession.builder.appName('Test').getOrCreate()

In [7]:
spark

# Reading the dataframe

In [8]:
df = spark.read.csv('metal_bands_2017.csv')

In [9]:
df.show()

+-----------------+----+------+--------------+-----+--------------------+
|              _c0| _c1|   _c2|           _c3|  _c4|                 _c5|
+-----------------+----+------+--------------+-----+--------------------+
|        band_name|fans|formed|        origin|split|               style|
|      Iron Maiden|4195|  1975|United Kingdom| null|New wave of briti...|
|            Opeth|4147|  1990|        Sweden| 1990|Extreme progressi...|
|        Metallica|3712|  1981|           USA| null|Heavy,Bay area th...|
|         Megadeth|3105|  1983|           USA| 1983|Thrash,Heavy,Hard...|
|      Amon Amarth|3054|  1988|        Sweden| null|       Melodic death|
|           Slayer|2955|  1981|           USA| 1981|              Thrash|
|            Death|2690|  1983|           USA| 2001|Progressive death...|
|    Dream Theater|2329|  1985|           USA| 1985|         Progressive|
|    Black Sabbath|2307|  1968|United Kingdom| null|Doom,Heavy,Hard rock|
|        Nightwish|2183|  1996|       

In [10]:
df_headers = spark.read.csv('metal_bands_2017.csv', inferSchema=True, header=True)

# Methods to view and analyse the dataframe - show, printSchema, columns, head, dtypes

In [11]:
df_headers.show()

+-----------------+----+------+--------------+-----+--------------------+
|        band_name|fans|formed|        origin|split|               style|
+-----------------+----+------+--------------+-----+--------------------+
|      Iron Maiden|4195|  1975|United Kingdom| null|New wave of briti...|
|            Opeth|4147|  1990|        Sweden| 1990|Extreme progressi...|
|        Metallica|3712|  1981|           USA| null|Heavy,Bay area th...|
|         Megadeth|3105|  1983|           USA| 1983|Thrash,Heavy,Hard...|
|      Amon Amarth|3054|  1988|        Sweden| null|       Melodic death|
|           Slayer|2955|  1981|           USA| 1981|              Thrash|
|            Death|2690|  1983|           USA| 2001|Progressive death...|
|    Dream Theater|2329|  1985|           USA| 1985|         Progressive|
|    Black Sabbath|2307|  1968|United Kingdom| null|Doom,Heavy,Hard rock|
|        Nightwish|2183|  1996|       Finland| 1996|Symphonic power,G...|
|Children Of Bodom|2153|  1993|       

In [12]:
type(df_headers)

pyspark.sql.dataframe.DataFrame

In [13]:
df_headers.printSchema()

root
 |-- band_name: string (nullable = true)
 |-- fans: integer (nullable = true)
 |-- formed: integer (nullable = true)
 |-- origin: string (nullable = true)
 |-- split: integer (nullable = true)
 |-- style: string (nullable = true)



In [14]:
df_headers.columns

['band_name', 'fans', 'formed', 'origin', 'split', 'style']

In [15]:
df_headers.head(4)

[Row(band_name='Iron Maiden', fans=4195, formed=1975, origin='United Kingdom', split=None, style='New wave of british heavy,Heavy'),
 Row(band_name='Opeth', fans=4147, formed=1990, origin='Sweden', split=1990, style='Extreme progressive,Progressive rock,Progressive'),
 Row(band_name='Metallica', fans=3712, formed=1981, origin='USA', split=None, style='Heavy,Bay area thrash'),
 Row(band_name='Megadeth', fans=3105, formed=1983, origin='USA', split=1983, style='Thrash,Heavy,Hard rock')]

# Selecting specific columns from the dataframe

In [16]:
df_band = df_headers.select(['band_name', 'fans'])

In [17]:
df_band.show()

+-----------------+----+
|        band_name|fans|
+-----------------+----+
|      Iron Maiden|4195|
|            Opeth|4147|
|        Metallica|3712|
|         Megadeth|3105|
|      Amon Amarth|3054|
|           Slayer|2955|
|            Death|2690|
|    Dream Theater|2329|
|    Black Sabbath|2307|
|        Nightwish|2183|
|Children Of Bodom|2153|
|     Judas Priest|2094|
|   Blind Guardian|2040|
|        In Flames|1932|
|          Pantera|1920|
|Dark Tranquillity|1898|
|         Agalloch|1881|
|        Ensiferum|1879|
|       Arch Enemy|1750|
|        Katatonia|1735|
+-----------------+----+
only showing top 20 rows



In [18]:
df_headers.dtypes

[('band_name', 'string'),
 ('fans', 'int'),
 ('formed', 'int'),
 ('origin', 'string'),
 ('split', 'int'),
 ('style', 'string')]

In [19]:
df_headers.describe().show()

+-------+----------+-----------------+------------------+---------+------------------+--------------------+
|summary| band_name|             fans|            formed|   origin|             split|               style|
+-------+----------+-----------------+------------------+---------+------------------+--------------------+
|  count|      5000|             5000|              4996|     4992|              2785|                5000|
|   mean|    1349.0|          87.8058|2000.4165332265814|     null|2001.2394973070018|                null|
| stddev|      null|296.1375234125429| 8.884580724252693|     null| 8.940119588290948|                null|
|    min|!T.O.O.H.!|                0|              1964|  Albania|              1965|Aggrotech,Melodic...|
|    max| �xx� X��x|             4195|              2016|Venezuela|              2016|Viking folk,Punk ...|
+-------+----------+-----------------+------------------+---------+------------------+--------------------+



# Adding new columns 

In [20]:
df_new = df_headers.withColumn('Age', 2021 - df_headers['formed'])

In [21]:
df_new.show()

+-----------------+----+------+--------------+-----+--------------------+---+
|        band_name|fans|formed|        origin|split|               style|Age|
+-----------------+----+------+--------------+-----+--------------------+---+
|      Iron Maiden|4195|  1975|United Kingdom| null|New wave of briti...| 46|
|            Opeth|4147|  1990|        Sweden| 1990|Extreme progressi...| 31|
|        Metallica|3712|  1981|           USA| null|Heavy,Bay area th...| 40|
|         Megadeth|3105|  1983|           USA| 1983|Thrash,Heavy,Hard...| 38|
|      Amon Amarth|3054|  1988|        Sweden| null|       Melodic death| 33|
|           Slayer|2955|  1981|           USA| 1981|              Thrash| 40|
|            Death|2690|  1983|           USA| 2001|Progressive death...| 38|
|    Dream Theater|2329|  1985|           USA| 1985|         Progressive| 36|
|    Black Sabbath|2307|  1968|United Kingdom| null|Doom,Heavy,Hard rock| 53|
|        Nightwish|2183|  1996|       Finland| 1996|Symphonic po

# Renaming columns

In [22]:
df_new = df_new.withColumnRenamed('Age', 'band_age')

In [23]:
df_new.show()

+-----------------+----+------+--------------+-----+--------------------+--------+
|        band_name|fans|formed|        origin|split|               style|band_age|
+-----------------+----+------+--------------+-----+--------------------+--------+
|      Iron Maiden|4195|  1975|United Kingdom| null|New wave of briti...|      46|
|            Opeth|4147|  1990|        Sweden| 1990|Extreme progressi...|      31|
|        Metallica|3712|  1981|           USA| null|Heavy,Bay area th...|      40|
|         Megadeth|3105|  1983|           USA| 1983|Thrash,Heavy,Hard...|      38|
|      Amon Amarth|3054|  1988|        Sweden| null|       Melodic death|      33|
|           Slayer|2955|  1981|           USA| 1981|              Thrash|      40|
|            Death|2690|  1983|           USA| 2001|Progressive death...|      38|
|    Dream Theater|2329|  1985|           USA| 1985|         Progressive|      36|
|    Black Sabbath|2307|  1968|United Kingdom| null|Doom,Heavy,Hard rock|      53|
|   

In [24]:
df_new = df_new.withColumn('band_age', df_new.band_age.cast('Integer'))

In [25]:
df_new.printSchema()

root
 |-- band_name: string (nullable = true)
 |-- fans: integer (nullable = true)
 |-- formed: integer (nullable = true)
 |-- origin: string (nullable = true)
 |-- split: integer (nullable = true)
 |-- style: string (nullable = true)
 |-- band_age: integer (nullable = true)



# Handling missing data with na.drop, na.fill methods

In [26]:
df_new.na.drop().describe().show()

+-------+----------+------------------+------------------+-------------------+------------------+--------------------+-----------------+
|summary| band_name|              fans|            formed|             origin|             split|               style|         band_age|
+-------+----------+------------------+------------------+-------------------+------------------+--------------------+-----------------+
|  count|      2782|              2782|              2782|               2782|              2782|                2782|             2782|
|   mean|    1349.0| 87.72825305535586|2000.1376707404745|               null|2001.2264557872033|                null|20.86232925952552|
| stddev|      null|294.33227630393714| 8.835267177670781|               null| 8.936093635078352|                null|8.835267177670767|
|    min|!T.O.O.H.!|                 0|              1965|            Albania|              1965|Aggrotech,Melodic...|                5|
|    max|�vangelist|              4147|  

In [27]:
df_new.na.fill(0, ['split']).show()

+-----------------+----+------+--------------+-----+--------------------+--------+
|        band_name|fans|formed|        origin|split|               style|band_age|
+-----------------+----+------+--------------+-----+--------------------+--------+
|      Iron Maiden|4195|  1975|United Kingdom|    0|New wave of briti...|      46|
|            Opeth|4147|  1990|        Sweden| 1990|Extreme progressi...|      31|
|        Metallica|3712|  1981|           USA|    0|Heavy,Bay area th...|      40|
|         Megadeth|3105|  1983|           USA| 1983|Thrash,Heavy,Hard...|      38|
|      Amon Amarth|3054|  1988|        Sweden|    0|       Melodic death|      33|
|           Slayer|2955|  1981|           USA| 1981|              Thrash|      40|
|            Death|2690|  1983|           USA| 2001|Progressive death...|      38|
|    Dream Theater|2329|  1985|           USA| 1985|         Progressive|      36|
|    Black Sabbath|2307|  1968|United Kingdom|    0|Doom,Heavy,Hard rock|      53|
|   

# Handling missing data using imputer functions

In [28]:
from pyspark.ml.feature import Imputer 

imputer = Imputer(
    inputCols= ['split'],
    outputCols= ['split_imputed']).setStrategy('mean')

In [29]:
imputer.fit(df_new).transform(df_new).show()

+-----------------+----+------+--------------+-----+--------------------+--------+-------------+
|        band_name|fans|formed|        origin|split|               style|band_age|split_imputed|
+-----------------+----+------+--------------+-----+--------------------+--------+-------------+
|      Iron Maiden|4195|  1975|United Kingdom| null|New wave of briti...|      46|         2001|
|            Opeth|4147|  1990|        Sweden| 1990|Extreme progressi...|      31|         1990|
|        Metallica|3712|  1981|           USA| null|Heavy,Bay area th...|      40|         2001|
|         Megadeth|3105|  1983|           USA| 1983|Thrash,Heavy,Hard...|      38|         1983|
|      Amon Amarth|3054|  1988|        Sweden| null|       Melodic death|      33|         2001|
|           Slayer|2955|  1981|           USA| 1981|              Thrash|      40|         1981|
|            Death|2690|  1983|           USA| 2001|Progressive death...|      38|         2001|
|    Dream Theater|2329|  1985

# Filtering data with &, |, ~ operators

In [30]:
df_new.filter((df_new['fans'] >= 2000) & (df_new['formed'] >= 1990)).show()

+-----------------+----+------+-------+-----+--------------------+--------+
|        band_name|fans|formed| origin|split|               style|band_age|
+-----------------+----+------+-------+-----+--------------------+--------+
|            Opeth|4147|  1990| Sweden| 1990|Extreme progressi...|      31|
|        Nightwish|2183|  1996|Finland| 1996|Symphonic power,G...|      25|
|Children Of Bodom|2153|  1993|Finland| null|       Extreme power|      28|
|            Opeth|4147|  1990| Sweden| 1990|Extreme progressi...|      31|
|        Nightwish|2183|  1996|Finland| 1996|Symphonic power,G...|      25|
|Children Of Bodom|2153|  1993|Finland| null|       Extreme power|      28|
+-----------------+----+------+-------+-----+--------------------+--------+



# Aggregate and GroupBy functions

In [31]:
df_new.groupBy('formed').sum('fans').show()

+------+---------+
|formed|sum(fans)|
+------+---------+
|  1990|    31924|
|  1975|     9807|
|  1977|      511|
|  2003|     9642|
|  2007|     6593|
|  1974|       74|
|  2015|      378|
|  2006|     6032|
|  1978|     4319|
|  2013|     1406|
|  null|        5|
|  1988|    17924|
|  1997|    11635|
|  1994|    13907|
|  1968|     7763|
|  2014|      696|
|  1973|      985|
|  1979|     2241|
|  1971|      124|
|  2004|    13418|
+------+---------+
only showing top 20 rows



In [32]:
df_new.agg({'fans':'sum'}).show()

+---------+
|sum(fans)|
+---------+
|   439029|
+---------+



# Introduction to mlib

# Using VectorAssembler to form a separate feature which contains all independent features

In [42]:
from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler(inputCols = ['formed'], outputCol = 'indep_feature').setHandleInvalid("skip").transform(df_new)
assembler.show()

+-----------------+----+------+--------------+-----+--------------------+--------+-------------+
|        band_name|fans|formed|        origin|split|               style|band_age|indep_feature|
+-----------------+----+------+--------------+-----+--------------------+--------+-------------+
|      Iron Maiden|4195|  1975|United Kingdom| null|New wave of briti...|      46|     [1975.0]|
|            Opeth|4147|  1990|        Sweden| 1990|Extreme progressi...|      31|     [1990.0]|
|        Metallica|3712|  1981|           USA| null|Heavy,Bay area th...|      40|     [1981.0]|
|         Megadeth|3105|  1983|           USA| 1983|Thrash,Heavy,Hard...|      38|     [1983.0]|
|      Amon Amarth|3054|  1988|        Sweden| null|       Melodic death|      33|     [1988.0]|
|           Slayer|2955|  1981|           USA| 1981|              Thrash|      40|     [1981.0]|
|            Death|2690|  1983|           USA| 2001|Progressive death...|      38|     [1983.0]|
|    Dream Theater|2329|  1985

In [43]:
final_df = assembler.select(['indep_feature', 'fans'])
final_df.show()

+-------------+----+
|indep_feature|fans|
+-------------+----+
|     [1975.0]|4195|
|     [1990.0]|4147|
|     [1981.0]|3712|
|     [1983.0]|3105|
|     [1988.0]|3054|
|     [1981.0]|2955|
|     [1983.0]|2690|
|     [1985.0]|2329|
|     [1968.0]|2307|
|     [1996.0]|2183|
|     [1993.0]|2153|
|     [1969.0]|2094|
|     [1984.0]|2040|
|     [1990.0]|1932|
|     [1981.0]|1920|
|     [1989.0]|1898|
|     [1995.0]|1881|
|     [1995.0]|1879|
|     [1996.0]|1750|
|     [1991.0]|1735|
+-------------+----+
only showing top 20 rows



# Performing Linear Regression

In [49]:
from pyspark.ml.regression import LinearRegression
train_data, test_data = final_df.randomSplit([0.75, 0.25])
model = LinearRegression(featuresCol = 'indep_feature', labelCol = 'fans').fit(train_data)

In [50]:
model.coefficients

DenseVector([-10.2388])

In [51]:
model.intercept

20572.78936656438

In [52]:
preds = model.evaluate(test_data)

In [53]:
preds.predictions.show()

+-------------+----+------------------+
|indep_feature|fans|        prediction|
+-------------+----+------------------+
|     [1964.0]| 362|463.70094977358895|
|     [1965.0]| 519|453.46210638418415|
|     [1967.0]|  26|432.98441960537457|
|     [1968.0]|  13| 422.7455762159734|
|     [1968.0]| 714| 422.7455762159734|
|     [1969.0]| 162| 412.5067328265686|
|     [1970.0]|  64|402.26788943716383|
|     [1971.0]| 124|392.02904604775904|
|     [1972.0]| 262|381.79020265835425|
|     [1973.0]|   7|371.55135926894945|
|     [1973.0]|  53|371.55135926894945|
|     [1974.0]|   4|361.31251587954466|
|     [1974.0]|  11|361.31251587954466|
|     [1975.0]|   1|351.07367249013987|
|     [1975.0]| 497|351.07367249013987|
|     [1975.0]| 875|351.07367249013987|
|     [1975.0]|4195|351.07367249013987|
|     [1976.0]|   5| 340.8348291007351|
|     [1976.0]|  15| 340.8348291007351|
|     [1976.0]|  17| 340.8348291007351|
+-------------+----+------------------+
only showing top 20 rows



In [54]:
preds.meanAbsoluteError

113.84944126052812

In [55]:
preds.meanSquaredError

62010.75748265654