**Set Up**

In [1]:
import os

# Java
! apt-get update -qq
! apt-get install -y openjdk-8-jdk-headless -qq > /dev/null
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["PATH"] = os.environ["JAVA_HOME"] + "/bin:" + os.environ["PATH"]
! java -version

# Pyspark
! pip install --ignore-installed pyspark==2.4.7

# Spark NLP
! pip install --ignore-installed spark-nlp==2.7.3

# SparkSession start
import sparknlp
spark = sparknlp.start()

openjdk version "1.8.0_292"
OpenJDK Runtime Environment (build 1.8.0_292-8u292-b10-0ubuntu1~18.04-b10)
OpenJDK 64-Bit Server VM (build 25.292-b10, mixed mode)
Collecting pyspark==2.4.7
  Downloading pyspark-2.4.7.tar.gz (217.9 MB)
[K     |████████████████████████████████| 217.9 MB 45 kB/s 
[?25hCollecting py4j==0.10.7
  Downloading py4j-0.10.7-py2.py3-none-any.whl (197 kB)
[K     |████████████████████████████████| 197 kB 19.1 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-2.4.7-py2.py3-none-any.whl size=218279466 sha256=dc4bff992861a6a116e6749ed7a5df39463dc30f80c4368c96e4d114090bd383
  Stored in directory: /root/.cache/pip/wheels/da/28/74/56054e5fe3413c8c58b67e4d7483d4864fe483920c9b8ec754
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.7 pyspark-2.4.7
Collecting spark-nlp==2.7.3
  Downloading spark_nlp-2.7.3-

In [8]:
import re
import nltk
import sparknlp
from sparknlp.annotator import *
## Pretrained Pipeline from Spark NLP
from sparknlp.pretrained import PretrainedPipeline
from sparknlp.base import *
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline # Start Spark Session with Spark NLP
from pyspark.sql.functions import col

In [3]:
# To use Spark and its API import the SparkContext
from pyspark import SparkContext
from pyspark.sql import SQLContext
# Create Spark Context
sc = SparkContext.getOrCreate()
sqlContext = SQLContext(sc)

**Compile Dataframe**

In [17]:
# load data
df_full = sqlContext.read.format('csv').option("delimiter",",").option("header","true").load('/content/samp3_3.csv')

# verify 11,382 records
df.count()

11382

In [65]:
# select needed columns
df = df_full.select(col("label"),col("clnNoMent"))
# drop na
df_label = df.na.drop()
print(df_label.count())
# for all
df_all = df_full.select(col("match2022"),col("clnNoMent"))
print(df_all.count())

563
11382


**Pipeline Set Up**

In [107]:
# taken from homework with small modifications
from pyspark.ml.feature import HashingTF, IDF, StringIndexer, SQLTransformer,IndexToString
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.classification import LinearSVC
document_assembler = DocumentAssembler().setInputCol("text").setOutputCol("document") # convert text column to spark nlp document
tokenizer = Tokenizer().setInputCols(["document"]).setOutputCol("token") # convert document to array of tokens
 
# clean tokens 
normalizer = Normalizer().setInputCols(["token"]).setOutputCol("normalized")
stopwords_cleaner = StopWordsCleaner().setInputCols("normalized").setOutputCol("cleanTokens").setCaseSensitive(False) # remove stopwords
stemmer = Stemmer().setInputCols(["cleanTokens"]).setOutputCol("stem") # stem
finisher = Finisher().setInputCols(["stem"]).setOutputCols(["token_features"]).setOutputAsArray(True).setCleanAnnotations(False)
finisher2 = Finisher().setInputCols(["token"]).setOutputCols(["token_features"]).setOutputAsArray(True).setCleanAnnotations(False)# TF
hashingTF = HashingTF(inputCol="token_features", outputCol="rawFeatures") # TF
idf = IDF(inputCol="rawFeatures", outputCol="features") #IDF
label_stringIdx = StringIndexer(inputCol = "category", outputCol = "label") # convert labels (string) to integers
lr = LogisticRegression(maxIter=10, regParam=0.2, elasticNetParam=0.0) # LR model
svm = LinearSVC(maxIter=10, regParam=0.2) #SVM model 
svm2 = LinearSVC(regParam=0.455) #SVM2 model 
label_to_stringIdx = IndexToString(inputCol="label", outputCol="article_class") # convert index(integer) to corresponding class labels
# lr pipeline
lr_pipeline = Pipeline(stages=[document_assembler, tokenizer, normalizer, stopwords_cleaner, stemmer, finisher, hashingTF, idf, label_stringIdx, lr, label_to_stringIdx])
# svm pipeline
# no additional text cleaning
svm3_pipeline = Pipeline(stages=[document_assembler, tokenizer, finisher2, hashingTF, idf, label_stringIdx, svm2, label_to_stringIdx])

In [108]:
# evaluation + validation
from sklearn.metrics import classification_report, accuracy_score, confusion_matrix
from sklearn.metrics import roc_curve, auc, roc_auc_score, f1_score, precision_score, recall_score
from sklearn.model_selection import StratifiedKFold, cross_val_score, train_test_split, GridSearchCV, ShuffleSplit
from sklearn import metrics

In [109]:
# function to display 2 decimals of float
def two_d(num):
  return ("{:.4f}".format(num))

# second function to print precision, recall, f1, and accuracy
def eval_metrics(p_labels,t_labels):
  pre = precision_score(p_labels,t_labels)
  rec = recall_score(p_labels,t_labels)
  f1 = f1_score(p_labels,t_labels)
  acc = accuracy_score(p_labels,t_labels)
  return pre, rec, f1, acc

# for repeated printing
def eval_metrics_print(pr, re, fone, ac):
  print('Precision: '+two_d(pr))
  print('Recall: '+two_d(re))
  print('F1: '+two_d(fone))
  print('Accuracy: '+two_d(ac))

In [110]:
# 70/30 spit for training and test data
(trainingData, testData) = df_label.withColumnRenamed('clnNoMent', 'text').withColumnRenamed('label', 'category').randomSplit([0.7, 0.3], seed = 4)

**LR Model**

In [111]:
lr_pipeline_model = lr_pipeline.fit(trainingData)
lrpredictions =  lr_pipeline_model.transform(testData)

Evaluate

In [112]:
# create pandasDF
pandasDF = lrpredictions.select(['prediction', 'label']).toPandas()

In [113]:
from pyspark.sql.functions import udf
from pyspark.sql.types import FloatType

# create function to get probs
secondelement = udf(lambda v:float(v[1]),FloatType())

# add probs to df
lrprobs = lrpredictions.select(secondelement('probability')).toPandas()
lrresults = pandasDF.join(lrprobs)

In [114]:
#print metrics
lrpre, lrrec, lrf1, lracc = eval_metrics(lrresults['label'], lrresults['prediction'])
eval_metrics_print(lrpre, lrrec, lrf1, lracc)

# AUC ROC
lr_roc_auc_scor = roc_auc_score(lrresults['label'], lrprobs)
print('AUC ROC: '+two_d(lr_roc_auc_scor))

Precision: 0.6667
Recall: 0.2857
F1: 0.4000
Accuracy: 0.9259
AUC ROC: 0.8359


Full Dataset

In [118]:
df_all_F = df_all.withColumnRenamed('clnNoMent', 'text').withColumnRenamed('match2022', 'category')

In [119]:
lrFullpredictions =  lr_pipeline_model.transform(df_all_F)

In [125]:
# create pandas DF
pandasDF2 = lrFullpredictions.select(['prediction', 'label']).toPandas()

# add probs to df
lrFprobs = lrFullpredictions.select(secondelement('probability')).toPandas()
lrFresults = pandasDF.join(lrFprobs)

In [127]:
#print metrics
lrpr, lrre, lrf, lrac = eval_metrics(pandasDF2['label'], pandasDF2['prediction'])
eval_metrics_print(lrpr, lrre, lrf, lrac)

# AUC ROC
lr_roc_auc_sco = roc_auc_score(pandasDF2['label'], lrFprobs)
print('AUC ROC: '+two_d(lr_roc_auc_sco))

Precision: 0.4792
Recall: 0.2081
F1: 0.2902
Accuracy: 0.9802
AUC ROC: 0.9588


**SVM Model**

In [115]:
svm_pipeline_model = svm3_pipeline.fit(trainingData)
svmpredictions =  svm_pipeline_model.transform(testData)

Evaluate

In [105]:
# create pandasDR
svmresults = svmpredictions.select(['prediction', 'label']).toPandas()

In [106]:
#print metrics
svmpre, svmrec, svmf1, svmacc = eval_metrics(svmresults['label'], svmresults['prediction'])
eval_metrics_print(svmpre, svmrec, svmf1, svmacc)

Precision: 0.8571
Recall: 0.4286
F1: 0.5714
Accuracy: 0.9444


Full Dataset

In [129]:
svmFullpredictions =  svm_pipeline_model.transform(df_all_F)

In [130]:
# create pandasDR
svmFullresults = svmFullpredictions.select(['prediction', 'label']).toPandas()

In [131]:
#print metrics
lrpr, lrre, lrf, lrac = eval_metrics(svmFullresults['label'], svmFullresults['prediction'])
eval_metrics_print(lrpr, lrre, lrf, lrac)

Precision: 0.4329
Recall: 0.5837
F1: 0.4971
Accuracy: 0.9771


In [134]:
svmFullresults.to_csv('PySpSVMResults.csv')