# PySpark Logistic Regression

Our full dataset file has around 9 million samples. When trying to run feature_generator.

Machine:
* 2018 Mac Mini - 6 core

Docker Configuration:
* 9 CPUs
* 24 GB Ram
* 3 GB swap


# References:

* https://spark.apache.org/docs/2.2.0/mllib-feature-extraction.html
* https://towardsdatascience.com/countvectorizer-hashingtf-e66f169e2d4e
* https://medium.com/@dhiraj.p.rai/logistic-regression-in-spark-ml-8a95b5f5434c
* packaging spark job - https://developerzen.com/best-practices-writing-production-grade-pyspark-jobs-cb688ac4d20f



In [1]:
import pyspark
from pyspark import SparkContext, SparkFiles
from pyspark.sql import SparkSession, DataFrameReader, SQLContext

import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import logging

log = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO)
sns.set()
%matplotlib inline

DEBUG = False

N_CLASSES = 5

if DEBUG:
    MIN_DF = 1
    DF_PERCENTAGE = 0.001
    DATA_FILE = "/home/jupyter/dataset/amazon_reviews/amazon_reviews_us_Wireless_v1_00-test1k-preprocessed.csv"
else:
    DF_PERCENTAGE = 0.001
    DATA_FILE = "/home/jupyter/dataset/amazon_reviews/amazon_reviews_us_Wireless_v1_00-1m-preprocessed.csv"
#     DATA_FILE = "/home/jupyter/dataset/amazon_reviews/amazon_reviews_us_Wireless_v1_00-all-preprocessed.csv"


spark = SparkSession.builder \
            .master("local[*]") \
            .appName("Test NGram TFIDF (local)") \
            .config("spark.logConf", True) \
            .config("spark.driver.memory", "24g") \
            .config("spark.executor.memory", "24g") \
            .config("spark.maxResultSize", "20g") \
            .getOrCreate()




In [2]:
from pyspark.ml.feature import NGram, CountVectorizer, VectorAssembler, Tokenizer, IDF
from pyspark.ml import Pipeline
from datetime import datetime

class Timer(object):
    
    def __init__(self, description: str):
        self.start_time = datetime.now()
        self.description = description
        
    def stop(self):
        self.end_time = datetime.now()
        self.print_duration_min()
        
    def print_duration_min(self):
        self.duration = int((self.end_time - self.start_time).total_seconds() / 60)
        print(f"{self.description} duration: {self.duration} minutes")
    
    
def show_df(df, columns: list, rows: int=10, sample=True, truncate=False):
    """
    Prints out DF with sampling
    """
    if sample:
        sample_percent = min(rows / df.count(), 1.0)
        log.info(f'sampling percentage: {sample_percent}')
        df.select(columns).sample(False, sample_percent, seed=1).show(rows, truncate=truncate)
    else:
        df.select(columns).show(rows, truncate=truncate)








In [3]:
file_timer = Timer("file load time")
df = spark.read.csv(SparkFiles.get(DATA_FILE), 
                    header=True, 
                    inferSchema= True)
df.collect()
file_timer.stop()

file load time duration: 0 minutes


In [4]:
def build_ngrams(inputCol, min_df, n=3):
    log.info(f'Creating TFIDF using min_df: {min_df}')
    
    tokenizer = [Tokenizer(inputCol = inputCol, outputCol = "words")]
    
    ngrams = [
        NGram(n=i, inputCol="words", outputCol="{0}_grams".format(i))
        for i in range(1, n + 1)
    ]

    vectorizers = [
        CountVectorizer(minDF=min_df, inputCol="{0}_grams".format(i),
            outputCol="{0}_counts".format(i))
        for i in range(1, n + 1)
    ]

    assembler = [VectorAssembler(
        inputCols=["{0}_counts".format(i) for i in range(1, n + 1)],
        outputCol="raw_features"
    )]
    
    idf = [IDF(minDocFreq=min_df).setInputCol("raw_features").setOutputCol("features")]

    return Pipeline(stages=tokenizer + ngrams + vectorizers + assembler + idf)



