In [None]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://archive.apache.org/dist/spark/spark-3.1.1/spark-3.1.1-bin-hadoop3.2.tgz
!tar xf spark-3.1.1-bin-hadoop3.2.tgz
!pip install -q findspark
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.1-bin-hadoop3.2"
import findspark
findspark.init()

In [None]:
from pyspark import SparkContext
sc = SparkContext(master = 'local')

from pyspark.sql import SparkSession
spark = SparkSession.builder \
          .appName("Python Spark SQL basic example") \
          .config("spark.some.config.option", "some-value") \
          .getOrCreate()

# Linear regression without cross-valiation

In [None]:
from google.colab import files
files.upload()

In [None]:
ad = spark.read.csv('./Advertising.csv', header=True, inferSchema=True)
ad.show(5)

+---+-----+-----+---------+-----+
|_c0|   TV|Radio|Newspaper|Sales|
+---+-----+-----+---------+-----+
|  1|230.1| 37.8|     69.2| 22.1|
|  2| 44.5| 39.3|     45.1| 10.4|
|  3| 17.2| 45.9|     69.3|  9.3|
|  4|151.5| 41.3|     58.5| 18.5|
|  5|180.8| 10.8|     58.4| 12.9|
+---+-----+-----+---------+-----+
only showing top 5 rows



In [None]:
type(ad)

pyspark.sql.dataframe.DataFrame

## Transform data structure

In [None]:
from pyspark.ml.linalg import Vectors
ad_df = ad.rdd.map(lambda x: [Vectors.dense(x[0:3]), x[-1]]).toDF(['features', 'label'])
ad_df.show(5)

+----------------+-----+
|        features|label|
+----------------+-----+
|[1.0,230.1,37.8]| 22.1|
| [2.0,44.5,39.3]| 10.4|
| [3.0,17.2,45.9]|  9.3|
|[4.0,151.5,41.3]| 18.5|
|[5.0,180.8,10.8]| 12.9|
+----------------+-----+
only showing top 5 rows



## Build linear regression model

In [None]:
from pyspark.ml.regression import LinearRegression
lr = LinearRegression(featuresCol = 'features', labelCol = 'label')

In [None]:
type(lr)

pyspark.ml.regression.LinearRegression

## Fit the model

In [None]:
lr_model = lr.fit(ad_df)

In [None]:
type(lr_model)

pyspark.ml.regression.LinearRegressionModel

In [None]:
lr_model

LinearRegressionModel: uid=LinearRegression_beacdf2c049e, numFeatures=3

## Prediction

In [None]:
pred = lr_model.transform(ad_df)
pred.show(5)

+----------------+-----+------------------+
|        features|label|        prediction|
+----------------+-----+------------------+
|[1.0,230.1,37.8]| 22.1|20.605029205675923|
| [2.0,44.5,39.3]| 10.4|12.392493270268307|
| [3.0,17.2,45.9]|  9.3|12.381882445789145|
|[4.0,151.5,41.3]| 18.5|17.663642367397202|
|[5.0,180.8,10.8]| 12.9|13.277141826096056|
+----------------+-----+------------------+
only showing top 5 rows



## Module evaluation

In [None]:
from pyspark.ml.evaluation import RegressionEvaluator
evaluator = RegressionEvaluator(predictionCol='prediction', labelCol='label')
evaluator.setMetricName('r2').evaluate(pred)

0.8972276882820611

In [None]:
from pyspark.ml.evaluation import RegressionEvaluator
evaluator = RegressionEvaluator(predictionCol='prediction', labelCol='label')
evaluator.setMetricName('mse').evaluate(pred)

2.7836644997872146

In [None]:
evaluator.setMetricName('mae').evaluate(pred)

1.254575253603657

### Exercise

1. Do research in Spark documentaion to study & practice in other regression models.
2. Investigate other metrics for evaluating regression model in Spark.
3. Practice on `Advertising` dataset.
4. Do the same thing, but now you should split into `train` and `test` datasets, so the fitting modelling in the `train`, then do the evaluation in the `test`

