In [None]:
# Summary (Part I)
## The conceptual model of Spark and the RDD distributed compute model
## The use of SparkContext and SparkSession to initiate Spark instances on a cluster
## Transformations, actions and where Spark performs lazy evaluation
## How to create, query and manipulate Spark SQL DataFrames
## Type conversion and filtering of Spark SQL DataFrames
## How to perform Exploratory Data Analysis (EDA) on large datasets using Spark

In [None]:
#Spark Session and Context
#Spark Session - maintains information about our connection with the cluster
#Components within the Spark Application : Driver, Master, the Cluster Manager, and the Executors(s), Worker

In [None]:
#SparkContext: represents a connection to the remote or local spark cluster, and was the main entry point for earlier versions of Spark
# spark context - older style
sc = spark.sparkContext
sc

In [None]:
#Spark session - hich has evolved to include all the interface options from the spark context. You should use the spark session as your entry point moving forward.
from pyspark.sql import SparkSession

spark = \
SparkSession.builder.appName('spark_training').getOrCreate()

spark

In [None]:
#End a sparkSession
sc.stop()

In [None]:
#Loading Data
#Import csv
df_person = spark.read.csv('person_demo.csv', header=True,
                           inferSchema=True)

In [None]:
#Examine object
df_person
#type of data
type(df_person)
#examine the first line
df_person.first()
#examine the first three lines
df_person.limit(3)

In [None]:
#column names
df_person.columns

In [None]:
#display the data
df_person.limit(10).show()

In [None]:
#convert 100 rows to pandas (DataFrame) and visualize
#be aware that this means we will collect all the data to the driver node, so use this option only with small datasets (or reduce or aggregate your datasets accordingly).
df_person.limit(100).toPandas().head()

In [None]:
#An RDD is the core data object in Spark
#Spark DataFrames
n_cols = 5
n_rows = 10
pandas_df = pd.DataFrame(
{chr(x + 65): range(x, x + n_rows) for x in range(n_cols)}, dtype=np.float64)

spark_df = spark.createDataFrame(pandas_df)

In [None]:
#Spark DataFrame from Pandas
spark_df.head(5)
spark_df.columns
type(spark_df)

In [None]:
#Schemas
#Create pandas dataframe
#Specify the data types
df_person.printSchema()

import pyspark.sql.types as st
schema1 = st.StructType([st.StructField("Clust_id", st.StringType(),True),
st.StructField("p_id", st.IntegerType(),True), st.StructField("Age", st.FloatType(),True), st.StructField("Baseline_date", st.StringType(),True), st.StructField("Gender", st.StringType(),True), st.StructField("Income", st.FloatType(),True), st.StructField("Education", st.StringType(),True), st.StructField("lab_value_1", st.FloatType(),True), st.StructField("lab_value_2", st.FloatType(),True), st.StructField("lab_value_3", st.FloatType(),True), st.StructField("outcome", st.StringType(),True)])
# create the dataframe
df2_spark = spark.createDataFrame(df2_pd,schema=schema1) ## no error now!
# Examine result df2_spark df2_spark.show()

In [None]:
#Spark SQL
#The sparkSession is the main entry point for Spark SQL applications
#Typical Data Management
df_person.printSchema()
df_person.count()
df_person.describe().show()
df_person[["Age","p_id"]].show(5)
df_person.select('Age','p_id').show(5)
df_person.filter(df_person['Age'] > 90).show(5)
df_person.filter(df_person['Age']> 90).describe('Age').show(5)

In [None]:
#Converting Column Types
import pyspark.sql.functions as s_f
df_person = df_person.withColumn('Baseline_date_dt',
     s_f.to_date(df_person['Baseline_date'], 'MM/dd/yyyy'))

df_person.select('Baseline_date','Baseline_date_dt').show(10)

In [None]:
#Add new variables as function of others
df_person =
df_person.withColumn('log_income',
                s_f.log(df_person['Income'])) df_person.select('Income','log_income').show(10)

In [None]:
#Gender variable example
df_person.select('Gender').distinct().show()

df_person.groupBy('Gender').count().show()

df_person = df_person.withColumn('Gender',
                                    s_f.when(df_person['Gender'] != '999',
                                            df_person['Gender']).otherwise(None))

df_person.groupBy('Gender').count().show()

In [None]:
#Summary Statistics: Quantiles
#Calculate quantiles for Age
df_person.select('Age').dropna().approxQuantile('Age',[0.5],0)

df_person.select('Age').dropna().approxQuantile('Age',[0.25,0.5,0.75],0)

df_person.select('Age').describe().show()

df_person = df_person.withColumn('Age', s_f.when(
                            df_person['Age'] < 100,df_person['Age']).otherwise(None))

df_person.select('Age').describe().show()


In [None]:
#Group By Analysis
df_person.groupBy('Gender').avg('Income').show()

