In [1]:
from pyspark import SparkContext
from pyspark.sql.session import SparkSession
from pyspark.streaming import StreamingContext
import pyspark.sql.types as tp
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, OneHotEncoderEstimator, VectorAssembler
from pyspark.ml.feature import StopWordsRemover, Word2Vec, RegexTokenizer
from pyspark.ml.classification import LogisticRegression
from pyspark.sql import Row
import sys
from pyspark.streaming.kafka import KafkaUtils
import json

In [2]:
spark.stop()

In [3]:
# create session

spark = SparkSession \
 .builder \
 .appName("tweeter_hate_speeach") \
 .master("local") \
 .getOrCreate()

In [4]:
# define the schema
# spark availabe types: see https://spark.apache.org/docs/1.6.0/api/java/org/apache/spark/sql/types/package-summary.html
my_schema = tp.StructType([
  tp.StructField(name= 'id',          dataType= tp.IntegerType(),  nullable= True),
  tp.StructField(name= 'label',       dataType= tp.IntegerType(),  nullable= True),
  tp.StructField(name= 'tweet',       dataType= tp.StringType(),   nullable= True)
])

In [5]:
# read the dataset  
my_data = spark.read.csv('data/twitter_sentiments.csv',
                         schema=my_schema,
                         header=True)

In [6]:
# view the data
my_data.show(5)

+---+-----+--------------------+
| id|label|               tweet|
+---+-----+--------------------+
|  1|    0| @user when a fat...|
|  2|    0|@user @user thank...|
|  3|    0|  bihday your maj...|
|  4|    0|#model   i love u...|
|  5|    0| factsguide: soci...|
+---+-----+--------------------+
only showing top 5 rows



In [7]:
# print the schema of the file
my_data.printSchema()

root
 |-- id: integer (nullable = true)
 |-- label: integer (nullable = true)
 |-- tweet: string (nullable = true)



In [8]:
# count data
my_data.count()

31962

# create pipeline

## pipeline steps
**load data** -> **tokenize** -> **remove stop words** -> **word vector** -> **logistic regression** -> **predicted labels**

In [9]:
# define stage 1: tokenize the tweet text    
stage_1 = RegexTokenizer(inputCol= 'tweet' , outputCol= 'tokens', pattern= '\\W')


In [10]:
# define stage 2: remove the stop words
stage_2 = StopWordsRemover(inputCol= 'tokens', outputCol= 'filtered_words')


In [11]:
# define stage 3: create a word vector of the size 100
stage_3 = Word2Vec(inputCol= 'filtered_words', outputCol= 'vector', vectorSize= 100)


In [12]:
# define stage 4: Logistic Regression Model
model = LogisticRegression(featuresCol= 'vector', labelCol= 'label')

In [13]:
# setup the pipeline
pipeline = Pipeline(stages= [stage_1, stage_2, stage_3, model])

# fit the pipeline model with the training data
pipelineFit = pipeline.fit(my_data)

# streaming predictions

In [14]:
# define a function to compute sentiments of the received tweets
def get_prediction(tweet_text):
	try:
    # filter the tweets whose length is greater than 0
		tweet_text = tweet_text.filter(lambda x: len(x) > 0)
    # create a dataframe with column name 'tweet' and each row will contain the tweet
		rowRdd = tweet_text.map(lambda w: Row(tweet=w))
    # create a spark dataframe
		wordsDataFrame = spark.createDataFrame(rowRdd)
    # transform the data using the pipeline and get the predicted sentiment
		pipelineFit.transform(wordsDataFrame).select('tweet','prediction').show()
	except : 
		print('No data')

In [15]:
# initialize the streaming context 
ssc = StreamingContext(SparkContext.getOrCreate(), batchDuration= 3)

In [18]:
# Create a DStream that will connect to hostname:port, like localhost:9991
#lines = ssc.socketTextStream(sys.argv[1], int(sys.argv[2]))
lines = ssc.socketTextStream('localhost', 9999)

In [19]:
#"kafkaStream = KafkaUtils.createStream(ssc, \
#     'localhost:2181', 'group_01', {'twitter_hater':1})

In [20]:
# split the tweet text by a keyword 'TWEET_APP' so that we can identify which set of words is from a single tweet
words = lines.flatMap(lambda line : line.split('TWEET_APP'))

In [21]:
# get the predicted sentiments for the tweets received
words.foreachRDD(get_prediction)

In [22]:
# Start the computation
ssc.start()   

No data
No data
No data
No data
No data
No data
No data


In [None]:
# Wait for the computation to terminate
ssc.awaitTermination()  

No data
No data
No data
No data
No data
No data
No data
No data
No data
No data
No data
No data
No data
No data
No data
No data
No data
No data
No data
No data
No data


# kafka

In [9]:
https://www.howtoforge.com/how-to-setup-apache-zookeeper-cluster-on-ubuntu-1804/

SyntaxError: invalid syntax (<ipython-input-9-7084fe453b13>, line 1)

In [None]:
https://www.tutorialspoint.com/apache_kafka/apache_kafka_basic_operations.htm