# Linear regression with cross-validation in Spark

In [None]:
training, test = ad_df.randomSplit([0.8, 0.2], seed=123)

In [None]:
##=====build cross valiation model======

# estimator
lr = LinearRegression(featuresCol = 'features', labelCol = 'label')

# parameter grid
from pyspark.ml.tuning import ParamGridBuilder
param_grid = ParamGridBuilder().\
    addGrid(lr.regParam, [0, 0.5, 1]).\
    addGrid(lr.elasticNetParam, [0, 0.5, 1]).\
    build()

# evaluator
evaluator = RegressionEvaluator(predictionCol='prediction', labelCol='label', metricName='r2')

# cross-validation model
from pyspark.ml.tuning import CrossValidator
cv = CrossValidator(estimator=lr, estimatorParamMaps=param_grid, evaluator=evaluator, numFolds=4)

In [None]:
type(cv)

pyspark.ml.tuning.CrossValidator

In [None]:
cv_model = cv.fit(training)

In [None]:
type(cv_model)

pyspark.ml.tuning.CrossValidatorModel

In [None]:
cv_model

CrossValidatorModel_ecf7f70de668

In [None]:
pred_training_cv = cv_model.transform(training)
pred_test_cv = cv_model.transform(test)

In [None]:
# performance on training data
evaluator.setMetricName('r2').evaluate(pred_training_cv)

0.8906321535733986

In [None]:
# performance on test data
evaluator.setMetricName('r2').evaluate(pred_test_cv)

0.914580132831202

## Intercept and coefficients

In [None]:
print('Intercept: ', cv_model.bestModel.intercept, "\n",
     'coefficients: ', cv_model.bestModel.coefficients)

Intercept:  3.1016735129846187 
 coefficients:  [-0.0018772527957915963,0.046197905637639806,0.18389173053167443]


In [None]:
ad_df.show(5)

+----------------+-----+
|        features|label|
+----------------+-----+
|[1.0,230.1,37.8]| 22.1|
| [2.0,44.5,39.3]| 10.4|
| [3.0,17.2,45.9]|  9.3|
|[4.0,151.5,41.3]| 18.5|
|[5.0,180.8,10.8]| 12.9|
+----------------+-----+
only showing top 5 rows



## Get parameter values from the best model

In [None]:
print('best regParam: ' + str(cv_model.bestModel._java_obj.getRegParam()) + "\n" +
     'best ElasticNetParam:' + str(cv_model.bestModel._java_obj.getElasticNetParam()))

best regParam: 0.0
best ElasticNetParam:0.0


### Exercise

Wrap up the code of building cross-validation models in a Python class

### Exercise

Do the regression to forecast the `inside_sale` of this data: https://github.com/maks-p/restaurant_sales_forecasting/blob/master/csv/CSV_for_EDA_NEW.csv

# Generalized regression

In [None]:
from google.colab import files
files.upload()

In [None]:
cuse = spark.read.csv('./cuse_binary.csv', header=True, inferSchema=True)
cuse.show(5)

+---+---------+---------+---+
|age|education|wantsMore|  y|
+---+---------+---------+---+
|<25|      low|      yes|  0|
|<25|      low|      yes|  0|
|<25|      low|      yes|  0|
|<25|      low|      yes|  0|
|<25|      low|      yes|  0|
+---+---------+---------+---+
only showing top 5 rows



In [None]:
cuse.columns[0:3]
# cuse.select('age').distinct().show()
cuse.select('age').rdd.countByValue()
# cuse.select('education').rdd.countByValue()

defaultdict(int,
            {Row(age='<25'): 397,
             Row(age='25-29'): 404,
             Row(age='30-39'): 612,
             Row(age='40-49'): 194})

