# Big Data Platforms

## PySpark Machine Learning

### MLlib applied to Wine reviews data 

**Dataset:**
https://www.kaggle.com/zynicide/wine-reviews


Copyright: 2018 [Ashish Pujari](apujari@uchicago.edu)

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

import matplotlib.pyplot as plt
%matplotlib inline

In [2]:
#create Spark session
spark = SparkSession.builder.appName('WineReviewsML').getOrCreate()

#change configuration settings on Spark 
conf = spark.sparkContext._conf.setAll([('spark.executor.memory', '5g'), ('spark.app.name', 'Spark Updated Conf'), ('spark.executor.cores', '4'), ('spark.cores.max', '4'), ('spark.driver.memory','8g')])

#print spark configuration settings
spark.sparkContext.getConf().getAll()

[('spark.executor.id', 'driver'),
 ('spark.executor.memory', '5g'),
 ('spark.driver.port', '50400'),
 ('spark.executor.cores', '4'),
 ('spark.cores.max', '4'),
 ('spark.app.name', 'Spark Updated Conf'),
 ('spark.rdd.compress', 'True'),
 ('spark.driver.memory', '8g'),
 ('spark.app.id', 'local-1550715547292'),
 ('spark.serializer.objectStreamReset', '100'),
 ('spark.master', 'local[*]'),
 ('spark.submit.deployMode', 'client'),
 ('spark.driver.host', '10.150.158.6'),
 ('spark.ui.showConsoleProgress', 'true')]

## Read Data

In [3]:
df = spark.read \
    .option("quote", "\"")  \
    .option("escape", "\"") \
    .option("ignoreLeadingWhiteSpace",True) \
    .csv("/Users/rowena/Documents/wine-reviews/winemag-data_first150k.csv",inferSchema=True, header=True )

In [4]:
df.printSchema()

root
 |-- _c0: integer (nullable = true)
 |-- country: string (nullable = true)
 |-- description: string (nullable = true)
 |-- designation: string (nullable = true)
 |-- points: integer (nullable = true)
 |-- price: double (nullable = true)
 |-- province: string (nullable = true)
 |-- region_1: string (nullable = true)
 |-- region_2: string (nullable = true)
 |-- variety: string (nullable = true)
 |-- winery: string (nullable = true)



In [5]:
df2 = spark.read \
    .option("quote", "\"")  \
    .option("escape", "\"") \
    .option("ignoreLeadingWhiteSpace",True) \
    .csv("/Users/rowena/Documents/wine-reviews/winemag-data-130k-v2.csv",inferSchema=True, header=True )

In [6]:
df2.printSchema()

root
 |-- _c0: integer (nullable = true)
 |-- country: string (nullable = true)
 |-- description: string (nullable = true)
 |-- designation: string (nullable = true)
 |-- points: integer (nullable = true)
 |-- price: double (nullable = true)
 |-- province: string (nullable = true)
 |-- region_1: string (nullable = true)
 |-- region_2: string (nullable = true)
 |-- taster_name: string (nullable = true)
 |-- taster_twitter_handle: string (nullable = true)
 |-- title: string (nullable = true)
 |-- variety: string (nullable = true)
 |-- winery: string (nullable = true)



In [7]:
df2.head()

Row(_c0=0, country='Italy', description="Aromas include tropical fruit, broom, brimstone and dried herb. The palate isn't overly expressive, offering unripened apple, citrus and dried sage alongside brisk acidity.", designation='Vulkà Bianco', points=87, price=None, province='Sicily & Sardinia', region_1='Etna', region_2=None, taster_name='Kerin O’Keefe', taster_twitter_handle='@kerinokeefe', title='Nicosia 2013 Vulkà Bianco  (Etna)', variety='White Blend', winery='Nicosia')

In [8]:
#combine the two datasets
df = df.union(df2.drop("taster_name", "taster_twitter_handle", "title"))

## Data Exploration

In [None]:
#summary statistics
df.describe().toPandas()

In [None]:
df.count()

In [None]:
df.show(5)

In [None]:
#Count rows with missing values
df.dropna().count()

In [None]:
#Find the number of missing values for each column

from pyspark.sql.functions import isnan, when, count, col
df.select([count(when(df[c].isNull(), c)).alias(c) for c in df.columns]).show()

In [None]:
#drop rows where country is missing
df = df.filter(df["country"].isNotNull())

