# Exploring data

Importing libraries, starting session and reading file

In [1]:
### Importing necessary libraries

#General
import time
from datetime import datetime
import pandas as pd
import numpy as np
import json
import re

#Pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql import functions as F
from pyspark.sql.types import StringType, TimestampType, FloatType

#Matplot
import matplotlib.pyplot as plt
%matplotlib inline

#Wordcloud
from wordcloud import WordCloud, STOPWORDS

#VADER
from nltk.sentiment.vader import SentimentIntensityAnalyzer

#Preprocessing data
from sklearn.preprocessing import StandardScaler,MinMaxScaler
from statsmodels.tsa.seasonal import seasonal_decompose
from statsmodels.graphics.correlation import plot_corr, plot_corr_grid
from statsmodels.tsa.stattools import acf, pacf
from statsmodels.graphics.tsaplots import plot_acf, plot_pacf
from statsmodels.tsa.stattools import adfuller
from statsmodels.tsa.stattools import kpss

In [None]:
## Startig Spark session
spark = SparkSession.builder \
    .master("local") \
    .appName("Greta") \
    .config("spark.sql.debug.maxToStringFields", 500) \
    .getOrCreate()

## Increasing available memory for Spark
spark.conf.set("spark.sql.legacy.setCommandRejectsSparkCoreConfs","false")
spark.conf.set("spark.executor.memory","4g")
spark.conf.set("spark.driver.memory","4g")
spark.conf.set("spark.driver.maxResultSize","4g")

In [None]:
## Read JSON file into dataframe
df = spark.read.json("hdfs://localhost:9000/ca2/Greta/greta.ndjson")

In [None]:
## Printing schema and showing
df.printSchema()
df.show()

In [None]:
## Counting numer of rows (tweets)
df.count()

In [None]:
## Printing 1st axis columns
for col in df.columns:
    print(col)

For univariate analysis timestamp and text of tweets are extracted.

In [None]:
## Selecting only necessary columns and displaying for review
df.select("created_at","full_text").show(10, truncate=True)

In [None]:
## Displaying full text for additional review
df.select("created_at","full_text").show(1, truncate=False)

# Cleaning data

Fixing timestamps, cleaning text, reducing data for further analysis

In [None]:
## Creating new df from selected columns
df_work = df.select("created_at","full_text")

In [None]:
## Creating function for cleaning texts
import re
def clean_text(text):
    text = text.lower()
    text = re.sub(r'greta', ' ', text)
    text = re.sub(r'thunberg', ' ', text)
    text = re.sub(r'@[a-zA-Z0-9_]+', ' ', text)   
    text = re.sub(r'https?://[A-Za-z0-9./]+', ' ', text)   
    text = re.sub(r'www.[^ ]+', '', text)  
    text = re.sub(r'[a-zA-Z0-9]*www[a-zA-Z0-9]*com[a-zA-Z0-9]*', ' ', text)  
    text = re.sub(r'[^a-zA-Z]', ' ', text)
    text = re.sub(' +', ' ',text)
    text = [token for token in text.split() if len(token) > 2]
    text = [token for token in text if token]
    text = ' '.join(text)
    return text

In [None]:
## Creating UDF with function for cleaning text to be applied on the column
cleanUDF = udf(lambda x:clean_text(x),StringType())

In [None]:
## Applying cleanUDF on new column
df_work = df_work.withColumn('Text', cleanUDF(F.col('full_text')))

In [None]:
## Creating function for reshaping timestamp
def createTimestamp(created_at):
    newTimestamp = datetime.strftime(datetime.strptime
                                     (created_at,
                                      '%a %b %d %H:%M:%S +0000 %Y'),
                                    '%Y-%m-%d %H:%M:%S')
    return newTimestamp

In [None]:
## Creating UDF with function for reshaping timestamp to be applied on the column
timestampUDF = udf(lambda x:createTimestamp(x),StringType())

In [None]:
## Applying timestampUDF new on column
df_work = df_work.withColumn('Timestamp', timestampUDF(F.col('created_at')))

In [None]:
## dropping old columns
df_work = df_work.drop('created_at')
df_work = df_work.drop('full_text')

In [None]:
## After cleaning text, remove all rows without alphabetic characters
df_work = df_work.filter(F.col('Text').rlike('[a-zA-Z]'))

In [None]:
## Display dataset for inspection
df_work.show()

Using RDD for creating index column and returning to dataframe

In [None]:
## Converting dataframe to RDD with additional rowID (index) column
from pyspark.sql.types import LongType, StructField, StructType

new_schema = StructType([StructField('rowId',LongType(),True)]
                        + df_work.schema.fields)
zip_rdd = df_work.rdd.zipWithIndex()

In [None]:
## Create map for new RDD
new_rdd = zip_rdd.map(lambda args: ([args[1]+1] + list(args[0])))

In [None]:
## Rewriting df with new data from RDD
df_work = spark.createDataFrame(new_rdd,new_schema)

In [None]:
## Inspecting dataset
df_work.show(5)

In [None]:
## Extract every 10th row because HW doesn't support this number of rows
df_work = df_work.where(df_work.rowId%10==0)

In [None]:
## Getting number of rows
df_work.count()

# Sentiment analysis

Creating Word clound to inspect top 50 words used. Then extracting compound polarity (sentiment) from tweets.

In [None]:
## Reading text into variable
blob = df_work.select('Text').toPandas()

In [None]:
# Passing text variable to library
wc = WordCloud(background_color='white',
                    stopwords =  set(STOPWORDS),
                    max_words = 50, 
                    random_state = 42,)
wc.generate(' '.join(blob['Text']))

In [None]:
## Creating word cloud image
plt.figure(figsize=(10,10))
plt.imshow(wc)

