In [0]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import HashingTF, Tokenizer, StopWordsRemover, StringIndexer
from sklearn.model_selection import train_test_split

In [0]:
df = spark.read.options(header="True") \
.csv("s3://kadentranbigdata/Tweets.csv")
df.show(10)

In [0]:
df.describe()

In [0]:
#remove rows where the text field is null.
#df.na.drop("all", Seq("text")).show(false)
df = df.filter(df.text.isNotNull())
df.count()

In [0]:
#Split datase to train and test
train, test = df.randomSplit(weights=[0.8,0.2], seed =200)

print(f"No. of training examples: {train.count()}")
print(f"No. of testing examples: {test.count()}")

In [0]:
#Configure ML pipeline with these stages: tokenizer, stop word remover, term hashing, label conversion
tokenizer = Tokenizer(inputCol="text", outputCol="words1")
stopwordsremover = StopWordsRemover(inputCol=tokenizer.getOutputCol(), outputCol="words2")
tf = HashingTF(inputCol=stopwordsremover.getOutputCol(), outputCol="features")
sentimentIndexer = StringIndexer(inputCol="airline_sentiment",outputCol="label")

lr = LogisticRegression(maxIter=20,labelCol='label')
#lr = LogisticRegression(maxIter=10)
pipeline = Pipeline(stages=[tokenizer,stopwordsremover,tf,sentimentIndexer, lr])

In [0]:
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
paramGrid = ParamGridBuilder() \
  .addGrid(tf.numFeatures,[10,100,1000]) \
  .addGrid(lr.regParam, [0.1, 0.01,0.001]) \
  .build()

In [0]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
crossval = CrossValidator(estimator=pipeline,
                          estimatorParamMaps=paramGrid,
                          evaluator=MulticlassClassificationEvaluator(),
                          numFolds=2)

In [0]:
#train = train.withColumnRenamed("sentimentIndex","label")
# Fit the pipeline to training documents.
cvmodel = crossval.fit(train)
#train.show()}

In [0]:
# validate with test data
transformed = cvmodel.transform(test)
selected = transformed.select("airline_sentiment", "label", "text", "probability", "prediction")
selected.show()

In [0]:
from pyspark.mllib.evaluation import MulticlassMetrics
results = transformed.select(['prediction', 'label'])
predictionAndLabels=results.rdd
metrics = MulticlassMetrics(predictionAndLabels)

cm=metrics.confusionMatrix().toArray()
accuracy=(cm[0][0]+cm[1][1])/cm.sum()
precision=(cm[0][0])/(cm[0][0]+cm[1][0])
recall=(cm[0][0])/(cm[0][0]+cm[0][1])
#print("Logistic Regression: accuracy,precision,recall",accuracy,precision,recall)
print(f"Logistic Regression metrics")
print(f"Accuracy ", accuracy)
print(f"Precision ", precision)
print(f"Recall ", recall)

In [0]:
outputstring = "Logistic Regression metrics\n" + "Accuracy " + str(accuracy) + "\nPrecision " + str(precision) + "\nRecall " + str(recall)
#print(outputstring)

In [0]:
access_key = ""

SECRET_KEY = ""

encoded_secret_key = SECRET_KEY.replace("/", "%2F")

aws_bucket_name = "kadentranbigdata"

mount_name = "ktbigdata"

dbutils.fs.mount("s3a://%s:%s@%s" % (access_key, encoded_secret_key, aws_bucket_name), "/mnt/%s" % mount_name)
display(dbutils.fs.ls("/mnt/%s" % mount_name))

In [0]:
display(dbutils.fs.ls("/mnt/%s" % mount_name))

path,name,size
dbfs:/mnt/ktbigdata/Tweets.csv,Tweets.csv,3421431


In [0]:
dbutils.fs.put("dbfs:/mnt/ktbigdata/outputpart2.txt", outputstring)