In [None]:
#mean price
meanprice = df.agg({"price": "mean"}).collect()[0][0]
meanprice

In [None]:
#impute the value of price where it is missing to the mean price
df = df.na.fill(meanprice, "price") \
    .na.fill("Unknown", "region_1")

In [None]:
#count the number of countries
df.select('Country').distinct().count()

In [None]:
#Wines by country
df.groupby('Country').count().orderBy(["count"], ascending=[0]).show()

In [None]:
#count the number of varieties
df.select('Variety').distinct().count()

In [None]:
#Wines by variety
df.groupby("variety").count().orderBy(["count"], ascending=[0]).show()

In [None]:
#Show a count of the number of wines grouped and sorted by Country and Province
df.groupBy('Country', 'province').count().orderBy(["count"], ascending=[0]).show()

In [None]:
#Show a count of the number of wines grouped and sorted by Country and Province
df.cube("Country", "province", "region_1").count().dropna().orderBy(["count"], ascending=[0]).show()

In [None]:
#Find the min,max and avg price in each country
df.groupBy('Country').agg(F.min('price'),F.max('price'),F.avg('price')).orderBy(["max(price)"], ascending=[0]).show()

In [None]:
#Highest rated wine - using API
from pyspark.sql.functions import max
df.agg(max(df.points)).head()[0]

In [None]:
#average points ranked by country
df2 = df.groupBy('Country').agg(F.min('points'),F.max('points'),F.avg('points')).orderBy(["avg(points)"], ascending=[0])
df2.show()

In [None]:
#convert Pyspark dataframe to Pandas dataframe for plotting
pdf = df2.toPandas()
pdf.plot(kind= 'bar', x='Country', y='avg(points)', figsize=(18,4), rot=30, ylim=(80, 95))

##  Feature Engineering

In [None]:
from pyspark.ml.linalg import Vectors
from pyspark.ml.stat import Correlation

#are points and price correlated ?
df.stat.corr("points", "price")

#weak uphill relationship

In [None]:
min_price = df.agg({"price": "min"}).collect()[0][0]
max_price = df.agg({"price": "max"}).collect()[0][0]
mean_price = df.agg({"price": "mean"}).collect()[0][0]

print ("Minimum Price : ", min_price, ", Maximum Price : ", max_price, ", Mean Price : ", mean_price)

In [None]:
#normalize the price
df = df.withColumn('price_norm', (df["price"] - min_price)/ ( max_price  - min_price))
df.show(2)

In [None]:
from pyspark.ml.feature import QuantileDiscretizer

#High Medium Low
discretizer = QuantileDiscretizer(numBuckets=3, inputCol="price", outputCol="price_category")
df = discretizer.fit(df).transform(df)
df.show(3)

In [None]:
#High Medium Low
discretizer = QuantileDiscretizer(numBuckets=3, inputCol="points", outputCol="points_category")
df = discretizer.fit(df).transform(df)
df.show(3)

### One Hot Encoding

In [None]:
from pyspark.ml.feature import VectorAssembler

from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, OneHotEncoderEstimator

#convert relevant categorical into one hot encoded
indexer1 = StringIndexer(inputCol="country", outputCol="countryIdx").setHandleInvalid("skip")
indexer2 = StringIndexer(inputCol="province", outputCol="provinceIdx").setHandleInvalid("skip")
indexer3 = StringIndexer(inputCol="variety", outputCol="varietyIdx").setHandleInvalid("skip")
indexer4 = StringIndexer(inputCol="winery", outputCol="wineryIdx").setHandleInvalid("skip")

#gather all indexers as inputs to the One Hot Encoder
inputs = [indexer1.getOutputCol(), indexer2.getOutputCol(), \
          indexer3.getOutputCol(), indexer4.getOutputCol()]

#create the one hot encoder
encoder = OneHotEncoderEstimator(inputCols=inputs,  \
                                 outputCols=["countryVec", "provinceVec", \
                                             "varietyVec", "wineryVec"])

#run it through a pipeline
pipeline = Pipeline(stages=[indexer1, indexer2, indexer3, indexer4, encoder])
pipeline = pipeline.fit(df).transform(df)
#we have removed NAs so dont need to impute missing values.
#pipeline = pipeline.na.fill(0) 

pipeline.show(5)

In [None]:
#gather feature vector and identify features
assembler = VectorAssembler(inputCols = ['countryVec', 'provinceVec', \
                                         'varietyVec', 'wineryVec', 'points'], \
                            outputCol = 'features')