pipeline_timer = Timer("pipeline time")
# calculate a reasonable min_df
min_df = max(int(df.count() * DF_PERCENTAGE), 1)

df = build_ngrams("review_body", min_df).fit(df).transform(df)
pipeline_timer.stop()


INFO:__main__:Creating TFIDF using min_df: 995


pipeline time duration: 1 minutes


In [5]:
df.printSchema()
show_df(df, ["star_rating", "features"], truncate=True)

root
 |-- star_rating: integer (nullable = true)
 |-- helpful_votes: integer (nullable = true)
 |-- total_votes: integer (nullable = true)
 |-- review_headline: string (nullable = true)
 |-- review_body: string (nullable = true)
 |-- review_date: timestamp (nullable = true)
 |-- words: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- 1_grams: array (nullable = true)
 |    |-- element: string (containsNull = false)
 |-- 2_grams: array (nullable = true)
 |    |-- element: string (containsNull = false)
 |-- 3_grams: array (nullable = true)
 |    |-- element: string (containsNull = false)
 |-- 1_counts: vector (nullable = true)
 |-- 2_counts: vector (nullable = true)
 |-- 3_counts: vector (nullable = true)
 |-- raw_features: vector (nullable = true)
 |-- features: vector (nullable = true)



INFO:__main__:sampling percentage: 1.004330673865709e-05


