In [1]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import CountVectorizer, Tokenizer, StopWordsRemover, IDF

In [2]:
# Start Spark session
spark = SparkSession.builder.appName('counterVector').getOrCreate()

In [3]:
# Read in csv
dataframe = spark.read.format("csv").option("header", "true").load("airlines.csv")
dataframe.show()

+--------------------+
|      Airline Tweets|
+--------------------+
|@VirginAmerica pl...|
|@VirginAmerica se...|
|@VirginAmerica do...|
|@VirginAmerica Ar...|
|@VirginAmerica aw...|
+--------------------+



In [4]:
# Tokenize dataframe
tokened = Tokenizer(inputCol="Airline Tweets", outputCol="words")
tokened_transformed = tokened.transform(dataframe)
tokened_transformed.show()

+--------------------+--------------------+
|      Airline Tweets|               words|
+--------------------+--------------------+
|@VirginAmerica pl...|[@virginamerica, ...|
|@VirginAmerica se...|[@virginamerica, ...|
|@VirginAmerica do...|[@virginamerica, ...|
|@VirginAmerica Ar...|[@virginamerica, ...|
|@VirginAmerica aw...|[@virginamerica, ...|
+--------------------+--------------------+



In [5]:
# Remove stop words
stop_list = ["@VirginAmerica", "$30", "@virginamerica"]
remover = StopWordsRemover(inputCol="words", outputCol="filtered", stopWords=stop_list)
removed_frame = remover.transform(tokened_transformed)
removed_frame.show()

+--------------------+--------------------+--------------------+
|      Airline Tweets|               words|            filtered|
+--------------------+--------------------+--------------------+
|@VirginAmerica pl...|[@virginamerica, ...|[plus, you've, ad...|
|@VirginAmerica se...|[@virginamerica, ...|[seriously, would...|
|@VirginAmerica do...|[@virginamerica, ...|[do, you, miss, m...|
|@VirginAmerica Ar...|[@virginamerica, ...|[are, the, hours,...|
|@VirginAmerica aw...|[@virginamerica, ...|[awaiting, my, re...|
+--------------------+--------------------+--------------------+



In [6]:
# Vectorize the term frequency
countvectorizer = CountVectorizer(minTF=1.0, minDF=1.0, vocabSize=20,
                                 inputCol='filtered', outputCol='Vectors')

model = countvectorizer.fit(removed_frame)
result = model.transform(removed_frame)
result.show()

+--------------------+--------------------+--------------------+--------------------+
|      Airline Tweets|               words|            filtered|             Vectors|
+--------------------+--------------------+--------------------+--------------------+
|@VirginAmerica pl...|[@virginamerica, ...|[plus, you've, ad...|(20,[0,3,9,14],[1...|
|@VirginAmerica se...|[@virginamerica, ...|[seriously, would...|(20,[0,1,4,5,7,8,...|
|@VirginAmerica do...|[@virginamerica, ...|[do, you, miss, m...|     (20,[13],[1.0])|
|@VirginAmerica Ar...|[@virginamerica, ...|[are, the, hours,...|(20,[0,1,2,5,6,16...|
|@VirginAmerica aw...|[@virginamerica, ...|[awaiting, my, re...|(20,[2,3,4,10,11,...|
+--------------------+--------------------+--------------------+--------------------+



In [7]:
# Fit data with the IDF
idf = IDF(inputCol="Vectors", outputCol="Features, [Indexes], [TF-IDF]")
idfModel = idf.fit(result)
rescaledData = idfModel.transform(result)

In [8]:
# Display the dataframe
rescaledData.select("filtered", "Features, [Indexes], [TF-IDF]").show()

+--------------------+-----------------------------+
|            filtered|Features, [Indexes], [TF-IDF]|
+--------------------+-----------------------------+
|[plus, you've, ad...|         (20,[0,3,9,14],[0...|
|[seriously, would...|         (20,[0,1,4,5,7,8,...|
|[do, you, miss, m...|         (20,[13],[1.09861...|
|[are, the, hours,...|         (20,[0,1,2,5,6,16...|
|[awaiting, my, re...|         (20,[2,3,4,10,11,...|
+--------------------+-----------------------------+



### Check Vector Index

In [9]:
# From the tokenized dataframe grab every word used. 
from pyspark.sql.types import StringType
df_vocab = removed_frame.select('filtered').rdd.\
            flatMap(lambda x: x[0]).\
            toDF(schema=StringType()).toDF('terms')
df_vocab.show()

+-------------+
|        terms|
+-------------+
|         plus|
|       you've|
|        added|
|  commercials|
|           to|
|          the|
|experience...|
|       tacky.|
|    seriously|
|        would|
|          pay|
|            a|
|       flight|
|          for|
|        seats|
|         that|
|       didn't|
|         have|
|         this|
|     playing.|
+-------------+
only showing top 20 rows



In [10]:
# Use string indexer to of each word
from pyspark.ml.feature import StringIndexer
stringindexer = StringIndexer(inputCol='terms', outputCol='StringIndexer(index)')

In [11]:
# Fit the string indexer to the vocab set and remove duplicates
stringindexer.fit(df_vocab).transform(df_vocab).\
    distinct().\
    orderBy('StringIndexer(index)').show()

+--------+--------------------+
|   terms|StringIndexer(index)|
+--------+--------------------+
|     the|                 0.0|
|     for|                 1.0|
|     are|                 2.0|
|   would|                 3.0|
|  online|                 4.0|
|    that|                 5.0|
|      to|                 6.0|
|    it's|                 7.0|
|current?|                 8.0|
|    this|                 9.0|
|    have|                10.0|
|    your|                11.0|
|   worry|                12.0|
|  didn't|                13.0|
|    plus|                14.0|
|  flight|                15.0|
|awaiting|                16.0|
|  return|                17.0|
|      do|                18.0|
|    club|                19.0|
+--------+--------------------+
only showing top 20 rows



In [12]:
# Stop Spark
spark.stop()