## TASK: Implementing Machine Learning Model Using Apache Spark MLlib

Implementation of Linear regression: Predicting Person's Weight Using Gender and Height

Documentation: https://spark.apache.org/mllib/

- Spark is built on the concept of distributed datasets, which contain arbitrary Java or Python objects. 
- You create a dataset from external data, then apply parallel operations to it. 
- The building block of the Spark API is its RDD API. 
- In the RDD API, there are two types of operations: transformations(which define a new dataset based on previous ones), and actions, which kick off a job to execute on a cluster. 
- On top of Spark’s RDD API, high level APIs are provided, e.g. DataFrame API and Machine Learning API. 
- These high level APIs provide a concise way to conduct certain data operations

In [3]:
import pyspark
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("Spark-Tutorial").getOrCreate()
spark

In [61]:
df = spark.read.csv("weight-height.csv",header=True,inferSchema=True)
df.show(4)

+------+----------------+----------------+
|Gender|          Height|          Weight|
+------+----------------+----------------+
|  Male| 73.847017017515|241.893563180437|
|  Male|68.7819040458903|  162.3104725213|
|  Male|74.1101053917849|  212.7408555565|
|  Male|71.7309784033377|220.042470303077|
+------+----------------+----------------+
only showing top 4 rows



In [62]:
df.printSchema()

root
 |-- Gender: string (nullable = true)
 |-- Height: double (nullable = true)
 |-- Weight: double (nullable = true)



In [63]:
type(df["Gender"])

pyspark.sql.column.Column

#### Encoding

#to work on the features, spark MLlib expects every value to be in numeric form

In [65]:
#Using Functions to replace values in column:

from pyspark.sql.functions import *
df = df.withColumn('Gender', regexp_replace('Gender', 'Male', "1"))
df = df.withColumn('Gender',regexp_replace('Gender', 'Female', "2"))

#Using Cast to convert column type
df = df.withColumn("Gender",df.Gender.cast("int"))
df.show(4)

+------+----------------+----------------+
|Gender|          Height|          Weight|
+------+----------------+----------------+
|     1| 73.847017017515|241.893563180437|
|     1|68.7819040458903|  162.3104725213|
|     1|74.1101053917849|  212.7408555565|
|     1|71.7309784033377|220.042470303077|
+------+----------------+----------------+
only showing top 4 rows



#### Another Way to encode:

Using StringIndexer, string type will be typecast to numeric datatype


In [93]:
# from pyspark.ml.feature import StringIndexer
# indexer =StringIndexer(inputCol='Gender',outputCol='Gender_n')
# indexed= indexer.fit(df).transform(df)

In [66]:
df.printSchema()

root
 |-- Gender: integer (nullable = true)
 |-- Height: double (nullable = true)
 |-- Weight: double (nullable = true)



Successfully converted to Int

#### Defining Features for ML Training

Treating Group of Features as an Indepedent feature. My group is ["Gender","Height"]

In [78]:
from pyspark.ml.feature import VectorAssembler

feature_assembler = VectorAssembler(inputCols=["Gender","Height"],outputCol="independent feature")

output = feature_assembler.transform(df)

output.show(4)

+------+----------------+----------------+--------------------+
|Gender|          Height|          Weight| independent feature|
+------+----------------+----------------+--------------------+
|     1| 73.847017017515|241.893563180437|[1.0,73.847017017...|
|     1|68.7819040458903|  162.3104725213|[1.0,68.781904045...|
|     1|74.1101053917849|  212.7408555565|[1.0,74.110105391...|
|     1|71.7309784033377|220.042470303077|[1.0,71.730978403...|
+------+----------------+----------------+--------------------+
only showing top 4 rows



In [79]:
output.columns

['Gender', 'Height', 'Weight', 'independent feature']

#### Columns to use: independent feature(X) and Weight(Y) 

In [80]:
data = output.select(["independent feature","Weight"])
data.show(2)

+--------------------+----------------+
| independent feature|          Weight|
+--------------------+----------------+
|[1.0,73.847017017...|241.893563180437|
|[1.0,68.781904045...|  162.3104725213|
+--------------------+----------------+
only showing top 2 rows



#### Train-test Split: Using Random Split

In [81]:
train,test =data.randomSplit([0.8,0.2])

#### Model:

In [82]:
from pyspark.ml.regression import LinearRegression

regressor = LinearRegression(featuresCol="independent feature",labelCol="Weight").fit(train)


#### Coefficients:

In [83]:
regressor.coefficients

DenseVector([-19.3176, 5.9941])

#### Intercept:

In [84]:
regressor.intercept

-207.4142980684588

#### Calculating Predictions

In [88]:
prediction = regressor.evaluate(test)
prediction.predictions.show(4)

+--------------------+----------------+------------------+
| independent feature|          Weight|        prediction|
+--------------------+----------------+------------------+
|[1.0,60.243718102...|153.831429216947| 134.3731069530535|
|[1.0,60.935739701...|140.151715704819|138.52113162353376|
|[1.0,61.074487103...|122.680111752611|139.35279301993788|
|[1.0,61.226828660...|153.520978630761|140.26593870524735|
+--------------------+----------------+------------------+
only showing top 4 rows



#### MAE , MSE 

In [91]:
prediction.meanAbsoluteError , prediction.meanSquaredError

(8.301929134584867, 108.05439782844422)

#### R-Squared Error

In [92]:
prediction.r2

0.8988267510829301

##### That is our model is around 90% Accurate!