# Introduction to Spark ML

## Predict chances of infant survival with ML

In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.master('local[4]').appName('spark_ml').getOrCreate()

### Load the data

First, we load the data.

In [None]:
births = spark.read.options(inferSchema = True).csv('births_transformed.csv.gz', header=True)
births.printSchema()

### Create transformers

Having done this, we can now create our first `Transformer`.

In [None]:
import pyspark.sql.types as types
import pyspark.ml.feature as ft

encoder = ft.OneHotEncoder(inputCol='BIRTH_PLACE', outputCol='BIRTH_PLACE_VEC')

Let's now create a single column with all the features collated together. 

In [None]:
featuresCreator = ft.VectorAssembler(inputCols=[col for col in births.columns[2:]] + [encoder.getOutputCol()], outputCol='features')

### Create an estimator

In this example we will (once again) us the Logistic Regression model.

In [None]:
import pyspark.ml.classification as cl
logistic = cl.LogisticRegression(maxIter=10, regParam=0.01, labelCol='INFANT_ALIVE_AT_REPORT')

### Create a pipeline

All that is left now is to creat a `Pipeline` and fit the model. First, let's load the `Pipeline` from the package.

In [None]:
from pyspark.ml import Pipeline

pipeline = Pipeline(stages=[encoder, featuresCreator, logistic])

### Fit the model

Conventiently, `DataFrame` API has the `.randomSplit(...)` method.

In [None]:
births_train, births_test = births.randomSplit([0.7, 0.3], seed=200)

Now run our `pipeline` and estimate our model.

In [None]:
model = pipeline.fit(births_train)
test_model = model.transform(births_test)
test_model.take(5)

### Model performance

Obviously, we would like to now test how well our model did.

In [None]:
import pyspark.ml.evaluation as ev

evaluator = ev.BinaryClassificationEvaluator(rawPredictionCol='probability', labelCol='INFANT_ALIVE_AT_REPORT')

print(evaluator.evaluate(test_model, {evaluator.metricName: 'areaUnderROC'}))
print(evaluator.evaluate(test_model, {evaluator.metricName: 'areaUnderPR'}))

### Saving the model

PySpark allows you to save the `Pipeline` definition for later use.

In [None]:
pipeline_path = './Logistic_Pipeline'
pipeline.write().overwrite().save(pipeline_path)

So, you can load it up later and use straight away to `.fit(...)` and predict.

In [None]:
loaded_pipeline = Pipeline.load(pipeline_path)
loaded_pipeline.fit(births_train).transform(births_test).take(5)

You can also save the whole model

In [None]:
from pyspark.ml import PipelineModel

model_path = './Logistic_Model'
model.write().overwrite().save(model_path)

loaded_model = PipelineModel.load(model_path)
loaded_model.transform(births_test).take(5)

## Hyperparameter tuning

### Grid search

Load the `.tuning` part of the package. Specify our model and the list of parameters we want to loop through.

In [None]:
import pyspark.ml.tuning as tune
logistic = cl.LogisticRegression(labelCol='INFANT_ALIVE_AT_REPORT')

grid = tune.ParamGridBuilder().addGrid(logistic.maxIter, [2, 10, 50]).addGrid(logistic.regParam, [0.01, 0.05, 0.3]).build()

Next, we need some way of comparing the models.

In [None]:
evaluator = ev.BinaryClassificationEvaluator(rawPredictionCol='probability', labelCol='INFANT_ALIVE_AT_REPORT')

Create the logic that will do the validation work for us.

In [None]:
cv = tune.CrossValidator(estimator=logistic, estimatorParamMaps=grid, evaluator=evaluator)

Create a purely transforming `Pipeline`.

In [None]:
pipeline = Pipeline(stages=[encoder, featuresCreator])
data_transformer = pipeline.fit(births_train)

Having done this, we are ready to find the optimal combination of parameters for our model.

In [None]:
cvModel = cv.fit(data_transformer.transform(births_train))

The `cvModel` will return the best model estimated. We can now use it to see if it performed better than our previous model.

In [None]:
data_test = data_transformer.transform(births_test)
results = cvModel.transform(data_test)

print(evaluator.evaluate(results, {evaluator.metricName: 'areaUnderROC'}))
print(evaluator.evaluate(results, {evaluator.metricName: 'areaUnderPR'}))

What parameters has the best model? The answer is a little bit convoluted but here's how you can extract it.

