In [1]:
# installing spark in colab and creating spark session

!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://archive.apache.org/dist/spark/spark-3.0.0/spark-3.0.0-bin-hadoop3.2.tgz
!tar xf spark-3.0.0-bin-hadoop3.2.tgz
!pip install -q findspark

import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.0.0-bin-hadoop3.2"

import findspark
findspark.init()

findspark.find()

from pyspark.sql import SparkSession

spark = SparkSession.builder\
        .master("local")\
        .appName("Colab")\
        .config('spark.ui.port', '4050')\
        .getOrCreate()

sc = spark.sparkContext

In [2]:
from pyspark import SparkFiles
url = "/content/Tweets.csv"
spark.sparkContext.addFile(url)
TweeterSent = spark.read.csv("file://"+SparkFiles.get("Tweets.csv"), header = True)

In [3]:
for col in TweeterSent.dtypes:
    print(col[0]+" , "+col[1])

tweet_id , string
airline_sentiment , string
airline_sentiment_confidence , string
negativereason , string
negativereason_confidence , string
airline , string
airline_sentiment_gold , string
name , string
negativereason_gold , string
retweet_count , string
text , string
tweet_coord , string
tweet_created , string
tweet_location , string
user_timezone , string


In [4]:
#Checking for null values in the df columns
from pyspark.sql.functions import col,isnan,when,count
isNullFram = TweeterSent.select([count(when(col(c).contains('None') | \
                            col(c).contains('NULL') | \
                            (col(c) == '' ) | \
                            col(c).isNull() | \
                            isnan(c), c 
                           )).alias(c)
                    for c in TweeterSent.columns])

isNullFram.collect()

[Row(tweet_id=0, airline_sentiment=155, airline_sentiment_confidence=68, negativereason=5573, negativereason_confidence=4229, airline=179, airline_sentiment_gold=14788, name=196, negativereason_gold=14805, retweet_count=205, text=214, tweet_coord=13768, tweet_created=389, tweet_location=5010, user_timezone=5103)]

In [5]:
TweeterSent.take(10)

