In [1]:
import numpy
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import HashingTF, Tokenizer,IDF
from pyspark.sql.types import DoubleType
from pyspark.sql.functions import col, expr, when
import pandas as pd
from pyspark.ml.classification import LinearSVC
from sklearn.metrics import confusion_matrix
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [2]:
AFR_df=sqlContext.read.format("csv").option("header", "true").option("inferSchema", "true").load("/FileStore/tables/Reviews.csv")

AFR_df = AFR_df.na.drop()

#conditionally adding a new column based on the Score column
#when you use any ML model the o/p column is called label in a classification problem
data_frame= AFR_df.withColumn("label", when(AFR_df.Score > 3, 1).otherwise(0))

#Configure an ML pipeline, which consists of three stages: tokenizer, hashingTF, and lr.
# the label column data type is double

changedTypedf = data_frame.withColumn("label", data_frame["label"].cast(DoubleType()))

In [3]:
X = changedTypedf['Summary','label']

In [4]:
train, test = X.randomSplit([0.7, 0.3], seed = 2018)

In [5]:
train.head()

In [7]:
#breaks each scentence on spaces into separate words, repeated words are also allowed
tokenizer = Tokenizer(inputCol="Summary", outputCol="words")

#TF(t) = (Number of times term t appears in a document) / (Total number of terms in the document)
# numFeatures = Total number of terms in the document >> assumed to be 20 as the summary size
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="rawFeatures", numFeatures=2000)
# featurizedData = hashingTF.transform(wordsData)

#IDF(t) = log_e(Total number of documents / Number of documents with term t in it >> importance of the term
idf = IDF(inputCol=hashingTF.getOutputCol(), outputCol="features")

#TF-IDF score is calculated and each new sentence's TF-IDF is calculated and then this is fed into the model here logistic regression where we set a threshold score.

In [8]:
#SVM model
svm = LinearSVC()

pipeline = Pipeline(stages=[tokenizer, hashingTF,idf, svm])

#Fit the pipeline to training documents.
model = pipeline.fit(train)

In [9]:
prediction = model.transform(test)
selected = prediction.select("prediction")

In [10]:
# for row in selected.collect():
#     summary, prob, prediction = row
#     print("(%s) --> prob=%s, prediction=%f" % (summary, str(prob), prediction))

# Select (prediction, true label) and compute test error
evaluator = MulticlassClassificationEvaluator(
    labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(prediction)

y_true = test.select("label")
y_true = y_true.toPandas()

y_pred = prediction.select("prediction")
y_pred = y_pred.toPandas()

classes = [0,1]

cnf_matrix = confusion_matrix(y_true, y_pred,labels=classes)

print("Accuracy = %g" % (accuracy))
print(cnf_matrix)

In [11]:
import matplotlib.pyplot as plt

In [12]:
plt.figure()
plt.figure(figsize = (10,7))
plt.title('Train Error')
x = [1,2,3,4,5,6]
plt.plot(x, label='X points')
plt.legend(loc='lower right')
show(plt)