# Apache Spark for Machine Learning - Linear Regression

## Machine Learning (spark over sklearn)

- Spark Machine Learning runs in parallel on a cluster
- In fact PCA on ApacheSpark scales linearly, this means you can run it on any data set size and on data streams of any speed

Note:
- Apache SparkML has it's own implementation of Clustering, Classification and Regression algorithms since they need to make use of the parallel Apache Spark APIs .Therefore it is possible to find an algorithm in the scikit-learn package which is not available in Apache Spark


## Supervised ML - Linear Regression 

- use hypotherical sense of data of a production machine 
- variables: Max Temp, Min Temp, Max Vibration, Asperity (all continuous)
- Goal: predict asperity which is highly correlated with the effect if the part is healthy or faulty based under observed sense of edges. 

In [1]:
dp1 = {'partno': 100, 'maxtemp': 35, 'mintemp': 35, 'maxvibration':12, 'asperity': 0.32}
dp2 = {'partno': 101, 'maxtemp': 46, 'mintemp': 35, 'maxvibration':21, 'asperity': 0.34}
dp3 = {'partno': 132, 'maxtemp': 56, 'mintemp': 46, 'maxvibration':4313, 'asperity': 12.2}
dp4 = {'partno': 130, 'maxtemp': 48, 'mintemp': 48, 'maxvibration':3511, 'asperity': 13.4}
dp5 = {'partno': 120, 'maxtemp': 58, 'mintemp': 38, 'maxvibration': 30, 'asperity': 3.32}

#### hardcode a rule

In [2]:
# hardcode a rule: predict asperity on the sensor value
def predict(dp):
    if dp['maxvibration'] > 100:
        return 13
    else: 
        return 0.33
    
predict(dp1)

0.33

In [3]:
# hardcode a linear regression formula; change ws to get a better parameter
w1 = 0
w2 = 0
w3 = 0
w4 = 12/4313 # give w4 some values, otherwise, the model will be 0 as a result

def mlpredict(dp):
    return w1 + w2*dp['maxtemp'] + w3*dp['mintemp'] + w4*dp['maxvibration']

# x0 = 1 default vectors: [w0, w1, w2, w3] [x0, ]

In [4]:
mlpredict(dp3)

12.0

## ML - Linear Regression 

### Import libraries and data

In [18]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
import pyspark
from pyspark.sql import Row

sc = SparkContext.getOrCreate(SparkConf().setMaster("local[*]"))

spark = SparkSession.builder.getOrCreate()

In [19]:
# delete files from previous runs
!rm -f hmp.parquet*

# download the file containing the data in PARQUET format
!wget https://github.com/IBM/coursera/raw/master/hmp.parquet
    
# create a dataframe out of it
df = spark.read.parquet('hmp.parquet')

# register a corresponding query table
df.createOrReplaceTempView('df')

--2020-05-11 19:23:44--  https://github.com/IBM/coursera/raw/master/hmp.parquet
Resolving github.com (github.com)... 140.82.112.4
Connecting to github.com (github.com)|140.82.112.4|:443... connected.
HTTP request sent, awaiting response... 301 Moved Permanently
Location: https://github.com/IBM/skillsnetwork/raw/master/hmp.parquet [following]
--2020-05-11 19:23:44--  https://github.com/IBM/skillsnetwork/raw/master/hmp.parquet
Reusing existing connection to github.com:443.
HTTP request sent, awaiting response... 302 Found
Location: https://raw.githubusercontent.com/IBM/skillsnetwork/master/hmp.parquet [following]
--2020-05-11 19:23:44--  https://raw.githubusercontent.com/IBM/skillsnetwork/master/hmp.parquet
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 151.101.48.133
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|151.101.48.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 932997 (911K) [application/octet-stream]
Savin

In [10]:
# check data
df.show()