+-----------+--------------------+
|star_rating|            features|
+-----------+--------------------+
|          2|(4073,[16,20,35,3...|
|          4|(4073,[13,17,18,2...|
|          5|(4073,[3],[1.4694...|
|          1|(4073,[0,5,12,31,...|
|          5|(4073,[1,17,20,22...|
|          5|(4073,[0,1,2,3,4,...|
|          4|(4073,[3,4,13,39,...|
|          4|(4073,[0,7,8,9,14...|
|          4|(4073,[0,1,2,3,8,...|
+-----------+--------------------+



# Split training and test data

In [6]:
train, test = df.randomSplit([0.9, 0.1], seed=1)

In [7]:
train_size = train.count()
test_size = test.count()

print(f'Training size: {train_size} Test size: {test_size}')

Training size: 896004 Test size: 99684


# Assign class weights to handle imbalanced classes

In [8]:
from sklearn.utils.class_weight import compute_class_weight

# only do this for small files - takes too long for large datasets - we will custom compute this
if DEBUG:
    labels = train.select("star_rating").toPandas().astype({"star_rating": np.int8})
    class_weights_sklearn = compute_class_weight('balanced', sorted(labels.star_rating.unique()), labels.star_rating.tolist())
    print(f'sklearn class weights: {class_weights_sklearn}')
    

# custom calculate class weights
n_samples = train.count()
class_weights = []

for i in np.arange(1, N_CLASSES + 1):
    class_samples = train.filter(f"star_rating == {i}").count()
    class_weights.append(n_samples / (N_CLASSES * class_samples))
    
print(f'calculated class weights: {class_weights}')

calculated class weights: [1.4238671487028725, 3.0216301891882775, 2.212055153003913, 1.200225041190575, 0.37292633489134824]


In [9]:
from pyspark.sql.functions import when, lit, col
from pyspark.sql.types import NumericType



train = train.withColumn("class_weights", lit(0))
train = train.withColumn("class_weights", when(train.star_rating == 1, class_weights[0]).otherwise(train.class_weights))
train = train.withColumn("class_weights", when(train.star_rating == 2, class_weights[1]).otherwise(train.class_weights))
train = train.withColumn("class_weights", when(train.star_rating == 3, class_weights[2]).otherwise(train.class_weights))
train = train.withColumn("class_weights", when(train.star_rating == 4, class_weights[3]).otherwise(train.class_weights))
train = train.withColumn("class_weights", when(train.star_rating == 5, class_weights[4]).otherwise(train.class_weights))


show_df(train, ["star_rating", "class_weights", "features"], 20, truncate=True)


INFO:__main__:sampling percentage: 2.2321328922638738e-05


+-----------+-------------------+--------------------+
|star_rating|      class_weights|            features|
+-----------+-------------------+--------------------+
|          1| 1.4238671487028725|(4073,[0,2,9,12,2...|
|          1| 1.4238671487028725|(4073,[0,5,6,24,4...|
|          5|0.37292633489134824|(4073,[3,5,10,116...|
|          5|0.37292633489134824|(4073,[134,169,25...|
|          4|  1.200225041190575|(4073,[17,22,176,...|
|          5|0.37292633489134824|(4073,[2,20,186,2...|
|          5|0.37292633489134824|(4073,[1,2,5,13,1...|
|          2| 3.0216301891882775|(4073,[0,1,3,4,5,...|
|          5|0.37292633489134824|(4073,[8],[1.7774...|
|          4|  1.200225041190575|(4073,[6,8,9,19,2...|
|          4|  1.200225041190575|(4073,[6,16,19,11...|
|          4|  1.200225041190575|(4073,[2,8,11,25,...|
|          4|  1.200225041190575|(4073,[15,16,101,...|
|          1| 1.4238671487028725|(4073,[0,19,24,48...|
|          4|  1.200225041190575|(4073,[0,6,7,8,9,...|
|         

# Train our Model

In [10]:
from pyspark.ml.classification import LogisticRegression

train_timer = Timer("traing time")
lr = LogisticRegression(labelCol="star_rating", 
                        featuresCol="features", 
                        weightCol="class_weights",
                        maxIter=100)
model=lr.fit(train)
train_timer.stop()

predict_timer = Timer("predict time")
predict_train=model.transform(train)
predict_test=model.transform(test)
predict_timer.stop()


show_df(predict_test, ["star_rating", "prediction", "rawPrediction", "probability"], truncate=True)



traing time duration: 1 minutes
predict time duration: 0 minutes


INFO:__main__:sampling percentage: 0.00010031700172545243


+-----------+----------+--------------------+--------------------+
|star_rating|prediction|       rawPrediction|         probability|
+-----------+----------+--------------------+--------------------+
|          5|       5.0|[-10.081784191813...|[1.04346451398456...|
|          5|       5.0|[-10.082017947351...|[1.73272704012591...|
|          5|       4.0|[-10.082291993279...|[7.22390572680019...|
|          5|       4.0|[-10.081778525523...|[6.41747648247169...|
|          5|       5.0|[-10.082146324969...|[1.02695767828834...|
|          1|       1.0|[-10.084304030478...|[8.11113555667061...|
|          2|       1.0|[-10.081712626159...|[9.94228306826467...|
|          5|       5.0|[-10.083307101262...|[9.41287116474760...|
|          2|       1.0|[-10.085158843896...|[2.68945012184320...|
|          5|       5.0|[-10.088940966399...|[6.95275870100088...|
+-----------+----------+--------------------+--------------------+
only showing top 10 rows



In [11]:
predict_test.printSchema()

root
 |-- star_rating: integer (nullable = true)
 |-- helpful_votes: integer (nullable = true)
 |-- total_votes: integer (nullable = true)
 |-- review_headline: string (nullable = true)
 |-- review_body: string (nullable = true)
 |-- review_date: timestamp (nullable = true)
 |-- words: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- 1_grams: array (nullable = true)
 |    |-- element: string (containsNull = false)
 |-- 2_grams: array (nullable = true)
 |    |-- element: string (containsNull = false)
 |-- 3_grams: array (nullable = true)
 |    |-- element: string (containsNull = false)
 |-- 1_counts: vector (nullable = true)
 |-- 2_counts: vector (nullable = true)
 |-- 3_counts: vector (nullable = true)
 |-- raw_features: vector (nullable = true)
 |-- features: vector (nullable = true)
 |-- rawPrediction: vector (nullable = true)
 |-- probability: vector (nullable = true)
 |-- prediction: double (nullable = false)



# Evaluate our Model

Reference:
* https://spark.apache.org/docs/2.1.0/mllib-evaluation-metrics.html#multiclass-classification

In [12]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

evaluator = MulticlassClassificationEvaluator(labelCol="star_rating",
                                             predictionCol="prediction")
accuracy = evaluator.evaluate(predict_test, {evaluator.metricName: "accuracy"})
print(f"Overall Accuracy = {accuracy}")
f1 = evaluator.evaluate(predict_test, {evaluator.metricName: "f1"})
print(f"Overall F1 = {f1}")
weightedPrecision = evaluator.evaluate(predict_test, {evaluator.metricName: "weightedPrecision"})
print(f"Weighted Precision = {weightedPrecision}")
weightedRecall = evaluator.evaluate(predict_test, {evaluator.metricName: "weightedRecall"})
print(f"Weighted Recall = {weightedRecall}")


Overall Accuracy = 0.6413165603306449
Overall F1 = 0.6551891250685951
Weighted Precision = 0.6748378796407741
Weighted Recall = 0.6413165603306449


## Calculating class specific Recall, Precision, F1-Score

$precision = TP / (TP + FP)$

$recall = TP / (TP + FN)$

$F1 = 2 * (precision * recall) / (precision + recall)$


All weighted average metrics are calculated in the following manner:
$$weighted=\frac{1}{total support}\sum_{i=1}^n{metric}$$

support is the number of occurrences of each class in y_true


In [13]:
from sklearn.metrics import classification_report
from pprint import pprint

# check results from sklearn in debug mode
if DEBUG:
    print("sklearn classification report")
    pprint(classification_report(predict_test.select("star_rating").toPandas().star_rating.tolist(),
                               predict_test.select("prediction").toPandas().prediction.tolist(),
                               output_dict=True))


def pyspark_classification_report(test_df, truth_column, prediction_column, classes: int):
    """
    """
    precisions = []
    recalls = []
    f1s = []
    weighted_precisions = []
    weighted_recalls = []
    weighted_f1s = []
    
    total_support = test_df.count()
    
    dr_dict = {}
    
    print()
    for i in np.arange(1, classes + 1):
        support = test_df.filter(f'{truth_column} == {i}').count()
        
        predicted = test_df.filter(f'{prediction_column} == {i}')
        tp = predicted.filter(f'{truth_column} == {prediction_column}').count()
        fp = predicted.count() - tp
        
        predicted_count = predicted.count()
        precision = 0 if predicted_count == 0 else tp / predicted.count()
        precisions.append(precision)
        weighted_precisions.append(precision * support)
        
        recall = tp / support
        recalls.append(recall)
        weighted_recalls.append(recall * support)
        
        f1 = 0 if precision == 0 and recall == 0 else 2 * (precision * recall) / (precision + recall)
        f1s.append(f1)
        weighted_f1s.append(f1 * support)
        print(f'Class {i} precision: {precision} recall: {recall} f1: {f1} support: {support}')
        
        d = {
            "precision":precision,
            "recall":recall,
            "f1-score":f1,
            "support":support
        }
        dr_dict[str(i)] = d
        
    print(f'Total Support: {total_support}')
        
    accuracy = test_df.filter("star_rating == prediction").count() / total_support
    dr_dict["accuracy"] = accuracy
    print(f'Accuracy: {accuracy}')
        
    macro_avg_precision = sum(precisions) / len(precisions)
    macro_avg_recall = sum(recalls) / len(recalls)
    macro_avg_f1 = sum(f1s) / len(f1s)
    
    dr_dict["macro avg"] = {
        "precision": macro_avg_precision,
        "recall": macro_avg_recall,
        "f1-score": macro_avg_f1,
        "support": total_support
    }

    print(f'Macro Avg Precision: {macro_avg_precision} Recall: {macro_avg_recall} F1: {macro_avg_f1}')
    
    weighted_avg_precision = sum(weighted_precisions) / total_support
    weighted_avg_recall = sum(weighted_recalls) / total_support
    weighted_avg_f1 = sum(weighted_f1s) / total_support
    
    dr_dict["weighted avg"] = {
        "precision": weighted_avg_precision,
        "recall": weighted_avg_recall,
        "f1-score": weighted_avg_f1,
        "support": total_support
    }
    
    print(f'Weighted Avg Precision: {weighted_avg_precision} Recall: {weighted_avg_recall} F1: {weighted_avg_f1}')
    
    return dr_dict
        
        

cr = pyspark_classification_report(predict_test, "star_rating", "prediction", N_CLASSES)

print()
pprint(cr)
    



Class 1 precision: 0.7026024905554779 recall: 0.7121684867394695 f1: 0.7073531483307508 support: 14102
Class 2 precision: 0.2902851843264549 recall: 0.3864793949683593 f1: 0.3315458457464415 support: 6479
Class 3 precision: 0.3212204319506342 recall: 0.41272987556436513 f1: 0.3612704226709721 support: 9081
Class 4 precision: 0.4012675668228162 recall: 0.43798123195380173 f1: 0.4188213638586097 support: 16624
Class 5 precision: 0.8594705118101851 recall: 0.7557024607663209 f1: 0.8042531564838713 support: 53398
Total Support: 99684
Accuracy: 0.6413165603306449
Macro Avg Precision: 0.5149692370931136 Recall: 0.5410122899984633 F1: 0.5246487874181291
Weighted Avg Precision: 0.6748378796407741 Recall: 0.6413165603306449 F1: 0.6551891250685951

{'1': {'f1-score': 0.7073531483307508,
       'precision': 0.7026024905554779,
       'recall': 0.7121684867394695,
       'support': 14102},
 '2': {'f1-score': 0.3315458457464415,
       'precision': 0.2902851843264549,
       'recall': 0.3864793949

## Confusion Matrix

In [14]:
from sklearn.metrics import confusion_matrix


if DEBUG:
    print("sklearn confusion matrix")
    print(confusion_matrix(predict_test.select("star_rating").toPandas().star_rating.tolist(),
                            predict_test.select("prediction").toPandas().prediction.tolist()))
    

def pyspark_confusion_matrix(test_df, truth_column, prediction_column, n_classes):
    """
    Calculates confusiion matrix like sklearn would using pyspark dataframe
    
    :param test_df: pyspark dataframe with labels and predictions
    :param truth_column: column name with the truth
    :param prediction_column: column name for prediction
    :param n_classes: number of classes in predictions
    :return: np array with confusion matrix
    """
    cm = []
    for i in np.arange(1, n_classes + 1):
        current = []
        for j in np.arange(1, n_classes + 1):
            count = test_df.filter(f'({truth_column} == {i}) AND ({prediction_column} == {j})').count()

            current.append(count)
        cm.append(current)
    return np.asarray(cm)

cm = pyspark_confusion_matrix(predict_test, "star_rating", "prediction", N_CLASSES)

print(cm)
    

[[10043  2635   928   224   272]
 [ 1991  2504  1399   334   251]
 [  962  1887  3748  1697   787]
 [  431   742  2882  7281  5288]
 [  867   858  2711  8609 40353]]


In [15]:
def calculate_score(cr: dict):
    
    values = []
    values.append(cr["1"]["recall"])
    values.append(cr["2"]["recall"])
    values.append(cr["3"]["recall"])
    values.append(cr["4"]["recall"])
    values.append(cr["5"]["precision"])
    
    mean = 0
    for v in values:
        if v == 0:
            mean = 0
            break
        else:
            mean += 1 / v
    if mean > 0:
        mean = len(values) / mean

    return mean




score = calculate_score(cr)
print(f'Overall score: {score}')

Overall score: 0.5070364344705827