In [None]:
## Calling VADER function for sentimental analysis
analyser = SentimentIntensityAnalyzer()

In [None]:
# Creating function to extract compound sentiment from tweets
def polarity(text):
    text = analyser.polarity_scores(text)['compound']
    return text

In [None]:
# Using sentiment function over UDF
polarityUDF = udf(lambda x:polarity(x),FloatType())

In [None]:
# Applying UDF on tweet text column and storing result in new polarity column
df_work = df_work.withColumn('polarity', polarityUDF(F.col('Text')))

In [None]:
# Inspecting result of sentiment
df_work.show(5)

In [None]:
# Converting dataframe to pandas for extensive analysis
df_pd = df_work.toPandas()

In [None]:
# Checking number of rows to confirm dataset conversion passed
df_pd.count()

In [None]:
# Checking info for column names and types
df_pd.info()

In [None]:
# Checking empty rows with empty text just in case
df_pd['Text'].isna().sum()

In [None]:
# Rearranging columns
df_pd = df_pd.reindex(columns=['rowId', 'Timestamp', 'Text', 'polarity'])

In [None]:
# Checking dataset for column names
df_pd.head(20)

In [None]:
# checking number of neutral sentiment rows
df_pd.query('polarity == 0.0000').count()

Final dataset will have one row per day. Current dataset has more than one tweet per day. To reduce dataset, we will take average of sentiment in one day. Before this calculation neutral sentiment rows will be dropped.

In [None]:
# Inspecting unique timestamps
df_pd['Timestamp'].unique()

In [None]:
# Slicing timestamp for only YYYY-MM-DD because we only need days
df_pd['Timestamp'] = df_pd['Timestamp'].str.slice(stop=10)

In [None]:
# Inspecting dataset
df_pd

In [None]:
# Creating new df that will be used for modeling and predictions
df_model = pd.DataFrame(columns=['Timestamp','polarity'])

In [None]:
# Creating function for aggregating non neutral sentiment rows and dividing by number of rows to get average compound sentiment per day
a=0
b=0
c=0
for i in df_pd['Timestamp'].unique():
    a = df_pd['polarity'][(df_pd['polarity'] != 0.0000) &
                          (df_pd['Timestamp'] == i)].sum()
    b = len(df_pd[(df_pd['polarity'] != 0.0000) &
                  (df_pd['Timestamp'] == i)])
    c=a/b
    new_row = pd.DataFrame({'Timestamp':i,'polarity':c},index=[0])
    df_model = pd.concat([new_row,df_model.loc[:]]).reset_index(drop=True)

In [None]:
# Resetting index of dataset
df_model = df_model[::-1].reset_index(drop=True)

In [None]:
# Inspecting dataset
df_model

In [None]:
# Converting timestamp from string to pandas datetime type
df_model['Timestamp'] = pd.to_datetime(df_model['Timestamp'],format='%Y/%m/%d')

In [None]:
# Moving timestamp to index of df
df_model = df_model.set_index('Timestamp')

In [None]:
# Removing name of the index column
df_model.index.name = None

In [None]:
# Inspecting dataset
df_model

In [None]:
# Sentiment is originally [-1,1]. For modeling scaling the data to [0,1]
scaler = MinMaxScaler()
scaled_data = scaler.fit(df_model)
df_model['polarity'] = scaler.transform(df_model)

In [None]:
# Inspecting
df_model

In [None]:
# Plotting sentiment data
plt.ylabel('polarity')
plt.xlabel('Date')
plt.xticks(rotation=45)
plt.plot(df_model.index, df_model['polarity'], )

In [None]:
# Decomposing sentiment
result = seasonal_decompose(df_model['polarity'], model='additive')
pyplot.rc("figure",figsize=(10,8))
result.plot()
plt.show()

In [None]:
# Correlation plot
plot_corr(df_model)
plt.show()

In [None]:
# ACF plot
plot_acf(df_model['polarity'])
plt.show()

In [None]:
# PACF plot
plot_pacf(df_model['polarity'])
plt.show()

The ADF test produces a test statistic that measures the strength of evidence against the null hypothesis of non-stationarity. The test statistic is compared against critical values to determine whether the null hypothesis can be rejected or not. If the test statistic is lower than the critical values, it suggests that the time series is stationary and does not require differencing. Conversely, if the test statistic is higher than the critical values, it indicates that the series is non-stationary.

In [None]:
# Augmented Dickey-Fuller Test function
def adf_test(series,title=''):
    """
    Pass in a time series and an optional title, returns an ADF report
    """
    print(f'Augmented Dickey-Fuller Test: {title}')
    result = adfuller(series,autolag='AIC')
    
    labels = ['ADF test statistic','p-value','# lags used','# observations']
    out = pd.Series(result[0:4],index=labels)
    for key,val in result[4].items():
        out[f'critical value ({key})']=val
        
    print(out.to_string())
    
    if result[1] <= 0.05:
        print("Strong evidence against the null hypothesis")
        print("Reject the null hypothesis")
        print("Data has no unit root and is stationary")
    else:
        print("Weak evidence against the null hypothesis")
        print("Fail to reject the null hypothesis")
        print("Data has a unit root and is non-stationary")

In [None]:
# Execute ADF test
adf_test(df_model['polarity'],title='Sentiment')

In [None]:
# The Kwiatkowski-Phillips-Schmidt-Shin Test
result = kpss(df_model['polarity'])
print(result)
print('KPSS Test Statistics: %.2f' % result[0])
print('1%% Critical Value: %.2f' % result[3]['1%'])
print('5%% Critical Value: %.2f' % result[3]['5%'])
print('10%% Critical Value: %.2f' % result[3]['10%'])
print('p-value: %.2f' % result[1])

# Modeling and prediction