+---+---+---+--------------------+-----------+
|  x|  y|  z|              source|      class|
+---+---+---+--------------------+-----------+
| 22| 49| 35|Accelerometer-201...|Brush_teeth|
| 22| 49| 35|Accelerometer-201...|Brush_teeth|
| 22| 52| 35|Accelerometer-201...|Brush_teeth|
| 22| 52| 35|Accelerometer-201...|Brush_teeth|
| 21| 52| 34|Accelerometer-201...|Brush_teeth|
| 22| 51| 34|Accelerometer-201...|Brush_teeth|
| 20| 50| 35|Accelerometer-201...|Brush_teeth|
| 22| 52| 34|Accelerometer-201...|Brush_teeth|
| 22| 50| 34|Accelerometer-201...|Brush_teeth|
| 22| 51| 35|Accelerometer-201...|Brush_teeth|
| 21| 51| 33|Accelerometer-201...|Brush_teeth|
| 20| 50| 34|Accelerometer-201...|Brush_teeth|
| 21| 49| 33|Accelerometer-201...|Brush_teeth|
| 21| 49| 33|Accelerometer-201...|Brush_teeth|
| 20| 51| 35|Accelerometer-201...|Brush_teeth|
| 18| 49| 34|Accelerometer-201...|Brush_teeth|
| 19| 48| 34|Accelerometer-201...|Brush_teeth|
| 16| 53| 34|Accelerometer-201...|Brush_teeth|
| 18| 52| 35|

In [20]:
df.count()

446529

In [21]:
# check classes
df.groupBy('class').count().show()

+--------------+-----+
|         class|count|
+--------------+-----+
| Use_telephone|15225|
| Standup_chair|25417|
|      Eat_meat|31236|
|     Getup_bed|45801|
|   Drink_glass|42792|
|    Pour_water|41673|
|     Comb_hair|23504|
|          Walk|92254|
|  Climb_stairs|40258|
| Sitdown_chair|25036|
|   Liedown_bed|11446|
|Descend_stairs|15375|
|   Brush_teeth|29829|
|      Eat_soup| 6683|
+--------------+-----+



In [26]:
# check missing values
df_nomissing = spark.sql("select * from df where class is not null and x is not null and y is not null and z is not null")
df_nomissing.show()

+---+---+---+--------------------+-----------+
|  x|  y|  z|              source|      class|
+---+---+---+--------------------+-----------+
| 22| 49| 35|Accelerometer-201...|Brush_teeth|
| 22| 49| 35|Accelerometer-201...|Brush_teeth|
| 22| 52| 35|Accelerometer-201...|Brush_teeth|
| 22| 52| 35|Accelerometer-201...|Brush_teeth|
| 21| 52| 34|Accelerometer-201...|Brush_teeth|
| 22| 51| 34|Accelerometer-201...|Brush_teeth|
| 20| 50| 35|Accelerometer-201...|Brush_teeth|
| 22| 52| 34|Accelerometer-201...|Brush_teeth|
| 22| 50| 34|Accelerometer-201...|Brush_teeth|
| 22| 51| 35|Accelerometer-201...|Brush_teeth|
| 21| 51| 33|Accelerometer-201...|Brush_teeth|
| 20| 50| 34|Accelerometer-201...|Brush_teeth|
| 21| 49| 33|Accelerometer-201...|Brush_teeth|
| 21| 49| 33|Accelerometer-201...|Brush_teeth|
| 20| 51| 35|Accelerometer-201...|Brush_teeth|
| 18| 49| 34|Accelerometer-201...|Brush_teeth|
| 19| 48| 34|Accelerometer-201...|Brush_teeth|
| 16| 53| 34|Accelerometer-201...|Brush_teeth|
| 18| 52| 35|

In [29]:
df.count() - df_nomissing.count()

# the data frame has no missing values

0

### Data and Pipeline Prepration

In [30]:
# split data into 80-20
splits = df.randomSplit([0.8, 0.2])
df_train = splits[0]
df_test = splits[1]

In [32]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import Normalizer


indexer = StringIndexer(inputCol="class", outputCol="label")

vectorAssembler = VectorAssembler(inputCols=["x","y","z"],
                                  outputCol="features")

normalizer = Normalizer(inputCol="features", outputCol="features_norm", p=1.0)

### fit the model

In [34]:
from pyspark.ml.classification import LogisticRegression
from pyspark.ml import Pipeline

lr = LogisticRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8)
pipeline = Pipeline(stages=[indexer, vectorAssembler, normalizer,lr])
model = pipeline.fit(df_train)
prediction = model.transform(df_test)

#### check the schema of the prediction df

In [35]:
prediction.printSchema()

