In [None]:
from pyspark import SparkConf, SparkContext
from pyspark.streaming import StreamingContext
from pyspark.sql import Row, SQLContext
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.ml.classification import LogisticRegression, NaiveBayes, NaiveBayesModel, LogisticRegressionModel
from pyspark.ml.feature import HashingTF, Tokenizer,RegexTokenizer, StopWordsRemover, CountVectorizer, IDF
from pyspark.sql import SparkSession, Row
from pyspark.sql.functions import concat, col, lit, monotonically_increasing_id
from pyspark.ml.linalg import Vectors, Vector
from pyspark.ml import Pipeline, PipelineModel
from pyspark.ml.feature import StopWordsRemover
from pyspark.streaming.kafka import KafkaUtils
from pyspark.sql.functions import sum as _sum
import nltk
from pyspark import keyword_only
from pyspark.ml import Transformer
from pyspark.ml.param.shared import HasInputCol, HasOutputCol, Param
from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType
from pyspark.sql.functions import lit
import os
import re
import json
import requests
import sys
import string

In [None]:
def process_twitter_stream(model, sc):

    spark = SparkSession(sc)

    bootstrapServers = "localhost:9092"
    topics = "<your_kafka_topic_name>"

    
    socketDF = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", bootstrapServers)\
    .option("subscribe", topics)\
    .load()\
    .selectExpr("CAST(value AS STRING)")


    socketDF.printSchema()
    print("COLUMNS:", socketDF.columns)

    socketDF = socketDF.withColumn("label", lit(0))
    socketDF = socketDF.withColumnRenamed("value", "status")

    prediction = getTweetSentiment(model, socketDF)

    prediction.printSchema()
        
    print(prediction.schema)
    
    #count positive and negative tweets
    prediction = prediction.groupBy("prediction") \
                    .count().alias('sentiment_sum')

    #results on console
    query1 = prediction.writeStream.outputMode("update").format('console').start()

    query1.awaitTermination()
    

def loadDataset(sc, path):
    
    spark = SparkSession(sc)

    cols = ["text","label"]

    schema = [StructField('text', StringType(), True),StructField('label', DoubleType(), True)]
    schema = schema = StructType(schema)

    df = spark.read.csv(path, schema=schema)

    return (df, spark)


def createNBModel(sc):

    (df, spark) = loadDataset(sc, "grtweetdataset/grTweets.csv")

    hashingTF = HashingTF(inputCol="text", outputCol='featTF')
    idf = IDF(inputCol = hashingTF.getOutputCol() , outputCol ='features')

    nb = NaiveBayes(smoothing=1.0, modelType="multinomial")

    pipeline = Pipeline(stages=[tokenizer, hashingTF, nb])

    # Train the data
    model = pipeline.fit(df)

    return model


def getTweetSentiment(model, df):
    prediction = model.transform(df)
    return prediction


# might need to add some jars for structured streaming- kafka
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.1 pyspark-shell'

# Initialize the spark config
conf = SparkConf().setAppName('TwitterStream').setMaster("local[*]")

# Create the spark context
sc = SparkContext.getOrCreate(conf=conf)

# Suppress debug messages
sc.setLogLevel("ERROR")

# Create Naive Bayes model
model = createNBModel(sc)

process_twitter_stream(model, sc)

sc.stop()