In [None]:
import pyspark
from __future__ import print_function
from pyspark import SparkContext
from pyspark.streaming import StreamingContext

In [None]:
import pyspark
from pyspark.sql import *
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark import SparkContext, SparkConf
sc = SparkContext(appName = "StreamingTwitterAnalysis")
sc.setLogLevel("ERROR")
ssc = StreamingContext(sc,10)



# Machine learning

In [None]:
# importing required libraries
import pyspark.ml.feature
from pyspark import SparkContext
from pyspark.sql.session import SparkSession
from pyspark.streaming import StreamingContext
import pyspark.sql.types as tp
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml.feature import StopWordsRemover, Word2Vec, RegexTokenizer
from pyspark.ml.classification import LogisticRegression
from pyspark.sql import Row


## Reading the csv file

In [None]:
spark = SparkSession(sc)

In [None]:
my_schema = tp.StructType([
  tp.StructField(name= 'id',          dataType= tp.IntegerType(),  nullable= True),
  tp.StructField(name= 'label',       dataType= tp.IntegerType(),  nullable= True),
  tp.StructField(name= 'tweet',       dataType= tp.StringType(),   nullable= True)
])

In [None]:
my_data = spark.read.csv('/home/swati/Desktop/tweets.csv',
                         schema=my_schema,
                         header=True)

In [None]:
# view the data
my_data.show(5)


In [None]:
# print the schema of the file
my_data.printSchema()

## Pipeline

In [None]:
# define stage 1: tokenize the tweet text    
# stage_1 = RegexTokenizer(inputCol= 'tweet' , outputCol= 'tokens', pattern= '\\W')
stage_1 = RegexTokenizer(pattern=r'(@[A-Za-z0-9]+)|([^0-9A-Za-z \t])|(\w+:\/\/\S+)', inputCol='tweet', outputCol='tokens')
# define stage 2: remove the stop words
stage_2 = StopWordsRemover(inputCol= 'tokens', outputCol= 'filtered_words')
# define stage 3: create a word vector of the size 100
stage_3 = Word2Vec(inputCol= 'filtered_words', outputCol= 'vector', vectorSize= 100)
# define stage 4: Logistic Regression Model
model = LogisticRegression(featuresCol= 'vector', labelCol= 'label')

In [None]:
# setup the pipeline
pipeline = Pipeline(stages= [stage_1, stage_2, stage_3, model])

# fit the pipeline model with the training data
pipelineFit = pipeline.fit(my_data)

In [None]:
# define a function to compute sentiments of the received tweets
def get_prediction(tweet_text):
	try:
    # filter the tweets whose length is greater than 0
		tweet_text = tweet_text.filter(lambda x: len(x) > 0)
    # create a dataframe with column name 'tweet' and each row will contain the tweet
		rowRdd = tweet_text.map(lambda w: Row(tweet=w))
    # create a spark dataframe
		wordsDataFrame = spark.createDataFrame(rowRdd)
    # transform the data using the pipeline and get the predicted sentiment
		pipelineFit.transform(wordsDataFrame).select('tweet','prediction').show()
	except : 
		print('No data')

#Streaming Data

Setting LogLevel will not print warning messages

Setting a batch interval of 10s

In [None]:
socket_stream = ssc.socketTextStream("127.0.0.1" , 6006 )

All analysis will be done for 60 seconds for a window

In [None]:
lines = socket_stream.window(60)

In [None]:
# hashtags = lines.flatMap(lambda text : text.split('')).filter(lambda word : word.lower().startswith('#')).map(lambda word: (word.lower(),1)).reduceByKey(lambda a,b:a+b)

In [None]:
# split the tweet text by a keyword 'TWEET_APP' so that we can identify which set of words is from a single tweet
words = lines.flatMap(lambda line : line.split('TWEET_APP')).filter(lambda word : word.lower())

# get the predicted sentiments for the tweets received
words.foreachRDD(get_prediction)

In [None]:
# author_count_sorted_dstream = hashtags.transform(lambda foo : foo.sortBy(lambda x: x[0].lower()).sortBy(lambda x:x[1],ascending=False))

In [None]:
# author_count_sorted_dstream.pprint()

In [None]:
# lines.pprint()

# Starting computations for all batches

In [None]:
ssc.start()

In [None]:
ssc.awaitTermination()