In [None]:
#Group By Multiple columns
df_person.groupBy('Gender','Education').avg('Income').show()

In [None]:
#Pivot (create a pivot table with col and row variables)
df_person.groupBy('Gender').pivot('Education').avg('Income').show()

In [None]:
#Correlation
#Currently, only the Pearson correlation coefficient is supported
df_person.corr('Age','Income') # Pearson correlation coef

df_person.corr("Age","lab_value_1") # Pearson
correlation coef

In [None]:
#Spark SQL Usage
#A convenient entry-point is to register your dataset as a table
#We can now interact with the table using SQL queries
df_person.createOrReplaceTempView('person')

spark.sql('select p_id, age from person where age > 55').show()

In [None]:
spark.sql('select corr(age,income) as cor from person where age > 55').show()

In [None]:
spark.sql('select min(age) as min, max(age) as max from person where age > 55').show()

In [None]:
#Merging Data
#We can merge data using a Pandas-like .join() method
#Here we load another dataset in order to merge it to the first

#.join() method
#create spark dataframe
df_cluster = spark.read.csv('cluster_demo.csv',header=True,inferSchema=True)
df_cluster.show(5)

In [None]:
#left join 
df_left = df_person.join(df_cluster, on='Clust_id', how = 'left_outer')

#right join
df_right = df_person.join(df_cluster, on='Clust_id', how = 'right_outer')

#inner join
df_inner = df_person.join(df_cluster, on = 'Clust_id', how = 'inner')

In [None]:
#Part II - PySpark ML

In [None]:
#Perform machine learning within the Spark environment, using the Spark ML API from the PySpark Python library
## How to perform regression within Spark
## Various classification routines
## How to construct a multi-layer perceptron (MLP) feed-forward neural network

In [None]:
#Loading library
import numpy as np 
import pandas as pd 
import os

import pyspark

In [None]:
#Data Preparation
#To generate labels column from categorical data
from pyspark.ml.feature 
import OneHotEncoderEstimator, StringIndexer

#To generate features vector
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler

In [None]:
#Import the data
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('spark_ml').getOrCreate() DATA_DIR = "../data" #or wherever this might be

cnc_data = spark.read.csv(os.path.join(DATA_DIR, "manufacturing", "CNC_Tool_Wear", "cnc_experiments_all.csv"),
header=True, inferSchema=True)

In [None]:
cnc_class_labels = StringIndexer(inputCol='tool_condition',outputCol='label')

cnc_data = cnc_class_labels.fit(cnc_data).transform(cnc_data)

In [None]:
#Converting Features
## convert the non-string features to a feature vector
cnc_feature_labels = cnc_data.columns[0:47] + cnc_data.columns[49:51]

cnc_feature_vector = VectorAssembler(
    inputCols=cnc_feature_labels,
    outputCol="features")  # Note the strict naming convention here

cnc_data = cnc_feature_vector.transform(cnc_data)

print("\nConverted features and labels for CNC wear dataset:")

cnc_data.sample(0.001).select("features","label").show(10)

In [None]:
#Splitting Data
cnc_train, cnc_test = cnc_data.randomSplit([0.75, 0.25], 42)

print(f"\nRows of data for training: {cnc_train.count()}, testing: {cnc_test.count()}\n")

In [None]:
#Fitting a Model
from pyspark.ml.classification import LogisticRegression

logReg = LogisticRegression(maxIter=10, regParam=0.3,elasticNetParam=0.8)
logRegModel = logReg.fit(cnc_train)

print(f"Coefficients: {logRegModel.coefficients}; Intercept: {logRegModel.intercept}")

In [None]:
#Evaluating a model
##logistic regression classifier
##elastic net
from pyspark.ml.evaluation import BinaryClassificationEvaluator

validation = logRegModel.transform(cnc_test).select("rawPrediction", "label") 

evaluator = BinaryClassificationEvaluator().setMetricName("areaUnderROC") 

print(f"\nValidation accuracy: {evaluator.evaluate(validation)*100}%\n" )

In [None]:
#Linear Regression
concrete_data = spark.read.csv(os.path.join(DATA_DIR, 
                                            "manufacturing","concrete","concrete.csv"),
                                            header=True, inferSchema=True)

In [None]:
#Preparing Data
#We rename the last column (compressive strength) to 'label':
#We would like to make the rest into a feature vector, and the rest of the
#logic is similar to that of the logistic regression example above. 
#Once the logic is clear, we may wish to formalize this as a pipeline. 

#This will apply each of the data transforms and model fits into a single operation.
concrete_data = concrete_data.withColumnRenamed(concrete_data.columns[-1], "label")
concrete_feature_labels = concrete_data.columns[0:8]
concrete_feature_vector = VectorAssembler(
                                        inputCols=concrete_feature_labels,
                                        outputCol="features")  # Note the strict naming convention here