In [None]:
# string index each categorical string columns
from pyspark.ml.feature import StringIndexer
from pyspark.ml import Pipeline
indexers = [StringIndexer(inputCol=column, outputCol="indexed_"+column) for column in ('age', 'education', 'wantsMore')]
pipeline = Pipeline(stages=indexers)
indexed_cuse = pipeline.fit(cuse).transform(cuse)
indexed_cuse.select('age', 'indexed_age').distinct().show(5)

+-----+-----------+
|  age|indexed_age|
+-----+-----------+
|30-39|        0.0|
|  <25|        2.0|
|25-29|        1.0|
|40-49|        3.0|
+-----+-----------+



In [None]:
# onehotencode each indexed categorical columns
from pyspark.ml.feature import OneHotEncoder
columns = indexed_cuse.columns[0:3]
onehoteencoders = [OneHotEncoder(inputCol="indexed_"+column, outputCol="onehotencode_"+column) for column in columns]
pipeline = Pipeline(stages=onehoteencoders)
onehotencode_columns = ['onehotencode_age', 'onehotencode_education', 'onehotencode_wantsMore', 'y']
onehotencode_cuse = pipeline.fit(indexed_cuse).transform(indexed_cuse).select(onehotencode_columns)
onehotencode_cuse.distinct().show(5)

+----------------+----------------------+----------------------+---+
|onehotencode_age|onehotencode_education|onehotencode_wantsMore|  y|
+----------------+----------------------+----------------------+---+
|   (3,[1],[1.0])|             (1,[],[])|         (1,[0],[1.0])|  0|
|   (3,[2],[1.0])|         (1,[0],[1.0])|             (1,[],[])|  1|
|   (3,[0],[1.0])|         (1,[0],[1.0])|         (1,[0],[1.0])|  0|
|       (3,[],[])|         (1,[0],[1.0])|         (1,[0],[1.0])|  1|
|   (3,[2],[1.0])|             (1,[],[])|         (1,[0],[1.0])|  0|
+----------------+----------------------+----------------------+---+
only showing top 5 rows



In [None]:
# assemble all feature columns into on single vector column
from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler(inputCols=['onehotencode_age', 'onehotencode_education', 'onehotencode_wantsMore'], outputCol='features')
cuse_df_2 = assembler.transform(onehotencode_cuse).withColumnRenamed('y', 'label')
cuse_df_2.show(5)

+----------------+----------------------+----------------------+-----+-------------------+
|onehotencode_age|onehotencode_education|onehotencode_wantsMore|label|           features|
+----------------+----------------------+----------------------+-----+-------------------+
|   (3,[2],[1.0])|             (1,[],[])|         (1,[0],[1.0])|    0|(5,[2,4],[1.0,1.0])|
|   (3,[2],[1.0])|             (1,[],[])|         (1,[0],[1.0])|    0|(5,[2,4],[1.0,1.0])|
|   (3,[2],[1.0])|             (1,[],[])|         (1,[0],[1.0])|    0|(5,[2,4],[1.0,1.0])|
|   (3,[2],[1.0])|             (1,[],[])|         (1,[0],[1.0])|    0|(5,[2,4],[1.0,1.0])|
|   (3,[2],[1.0])|             (1,[],[])|         (1,[0],[1.0])|    0|(5,[2,4],[1.0,1.0])|
+----------------+----------------------+----------------------+-----+-------------------+
only showing top 5 rows



In [None]:
# split data into training and test datasets
training, test = cuse_df_2.randomSplit([0.8, 0.2], seed=1234)
training.show(5)

+----------------+----------------------+----------------------+-----+---------+
|onehotencode_age|onehotencode_education|onehotencode_wantsMore|label| features|
+----------------+----------------------+----------------------+-----+---------+
|       (3,[],[])|             (1,[],[])|             (1,[],[])|    0|(5,[],[])|
|       (3,[],[])|             (1,[],[])|             (1,[],[])|    0|(5,[],[])|
|       (3,[],[])|             (1,[],[])|             (1,[],[])|    0|(5,[],[])|
|       (3,[],[])|             (1,[],[])|             (1,[],[])|    0|(5,[],[])|
|       (3,[],[])|             (1,[],[])|             (1,[],[])|    0|(5,[],[])|
+----------------+----------------------+----------------------+-----+---------+
only showing top 5 rows



