# [TEST] pyspark_ls_script

## Testing model statements

### Import libraries

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.sql.types import StructType, StructField, StringType
from pyspark.ml.feature import RegexTokenizer, StopWordsRemover, CountVectorizer
from pyspark.ml.classification import LogisticRegression
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer

### Spark Init and Config

Create a SparkSession instance

In [None]:
spark = SparkSession.builder.appName("SocialApp").getOrCreate()

Define the schema for the data

In [None]:
customSchema = StructType([
    StructField("clean_text", StringType()), 
    StructField("category", StringType())])

### Dataset

In [None]:
filename = 'https://docs.google.com/spreadsheets/d/e/2PACX-1vQrsbFzUZtypCv80I7lGN4qs1m56Qss5X54FzTH-gb0lx569sjkRKCtSRemMhF1tca38rVu-mQFhbez/pubhtml?gid=817597830&single=true'

Read data from the CSV files

In [None]:
df = spark.read.format("csv").option("header", "true").schema(customSchema).load(filename)

### Preprocessing

Remove rows with null values

In [None]:
data = df.na.drop(how='any')

Group data by the "category" column and count the categories

In [None]:
data.groupBy("category").count().orderBy(col("count").desc())

Tokenize text using a regular expression

In [None]:
regexTokenizer = RegexTokenizer(inputCol="clean_text", outputCol="words", pattern="\\W")

Define stop words

In [None]:
add_stopwords = ["http","https","amp","rt","t","c","the"]
stopwordsRemover = StopWordsRemover(inputCol="words", outputCol="filtered").setStopWords(add_stopwords)

Create a "bag of words" representation from tokenized words

In [None]:
countVectors = CountVectorizer(inputCol="filtered", outputCol="features", vocabSize=30000, minDF=5)

Convert the "category" column to numeric labels

In [None]:
label_stringIdx = StringIndexer(inputCol="category", outputCol="label")

Define the pipeline

In [None]:
pipeline = Pipeline(stages=[regexTokenizer, stopwordsRemover, countVectors, label_stringIdx])

Fit the pipeline to the data

In [None]:
pipelineFit = pipeline.fit(data)
dataset = pipelineFit.transform(data)

### Training

Split the data into training and test sets

In [None]:
(trainingData, testData) = dataset.randomSplit([0.7, 0.3], seed=100)

### Model Training

Train a logistic regression model

In [None]:
lr = LogisticRegression(maxIter=20, regParam=0.3, elasticNetParam=0)
lrModel = lr.fit(trainingData)

### Evaluation

Make predictions on the test set

In [None]:
predictions = lrModel.transform(testData)

Show the top 10 predictions ordered by probability

In [None]:
predictions.filter(predictions['prediction'] == 0).select("clean_text", "category", "probability", "label", "prediction")\
    .orderBy("probability", ascending=False).show(n=10, truncate=30)

## Evaluate model metrics

### ROC curve

In [None]:
trainingSummary = lrModel.summary

# for multiclass, we can inspect metrics on a per-label basis
print("\nFalse positive rate by label:")
for i, rate in enumerate(trainingSummary.falsePositiveRateByLabel):
    print("label %d: %s" % (i, rate))

print("\nTrue positive rate by label:")
for i, rate in enumerate(trainingSummary.truePositiveRateByLabel):
    print("label %d: %s" % (i, rate))

print("\nPrecision by label:")
for i, prec in enumerate(trainingSummary.precisionByLabel):
    print("label %d: %s" % (i, prec))

print("\nRecall by label:")
for i, rec in enumerate(trainingSummary.recallByLabel):
    print("label %d: %s" % (i, rec))

print("\nF-measure by label:")
for i, f in enumerate(trainingSummary.fMeasureByLabel()):
    print("label %d: %s" % (i, f))

accuracy = trainingSummary.accuracy
falsePositiveRate = trainingSummary.weightedFalsePositiveRate
truePositiveRate = trainingSummary.weightedTruePositiveRate
fMeasure = trainingSummary.weightedFMeasure()
precision = trainingSummary.weightedPrecision
recall = trainingSummary.weightedRecall
print("\nAccuracy: %s\n\nFPR: %s\n\nTPR: %s\n\nF-measure: %s\n\nPrecision: %s\n\nRecall: %s"
      % (accuracy, falsePositiveRate, truePositiveRate, fMeasure, precision, recall))

## Cross Validation

Cross-validation is performed to tune the hyperparameters, and only the logistic regression of the count vectors will be tuned.

In order to evaluate and adjust the model in a more robust and precise way.

The objective of this section is to find the best values of the model hyperparameters that optimize the performance of the model on unseen data (test set) and avoid overfitting.

In [None]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")
evaluator.evaluate(predictions)

In [None]:
pipeline = Pipeline(stages=[regexTokenizer, stopwordsRemover, countVectors, label_stringIdx])

pipelineFit = pipeline.fit(data)
dataset = pipelineFit.transform(data)
(trainingData, testData) = dataset.randomSplit([0.7, 0.3], seed = 100)

lr = LogisticRegression(maxIter=20, regParam=0.3, elasticNetParam=0)

In [None]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

# Create ParamGrid for Cross Validation
paramGrid = (ParamGridBuilder()
             .addGrid(lr.regParam, [0.1, 0.3, 0.5])
             .addGrid(lr.elasticNetParam, [0.0, 0.1, 0.2])
             .build())

# Create 5-fold CrossValidator
cv = CrossValidator(estimator=lr, \
                    estimatorParamMaps=paramGrid, \
                    evaluator=evaluator, \
                    numFolds=5)

cvModel = cv.fit(trainingData)

predictions = cvModel.transform(testData)

# Evaluate best model
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")
evaluator.evaluate(predictions)
print("Test Area Under ROC: " + str(evaluator.evaluate(predictions, {evaluator.metricName: "areaUnderROC"})))

## Stop pyspark session

In [None]:
spark.stop()