In [1]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('Twitter US Airline Sentiment PYSPARK').getOrCreate()

tweets  = spark.read.csv('tweets.csv', inferSchema = True, header = True)


In [2]:
from pyspark.sql.functions import col
from pyspark.ml.feature import RegexTokenizer, StopWordsRemover, CountVectorizer

from pyspark.ml.classification import LogisticRegression
from pyspark.ml import Pipeline

from pyspark.ml.feature import StringIndexer


In [3]:
tweets.toPandas().head()

Unnamed: 0,tweet_id,airline_sentiment,airline_sentiment_confidence,negativereason,negativereason_confidence,airline,airline_sentiment_gold,name,negativereason_gold,retweet_count,text,tweet_coord,tweet_created,tweet_location,user_timezone
0,570306133677760513,neutral,1.0,,,Virgin America,,cairdin,,0.0,@VirginAmerica What @dhepburn said.,,2015-02-24 11:35:52 -0800,,Eastern Time (US & Canada)
1,570301130888122368,positive,0.3486,,0.0,Virgin America,,jnardino,,0.0,@VirginAmerica plus you've added commercials t...,,2015-02-24 11:15:59 -0800,,Pacific Time (US & Canada)
2,570301083672813571,neutral,0.6837,,,Virgin America,,yvonnalynn,,0.0,@VirginAmerica I didn't today... Must mean I n...,,2015-02-24 11:15:48 -0800,Lets Play,Central Time (US & Canada)
3,570301031407624196,negative,1.0,Bad Flight,0.7033,Virgin America,,jnardino,,0.0,"""@VirginAmerica it's really aggressive to blas...",,2015-02-24 11:15:36 -0800,,Pacific Time (US & Canada)
4,570300817074462722,negative,1.0,Can't Tell,1.0,Virgin America,,jnardino,,0.0,@VirginAmerica and it's a really big bad thing...,,2015-02-24 11:14:45 -0800,,Pacific Time (US & Canada)


In [4]:
tweets.printSchema()

root
 |-- tweet_id: string (nullable = true)
 |-- airline_sentiment: string (nullable = true)
 |-- airline_sentiment_confidence: string (nullable = true)
 |-- negativereason: string (nullable = true)
 |-- negativereason_confidence: string (nullable = true)
 |-- airline: string (nullable = true)
 |-- airline_sentiment_gold: string (nullable = true)
 |-- name: string (nullable = true)
 |-- negativereason_gold: string (nullable = true)
 |-- retweet_count: integer (nullable = true)
 |-- text: string (nullable = true)
 |-- tweet_coord: string (nullable = true)
 |-- tweet_created: string (nullable = true)
 |-- tweet_location: string (nullable = true)
 |-- user_timezone: string (nullable = true)



In [5]:
tweets = tweets.select(['airline_sentiment','text'])

In [6]:
tweets = tweets.na.drop()

In [7]:
tweets.groupBy("airline_sentiment").count().orderBy(col("count").desc()).show(3)

+-----------------+-----+
|airline_sentiment|count|
+-----------------+-----+
|         negative| 9170|
|          neutral| 3099|
|         positive| 2363|
+-----------------+-----+



In [8]:
# regular expression tokenizer
regexTokenizer = RegexTokenizer(inputCol="text", outputCol="words", pattern="\\W")


remover = StopWordsRemover(inputCol="words", outputCol="filtered")

# bag of words count
cv = CountVectorizer(inputCol="filtered", outputCol="features")



In [11]:
label_stringIdx = StringIndexer(inputCol = "airline_sentiment", outputCol = "label").setHandleInvalid("skip")

pipeline = Pipeline(stages=[regexTokenizer, remover, cv, label_stringIdx])

# Fit the pipeline to training documents.
pipelineFit = pipeline.fit(tweets)
dataset = pipelineFit.transform(tweets)
dataset.show(5)

+-----------------+--------------------+--------------------+--------------------+--------------------+-----+
|airline_sentiment|                text|               words|            filtered|            features|label|
+-----------------+--------------------+--------------------+--------------------+--------------------+-----+
|          neutral|@VirginAmerica Wh...|[virginamerica, w...|[virginamerica, d...|(14822,[29,141,14...|  1.0|
|         positive|@VirginAmerica pl...|[virginamerica, p...|[virginamerica, p...|(14822,[29,35,118...|  2.0|
|          neutral|@VirginAmerica I ...|[virginamerica, i...|[virginamerica, d...|(14822,[27,29,38,...|  1.0|
|         negative|"@VirginAmerica i...|[virginamerica, i...|[virginamerica, r...|(14822,[17,29,69,...|  0.0|
|         negative|@VirginAmerica an...|[virginamerica, a...|[virginamerica, r...|(14822,[29,69,131...|  0.0|
+-----------------+--------------------+--------------------+--------------------+--------------------+-----+
only showi

In [12]:
dataset.printSchema()
print("Size of the DataFrame: {} records".format(dataset.count()))


root
 |-- airline_sentiment: string (nullable = true)
 |-- text: string (nullable = true)
 |-- words: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- filtered: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- features: vector (nullable = true)
 |-- label: double (nullable = false)

Size of the DataFrame: 14632 records


In [13]:
# set seed for reproducibility
(train, test) = dataset.randomSplit([0.7, 0.3], seed = 42)


In [14]:
from pyspark.ml.classification import GBTClassifier
lr = LogisticRegression(maxIter=10, regParam=0.3, elasticNetParam=0)


lrModel = lr.fit(train)

predictions = lrModel.transform(test)

predictions.filter(predictions['prediction'] == 0) \
    .select("text","label","probability","label","prediction") \
    .orderBy("probability", ascending=False) \
    .show(n = 10, truncate = 30)

+------------------------------+-----+------------------------------+-----+----------+
|                          text|label|                   probability|label|prediction|
+------------------------------+-----+------------------------------+-----+----------+
|@united 2100$ ticket,12h bi...|  0.0|[0.9934631982790163,0.00295...|  0.0|       0.0|
|@USAirways denied me standb...|  0.0|[0.988337589918016,0.002264...|  0.0|       0.0|
|.@united being delayed 3 ho...|  0.0|[0.9879675810980598,0.00675...|  0.0|       0.0|
|@USAirways @AmericanAir hav...|  0.0|[0.9877055173065495,0.00567...|  0.0|       0.0|
|@USAirways DCA-&gt;HPN, u r...|  0.0|[0.9875270669343706,0.00937...|  0.0|       0.0|
|@USAirways Baggage Team? Re...|  0.0|[0.9859962952629228,0.00612...|  0.0|       0.0|
|@united what the hell? Flig...|  0.0|[0.9858488472559372,0.00647...|  0.0|       0.0|
|@SouthwestAir Cancelled Fli...|  0.0|[0.9856895131100212,0.00774...|  0.0|       0.0|
|@AmericanAir probs the wors...|  0.0|[0.98

In [15]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")
evaluator.evaluate(predictions)

0.7079556137864562