In [None]:
test.show(5)

+----------------+----------------------+----------------------+-----+---------+
|onehotencode_age|onehotencode_education|onehotencode_wantsMore|label| features|
+----------------+----------------------+----------------------+-----+---------+
|       (3,[],[])|             (1,[],[])|             (1,[],[])|    0|(5,[],[])|
|       (3,[],[])|             (1,[],[])|             (1,[],[])|    0|(5,[],[])|
|       (3,[],[])|             (1,[],[])|             (1,[],[])|    0|(5,[],[])|
|       (3,[],[])|             (1,[],[])|             (1,[],[])|    0|(5,[],[])|
|       (3,[],[])|             (1,[],[])|             (1,[],[])|    0|(5,[],[])|
+----------------+----------------------+----------------------+-----+---------+
only showing top 5 rows



In [None]:
## ======= build cross validation model ===========

# estimator
from pyspark.ml.regression import GeneralizedLinearRegression
glm = GeneralizedLinearRegression(featuresCol='features', labelCol='label', family='binomial')

# parameter grid
from pyspark.ml.tuning import ParamGridBuilder
param_grid = ParamGridBuilder().\
    addGrid(glm.regParam, [0, 0.5, 1, 2, 4]).\
    build()

# evaluator
from pyspark.ml.evaluation import BinaryClassificationEvaluator
evaluator = BinaryClassificationEvaluator(rawPredictionCol='prediction')

# build cross-validation model
from pyspark.ml.tuning import CrossValidator
cv = CrossValidator(estimator=glm, estimatorParamMaps=param_grid, evaluator=evaluator, numFolds=4)

In [None]:
# fit model
# cv_model = cv.fit(training)
cv_model = cv.fit(cuse_df_2)

In [None]:
type(cv_model)

pyspark.ml.tuning.CrossValidatorModel

In [None]:
# prediction
pred_training_cv = cv_model.transform(training)
pred_test_cv = cv_model.transform(test)

pred_training_cv.show(5)
pred_test_cv.show(5, truncate=False)

+----------------+----------------------+----------------------+-----+---------+------------------+
|onehotencode_age|onehotencode_education|onehotencode_wantsMore|label| features|        prediction|
+----------------+----------------------+----------------------+-----+---------+------------------+
|       (3,[],[])|             (1,[],[])|             (1,[],[])|    0|(5,[],[])|0.5140024065151293|
|       (3,[],[])|             (1,[],[])|             (1,[],[])|    0|(5,[],[])|0.5140024065151293|
|       (3,[],[])|             (1,[],[])|             (1,[],[])|    0|(5,[],[])|0.5140024065151293|
|       (3,[],[])|             (1,[],[])|             (1,[],[])|    0|(5,[],[])|0.5140024065151293|
|       (3,[],[])|             (1,[],[])|             (1,[],[])|    0|(5,[],[])|0.5140024065151293|
+----------------+----------------------+----------------------+-----+---------+------------------+
only showing top 5 rows

+----------------+----------------------+----------------------+-----+-----

In [None]:
cv_model.bestModel.coefficients

DenseVector([-0.2806, -0.7999, -1.1892, 0.325, -0.833])

In [None]:
cv_model.bestModel.intercept

0.056024275169240606

In [None]:
evaluator.evaluate(pred_training_cv)

0.6716478245974649

In [None]:
evaluator.evaluate(pred_test_cv)

0.6830864197530864

### Exercise

1. Do the generalized regression to forecast the `inside_sale` of this data: https://github.com/maks-p/restaurant_sales_forecasting/blob/master/csv/CSV_for_EDA_NEW.csv

2. Wrap your code in a pipeline as a Python class