In [None]:
#Split the Data
concrete_train, concrete_test = concrete_data.randomSplit([0.75, 0.25], 42)
print(f"\nRows of data for training:
                               {concrete_train.count()}, testing:
                               {concrete_test.count()}\n")

In [None]:
#Instantiate the Model
# instantiate
from pyspark.ml.regression import LinearRegression
linReg = LinearRegression(
                maxIter=10, regParam=0.3, elasticNetParam=0.8, solver='normal')
# fit
from pyspark.ml.pipeline import Pipeline
lin_pipeline = Pipeline(stages=[concrete_feature_vector, linReg]) 
linRegModel = lin_pipeline.fit(concrete_train)

In [None]:
#Coefficients and Intercept
print(f"Coefficients: {linRegModel.stages[-1].coefficients}")

print(f"Intercept: {linRegModel.stages[-1].intercept}")

In [None]:
#Summarize the Model
trainingSummary = linRegModel.stages[-1].summary

print(f"numIterations:{trainingSummary.totalIterations}")
print(f"objectiveHistory: {trainingSummary.objectiveHistory}" )

trainingSummary.residuals.show(10)

print(f"RMSE: {trainingSummary.rootMeanSquaredErr or}")
print(f"r2: {trainingSummary.r2}")

In [None]:
#Regression Evaluator
from pyspark.ml.evaluation import RegressionEvaluator

concrete_test = concrete_feature_vector.transform(concrete_test)
validation = linRegModel.stages[- 1].transform(concrete_test).select("prediction", "label")
evaluator = RegressionEvaluator()

print(f"\nRMSE: {evaluator.evaluate(validation)}\n" )

In [None]:
#Hyperparameter Tuning
#Tuning library in Spark ML

In [None]:
#Single Validation Split
## One possibility is to split a single validation set at random from the training data and select the best model, 
## from a given range of hyperparameters (grid search), according to some evaluation metric. 
## This is the purpose of TrainValidationSplit:
from pyspark.ml.tuning import ParamGridBuilder, TrainValidationSplit

paramGrid = ParamGridBuilder() \
        .addGrid(linReg.regParam, [0.1, 0.01, 5.0])\ 
        .addGrid(linReg.elasticNetParam, np.arange(0.0, 1.25, 0.25))\ 
        .addGrid(linReg.fitIntercept, [False, True])\
        .build()

In [None]:
#Then we will split off 25% of the training data to a single validation set. 
#We'll make use of the linReg linear regressor and RegressionEvaluator we created above (which has an RMSE default output metric):
#We can then fit the best model, according to the evaluator, by exploring the hyperparameter grid:
trainValSplit = TrainValidationSplit(estimator=linReg, estimatorParamMaps=paramGrid,
                           evaluator=RegressionEvaluator(),
                           trainRatio=0.75)
concrete_train = concrete_feature_vector.transform(concrete_train)
linReg_tvs = trainValSplit.fit(concrete_train)

In [None]:
# The default RegressionEvaluator requires a prediction and a label column:
# We can see what coefficients were chosen
linReg_valid = linReg_tvs.transform(concrete_test).select("prediction", "label") 
evaluator = RegressionEvaluator()

print(f"\nRMSE: {evaluator.evaluate(linReg_valid)}\n" )

print(f"Model coefficients: {linReg_tvs.bestModel.coefficients}") 
print(f"Model intercept: {linReg_tvs.bestModel.intercept}")

In [None]:
# K-fold Cross Validation
from pyspark.ml.tuning import CrossValidator

crossValid = CrossValidator(estimator=linReg,
                              estimatorParamMaps=paramGrid,
                              evaluator=RegressionEvaluator(),
                              numFolds=5)

In [None]:
# Here we have set 𝑘=5 and recycled the paramGrid, estimator and evaluator from above. 
# This may take a while to train, depending on the parameter grid size and number of folds.
linReg_cv = crossValid.fit(concrete_train) 
linReg_cv.transform(concrete_test).select("features", "label", "prediction").show()

In [None]:
#Deep Learning with Spark
## Spark ML has a Multilayer Perceptron (MLP) built-in function. 
## This is a feed-forward artificial neural network, used in many classification problems. 
## It is a very powerful classifier and is relatively simple to implement in Spark. 
## However, non-trivial models require a lot of computational power to train.
from pyspark.ml.classification import MultilayerPerceptronClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [None]:
mlp_architecture = [len(cnc_feature_labels), 128, 128, 2]

In [None]:
#instantiate and train the model (fit)
mlp_template = MultilayerPerceptronClassifier( 
                                            maxIter=100,
                                            layers=mlp_architecture, 
                                            blockSize=128, 
                                            seed=13579)
mlp_model = mlp_template.fit(cnc_train)

In [None]:
#validate against the test data
validation = mlp_model.transform(cnc_test).select("rawPrediction", "label")
evaluator = BinaryClassificationEvaluator()

print(f"\nValidation accuracy: {evaluator.evaluate(validation)*100}%\n" )