pipeline = assembler.transform(pipeline)

In [None]:
#split data into train and test
train_df, test_df = pipeline.randomSplit([.8,.2],seed=1234)
train_df.show(1)

##  Regression

Let us try to predict the price given features such as country, variety, region, etc.

### Linear Regression

In [None]:
%%time
from pyspark.ml.regression import LinearRegression

#Elastic Net
lr = LinearRegression(featuresCol = 'features', labelCol='price', regParam=0.3, elasticNetParam=0.8, maxIter=20)
lrm = lr.fit(train_df)

In [None]:
#coefficients
#print("Coefficients: " + str(lrm.coefficients))
print("Intercept: " + str(lrm.intercept))

#model summary
print("RMSE: %f" % lrm.summary.rootMeanSquaredError)
print("r2: %f" % lrm.summary.r2)

#p-values are not provided in this model for the solver being used
#print("pValues: " + str(lrm.summary.pValues))

In [None]:
#make predictions
predictions = lrm.transform(test_df)

In [None]:
from itertools import chain
attrs = sorted(
    (attr["idx"], attr["name"]) for attr in (chain(*predictions
        .schema[lrm.summary.featuresCol]
        .metadata["ml_attr"]["attrs"].values())))

#[(name, lrm.summary.pValues[idx]) for idx, name in attrs]

In [None]:
from pyspark.ml.evaluation import RegressionEvaluator

eval = RegressionEvaluator(labelCol="price", predictionCol="prediction", metricName="rmse")

# Root Mean Square Error
rmse = eval.evaluate(predictions)
print("RMSE: %.3f" % rmse)

# Mean Square Error
mse = eval.evaluate(predictions, {eval.metricName: "mse"})
print("MSE: %.3f" % mse)

# Mean Absolute Error
mae = eval.evaluate(predictions, {eval.metricName: "mae"})
print("MAE: %.3f" % mae)

# r2 - coefficient of determination
r2 = eval.evaluate(predictions, {eval.metricName: "r2"})
print("r2: %.3f" %r2)

In [None]:
#view predictions against test
predictions.select("country", "region_1", "winery", "variety", "points", "price", "prediction"). \
orderBy(["country", "region_1", "winery", "variety", "points", "price", "prediction"]).show(20)

<b>Exercise</b>: <font color='red'>Tune the model hyperparameters, see if adding additional attributes from the dataset improves the model</font>

## Classification

Let us try to predict the price_category given features such as country, variety, region, etc.

### Logistic Regression

In [None]:
%%time
from pyspark.ml.classification import LogisticRegression

# Set parameters for Logistic Regression
lgr = LogisticRegression(maxIter=10, featuresCol = 'features', labelCol='price_category')

# Fit the model to the data.
lgrm = lgr.fit(train_df)

# Given a dataset, predict each point's label, and show the results.
predictions = lgrm.transform(test_df)

In [None]:
#compare predictions against true labels
predictions.select("country", "region_1", "winery", "variety", "points", "price_category", "prediction").show(20)

In [None]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

#print evaluation metrics
evaluator = MulticlassClassificationEvaluator(labelCol="price_category", predictionCol="prediction")

print(evaluator.evaluate(predictions, {evaluator.metricName: "accuracy"}))
print(evaluator.evaluate(predictions, {evaluator.metricName: "f1"}))

### Random Forest

In [None]:
%%time
from pyspark.ml.classification import RandomForestClassifier

# Set parameters for the Random Forest.
#rfc = RandomForestClassifier(maxDepth=5, numTrees=15, impurity="gini", labelCol="price_category", predictionCol="prediction")

# Fit the model to the data.
#rfcm = rfc.fit(train_df)

# Given a dataset, predict each point's label, and show the results.
#predictions = rfcm.transform(test_df)

In [None]:
#print evaluation metrics

print(evaluator.evaluate(predictions, {evaluator.metricName: "accuracy"}))
print(evaluator.evaluate(predictions, {evaluator.metricName: "f1"}))

<b>Exercise</b>: <font color='red'>Tune the model hyperparameters - increase number of trees to see if the model improves.</font>

<b>Exercise</b>: <font color='red'>Implement a different classifier from Spark ML and compare metrics</font>

#### Feature importance selector 

https://www.timlrx.com/2018/06/19/feature-selection-using-feature-importance-score-creating-a-pyspark-estimator/