In [1]:
# IMporting SQL libraries
from pyspark.sql.types import *
# from pyspark.sql.functions import * 
from pyspark.sql.functions import sum as _sum

In [2]:
# Reading files from createed table
csv = spark.sql('select * from azureData4')

In [3]:
csv.createTempView('mytable3')

In [4]:
csv.columns

In [5]:
spark.sql('select count(*) from mytable3').show()

In [6]:
# IMporting libraries
from pyspark.ml.feature import RegexTokenizer, StopWordsRemover, CountVectorizer
from pyspark.ml.classification import LogisticRegression

# regular expression tokenizer
regexTokenizer = RegexTokenizer(inputCol="processed_text", outputCol="words", pattern="\\s+")

# stop words
add_stopwords = ["http","https","amp","rt","t","c","can", # standard stop words
     "#keithlamontscott","#charlotteprotest","#charlotteriots","#keithscott"] # keywords used to pull data)
stopwordsRemover = StopWordsRemover(inputCol="words", outputCol="filtered").setStopWords(add_stopwords)

# bag of words count
# Here we created bag of words as 10000 beecause more than that will take hours to compute and sometimes fails to.
countVectors = CountVectorizer(inputCol="filtered", outputCol="features", vocabSize=10000, minDF=5)

In [7]:
# Creating pipeline of basic data cleaning and creating dataset with all features.
from pyspark.ml import Pipeline

pipeline = Pipeline(stages=[regexTokenizer, stopwordsRemover, countVectors])

# Fit the pipeline to training documents.
pipelineFit = pipeline.fit(csv)
# Transform dataset with new pipelined features.
dataset = pipelineFit.transform(csv)

In [8]:
dataset.dtypes

In [9]:
# Implementing logistic regression

In [10]:
(trainingData, testData) = dataset.randomSplit([0.7, 0.3], seed = 100)
print("Training Dataset Count: " + str(trainingData.count()))
print("Test Dataset Count: " + str(testData.count()))

In [11]:
trainingData.show()

In [12]:
trainingData.dtypes

In [13]:
from pyspark.ml.classification import LogisticRegression
# Build the model
lr = LogisticRegression(labelCol='y', maxIter=20, regParam=0.3, elasticNetParam=0, family = "binomial")

# Train model with Training Data
lrModel = lr.fit(trainingData)

In [14]:
trainingSummary = lrModel.summary

In [15]:
import matplotlib.pyplot as plt
objectiveHistory = trainingSummary.objectiveHistory
plt.plot(objectiveHistory)
plt.ylabel('Objective Function')
plt.xlabel('Iteration')
plt.show()

In [16]:
print("areaUnderROC: " + str(trainingSummary.areaUnderROC))

In [17]:
import matplotlib.pyplot as plt
import numpy as np

beta = np.sort(lrModel.coefficients)

plt.plot(beta)
plt.ylabel('Beta Coefficients')
plt.show()

In [18]:
predictions = lrModel.transform(testData)

In [19]:
from pyspark.sql.functions import col
see = predictions.select('y', col('y').alias('label'), 'rawPrediction')

In [20]:
see.show()

In [21]:
evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction")
print("Test: Area Under ROC: " + str(evaluator.evaluate(see, {evaluator.metricName: "areaUnderROC"})))

In [22]:
We are using decision tree classifier

In [23]:
(trainingData, testData) = dataset.randomSplit([0.7, 0.3], seed = 100)
print("Training Dataset Count: " + str(trainingData.count()))
print("Test Dataset Count: " + str(testData.count()))

In [24]:
from pyspark.ml.classification import DecisionTreeClassifier

# Create initial Decision Tree Model
dt = DecisionTreeClassifier(labelCol="y", featuresCol="features", maxDepth=3)
# Only for cli
# dt = DecisionTreeClassifier(labelCol="y", featuresCol="features")
# Train model with Training Data
dtModel = dt.fit(trainingData)

In [25]:
# Evaluate model
from pyspark.ml.evaluation import BinaryClassificationEvaluator
predictions = dtModel.transform(testData)

In [26]:
predictions.show()

In [27]:
from pyspark.sql.functions import col
see = predictions.select('y', col('y').alias('label'), 'rawPrediction')

In [28]:
see.show()

