#Spark with Machine Learning Notebook

This is my notebook for taking the DataCamp [Machine Learning with Apache Spark](https://www.datacamp.com/courses/machine-learning-with-apache-spark).

PySpark.sql deals with structured data. There are more Important classes of Spark SQL. Please refer to [PySpark.sql module]( https://spark.apache.org/docs/2.3.1/api/python/pyspark.sql.html#pyspark.sql.SparkSession). 

##Content



*   Load & Prepare Data
*   Classifiers
      1.   Decision Tree
      2.   Logistic Regression

*   Regression
       1.  Linear Regression
       2.  Penalized Regression
*   Pipelines
*   Cross-validation & grid search
*   Ensembles



##Preparation and Connecting to Spark

In [0]:
pip install pyspark

In [0]:
import pyspark
pyspark.__version__
from pyspark.sql import SparkSession

In [0]:
#Create a local cluster

spark = SparkSession.builder \
                    .master('local[*]') \
                    .appName('test') \
                    .getOrCreate()



In [0]:
spark.stop()

## Loading Data

In [0]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType

"""selected methods"""
count()
show()
printSchema()

"""Selected attributes, check column data types"""
Dataset.dtypes

"""Manual Set"""
schema = StrucType([
    StructField("maker", StringType()),
    StructField("cyl", IntegerType()),
    StructField("size",DoubleType()),
]

cars = spark.readcsv("cars.csv",header = True, schema = schema, nullValue ='NA')


"

In [0]:
# Read data from CSV file
# nullValue is case sensitive
flights = spark.read.csv('flights.csv',
                         sep=',',
                         header=True,
                         inferSchema=True,
                         nullValue='NA')

# Get number of records
print("The data contain %d records." % flights.count())

# View the first five records
flights.show(5)

# Check column data types
flights.dtype

## Data Preparation

In [0]:
"select the columns"
cars = cars.drop("")
cars = cars.select("")

"count missing values"
cars.filter("cyl IS NULL").count()
cars = ars.filter('cyl IS NOT NULL')
cars = cars.dropna()

"mutating columns"
from pyspark.sql.functions import round
cars = cars.withColumn("mass", round(cars.weight / 2,0))
cars = cars.withColumn("length", round(cars.length * 0.0254, 3))





"index categorical data"

from pyspark.ml.feature import StringIndexer
inder = StringIndexer(inputCol = 'type',outputCol = 'type_idx')

#assign index values to strings
#default, most frequent - 0, least frequent - 5
# Use stringOderType to change order
indexer = indexer.fit(cars)
#create column with index values
cars = indexer.transform(cars)

#create new columns based on original column
flights_km = flights_km.withColumn('label', (flights_km.delay >= 15).cast('integer'))






"index categorical data - flights data"
from pyspark.ml.feature import StringIndexer

# Create an indexer
indexer = StringIndexer(inputCol='carrier', outputCol='carrier_idx')

# Indexer identifies categories in the data
indexer_model = indexer.fit(flights)

# Indexer creates a new column with numeric index values
flights_indexed = indexer_model.transform(flights)

# Repeat the process for the other categorical feature
flights_indexed = StringIndexer(inputCol='org', outputCol='org_idx').fit(flights_indexed).transform(flights_indexed)







"Assembling columns"
from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler(inputCols = ['cyl','size'], outputCol = 'features')
assember.transform(cars)

# flights case 
# Import the necessary class
from pyspark.ml.feature import VectorAssembler

# Create an assembler object
assembler = VectorAssembler(inputCols=['mon','dom','dow','carrier_idx','org_idx','km','depart','duration'], outputCol='features')

# Consolidate predictor columns
flights_assembled = assembler.transform(flights)

# Check the resulting column
flights_assembled.select('features', 'delay').show(5, truncate=False)



##Classification

## Decision Tree

In [0]:
#specify a seed for reproducibility 
cars_train, cars_test = cars.randomSplit([0.8,0.2],seed = 23)


In [0]:
from pyspark.ml.classification import DecisionTreeCalssifier

tree = DecisionTreeClassifier()
tree= tree.fit(cars_train)

#evaluate
prediction = tree.transform(cars_test)

#confusion matrix
prediction.groupBy("label","prediction").count().show()
TN = prediction.filter('prediction = 0 AND label = prediction').count()
TP = prediction.filter('prediction = 1 AND label = prediction').count()
FN = prediction.filter('prediction = 0 AND label =1').count()
FP = prediction.filter('prediction=1 AND label = 0').count()

##Logistic Regression

Precision = TP/(TP+FP)
Recall = TP/(TP+FN)

In [0]:
from pyspark.ml.classification import LogisticRegression

logistic = LogisticRegression()
logistic = logistic.fit(cars_train)

prediction = logistic.transform(cars_test)

**weightedRecall, accuracy, f1**

In [0]:
"""weighted metrics"""


from pyspark.ml.evaluation import MulticlassClassificationEvaluator, BinaryClassificationEvaluator
evaluator = MulticlassClassificationEvaluator()
evaluator.evalute(prediction, {evaluator.metricName:'weightedPrecision'})

binary_evaluator = BinaryClassificationEvaluator()
auc = binary_evaluator.evaluate(prediction, {binary_evaluator.metricName:'areaUnderROC'})

##Turning Text into Table

In [0]:
"""remove punctuation"""
from pyspark.sql.functions import regexp_replace

REGEX = '[,\\-]'

books = books.withColumn('text',regexp_replace(books.text, REGEX, ' '))

"""text to tokens"""

from pyspark.ml.feature import Tokenizer
books = Tokenizer(inputCol = 'Text', outputCol = 'tokens').transform(books)


"""stop words"""

from pyspark.ml.feature import StopWordsRemover

stopwords = StopWordsRemover()
stopwords.getStopWords()

"""remove stop words"""
stopwords = stopwords(setInputCol('tokens'),setOutputCol('words'))
book = stopwords.transform(books)

"""feature hasing"""
"32 is the largest value for numFeature"

from pyspark.ml.feature import HashingTF
hasher = HashingTF(inputCol = "words", outputCol = "hash", numFeature = 32)
books = hasher.transform(books)

from pyspark.ml.feature import IDF 
"inverse document frequency - account for frequency"
books = IDF(inputCol = 'hash',outputCol = "features").fit(books).transform(books)



In [0]:

# Import the necessary functions
from pyspark.sql.functions import regexp_replace
from pyspark.ml.feature import Tokenizer

# Remove punctuation (REGEX provided) and numbers
wrangled = sms.withColumn('text', regexp_replace(sms.text, '[_():;,.!?\\-]', ' '))
wrangled = wrangled.withColumn('text',regexp_replace(wrangled.text, '[0-9]', ' '))

# Merge multiple spaces
wrangled = wrangled.withColumn('text', regexp_replace(wrangled.text, ' +', ' '))

# Split the text into words
wrangled = Tokenizer(inputCol='text', outputCol='words').transform(wrangled)

wrangled.show(4, truncate=False)

from pyspark.ml.feature import StopWordsRemover, HashingTF, IDF

# Remove stop words.

wrangled = StopWordsRemover(inputCol='words', outputCol='terms')\
      .transform(sms)

# Apply the hashing trick
wrangled = HashingTF(inputCol = 'terms', outputCol = 'hash', numFeatures=1024)\
      .transform(wrangled)

# Convert hashed symbols to TF-IDF
tf_idf = IDF(inputCol = 'hash', outputCol = 'features')\
      .fit(wrangled).transform(wrangled)
      
tf_idf.select('terms', 'features').show(4, truncate=False)

In [0]:
"train a spam classifier"
# Split the data into training and testing sets
sms_train, sms_test = sms.randomSplit([0.8,0.2], seed = 13)

# Fit a Logistic Regression model to the training data
from pyspark.ml.classification import LogisticRegression
logistic = LogisticRegression(regParam=0.2).fit(sms_train)

# Make predictions on the testing data
prediction = logistic.transform(sms_test)

# Create a confusion matrix, comparing predictions to known labels
prediction.groupBy("label", "prediction").count().show()

##Regression

##One-Hot Encoding

In [0]:
from pyspark.ml.feature import OneHotEncoderEstimator

onehoe = OneHotEncoderEtimator(inputCols = ['type_idx'],outputCols =['type_dummy'])

onehot = onehot.fit(cars)

"how many category levels"
onehot.categorySizes

cars = onehot.transform(cars)
cars.select("type","type_idx","typx_dummy").distinct().sort('type_idx').show()


from pyspark.mllib.linalg import DenseVector, SparseVector
"dense vector: reflect 0 explicitly in a list"
"sparse vector: reflect non-0 values with locations"





In [0]:
"Use Case - Flight"
# Import the one hot encoder class
from pyspark.ml.feature import OneHotEncoderEstimator

# Create an instance of the one hot encoder
onehot = OneHotEncoderEstimator(inputCols=['org_idx'], outputCols=['org_dummy'])

# Apply the one hot encoder to the flights data
onehot = onehot.fit(flights)
flights_onehot = onehot.transform(flights)

# Check the results
flights_onehot.select('org', 'org_idx', 'org_dummy').distinct().sort('org_idx').show()

##Regression

Minimize the loss function: MSE = 1/n sum(yi - yi_hat)^2

In [0]:
"""Assemble predictors"""

from pyspark.ml.regression import LinearRegression
regression = LinearRegression(labelCol = 'consumption')

regression = regression.fit(cars_train)
predictions = regression.transform(cars_train)

from pyspark.ml.evaluation import RegressionEvaluator
#Find RMSE
RegressionEvaluator(labelCol = 'consumption').evaluate(predictions)
#mae, r2, mse

regression.intercept

regression.coefficients



In [0]:
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator

# Create a regression object and train on training data
regression = LinearRegression(labelCol = 'duration').fit(flights_train)

# Create predictions for the testing data and take a look at the predictions
predictions = regression.transform(flights_test)
predictions.select('duration', 'prediction').show(5, False)

# Calculate the RMSE
RegressionEvaluator(labelCol = "duration").evaluate(predictions)

In [0]:
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator

# Create a regression object and train on training data
regression = LinearRegression(labelCol = 'duration').fit(flights_train)

# Create predictions for the testing data
predictions = regression.transform(flights_test)

# Calculate the RMSE on testing data
RegressionEvaluator(labelCol = 'duration').evaluate(predictions)

##Bucketing & Engineering

Operations on a single column: log(), sqrt(), pow()

operations on two columns: product, ratio

In [0]:
from pyspark.ml.feature import Bucketizer

bucketizer = Bucketizer(splits = [3500,4500,6000,6500], inputCols = "rpm",outputCols = 'rpm_bin')

cars = bucketizer.transform(cars)

#operations on two columns
cars = cars.withColumn('density_line', cars.mass/cars.length)



In [0]:
from pyspark.ml.feature import Bucketizer, OneHotEncoderEstimator

# Create buckets at 3 hour intervals through the day
buckets = Bucketizer(splits=[0, 3, 6, 9, 12, 15, 18, 21, 24], inputCol='depart', outputCol='depart_bucket')

# Bucket the departure times
bucketed = buckets.transform(flights)
bucketed.select('depart', 'depart_bucket').show(5)

# Create a one-hot encoder
onehot = OneHotEncoderEstimator(inputCols=['depart_bucket'], outputCols=['depart_dummy'])

# One-hot encode the bucketed departure times
flights_onehot = onehot.fit(bucketed).transform(bucketed)
flights_onehot.select('depart', 'depart_bucket', 'depart_dummy').show(5)

##Regularization

Minimize the loss function: MSE = 1/n sum(yi - yi_hat)^2 +lambda(beta)
penalize the model with too many variables

Lasso = absolute value of the coefficients; Ridge - square of the coefficient

lambda = 0 --- No Regularization
lambda = infinity -- complete regularization

In [0]:
from pyspark.ml.regression import LinearRegression
ridge = LinearRegression(labelCol = 'consumption',elasticNetParam = 0, regParam = 0.1)
ridge.fit(cars_train)

lasso = LinearRegression(labelCol = 'consumption',elasticNetParam = 1, regParam = 0.1)
lasso.fit(cars_train)




In [0]:
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator

# Fit Lasso model (α = 1) to training data
regression = LinearRegression(labelCol = 'duration', regParam = 1, elasticNetParam=1).fit(flights_train)

# Calculate the RMSE on testing data

rmse = RegressionEvaluator(labelCol = 'duration').evaluate(regression.transform(flights_test))
print("The test RMSE is", rmse)

# Look at the model coefficients
coeffs = regression.coefficients
print(coeffs)

# Number of zero coefficients
zero_coeff = sum([0 for beta in regression.coefficients])
print("Number of ceofficients equal to 0:", zero_coeff)

##Ensemble and Pipeline

##Pipeline



In [0]:
from pyspark.ml import Pipeline
pipeline = Pipeline(stages = [indexer, onehot, assemble, regression])

pipeline = pipeline.fit(Cars_train)

prediction = pipeline.transform(cars_test)

pipeline.stage[3]

pipeline.stage[3].intercept()

In [0]:
# Convert categorical strings to index values
indexer = StringIndexer(inputCol = 'org',outputCol = 'org_idx')

# One-hot encode index values
onehot = OneHotEncoderEstimator(
    inputCols=['org_idx','dow'],
    outputCols=['org_dummy','dow_dummy']
)

# Assemble predictors into a single column
assembler = VectorAssembler(inputCols=['km','org_dummy','dow_dummy'], outputCol='features')

# A linear regression object
regression = LinearRegression(labelCol='duration')


# Import class for creating a pipeline
from pyspark.ml import Pipeline

# Construct a pipeline
pipeline = Pipeline(stages=[indexer, onehot, assembler, regression])

# Train the pipeline on the training data
pipeline = pipeline.fit(flights_train)

# Make predictions on the testing data
predictions = pipeline.transform(flights_test)



In [0]:
from pyspark.ml.feature import Tokenizer, StopWordsRemover, HashingTF, IDF

# Break text into tokens at non-word characters
tokenizer = Tokenizer(inputCol='text', outputCol='words')

# Remove stop words
remover = StopWordsRemover(inputCol=tokenizer.getOutputCol(), outputCol='terms')

# Apply the hashing trick and transform to TF-IDF
hasher = HashingTF(inputCol = remover.getOutputCol(), outputCol="hash")
idf = IDF(inputCol=hasher.getOutputCol(), outputCol="features")

# Create a logistic regression object and add everything to a pipeline
logistic = LogisticRegression()
pipeline = Pipeline(stages=[tokenizer, remover, hasher, idf, logistic])

##Cross-Validation

In [0]:
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

params =  ParamGridBuilder().build

cv = CrossValidator(estimator= regression, estimatorParamMaps = params, evaluator = evaluator, numFolds = 10, seed = 13)

In [0]:
cv.cv.fit(cars_train)

cv.avgMetrics #average RMSE

evaluator.evaluate(cv.transform(Cars_test))

In [0]:
# Create an empty parameter grid
params = ParamGridBuilder().build()

# Create objects for building and evaluating a regression model
regression = LinearRegression(labelCol='duration')
evaluator = RegressionEvaluator(labelCol = 'duration')

# Create a cross validator
cv = CrossValidator(estimator=regression, estimatorParamMaps=params, evaluator=evaluator, numFolds= 5)

# Train and test model on multiple folds of the training data
cv = cv.fit(flights_train)

# NOTE: Since cross-valdiation builds multiple models, the fit() method can take a little while to complete.

In [0]:
# Create an indexer for the org field
indexer = StringIndexer(inputCol = "org", outputCol = 'org_idx')

# Create an one-hot encoder for the indexed org field
onehot = OneHotEncoderEstimator(inputCols= ['org_idx'], outputCols = ['org_dummy'])

# Assemble the km and one-hot encoded fields
assembler = VectorAssembler(inputCols = ['km','org_dummy'], outputCol = 'features')

# Create a pipeline and cross-validator.
regression = LinearRegression(labelCol= ' duration')
pipeline = Pipeline(stages=[indexer, onehot, assembler, regression])
cv = CrossValidator(estimator=pipeline,
          estimatorParamMaps=params,
          evaluator=evaluator)

##Grid Search

In [0]:
regression = LinearRegression(labelCol = 'consumption', fitIntercept = True)
regression = regression.fit(cars_train)

#RMSE
evaluator = evaluate(regression.transform(cars_test))

from pyspark.ml.tuning import PramGridBuilder

params = ParamGridBuilder()
params = params.addGrid(regression.fitIntercept, [True, False])

params = params.build

cv.bestModel
predictions = cv.transform(cars_test)

cv.bestModel.explainParam('fitIntercept')

params = ParamGridBuilder().addGrid(regression.fitIntercept, [True, False]\
                                    .addGrid(regression.regParam, [0.001,0.01])\
                                    .addGrid(regression.elasticNetParam, [0,0.25,0.5,0.75,1])\
                                    .build()
                                    
                

In [0]:
# Create parameter grid
params = ParamGridBuilder()

# Add grids for two parameters
params = params.addGrid(regression.regParam, [0.01,0.1,1,10]) \
               .addGrid(regression.elasticNetParam, [0.0,0.5,1.0])

# Build the parameter grid
params = params.build()
print('Number of models to be tested: ', len(params))

# Create cross-validator
cv = CrossValidator(estimator=pipeline, estimatorParamMaps=params, evaluator= evaluator, numFolds=5)

In [0]:
# Get the best model from cross validation
best_model = cv.bestModel

# Look at the stages in the best model
print(best_model.stages)

# Get the parameters for the LinearRegression object in the best model
best_model.stages[3].extractParamMap()

# Generate predictions on testing data using the best model then calculate RMSE
predictions = best_model.transform(flights_test)
evaluator.evaluate(predictions)

In [0]:
# Create parameter grid
params = ParamGridBuilder()

# Add grid for hashing trick parameters
params = params.addGrid(hasher.numFeatures, [1024,4096,16384]) \
               .addGrid(hasher.binary,[True, False])

# Add grid for logistic regression parameters
params = params.addGrid(logistic.regParam, [0.01,0.1,1,10]) \
               .addGrid(logistic.elasticNetParam, [0,0.5,1])

# Build parameter grid
params = params.build()

##Ensemble

In [0]:
from pyspark.ml.classification import RandomForestClassifier

forest=RandomForestClassifier(numTrees = 5)

forest = forest.fit(cars_train)

forest.featureImportances

In [0]:
from pyspark.ml.classification import GBTClassifier

gbt = GBTClassifier(maxIter = 10)

gbt = gbt.fit(cars_train)

In [0]:
# Create a random forest classifier
forest = RandomForestClassifier()

# Create a parameter grid
params = ParamGridBuilder().addGrid(forest.featureSubsetStrategy, ['all', 'onethird', 'sqrt', 'log2']) \
            .addGrid(forest.maxDepth, [2, 5, 10]) \
            .build()

# Create a binary classification evaluator
evaluator = BinaryClassificationEvaluator()

# Create a cross-validator
cv = CrossValidator(estimator = forest, estimatorParamMaps=params, evaluator=evaluator, numFolds=5)

In [0]:
# Average AUC for each parameter combination in grid
avg_auc = cv.avgMetrics

# Average AUC for the best model
best_model_auc =  max(avg_auc)

# What's the optimal parameter value?
opt_max_depth = cv.bestModel.explainParam('maxDepth')
opt_feat_substrat = cv.bestModel.explainParam('featureSubsetStrategy')

# AUC for best model on testing data
best_auc = evaluator.evaluate(cv.transform(flights_test))