# Run analysis on tweets stream and store as parquet

In [None]:
from pyspark.sql import SparkSession

# Spark session & context
spark = (SparkSession
         .builder
         .master("local[2]")
         .appName('twitter-read-event-consumer')
         # Add kafka package
         .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.2")
         .getOrCreate())

# read the tweet data from socket
lines = (spark
          .readStream
          .format("kafka")
          .option("kafka.bootstrap.servers", "localhost:9092") # kafka server
          .option("subscribe", "twitterdata") # topic
          .option("startingOffsets", "earliest") # start from beginning 
          .load())
print(type(lines))

## Analysis: part-1

In [None]:
from pyspark.sql import functions as F
from pyspark.sql.functions import *
from pyspark.sql.types import *
from textblob import TextBlob
import numpy as np
import pandas as pd
import re

def clean_tweet(tweet):
        return ' '.join(re.sub("(@[A-Za-z0-9]+)|([^0-9A-Za-z \t])|(\w+:\/\/\S+)", " ", tweet).split())

def analyze_sentiment(tweet):
    analysis = TextBlob(self.clean_tweet(tweet))

    if analysis.sentiment.polarity > 0:
        return 1
    elif analysis.sentiment.polarity == 0:
        return 0
    else:
        return -1

def tweets_to_data_frame(tweets):
    df = pd.DataFrame(data=[tweet.text for tweet in tweets], columns=['tweets'])

    df['id'] = np.array([tweet.id for tweet in tweets])
    df['len'] = np.array([len(tweet.text) for tweet in tweets])
    df['date'] = np.array([tweet.created_at for tweet in tweets])
    df['source'] = np.array([tweet.source for tweet in tweets])
    df['likes'] = np.array([tweet.favorite_count for tweet in tweets])
    df['retweets'] = np.array([tweet.retweet_count for tweet in tweets])

    return df


if __name__ == "__main__":
    
    df = tweets_to_data_frame(lines)
    df['sentiment'] = np.array([analyze_sentiment(tweet) for tweet in df['tweets']])

    print(df.head(10))

## Analysis: part-2

In [None]:
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql import functions as F
from textblob import TextBlob
import json
import os

def preprocessing(lines):
    words = lines.select(explode(split(lines.value, "t_end")).alias("word"))
    words = words.na.replace('', None)
    words = words.na.drop()
    words = words.withColumn('word', F.regexp_replace('word', r'http\S+', ''))
    words = words.withColumn('word', F.regexp_replace('word', '@\w+', ''))
    words = words.withColumn('word', F.regexp_replace('word', '#', ''))
    words = words.withColumn('word', F.regexp_replace('word', 'RT', ''))
    words = words.withColumn('word', F.regexp_replace('word', ':', ''))
    return words

# text classification
def polarity_detection(text):
    return TextBlob(text).sentiment.polarity

def subjectivity_detection(text):
    return TextBlob(text).sentiment.subjectivity

def text_classification(words):
    # polarity detection
    polarity_detection_udf = udf(polarity_detection, StringType())
    words = words.withColumn("polarity", polarity_detection_udf("word"))
    # subjectivity detection
    subjectivity_detection_udf = udf(subjectivity_detection, StringType())
    words = words.withColumn("subjectivity", subjectivity_detection_udf("word"))
    return words
    
if __name__ == "__main__":
    
    # Preprocess the data
    words = preprocessing(lines)

    # text classification to define polarity and subjectivity
    words = text_classification(words)
    print(type(words))
    words = words.repartition(1)


In [None]:
# spark.sql("select polarity, subjectivity from word").show()
words.select('polarity').show()

In [None]:
# storing schema in a file
data_path = os.path.join(os.path.pardir, 'data', 'processed', 'json')
checkpoint_path = os.path.join(os.path.pardir, 'data', 'processed', 'json', './check')
schema = words.schema
schema_path = data_path + '/tweets.txt'
with open(schema_path, 'w') as file:
    #json.dump(schema, json_file)
    file.write(str(schema))

In [None]:
# Start query stream over stream dataframe
query = (
    words.writeStream
    .queryName("all_tweets")
    .option("path", data_path)
    .format("json")
    .outputMode("append")
    .option("checkpointLocation", checkpoint_path)
    #.partitionBy("change_timestamp_date", "server_name")
    .trigger(processingTime='60 seconds')
    .start())
query.awaitTermination()