In [1]:
import numpy as np
import pandas as pd
import os
import csv
import seaborn as sns
import json
from decimal import Decimal
import nltk
#nltk.download()
from nltk.corpus import stopwords
import string
from pyspark.sql.functions import udf
from pyspark.sql.types import *
from afinn import Afinn
# from nltk.sentiment.vader import SentimentIntensityAnalyzer
# analyser = SentimentIntensityAnalyzer()
afinn=Afinn()
from sklearn.feature_extraction.text import CountVectorizer
from sklearn.feature_extraction.text import TfidfTransformer
from sklearn.naive_bayes import MultinomialNB
from sklearn.ensemble import RandomForestClassifier
from sklearn.pipeline import Pipeline

from sklearn.cross_validation import train_test_split
from sklearn.metrics import classification_report

In [2]:
#Defining funtions to clean the raw text, calculate the sentiment score and categorize the sentiment into positive and negative

In [3]:
def clean_text(normalized_text):
    nopunc = [char for char in normalized_text if char not in string.punctuation]
    nopunc = ''.join(nopunc)
    return [word for word in nopunc.split() if word.lower() not in stopwords.words('english')]

In [4]:
def text_to_sentiment_score(text):
   return afinn.score(text)

In [5]:
def sentiment_score_to_category(score):
    if(score>0):
        return 'positive'
        #display('p')
    if(score<0):
        return 'negative'
    if(score==0):
        return 'neutral'

In [6]:
def convert_to_int(value):
  return value

In [7]:
#convert to supported Datatypes

In [8]:
maxUdf=udf(text_to_sentiment_score, FloatType())
maxUdf1=udf(sentiment_score_to_category, StringType())
integerUdf=udf(convert_to_int, IntegerType())
#cleanUdf=udf(clean_text,StringType())

In [9]:
# The dataset contains 300+ episodes data but for analysis purpose, I am using 4 epsiodes for streaming of dialogues spoken in the episode.
# Below code will pick up 1 csv file(1 csv has one episode dialouges) and stream the data and the sentiments will be calculated by using the functions defined above

In [10]:
inputPath = "/FileStore/tables/"

df_episode = sqlContext.read.format('csv').options(header='true', inferSchema='true').load('/FileStore/tables/')


# Similar to definition of staticInputDF above, just using `readStream` instead of `read`
streamingInputDF = (
  spark
    .readStream                       
    .schema(df_episode.schema)               # Set the schema of the JSON data
    .option("maxFilesPerTrigger", 1)  # Treat a sequence of files as a stream by picking one file at a time
    .csv(inputPath)
)
streamingInputDF=streamingInputDF.filter("episode_id is not NULL")
streamingInputDF=streamingInputDF.filter("normalized_text is not NULL")



new_df_episode = streamingInputDF.withColumn("sentiment_score", maxUdf('normalized_text'))

new_df_episode_sentiment = new_df_episode.withColumn("sentiment", maxUdf1('sentiment_score'))

display(new_df_episode_sentiment)
# Same query as staticInputDF
streamingCountsDF = (                 
  new_df_episode_sentiment
    .groupBy(
      new_df_episode_sentiment.raw_character_text,
      new_df_episode_sentiment.sentiment)
      .count()
)

# Is this DF actually a streaming DF?
streamingCountsDF.isStreaming

In [11]:
spark.conf.set("spark.sql.shuffle.partitions", "3")  # keep the size of shuffles small

query = (
  streamingCountsDF
    .writeStream
    .format("memory")        # memory = store in-memory table (for testing only in Spark 2.0)
    .queryName("counts")     # counts = name of the in-memory table
    .outputMode("complete")  # complete = all the counts should be in the table
    .start()
)

In [12]:
#The data is now processed and sentiment is calculated as positive ,negative or neutral.
#The shuffle.partitions =3 specifies that the data has to be partitioned into 3 categories- postive, negative and neutral
#the data can be visualized using the counts query
#The above code processes the streaming data into batches and processes it.

In [13]:
from time import sleep
sleep(5) 