root
 |-- x: integer (nullable = true)
 |-- y: integer (nullable = true)
 |-- z: integer (nullable = true)
 |-- source: string (nullable = true)
 |-- class: string (nullable = true)
 |-- label: double (nullable = false)
 |-- features: vector (nullable = true)
 |-- features_norm: vector (nullable = true)
 |-- rawPrediction: vector (nullable = true)
 |-- probability: vector (nullable = true)
 |-- prediction: double (nullable = false)



### model evaluation

In [36]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
MulticlassClassificationEvaluator().setMetricName("accuracy").evaluate(prediction)

0.20346760148260618

- 20% accuracy is not good, but it is fine for a baseline

## Random Forest

In [39]:
# fit RF and predict
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml import Pipeline

rf = RandomForestClassifier(labelCol="label", numTrees=10)

pipeline = Pipeline(stages=[indexer, vectorAssembler, normalizer,rf])
model = pipeline.fit(df_train)
prediction = model.transform(df_test)

In [40]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
MulticlassClassificationEvaluator().setMetricName("accuracy").evaluate(prediction)

0.44353146072433347

- 44% accuracy: better than logistic regression

# Quiz practcie

In [44]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
sc = SparkContext.getOrCreate(SparkConf().setMaster("local[*]"))
spark = SparkSession.builder.getOrCreate()

In [45]:
# delete files from previous runs
!rm -f jfk_weather*

# download the file containing the data in CSV format
!wget http://max-training-data.s3-api.us-geo.objectstorage.softlayer.net/noaa-weather/jfk_weather.tar.gz

# extract the data
!tar xvfz jfk_weather.tar.gz
    
# create a dataframe out of it by using the first row as field names and trying to infer a schema based on contents
df = spark.read.option("header", "true").option("inferSchema","true").csv('jfk_weather.csv')

# register a corresponding query table
df.createOrReplaceTempView('df')

--2020-05-11 19:43:10--  http://max-training-data.s3-api.us-geo.objectstorage.softlayer.net/noaa-weather/jfk_weather.tar.gz
Resolving max-training-data.s3-api.us-geo.objectstorage.softlayer.net (max-training-data.s3-api.us-geo.objectstorage.softlayer.net)... 67.228.254.196
Connecting to max-training-data.s3-api.us-geo.objectstorage.softlayer.net (max-training-data.s3-api.us-geo.objectstorage.softlayer.net)|67.228.254.196|:80... connected.
HTTP request sent, awaiting response... 200 OK
Length: 2575759 (2.5M) [application/x-tar]
Saving to: ‘jfk_weather.tar.gz’


2020-05-11 19:43:10 (68.4 MB/s) - ‘jfk_weather.tar.gz’ saved [2575759/2575759]

./._jfk_weather.csv
jfk_weather.csv


In [46]:
df.show()

+----------+--------------------+---------+--------+---------+----------------+----------+--------------------+----------------+-----------------------+------------------+------------------+------------------+------------------+-------------------+-------------------+----------------------+---------------+-------------------+-------------------+---------------------+----------------------+--------------------+----------------------+------------+----------------------+-----------------------+-----------------------+-----------------------+------------------------------+----------------------------+------------------------+-----------------------+----------------------+----------------------+------------+-----------+------------+-----------+-------------+--------------+---------------------------+----------------------------+---------------------+------------------+-----------------+-----------------------+---------------------------+------------------+------------------+---------------+

The dataset contains some null values, therefore schema inference didn’t work properly for all columns, in addition, a column contained trailing characters, so we need to clean up the data set first. This is a normal task in any data science project since your data is never clean, don’t worry if you don’t understand all code, you won’t be asked about it.

### data cleaning

In [47]:
import random
random.seed(42)

from pyspark.sql.functions import translate, col

df_cleaned = df \
    .withColumn("HOURLYWindSpeed", df.HOURLYWindSpeed.cast('double')) \
    .withColumn("HOURLYWindDirection", df.HOURLYWindDirection.cast('double')) \
    .withColumn("HOURLYStationPressure", translate(col("HOURLYStationPressure"), "s,", "")) \
    .withColumn("HOURLYPrecip", translate(col("HOURLYPrecip"), "s,", "")) \
    .withColumn("HOURLYRelativeHumidity", translate(col("HOURLYRelativeHumidity"), "*", "")) \
    .withColumn("HOURLYDRYBULBTEMPC", translate(col("HOURLYDRYBULBTEMPC"), "*", "")) \

