### Kafka Consumer and Sentiment Analysis

**Original Author:** Walker Rowe.<br/>
**With modification of:** Astrid Krickl.<br/>
**Additional Info:** Working with Streaming Twitter Data Using Kafka. https://www.bmc.com/blogs/working-streaming-twitter-data-using-kafka/

In [11]:
# Import the os module 
import os

# Set the PYSPARK_SUBMIT_ARGS to the appropriate spark-sql-kafka package
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.1 pyspark-shell'

In [12]:
# Install a pip package in the current Jupyter kernel
import sys
!{sys.executable} -m pip install findspark
!{sys.executable} -m pip install -U textblob
!{sys.executable} -m pip install pyspark

In [14]:
# Import the findspark module 
import findspark

# Initialize via the full spark path
findspark.init("/usr/local/spark/")

#### Creating help functions for processing of the tweets
* Function prepocessing
    * Extract from json created_at, text and user
    * Extract from user screen_name and location
    * Clean tweets 
* Function polarity_detection
    * Returns polarity of tweets
* Function sebjectivity_detection
    * Returns subjectivity of tweets
* Function text_classification
    * Append polarity and subjectivity to dataframe

In [16]:
def preprocessing(lines):
    # Just select the tweet text itself
    words = lines.select(json_tuple('json_data', 'created_at','text', 'user').alias('created_at', 'word', 'json_user')) 

    # extract screen_name and location from user info
    words = words.select('word', 'created_at', json_tuple('json_user', 'screen_name', 'location').alias('screen_name', 'location'))
    
    # Clean up the tweets
    words = words.na.replace('', None)
    words = words.na.drop()
    words = words.withColumn('word', F.regexp_replace('word', r'http\S+', ''))
    words = words.withColumn('word', F.regexp_replace('word', '@\w+', ''))
    words = words.withColumn('word', F.regexp_replace('word', '#', ''))
    words = words.withColumn('word', F.regexp_replace('word', 'RT', ''))
    words = words.withColumn('word', F.regexp_replace('word', ':', ''))
    return words

# Define methods from TextBlob
def polarity_detection(text):
    return TextBlob(text).sentiment.polarity

def subjectivity_detection(text):
    return TextBlob(text).sentiment.subjectivity

# We need to create user defined functions for the Textblob methods in order to use them
def text_classification(words):
    # polarity detection
    # Define as user defined fuction to embed method in the spark environment 
    polarity_detection_udf = udf(polarity_detection, StringType())
    # Append polarity to dataframe
    words = words.withColumn("polarity", polarity_detection_udf("word"))
    
    # subjectivity detection
    # Define as user defined fuction to embed method in the spark environment 
    subjectivity_detection_udf = udf(subjectivity_detection, StringType())
    # Append subjectivity to dataframe 
    words = words.withColumn("subjectivity", subjectivity_detection_udf("word"))
    return words

#### Build a SparkSession

In [17]:
# Import the spark sql and sql.types, and sql.functions, time and textblob modules 
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.functions import col, split
import time
from pyspark.sql import functions as F #NEW
from textblob import TextBlob   #NEW

# Gets an existing :class:`SparkSession` or, if there is no existing one, creates a new one based on the options set in this builder.
spark = SparkSession.builder \
   .master("local[*]") \
   .appName("WeatherApp") \
   .config("spark.executor.memory", "1gb") \
   .getOrCreate()


#### Open Read Stream from Spark 

In [18]:
# Encapsulate the code in try except blocks
try:
    # Interface used to load a streaming :class:`DataFrame <pyspark.sql.DataFrame>` from external storage systems (e.g. file systems, key-value stores, etc).
    tweet_df = spark.readStream \
        .format("kafka") \
        .option("kafka.bootstrap.servers", "127.0.0.1:9092") \
        .option("subscribe", "weather") \
        .option("startingOffsets", "latest")  \
        .load()   
except:
    # Print the error
    print("Unexpected error:", sys.exc_info()[0])     

#### Open Write Stream
* Cast tweets as json data
* Preprocess the tweets
* Add the polarity and subjectivity to the tweets
* Write the data from stream into memory with queryName "tweetquery" and save each 10 seconds

In [19]:
# Encapsulate the code in try except blocks
try:
    # extract the JSON stored in the topic
    tweet_df_string = tweet_df.selectExpr("CAST(value AS STRING) as json_data")
   
    # Preprocess the data 
    words = preprocessing(tweet_df_string)

    # text classification to define polarity and subjectivity
    words = text_classification(words)

    # Repartition 'Returns a new DataFrame partitioned by the given partitioning expressions. The resulting DataFrame is hash partitioned.
    words = words.repartition(1)    
    
    # Interface used to write a streaming :class:`DataFrame <pyspark.sql.DataFrame>` to external storage systems (e.g. file systems, key-value stores, etc).
    # Write the above data into memory. Consider the entire analysis in all iteration (output mode = complete). and let the trigger runs in every 10 secs.
    writeTweet = words.writeStream. \
        format("memory"). \
        queryName("tweetquery"). \
        outputMode("append"). \
        trigger(processingTime='10 seconds'). \
        start()
    
    # Print banner text
    print("----- streaming is running -------")
except:
    # Print the error
    print("Unexpected error:", sys.exc_info()) 

----- streaming is running -------


#### Creating dataframe
* Selecting all variables from tweetquery, where the location was not equal to null

In [42]:
# Encapsulate the code in try except blocks
try:
    df = spark.sql("SELECT * from tweetquery WHERE location IS NOT NULL ORDER BY created_at DESC")
            
except:
    # Print the error
    print("Unexpected error:", sys.exc_info()) 

#### Splitting dataframe into sub-dataframes
* Creating new dataframes for each city according to the location condition
* Saving the dataframe as json locally 

In [43]:
df_london = df.filter(lower(df['location']).contains('london'))
df_dublin = df.filter(lower(df['location']).contains('dublin'))
df_belfast = df.filter(lower(df['location']).contains('belfast'))
df_manchester = df.filter(lower(df['location']).contains('manchester'))
df_liverpool = df.filter(lower(df['location']).contains('liverpool'))
df_miami = df.filter(lower(df['location']).contains('miami'))
df_los_angeles = df.filter(lower(df['location']).contains('los angeles'))
df_dallas = df.filter(lower(df['location']).contains('dallas'))

dfs = [df_london, df_dublin, df_belfast, df_manchester, df_liverpool, df_miami, df_los_angeles, df_dallas]
cities = ["London","Dublin","Belfast","Manchester","Liverpool","Miami","LA","Dallas"]

for i, my_df in enumerate(dfs):
    folder_name = "tweets_city_" + cities[i]
    file_path = "./results_tweets/" + folder_name
    my_df.coalesce(1).write.mode('overwrite').save(file_path, format="json")


In [44]:
spark.stop() 