In [14]:
%sql select raw_character_text, sentiment, count from counts 
where raw_character_text 
in ('Homer Simpson', 'Bart Simpson', 'Lisa Simpson', 'Marge Simpson', 'Grampa Simpson')
order by raw_character_text, sentiment

In [15]:
#The pivot table above shows the number of +ve, -ve and neutral dialogues spoken by Simpsons Family.

In [16]:
%sql select raw_character_text, sentiment, count from counts 
where raw_character_text 
in ('Homer Simpson', 'Bart Simpson', 'Lisa Simpson', 'Marge Simpson', 'Grampa Simpson')
order by raw_character_text, sentiment

In [17]:
#Stacked Bar graph visualizations of the episodes

In [18]:
inputPath = "/FileStore/tables/"

df_episode = sqlContext.read.format('csv').options(header='true', inferSchema='true').load('/FileStore/tables/')


# Similar to definition of staticInputDF above, just using `readStream` instead of `read`
streamingInputDF = (
  spark
    .readStream                       
    .schema(df_episode.schema)               # Set the schema of the JSON data
    .option("maxFilesPerTrigger", 1)  # Treat a sequence of files as a stream by picking one file at a time
    .csv(inputPath)
)
streamingInputDF=streamingInputDF.filter("episode_id is not NULL")
streamingInputDF=streamingInputDF.filter("normalized_text is not NULL")


new_df_episode = streamingInputDF.withColumn("sentiment_score", maxUdf('normalized_text'))

new_df_episode_sentiment = new_df_episode.withColumn("sentiment", maxUdf1('sentiment_score'))
display(new_df_episode_sentiment)
# Same query as staticInputDF
streamingCountsDF = (                 
  new_df_episode_sentiment
    .groupBy(
      new_df_episode_sentiment.episode_id,
      new_df_episode_sentiment.sentiment)
      .count()
)

# Is this DF actually a streaming DF?
streamingCountsDF.isStreaming

In [19]:
spark.conf.set("spark.sql.shuffle.partitions", "3")  # keep the size of shuffles small

query = (
  streamingCountsDF
    .writeStream
    .format("memory")        # memory = store in-memory table (for testing only in Spark 2.0)
    .queryName("counts1")     # counts = name of the in-memory table
    .outputMode("complete")  # complete = all the counts should be in the table
    .start()
)

In [20]:
from time import sleep
sleep(5) 

In [21]:
inputPath = "/FileStore/tables/"

df_episode = sqlContext.read.format('csv').options(header='true', inferSchema='true').load('/FileStore/tables/')


# Similar to definition of staticInputDF above, just using `readStream` instead of `read`
streamingInputDF = (
  spark
    .readStream                       
    .schema(df_episode.schema)               # Set the schema of the JSON data
    .option("maxFilesPerTrigger", 1)  # Treat a sequence of files as a stream by picking one file at a time
    .csv(inputPath)
)
streamingInputDF=streamingInputDF.filter("episode_id is not NULL")
streamingInputDF=streamingInputDF.filter("normalized_text is not NULL")


new_df_episode = streamingInputDF.withColumn("sentiment_score", maxUdf('normalized_text'))

new_df_episode_sentiment = new_df_episode.withColumn("sentiment", maxUdf1('sentiment_score'))
display(new_df_episode_sentiment)
# Same query as staticInputDF
streamingCountsDF = (                 
  new_df_episode_sentiment
    .groupBy(
      new_df_episode_sentiment.episode_id,
      new_df_episode_sentiment.sentiment_score,
      new_df_episode_sentiment.sentiment)
      .count()
)

# Is this DF actually a streaming DF?
streamingCountsDF.isStreaming

In [22]:
spark.conf.set("spark.sql.shuffle.partitions", "3")  # keep the size of shuffles small

query = (
  streamingCountsDF
    .writeStream
    .format("memory")        # memory = store in-memory table (for testing only in Spark 2.0)
    .queryName("counts2")     # counts = name of the in-memory table
    .outputMode("complete")  # complete = all the counts should be in the table
    .start()
)

In [23]:
#Connclusion: A basic stuctured streaming analysis for the simpsons dataset is done.
#We were able to understand the trends for the varying sentiments while episode is streaming and over the episodes.