df_cleaned =   df_cleaned \
                    .withColumn("HOURLYStationPressure", df_cleaned.HOURLYStationPressure.cast('double')) \
                    .withColumn("HOURLYPrecip", df_cleaned.HOURLYPrecip.cast('double')) \
                    .withColumn("HOURLYRelativeHumidity", df_cleaned.HOURLYRelativeHumidity.cast('double')) \
                    .withColumn("HOURLYDRYBULBTEMPC", df_cleaned.HOURLYDRYBULBTEMPC.cast('double')) \

df_filtered = df_cleaned.filter("""
    HOURLYWindSpeed <> 0
    and HOURLYWindSpeed IS NOT NULL
    and HOURLYWindDirection IS NOT NULL
    and HOURLYStationPressure IS NOT NULL
    and HOURLYPressureTendency IS NOT NULL
    and HOURLYPrecip IS NOT NULL
    and HOURLYRelativeHumidity IS NOT NULL
    and HOURLYDRYBULBTEMPC IS NOT NULL
""")

In [48]:
df_filtered.show()

+----------+--------------------+---------+--------+---------+----------------+----------+--------------------+----------------+-----------------------+------------------+------------------+------------------+------------------+-------------------+-------------------+----------------------+---------------+-------------------+-------------------+---------------------+----------------------+--------------------+----------------------+------------+----------------------+-----------------------+-----------------------+-----------------------+------------------------------+----------------------------+------------------------+-----------------------+----------------------+----------------------+------------+-----------+------------+-----------+-------------+--------------+---------------------------+----------------------------+---------------------+------------------+-----------------+-----------------------+---------------------------+------------------+------------------+---------------+

In [49]:
# check missing values
df.count() - df_filtered.count()

92666

In [90]:
df_filtered.columns