In [None]:
cvModel.getEstimatorParamMaps()

In [None]:
cvModel.avgMetrics

In [None]:
results = []
for params, metric in zip(cvModel.getEstimatorParamMaps(), cvModel.avgMetrics):
    parameters = []
    for key, paramValue in zip(params.keys(), params.values()):
        parameters.append({key.name: paramValue})
    results.append((parameters, metric))

sorted(results, key=lambda x: x[1], reverse=True)

### Train-Validation splitting

Use the `ChiSqSelector` to select only top 5 features, thus limiting the complexity of our model.

In [None]:
selector = ft.ChiSqSelector(numTopFeatures=5, featuresCol=featuresCreator.getOutputCol(), outputCol='selectedFeatures', labelCol='INFANT_ALIVE_AT_REPORT')

logistic = cl.LogisticRegression(labelCol='INFANT_ALIVE_AT_REPORT', featuresCol='selectedFeatures')

pipeline = Pipeline(stages=[encoder, featuresCreator, selector])
feature_transformer = pipeline.fit(births_train)

The `TrainValidationSplit` object gets created in the same fashion as the `CrossValidator` model.

In [None]:
grid = tune.ParamGridBuilder().addGrid(logistic.maxIter, [2, 10, 50]).addGrid(logistic.regParam, [0.01, 0.05, 0.3]).build()
tvs = tune.TrainValidationSplit(estimator=logistic, estimatorParamMaps=grid, evaluator=evaluator, collectSubModels=True)

As before, we fit our data to the model, and calculate the results.

In [None]:
tvsModel = tvs.fit(feature_transformer.transform(births_train))

data_test = feature_transformer.transform(births_test)
results = tvsModel.transform(data_test)

print(evaluator.evaluate(results, {evaluator.metricName: 'areaUnderROC'}))
print(evaluator.evaluate(results, {evaluator.metricName: 'areaUnderPR'}))

In [None]:
tvsModel.validationMetrics

In [None]:
results = []
for params, metric in zip(tvsModel.getEstimatorParamMaps(), tvsModel.validationMetrics):
    parameters = []
    for key, paramValue in zip(params.keys(), params.values()):
        parameters.append({key.name: paramValue})
    results.append((parameters, metric))

sorted(results, key=lambda x: x[1], reverse=True)

### Random Forest Classification

We will now use the `RandomForestClassfier` to model the chances of survival for an infant.

First, we need to cast the label feature to `DoubleType`.

In [None]:
import pyspark.sql.functions as fn
import pyspark.sql.types as types

births = births.withColumn('INFANT_ALIVE_AT_REPORT', fn.col('INFANT_ALIVE_AT_REPORT').cast(types.DoubleType()))
births_train, births_test = births.randomSplit([0.7, 0.3], seed=200)

We are ready to build our model.

In [None]:
classifier = cl.RandomForestClassifier(numTrees=5, maxDepth=5, labelCol='INFANT_ALIVE_AT_REPORT')
pipeline = Pipeline(stages=[encoder, featuresCreator, classifier])
model = pipeline.fit(births_train)
test = model.transform(births_test)

Let's now see how the `RandomForestClassifier` model performs compared to the `LogisticRegression`.

In [None]:
evaluator = ev.BinaryClassificationEvaluator(labelCol='INFANT_ALIVE_AT_REPORT')

print(evaluator.evaluate(test, {evaluator.metricName: "areaUnderROC"}))
print(evaluator.evaluate(test, {evaluator.metricName: "areaUnderPR"}))

Let's test how well would one tree do, then.

In [None]:
classifier = cl.DecisionTreeClassifier(maxDepth=5, labelCol='INFANT_ALIVE_AT_REPORT')
pipeline = Pipeline(stages=[encoder, featuresCreator, classifier])
model = pipeline.fit(births_train)
test = model.transform(births_test)

evaluator = ev.BinaryClassificationEvaluator(labelCol='INFANT_ALIVE_AT_REPORT')
print(evaluator.evaluate(test, {evaluator.metricName: "areaUnderROC"}))
print(evaluator.evaluate(test, {evaluator.metricName: "areaUnderPR"}))

### Regression

In this section we will try to predict the `MOTHER_WEIGHT_GAIN`.

