# **Machine Learning with Apache Spark ML**


*   Import the Spark ML and Statistics Libraries
*   Perform basic statistics operations using Spark
*   Build a simple linear regression model using Spark ML
*   Train the model and perform evaluation


## Setup


In [None]:
# Pandas is a popular data science package for Python. In this lab, we use Pandas to load a CSV file from disc to a pandas dataframe in memory.
import pandas as pd
import matplotlib.pyplot as plt
# pyspark is the Spark API for Python. In this lab, we use pyspark to initialize the spark context. 
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession

## Spark session


#### Creating the spark session and context


In [None]:
# Creating a spark context class
sc = SparkContext()

# Creating a spark session
spark = SparkSession \
    .builder \
    .appName("Python Spark DataFrames basic example") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

#### Initialize Spark session


In [None]:
spark

#### Importing Spark ML libraries

1.  (Feature library) VectorAssembler() - used to create feature vectors from dataframes/raw data. These feature vectors are required to train a ML model or perform any statistical operations.

2.  (Stat library) Correlation() - used to calculate correlation between feature vectors.

3.  (Feature library) Normalized() - used to normalize features. Normalizing features leads to better ML model convergence and training results.

4.  (Regression Library) LinearRegression() - used to create a Linear Regression model and train it.


In [None]:
from pyspark.ml.feature import VectorAssembler, Normalizer, StandardScaler
from pyspark.ml.stat import Correlation
from pyspark.ml.regression import LinearRegression

## Loading the data and Creating Feature Vectors


Read the CSV file into a pandas dataframe and -> Spark dataframe

We use a dataset that contains information about cars.


#### Data into a Pandas


In [None]:
cars = pd.read_csv('https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBM-BD0225EN-SkillsNetwork/labs/data/cars.csv')

In [None]:
cars.head()

In [None]:
cars2 = pd.read_csv('https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBM-BD0225EN-SkillsNetwork/labs/data/cars2.csv', header=None, names=["mpg", "hp", "weight"])
cars2.head()

#### Loading data into a Spark


In [None]:
sdf = spark.createDataFrame(cars2)

In [None]:
sdf.printSchema()

#### Converting data frame columns into feature vectors

We use the `VectorAssembler()` function to convert the dataframe columns into feature vectors.
We use the horsepower ("hp) and weight of the car as input features and the miles-per-gallon ("mpg") as target labels.


In [None]:
assembler = VectorAssembler(
    inputCols=["hp", "weight"],
    outputCol="features")

output = assembler.transform(sdf).select('features','mpg')

test-train split of 75%-25%


In [None]:
train, test = output.randomSplit([0.75, 0.25])

## stats and feature engineering


#### Correlation



In [None]:
r1 = Correlation.corr(train, "features").head()
print("Pearson correlation matrix:\n" + str(r1[0]))

In [None]:
r2 = Correlation.corr(train, "features", "spearman").head()
print("Spearman correlation matrix:\n" + str(r2[0]))

In [None]:
plt.figure()
plt.scatter(cars2["hp"], cars2["weight"])
plt.xlabel("horsepower")
plt.ylabel("weight")
plt.title("Correlation between Horsepower and Weight")
plt.show()

#### Normalization


In [None]:
normalizer = Normalizer(inputCol="features", outputCol="features_normalized", p=1.0)
train_norm = normalizer.transform(train)
print("Normalized using L^1 norm")
train_norm.show(5, truncate=False)

#### Standard Scaling



In [None]:
standard_scaler = StandardScaler(inputCol="features", outputCol="features_scaled")
train_model = standard_scaler.fit(train)
train_scaled = train_model.transform(train)
train_scaled.show(5, truncate=False)

In [None]:
test_scaled = train_model.transform(test)
test_scaled.show(5, truncate=False)

## Building and Training a Linear Regression Model


#### Create and Train model



In [None]:
lr = LinearRegression(featuresCol='features_scaled', labelCol='mpg', maxIter=100)
lrModel = lr.fit(train_scaled)

print("Coefficients: %s" % str(lrModel.coefficients))
print("Intercept: %s" % str(lrModel.intercept))

# Summarize the model over the training set and print out some metrics
trainingSummary = lrModel.summary
#trainingSummary.residuals.show()
print("RMSE: %f" % trainingSummary.rootMeanSquaredError)
print("R-squared: %f" % trainingSummary.r2)

#### Predict on new data



In [None]:
lrModel.transform(test_scaled).show(5)

### Correlation


In [None]:
r1 = Correlation.corr(test, "features").head()
print("Pearson correlation matrix:\n" + str(r1[0]))

### Feature Normalization


In [None]:
normalizer_l2 = Normalizer(inputCol="features", outputCol="features_normalized", p=2.0)
train_norm_l2 = normalizer_l2.transform(train)
rint("Normalized using L^1 norm\n"+str(train_norm_l2))
train_norm_l2.show(5, truncate=False)