['STATION',
 'STATION_NAME',
 'ELEVATION',
 'LATITUDE',
 'LONGITUDE',
 'DATE',
 'REPORTTPYE',
 'HOURLYSKYCONDITIONS',
 'HOURLYVISIBILITY',
 'HOURLYPRSENTWEATHERTYPE',
 'HOURLYDRYBULBTEMPF',
 'HOURLYDRYBULBTEMPC',
 'HOURLYWETBULBTEMPF',
 'HOURLYWETBULBTEMPC',
 'HOURLYDewPointTempF',
 'HOURLYDewPointTempC',
 'HOURLYRelativeHumidity',
 'HOURLYWindSpeed',
 'HOURLYWindDirection',
 'HOURLYWindGustSpeed',
 'HOURLYStationPressure',
 'HOURLYPressureTendency',
 'HOURLYPressureChange',
 'HOURLYSeaLevelPressure',
 'HOURLYPrecip',
 'HOURLYAltimeterSetting',
 'DAILYMaximumDryBulbTemp',
 'DAILYMinimumDryBulbTemp',
 'DAILYAverageDryBulbTemp',
 'DAILYDeptFromNormalAverageTemp',
 'DAILYAverageRelativeHumidity',
 'DAILYAverageDewPointTemp',
 'DAILYAverageWetBulbTemp',
 'DAILYHeatingDegreeDays',
 'DAILYCoolingDegreeDays',
 'DAILYSunrise',
 'DAILYSunset',
 'DAILYWeather',
 'DAILYPrecip',
 'DAILYSnowfall',
 'DAILYSnowDepth',
 'DAILYAverageStationPressure',
 'DAILYAverageSeaLevelPressure',
 'DAILYAverageWind

## Practice 1: Predict HOURLYWindSpeed 

In [81]:
# pipeline
from pyspark.ml.feature import StringIndexer, OneHotEncoder
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import Normalizer


indexer = StringIndexer(inputCol="HOURLYWindSpeed", 
                        outputCol="label")

vectorAssembler = VectorAssembler(inputCols=[
                                    "HOURLYWindDirection",
                                    "ELEVATION",
                                    "HOURLYStationPressure"], 
                                    outputCol="features")

normalizer = Normalizer(inputCol="features", outputCol="features_norm", p=1.0)

In [82]:
# define a function to evaluate regression model performance - use root mean squred error
def regression_metrics(prediction):
    from pyspark.ml.evaluation import RegressionEvaluator
    evaluator = RegressionEvaluator(labelCol="HOURLYWindSpeed", 
                                    predictionCol="prediction", 
                                    metricName="rmse")
    rmse = evaluator.evaluate(prediction)
    print("RMSE on test data = %g" % rmse)

In [83]:
# split the data
splits = df_filtered.randomSplit([0.8, 0.2])
df_train = splits[0]
df_test = splits[1]

### model fitting and evaluation

In [84]:
# Model 1: Linear Regression

from pyspark.ml.regression import LinearRegression


lr = LinearRegression(labelCol="HOURLYWindSpeed", 
                      featuresCol='features', 
                      maxIter=100, 
                      regParam=0.0, 
                      elasticNetParam=0.0)
pipeline = Pipeline(stages=[vectorAssembler, normalizer,lr])
model = pipeline.fit(df_train)
prediction = model.transform(df_test)
regression_metrics(prediction)

RMSE on test data = 5.29776


In [85]:
# Model 2: Gradient Boosted Tree Regressor

from pyspark.ml.regression import GBTRegressor
gbt = GBTRegressor(labelCol="HOURLYWindSpeed", maxIter=100)
pipeline = Pipeline(stages=[vectorAssembler, normalizer,gbt])
model = pipeline.fit(df_train)
prediction = model.transform(df_test)
regression_metrics(prediction)

RMSE on test data = 5.07247


## Practice 2: predict HOURLYWindDirection

In [104]:
# pipeline

# nedd to convert data into a classification problem using Bucketizer
from pyspark.ml.feature import Bucketizer, OneHotEncoder
bucketizer = Bucketizer(splits=[ 0, 180, float('Inf') ],
                        inputCol="HOURLYWindDirection", 
                        outputCol="HOURLYWindDirectionBucketized")
encoder = OneHotEncoder(inputCol="HOURLYWindDirectionBucketized", outputCol="HOURLYWindDirectionBucketizedOHE")


from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import Normalizer

vectorAssembler = VectorAssembler(inputCols=[
                                    "HOURLYWindDirection",
                                    "ELEVATION",
                                    "HOURLYStationPressure"], 
                                    outputCol="features")

normalizer = Normalizer(inputCol="features", outputCol="features_norm", p=1.0)

In [105]:
# define a function to evaluate classification model performance
def classification_metrics(prediction):
    from pyspark.ml.evaluation import MulticlassClassificationEvaluator
    mcEval = MulticlassClassificationEvaluator().setMetricName("accuracy") .setPredictionCol("prediction").setLabelCol("HOURLYWindDirectionBucketized")
    accuracy = mcEval.evaluate(prediction)
    print("Accuracy on test data = %g" % accuracy)

In [106]:
# split the data
splits = df_filtered.randomSplit([0.8, 0.2])
df_train = splits[0]
df_test = splits[1]

### model fitting and evaluation

In [108]:
# Model 1: Logistic Regression  - base model
from pyspark.ml.classification import LogisticRegression
lr = LogisticRegression(labelCol="HOURLYWindDirectionBucketized", maxIter=10)

pipeline = Pipeline(stages=[bucketizer,vectorAssembler, normalizer,lr])
model = pipeline.fit(df_train)
prediction = model.transform(df_test)
classification_metrics(prediction)

Accuracy on test data = 0.977064


In [109]:
# Model 2: Random Forest

from pyspark.ml.classification import RandomForestClassifier
rf = RandomForestClassifier(labelCol="HOURLYWindDirectionBucketized", numTrees=10)

pipeline = Pipeline(stages=[bucketizer,vectorAssembler,normalizer,rf])
model = pipeline.fit(df_train)
prediction = model.transform(df_test)
classification_metrics(prediction)

Accuracy on test data = 1


In [110]:
# Model 3: Gradient Boosted Tree Regressor

from pyspark.ml.classification import GBTClassifier
gbt = GBTClassifier(labelCol="HOURLYWindDirectionBucketized", maxIter=100)

pipeline = Pipeline(stages=[bucketizer,vectorAssembler,normalizer,gbt])
model = pipeline.fit(df_train)
prediction = model.transform(df_test)
classification_metrics(prediction)

Accuracy on test data = 1