In [None]:
features = ['MOTHER_AGE_YEARS','MOTHER_HEIGHT_IN',
            'MOTHER_PRE_WEIGHT','DIABETES_PRE',
            'DIABETES_GEST','HYP_TENS_PRE', 
            'HYP_TENS_GEST', 'PREV_BIRTH_PRETERM',
            'CIG_BEFORE','CIG_1_TRI', 'CIG_2_TRI', 'CIG_3_TRI']

First, we will collate all the features together and use the `ChiSqSelector` to select only the top 6 most important features.

In [None]:
featuresCreator = ft.VectorAssembler(inputCols=[col for col in features[1:]], outputCol='features')

selector = ft.ChiSqSelector(numTopFeatures=5, outputCol="selectedFeatures", labelCol='MOTHER_WEIGHT_GAIN')

In order to predict the weight gain we will use the gradient boosted trees regressor.

In [None]:
import pyspark.ml.regression as reg
regressor = reg.GBTRegressor(maxIter=15, maxDepth=3, labelCol='MOTHER_WEIGHT_GAIN')

Finally, again, we put it all together into a `Pipeline`.

In [None]:
pipeline = Pipeline(stages=[featuresCreator, selector, regressor])
weight_gain = pipeline.fit(births_train)

Having created the `weight_gain` model, let's see if it performs well on our testing data.

In [None]:
evaluator = ev.RegressionEvaluator(predictionCol="prediction", labelCol='MOTHER_WEIGHT_GAIN')

print(evaluator.evaluate(weight_gain.transform(births_test), {evaluator.metricName: 'r2'}))
print(evaluator.evaluate(weight_gain.transform(births_test), {evaluator.metricName: 'rmse'}))

### Clustering

In this example we will use k-means model to find similarities in the births data.

In [None]:
import pyspark.ml.clustering as clus

kmeans = clus.KMeans(k = 5, featuresCol='features')
pipeline = Pipeline(stages=[encoder, featuresCreator, kmeans])
model = pipeline.fit(births_train)

Having estimated the model, let's see if we can find some differences between clusters.

In [None]:
train = model.transform(births_train)
train.groupBy('prediction').agg(fn.count('*'), fn.avg('MOTHER_HEIGHT_IN')).collect()

In [None]:
test = model.transform(births_test)
test.groupBy('prediction').agg(fn.count('*'), fn.avg('MOTHER_HEIGHT_IN')).collect()

## Text Mining with Spark ML

### Feature extraction

Here we use an Airbnb review dataset from all properties in Denver area.

In [None]:
reviews = spark.read.options(inferSchema = True, multiLine = True, escape = '\"').csv('reviews.csv.gz', header=True)
reviews.show()

In [None]:
reviews = reviews.dropna(subset=['comments'])
reviews = reviews.withColumn('comments', fn.regexp_replace(fn.col("comments"), '([^\s\w_]|_)+', ' ')).withColumn('comments', fn.regexp_replace(fn.col("comments"), '[\n\r]', ' '))
reviews.show()

First, we need to tokenize this text.

In [None]:
tokenizer = ft.RegexTokenizer(inputCol='comments', outputCol='comments_tok', pattern='\s+|[,.\"/!]')

The output of the tokenizer looks similar to this.

In [None]:
tok = tokenizer.transform(reviews).select('comments_tok') 
tok.take(5)

Use the `StopWordsRemover(...)`.

In [None]:
stopwords = ft.StopWordsRemover(inputCol=tokenizer.getOutputCol(), outputCol='comments_stop')

The output of the method looks as follows

In [None]:
stopwords.transform(tok).select('comments_stop').take(5)

Build `NGram` model and the `Pipeline`.

In [None]:
ngram = ft.NGram(n=2, inputCol=stopwords.getOutputCol(), outputCol="nGrams")
pipeline = Pipeline(stages=[tokenizer, stopwords, ngram])

Now that we have the `pipeline` we follow in the very similar fashion as before.

In [None]:
data_ngram = pipeline.fit(reviews).transform(reviews)
data_ngram.select('nGrams').take(5)

That's it. We got our n-grams and we can then use them in further NLP processing.

First, we will once again use the `RegexTokenizer` and the `StopWordsRemover` models.

Next to model text we have `CountVectorizer` in our pipeline.

In [None]:
stringIndexer = ft.CountVectorizer(inputCol=stopwords.getOutputCol(), outputCol="comments_indexed")
tokenized = stopwords.transform(tokenizer.transform(reviews))
stringIndexer.fit(tokenized).transform(tokenized).select('comments_indexed').take(5)

