In [1]:
# importing required libraries
from pyspark import SparkContext
from pyspark.sql.session import SparkSession
from pyspark.streaming import StreamingContext
import pyspark.sql.types as tp
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml.feature import StopWordsRemover, Word2Vec, RegexTokenizer
from pyspark.ml.classification import LogisticRegression
from pyspark.sql import Row

In [2]:
# initializing spark session
sc = SparkContext(appName="PySparkShell")
spark = SparkSession(sc)

In [3]:
# define the schema
my_schema = tp.StructType([
  tp.StructField(name= 'id',          dataType= tp.IntegerType(),  nullable= True),
  tp.StructField(name= 'label',       dataType= tp.IntegerType(),  nullable= True),
  tp.StructField(name= 'tweet',       dataType= tp.StringType(),   nullable= True)
])

In [4]:
# read the dataset  
my_data = spark.read.csv('file:///downloads/twitter_sentiments.csv',
                         schema=my_schema,
                         header=True)

In [5]:
# view the data
my_data.show(5)

+---+-----+--------------------+
| id|label|               tweet|
+---+-----+--------------------+
|  1|    0| @user when a fat...|
|  2|    0|@user @user thank...|
|  3|    0|  bihday your maj...|
|  4|    0|#model   i love u...|
|  5|    0| factsguide: soci...|
+---+-----+--------------------+
only showing top 5 rows



In [6]:
# print the schema of the file
my_data.printSchema()

root
 |-- id: integer (nullable = true)
 |-- label: integer (nullable = true)
 |-- tweet: string (nullable = true)



In [7]:
# define stage 1: tokenize the tweet text    
stage_1 = RegexTokenizer(inputCol= 'tweet' , outputCol= 'tokens', pattern= '\\W')
# define stage 2: remove the stop words
stage_2 = StopWordsRemover(inputCol= 'tokens', outputCol= 'filtered_words')
# define stage 3: create a word vector of the size 100
stage_3 = Word2Vec(inputCol= 'filtered_words', outputCol= 'vector', vectorSize= 100)
# define stage 4: Logistic Regression Model
model = LogisticRegression(featuresCol= 'vector', labelCol= 'label')

In [8]:
# setup the pipeline
pipeline = Pipeline(stages= [stage_1, stage_2, stage_3, model])

# fit the pipeline model with the training data
pipelineFit = pipeline.fit(my_data)

In [9]:
# define a function to compute sentiments of the received tweets
def get_prediction(tweet_text):
    try:
    # filter the tweets whose length is greater than 0
        tweet_text = tweet_text.filter(lambda x: len(x) > 0)
    # create a dataframe with column name 'tweet' and each row will contain the tweet
        rowRdd = tweet_text.map(lambda w: Row(tweet=w))
    # create a spark dataframe
        wordsDataFrame = spark.createDataFrame(rowRdd)
    # transform the data using the pipeline and get the predicted sentiment
        pipelineFit.transform(wordsDataFrame).select('tweet','prediction').show()
    except : 
        print('No data')
    
# initialize the streaming context 
ssc = StreamingContext(sc, batchDuration= 3)

# Create a DStream that will connect to hostname:port, like localhost:9991
lines = ssc.socketTextStream('localhost', 9991)

# split the tweet text by a keyword 'TWEET_APP' so that we can identify which set of words is from a single tweet
words = lines.flatMap(lambda line : line.split('TWEET_APP'))

# get the predicted sentiments for the tweets received
words.foreachRDD(get_prediction)

# Start the computation
ssc.start()             

# Wait for the computation to terminate
ssc.awaitTermination() 

No data
No data
No data
No data
No data
No data
No data
No data
No data
No data
No data
No data
No data
No data
+----------------+----------+
|           tweet|prediction|
+----------------+----------+
|very good person|       0.0|
+----------------+----------+

No data
No data
No data
No data
No data
No data
No data
No data
No data
No data
No data
No data
+--------------------+----------+
|               tweet|prediction|
+--------------------+----------+
|@DinaPugliese Goo...|       0.0|
+--------------------+----------+

No data
No data
No data
No data
No data
No data
No data
No data
No data
No data
No data
No data
No data
No data
No data
No data
No data
No data
No data
No data
No data
No data
No data
No data
No data
No data
No data
No data
No data
No data
+--------------------+----------+
|               tweet|prediction|
+--------------------+----------+
|retweet if you ag...|       1.0|
+--------------------+----------+

No data
No data
No data
No data
No data
No data
No data
No 

KeyboardInterrupt: 