In [None]:
import findspark

findspark.init("C:\Spark\spark-3.1.2-bin-hadoop3.2")

In [None]:
import pandas as pd
from pyspark import SparkContext
from pyspark.sql.session import SparkSession
from pyspark.streaming import StreamingContext
import pyspark.sql.types as tp
from pyspark.ml import Pipeline
from pyspark.ml.feature import StopWordsRemover, CountVectorizer, RegexTokenizer
from pyspark.ml.classification import LinearSVC
from pyspark.sql import Row
from pyspark.sql.functions import split
from pyspark.sql import functions as F
from pyspark.sql.functions import monotonically_increasing_id, row_number
from pyspark.sql.window import Window
import preprocessor as p
import re

sc = SparkContext(appName="PySparkShell")
spark = SparkSession(sc)

In [None]:
REPLACE_NO_SPACE = re.compile("(\.)|(\;)|(\:)|(\!)|(\')|(\?)|(\,)|(\")|(\|)|(\()|(\))|(\[)|(\])|(\%)|(\$)|(\>)|(\<)|(\{)|(\})")
REPLACE_WITH_SPACE = re.compile("(<br\s/><br\s/?)|(-)|(/)|(:).")

def clean_tweets(df):
    tempArr = []
    for line in df:
        tmpL = p.clean(line)
        tmpL = REPLACE_NO_SPACE.sub("", tmpL.lower())
        tmpL = REPLACE_WITH_SPACE.sub(" ", tmpL)
        tmpL = tmpL.strip()
        tempArr.append(tmpL)
    return tempArr

In [None]:
my_schema = tp.StructType([
  tp.StructField(name= 'id',          dataType= tp.IntegerType(),  nullable= True),
  tp.StructField(name= 'label',       dataType= tp.IntegerType(),  nullable= True),
  tp.StructField(name= 'tweet',       dataType= tp.StringType(),   nullable= True)
])
    
  
# read the dataset  
my_data = spark.read.csv('train.csv',
                         schema=my_schema,
                         header=True)
df = my_data
pandas_df = df.toPandas()
clean_df = clean_tweets(pandas_df["tweet"])
clean_df = pd.DataFrame(clean_df)
clean_df["id"] = pandas_df["id"]
clean_df["label"] = pandas_df["label"]
clean_spark = spark.createDataFrame(clean_df,["tweet","id","label"])

clean_spark.show(5)

clean_spark.printSchema()

In [None]:
stage_1 = RegexTokenizer(inputCol= 'tweet' , outputCol= 'tokens', pattern= '\\W')
stage_2 = StopWordsRemover(inputCol= 'tokens', outputCol= 'filtered_words')
stage_3 = CountVectorizer(inputCol= 'filtered_words', outputCol= 'vector')
model = LinearSVC(featuresCol= 'vector', labelCol= 'label')

In [None]:
pipeline = Pipeline(stages= [stage_1, stage_2, stage_3, model])
pipelineFit = pipeline.fit(clean_spark)

# Start the TCP Socket and then run the following

In [None]:
def get_prediction(tweet_text):
    try:
        tweet_text = tweet_text.filter(lambda x: len(x) > 0)
        rowRdd = tweet_text.map(lambda w: Row(tweet=w))
        wordsDataFrame = spark.createDataFrame(rowRdd)
        wordsDataFrame = wordsDataFrame.withColumn("user_id",split(F.col("tweet")," splitterT23 ").getItem(0)).withColumn("user_followers", split(F.col("tweet")," splitterT23 ").getItem(2)).withColumn("actual_tweeter", split(F.col("tweet")," splitterT23 ").getItem(3)).withColumn("tweet", split(F.col("tweet")," splitterT23 ").getItem(1))
        df = wordsDataFrame.select("tweet")
        pandas_df = df.toPandas()
        clean_df = clean_tweets(pandas_df["tweet"])
        clean_df = pd.DataFrame(clean_df)
        clean_spark = spark.createDataFrame(clean_df,["tweet"])
        clean_spark = pipelineFit.transform(clean_spark)
        wordsDataFrame=wordsDataFrame.withColumn('row_index', row_number().over(Window.orderBy(monotonically_increasing_id())))
        clean_spark=clean_spark.withColumn('row_index', row_number().over(Window.orderBy(monotonically_increasing_id())))
        clean_spark = clean_spark.join(wordsDataFrame.select("row_index","user_id","user_followers","actual_tweeter"),on=["row_index"]).drop("row_index")
        clean_spark = clean_spark.select('user_id','tweet',"user_followers","actual_tweeter",'prediction')
        clean_spark.show()
        clean_spark.write.format('jdbc').options(url='jdbc:mysql://localhost:3306/demo',driver='com.mysql.cj.jdbc.Driver',dbtable='livetweets',user='root',password='root').mode('append').save()
    except Exception as e: 
        print(e)

ssc = StreamingContext(sc, batchDuration= 3)
lines = ssc.socketTextStream("localhost", 5555)
words = lines.flatMap(lambda line : line.split('t_end'))
words.foreachRDD(get_prediction)
ssc.start()

In [None]:
ssc.stop()