[Row(tweet_id='570306133677760513', airline_sentiment='neutral', airline_sentiment_confidence='1.0', negativereason=None, negativereason_confidence=None, airline='Virgin America', airline_sentiment_gold=None, name='cairdin', negativereason_gold=None, retweet_count='0', text='@VirginAmerica What @dhepburn said.', tweet_coord=None, tweet_created='2015-02-24 11:35:52 -0800', tweet_location=None, user_timezone='Eastern Time (US & Canada)'),
 Row(tweet_id='570301130888122368', airline_sentiment='positive', airline_sentiment_confidence='0.3486', negativereason=None, negativereason_confidence='0.0', airline='Virgin America', airline_sentiment_gold=None, name='jnardino', negativereason_gold=None, retweet_count='0', text="@VirginAmerica plus you've added commercials to the experience... tacky.", tweet_coord=None, tweet_created='2015-02-24 11:15:59 -0800', tweet_location=None, user_timezone='Pacific Time (US & Canada)'),
 Row(tweet_id='570301083672813571', airline_sentiment='neutral', airline_se

In [6]:
#removing columns that have too many nulls (4000 and above )
empty_columns = ["negativereason","airline_sentiment_gold","negativereason_gold","tweet_coord","negativereason_confidence","tweet_location","user_timezone"]
for col in empty_columns:
    TweeterSent = TweeterSent.drop(col)

In [7]:
TweeterSent = TweeterSent.dropna()
TweeterSent.count()

14448

In [8]:
# check if any df column contains null value
from pyspark.sql.functions import col,isnan,when,count
isNullFram = TweeterSent.select([count(when(
                            col(c).contains('NULL') | \
                            col(c).isNull() | \
                            isnan(c), c 
                           )).alias(c)
                    for c in TweeterSent.columns])

In [9]:
isNullFram.collect()

[Row(tweet_id=0, airline_sentiment=0, airline_sentiment_confidence=0, airline=0, name=0, retweet_count=0, text=0, tweet_created=0)]

In [10]:
from pyspark.ml.feature import StopWordsRemover, RegexTokenizer
from pyspark.ml import Pipeline
from pyspark.sql.functions import expr
from pyspark.ml.feature import StringIndexer
from pyspark.sql import SparkSession
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.feature import HashingTF, Tokenizer
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.feature import SQLTransformer
 
# tokenize 
tokenizer = Tokenizer(inputCol="text", outputCol="vector")
#stop word 
StopWords = StopWordsRemover(inputCol="vector", outputCol="text_filtered")
# TF IDF 
ht = HashingTF(inputCol="text_filtered", outputCol="features")
 
# Encoding String Indexer 
SI_tweet_created = StringIndexer(inputCol='tweet_created',outputCol='tweet_created_Index')
SI_airline = StringIndexer(inputCol='airline',outputCol='airline_Index')
SI_name = StringIndexer(inputCol='name',outputCol='name_Index')
SI_airline_sentiment_confidence = StringIndexer(inputCol='airline_sentiment_confidence',outputCol='airline_sentiment_confidence_Index')
SI_retweet_count = StringIndexer(inputCol='retweet_count',outputCol='retweet_count_Index')
SI_airline_sentiment = StringIndexer(inputCol='airline_sentiment',outputCol='airline_sentiment_Index')
 
# Sql Transformer for selecting required columns
sqlDropCols = SQLTransformer(statement="SELECT airline_sentiment_Index, features FROM __THIS__")
 
# Logistic Regression
lr = LogisticRegression(labelCol = 'airline_sentiment_Index')

In [11]:
#Test and train split
train, test = TweeterSent.randomSplit([0.9, 0.1], seed=12345)
train.cache()
test.cache()

DataFrame[tweet_id: string, airline_sentiment: string, airline_sentiment_confidence: string, airline: string, name: string, retweet_count: string, text: string, tweet_created: string]

In [12]:
train.take(5)

[Row(tweet_id='567588278875213824', airline_sentiment='neutral', airline_sentiment_confidence='1.0', airline='Delta', name='JetBlueNews', retweet_count='0', text="@JetBlue's new CEO seeks the right balance to please passengers and Wall ... - Greenfield Daily Reporter http://t.co/LM3opxkxch", tweet_created='2015-02-16 23:36:05 -0800'),
 Row(tweet_id='567590027375702016', airline_sentiment='negative', airline_sentiment_confidence='1.0', airline='Delta', name='nesi_1992', retweet_count='0', text='@JetBlue is REALLY getting on my nerves !! 😡😡 #nothappy', tweet_created='2015-02-16 23:43:02 -0800'),
 Row(tweet_id='567591480085463040', airline_sentiment='negative', airline_sentiment_confidence='1.0', airline='United', name='CPoutloud', retweet_count='0', text='@united yes. We waited in line for almost an hour to do so. Some passengers just left not wanting to wait past 1am.', tweet_created='2015-02-16 23:48:48 -0800'),
 Row(tweet_id='567592368451248130', airline_sentiment='negative', airline_

In [13]:
test.take(5)

[Row(tweet_id='567680108002291712', airline_sentiment='positive', airline_sentiment_confidence='0.6645', airline='Delta', name='TravellerLukose', retweet_count='0', text='@JetBlue No worries. Delay was minor and dealt with nicely. It was captain of flight 2324 by the way.', tweet_created='2015-02-17 05:40:59 -0800'),
 Row(tweet_id='567686758708817921', airline_sentiment='neutral', airline_sentiment_confidence='0.6890000000000001', airline='Southwest', name='Tinygami', retweet_count='0', text="@SouthwestAir Is it a temporary site glitch or are you no longer offering flights from GRR to GEG after Feb? Can't find any online :(", tweet_created='2015-02-17 06:07:25 -0800'),
 Row(tweet_id='567702414157824000', airline_sentiment='negative', airline_sentiment_confidence='1.0', airline='Southwest', name='lucasdavidson', retweet_count='0', text='@SouthwestAir on hold for 2 hours and then you hung up 3 Cancelled Flightled flights. Running out of daycare for our kids who are trapped at home.', twe

In [14]:
#parameter 
paramGrid = ParamGridBuilder() \
    .addGrid(ht.numFeatures, [10, 100, 1000]) \
    .addGrid(lr.regParam, [0.1, 0.01]) \
    .build()
 
evaluator = MulticlassClassificationEvaluator(predictionCol = "prediction", labelCol = "automatic", metricName = "accuracy")
evaluator.setLabelCol("airline_sentiment_Index")

MulticlassClassificationEvaluator_58e1de7f1923

In [15]:
# creating pipeline
pipeline = Pipeline(stages = [SI_tweet_created,SI_airline,SI_name,SI_airline_sentiment_confidence,SI_retweet_count,SI_airline_sentiment,tokenizer, StopWords,ht, sqlDropCols, lr])

In [16]:
crossval = CrossValidator(estimator=pipeline,
                          estimatorParamMaps=paramGrid,
                          evaluator = evaluator,
                          numFolds=4)
 
# Run cross-validation, and choose the best set of parameters.
cvModel = crossval.fit(train)

In [17]:
# performing prediction on test data
prediction = cvModel.transform(test)

In [18]:
prediction.take(10)

[Row(airline_sentiment_Index=2.0, features=SparseVector(1000, {29: 1.0, 273: 1.0, 297: 1.0, 314: 1.0, 365: 1.0, 639: 1.0, 652: 1.0, 699: 1.0, 803: 1.0, 903: 1.0}), rawPrediction=DenseVector([0.8031, 0.1316, -0.9346]), probability=DenseVector([0.5928, 0.3029, 0.1043]), prediction=0.0),
 Row(airline_sentiment_Index=1.0, features=SparseVector(1000, {86: 1.0, 105: 1.0, 211: 1.0, 330: 1.0, 341: 1.0, 433: 1.0, 510: 1.0, 593: 1.0, 615: 1.0, 654: 1.0, 700: 1.0, 807: 1.0, 891: 1.0}), rawPrediction=DenseVector([1.0033, 0.1679, -1.1712]), probability=DenseVector([0.6463, 0.2803, 0.0735]), prediction=0.0),
 Row(airline_sentiment_Index=0.0, features=SparseVector(1000, {23: 1.0, 63: 1.0, 206: 1.0, 330: 1.0, 383: 1.0, 389: 1.0, 602: 1.0, 640: 1.0, 661: 1.0, 726: 1.0, 861: 1.0, 892: 1.0, 986: 1.0, 999: 1.0}), rawPrediction=DenseVector([2.1588, -0.5799, -1.5788]), probability=DenseVector([0.9187, 0.0594, 0.0219]), prediction=0.0),
 Row(airline_sentiment_Index=0.0, features=SparseVector(1000, {29: 1.0, 

In [19]:
# Instantiate metrics object
from pyspark.mllib.evaluation import MulticlassMetrics
predictionAndLabels = prediction.select(['prediction', 'airline_sentiment_Index']).withColumnRenamed('airline_sentiment_Index', 'label').rdd
metrics = MulticlassMetrics(predictionAndLabels)
metrics.confusionMatrix().toArray()

array([[835.,  52.,  25.],
       [165., 110.,  28.],
       [112.,  30.,  89.]])

In [20]:
from pyspark.sql.types import StructType, StructField, DoubleType, FloatType, StringType
# defining dataframe schema for metrics table inclusing productive metric terms as headers for each row of data labels
schema = StructType([
    StructField("Label", StringType(), True), 
    StructField("True Positive Rate", DoubleType(), True),
    StructField("False Positive Rate", DoubleType(), True),
    StructField("Precision", DoubleType(), True),
    StructField("Recall", DoubleType(), True),
    StructField("F1 Score", DoubleType(), True),
])
df = spark.createDataFrame([], schema)

In [21]:
#loop for classes 0.0 to 2.0 for 3 classes of data labels
for i in range(3):
    tempList = [str(float(i)),metrics.truePositiveRate(float(i)),metrics.falsePositiveRate(float(i)),metrics.precision(float(i)),metrics.recall(float(i)),metrics.fMeasure(float(i))] 
    tempDf = spark.createDataFrame([tempList])
    df = df.union(tempDf)
 
df.collect()

[Row(Label='0.0', True Positive Rate=0.9155701754385965, False Positive Rate=0.5187265917602997, Precision=0.7508992805755396, Recall=0.9155701754385965, F1 Score=0.8250988142292491),
 Row(Label='1.0', True Positive Rate=0.36303630363036304, False Positive Rate=0.07174103237095363, Precision=0.5729166666666666, Recall=0.36303630363036304, F1 Score=0.4444444444444444),
 Row(Label='2.0', True Positive Rate=0.3852813852813853, False Positive Rate=0.043621399176954734, Precision=0.6267605633802817, Recall=0.3852813852813853, F1 Score=0.47721179624664883)]

In [22]:
# model accuracy
print(metrics.accuracy)

0.715076071922545