We will use the `LDA` model - the Latent Dirichlet Allocation model - to extract the topics.

In [None]:
import pyspark.ml.clustering as clus
lda = clus.LDA(k=5, optimizer='online', featuresCol=stringIndexer.getOutputCol())

Put these transformers and estimators together.

In [None]:
pipeline = Pipeline(stages=[tokenizer, stopwords, stringIndexer, lda])

Let's see if we have properly uncovered the topics.

In [None]:
pipeline_model = pipeline.fit(reviews)
topics = pipeline_model.transform(reviews)
topics.select('topicDistribution').take(5)

In [None]:
#vectorized_model = stringIndexer.fit(tokenized)
#vectorized = vectorized_model.transform(tokenized)
#lda = clus.LDA(k=5, optimizer='online', featuresCol=stringIndexer.getOutputCol())
#topic_model = lda.fit(vectorized)
#topics = topic_model.transform(vectorized)
#topics.select('topicDistribution').take(5)

Now we want to know the topics. Here is how we extract them.

In [None]:
vectorized_model = pipeline_model.stages[2]
topic_model = pipeline_model.stages[3]
topic_model.describeTopics(10).take(5)

In [None]:
vocab = vectorized_model.vocabulary
topic_words_list = topic_model.describeTopics(10)

topic_words_rdd = topic_words_list.rdd
topics_words = topic_words_rdd.map(lambda row: row['termIndices']).map(lambda idx_list: [vocab[idx] for idx in idx_list]).collect()

for idx, topic in enumerate(topics_words):
    print("topic: {}".format(idx))
    print("*"*25)
    for word in topic:
       print(word)
    print("*"*25)

In [None]:
stringIndexer = ft.CountVectorizer(inputCol=ngram.getOutputCol(), outputCol="comments_n_indexed")
nlda = clus.LDA(k=5, optimizer='online', featuresCol=stringIndexer.getOutputCol())

pipeline = Pipeline(stages=[tokenizer, stopwords, ngram, stringIndexer, nlda])

pipeline_model = pipeline.fit(reviews)
topics = pipeline_model.transform(reviews)
topics.select('topicDistribution').take(5)

In [None]:
#n_vectorized_model = stringIndexer.fit(data_ngram)
#n_vectorized = n_vectorized_model.transform(data_ngram)

#n_topic_model = nlda.fit(n_vectorized)
#n_topics = n_topic_model.transform(n_vectorized)
#n_topics.select('topicDistribution').take(5)

In [None]:
n_vectorized_model = pipeline_model.stages[3]
n_topic_model = pipeline_model.stages[4]

vocab = n_vectorized_model.vocabulary
n_topic_words_list = n_topic_model.describeTopics(10)

n_topic_words_rdd = n_topic_words_list.rdd
n_topics_words = n_topic_words_rdd.map(lambda row: row['termIndices']).map(lambda idx_list: [vocab[idx] for idx in idx_list]).collect()

for idx, topic in enumerate(n_topics_words):
    print("topic: {}".format(idx))
    print("*"*25)
    for word in topic:
       print(word)
    print("*"*25)

Last we use TF-IDF to train topic model; it is not always useful to train LDA with TF-IDF, as sometimes TF is sufficient. Here we just try it.

In [None]:
#use tf-idf vector
tf = ft.CountVectorizer(inputCol=stopwords.getOutputCol(), outputCol="comments_tf")
idf = ft.IDF(inputCol=tf.getOutputCol(), outputCol="comments_tfidf")
lda = clus.LDA(k=5, optimizer='online', maxIter=10, featuresCol=idf.getOutputCol())

pipeline = Pipeline(stages=[tokenizer, stopwords, tf, idf, lda])
pipeline_model = pipeline.fit(reviews)

topics = pipeline_model.transform(reviews)
topics.select('topicDistribution').take(5)

In [None]:
tf_model = pipeline_model.stages[2]
topic_model = pipeline_model.stages[4]
vocab = tf_model.vocabulary
topic_words_list = topic_model.describeTopics(20)
topic_words_rdd = topic_words_list.rdd
topics_words = topic_words_rdd.map(lambda row: row['termIndices']).map(lambda idx_list: [vocab[idx] for idx in idx_list]).collect()

for idx, topic in enumerate(topics_words):
    print("topic: {}".format(idx))
    print("*"*25)
    for word in topic:
       print(word)
    print("*"*25)