In [29]:
evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction")
print("Test: Area Under ROC: " + str(evaluator.evaluate(see, {evaluator.metricName: "areaUnderROC"})))

In [30]:
print ("numNodes = ", dtModel.numNodes)
print ("depth = ", dtModel.depth)

In [31]:
# Implementing random forest classifier.

In [32]:
(trainingData, testData) = dataset.randomSplit([0.7, 0.3], seed = 100)
print("Training Dataset Count: " + str(trainingData.count()))
print("Test Dataset Count: " + str(testData.count()))

In [33]:
trainingData.show()

In [34]:
from pyspark.ml.classification import RandomForestClassifier

# Create an initial RandomForest model.
rf = RandomForestClassifier(labelCol="y", \
                            featuresCol="features")

# Train model with Training Data
rfModel = rf.fit(trainingData)

In [35]:
predictions = rfModel.transform(testData)

In [36]:
predictions.show()

In [37]:
from pyspark.sql.functions import col
see = predictions.select('y', col('y').alias('label'), 'rawPrediction')

In [38]:
see.show()

In [39]:
evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction")
print("Test: Area Under ROC: " + str(evaluator.evaluate(see, {evaluator.metricName: "areaUnderROC"})))

In [40]:
# Implementing Naive Bayes Algorithm

In [41]:
(trainingData, testData) = dataset.randomSplit([0.7, 0.3], seed = 100)
print("Training Dataset Count: " + str(trainingData.count()))
print("Test Dataset Count: " + str(testData.count()))

In [42]:
trainingData.show()

In [43]:
from pyspark.ml.classification import NaiveBayes

# create the trainer and set its parameters
nb = NaiveBayes(labelCol="y", smoothing=1, modelType="multinomial")

# train the model
model = nb.fit(trainingData)

In [44]:
predictions = model.transform(testData)

In [45]:
from pyspark.sql.functions import col
see = predictions.select('y', col('y').alias('label'), 'rawPrediction')

In [46]:
see.show()

In [47]:
evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction")
print("Test: Area Under ROC: " + str(evaluator.evaluate(see, {evaluator.metricName: "areaUnderROC"})))

In [48]:
# Implementing cross validation

In [49]:
(trainingData, testData) = dataset.randomSplit([0.7, 0.3], seed = 100)
print("Training Dataset Count: " + str(trainingData.count()))
print("Test Dataset Count: " + str(testData.count()))

In [50]:
from pyspark.ml.classification import LogisticRegression
lr = LogisticRegression(labelCol="y", maxIter=20, regParam=0.3, elasticNetParam=0, family = "binomial")
# lr = LogisticRegression()

In [51]:
from pyspark.sql.functions import col
updatedTrainingData = trainingData.select('y', col('y').alias('label'), 'features')

In [52]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

# Create ParamGrid for Cross Validation
paramGrid = (ParamGridBuilder()
             .addGrid(lr.regParam, [0.1, 0.3, 0.5]) # regularization parameter
             .addGrid(lr.elasticNetParam, [0.0, 0.1, 0.2]) # Elastic Net Parameter (Ridge = 0)
#            .addGrid(model.maxIter, [10, 20, 50]) #Number of iterations
#            .addGrid(idf.numFeatures, [10, 100, 1000]) # Number of features
             .build())

# Create 5-fold CrossValidator
cv = CrossValidator(estimator=lr, \
                    estimatorParamMaps=paramGrid, \
                    evaluator=evaluator, \
                    numFolds=3)

# Run cross validations
cvModel = cv.fit(updatedTrainingData)
# this will likely take a fair amount of time because of the amount of models that we're creating and testing

In [53]:
from pyspark.sql.functions import col
updatedTestData = testData.select('y', col('y').alias('label'), 'features')

In [54]:
# Use test set here so we can measure the accuracy of our model on new data
predictions = cvModel.transform(updatedTestData)

In [55]:
# cvModel uses the best model found from the Cross Validation
# Evaluate best model
evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction")
print("Test: Area Under ROC: " + str(evaluator.evaluate(predictions, {evaluator.metricName: "areaUnderROC"})))

In [56]:
evaluator.evaluate(predictions)

In [57]:
predictions.show()

In [58]:
from pyspark.sql.functions import col
updatedPredictions = predictions.select('prediction', col('prediction').alias('ctx'), 'label')