## CA 2 - Big Data & Advanced Analytics

### Using Pyspark for data exploration

In [None]:
# clear the cache on the spark session
spark.catalog.clearCache()

In [None]:
# what version of pyspark is running on the computer using SparkContext
sc

In [None]:
# sc master - running locally
sc.master

In [None]:
# Import regex module
import re
from operator import add

# Import Pyspark
import pyspark
from pyspark.sql import SparkSession

# Other Libraries 
import findspark
findspark.init()

from pyspark.sql.types import StructType, StructField, IntegerType, StringType

from pyspark.ml.feature import MinMaxScaler
from pyspark.ml.feature import VectorAssembler
from pyspark.sql.functions import *

#### Import File from the Hadoop Directory

<b>References</b>
1. https://sparkbyexamples.com/pyspark/pyspark-read-csv-file-into-dataframe/
2. https://medium.com/@ashutoshkumar2048/spark-connect-apache-spark-3-4-9846c40484d0
3. 

In [None]:
# start the spark session for CA2

spark = SparkSession.builder.appName("ca2_V2").getOrCreate()

In [None]:
# Create a structure to hold the data, name and define data types
schema = StructType([
    StructField("target", IntegerType(), True),
    StructField("id", StringType(), True),
    StructField("date", StringType(), True),
    StructField("query", StringType(), True),
    StructField("author", StringType(), True),
    StructField("tweet", StringType(), True),
    StructField("processed_tweet", StringType(), True)])


In [None]:
# Read input file from hadoop directory on the local drive
# import the csv file from hadoop
path = "/user1/twitter_DS_1yr.csv"

df = spark.read.csv(path, header=False, inferSchema=True, schema=schema)


In [None]:
# convert 'date' to a date type

df.select(col('date'), to_date(col('date'), 'MM-DD').alias('date_split'))

#### Exploring the dataframe

In [None]:
# Get the number of rows
num_rows = df.count()

# Get the number of columns
num_columns = len(df.columns)

# Print the shape
print("Number of rows: ", num_rows)
print("Number of columns: ", num_columns)

In [None]:
# look at types of values in the polarity

print(f"There is {df[df['target']==4].count()} positive values in the dataframe.")

print(f"There is {df[df['target']==0].count()} negative values in the dataframe.")

In [None]:
# drop duplicate entries

df = df.dropDuplicates()

In [None]:
# drop any n/a rows

df.dropna()

#### Feature Engineering

In [None]:
# import relevant libaries

from pyspark.sql.functions import udf, regexp_replace, lower
from pyspark.sql.types import StringType


In [None]:
# remove noise such as html links, stop words / punctuation / #hashtags etc
# REFERENCE: https://medium.com/towards-artificial-intelligence/large-scale-sentiment-analysis-with-pyspark-bdccf9256e35

def pre_process(text):
    # Remove links
    #text = re.sub('http://\S+|https://\S+', '', text)
    #text = re.sub('http[s]?://\S+', '', text)
    text = re.sub(r'http\S+', '', text)

    # Convert HTML references
    text = re.sub(r'&amp', 'and', text)
    text = re.sub(r'&lt', '<', text)
    text = re.sub(r'&gt', '>', text)
    #text = re.sub(' ', text)

    # Remove new line characters
    text = re.sub(r'[\r\n]+', ' ', text)
    
    # Remove mentions
    text = re.sub(r'@\w+', '', text)
    
    # Remove hashtags
    text = re.sub(r'#\w+', '', text)

    # Remove multiple space characters
    text = re.sub(r'\s+',' ', text)
    
    # Convert all text to lowercase
    text = text.lower()
    
    return text

In [None]:
# Register the pre_process function as a UDF (User-Defined Function)

pre_process_udf = udf(pre_process, StringType())

In [None]:
# Apply the UDF to the 'Tweet_details' column and create a new column 'Processed_tweet_details'

df = df.withColumn('processed_tweet', pre_process_udf('tweet'))

In [None]:
# drop duplicate entries

df = df.dropDuplicates()

In [None]:
# Show the DataFrame with the new column

#df.show(10)

#### Feature Extraction

<b>References</b>
1. https://medium.com/towards-artificial-intelligence/large-scale-sentiment-analysis-with-pyspark-bdccf9256e35

2. https://medium.com/@chris_42047/an-easy-guide-to-basic-twitter-sentiment-analysis-python-tutorial-1630d5213ff6

3. https://www.kaggle.com/code/muhammetzahitaydn/pyspark-sentiment-analysis-with-word2vec-embedding


In [None]:
# Import the relevant libraries tocreate a pipleline

from pyspark.ml import Pipeline
from pyspark.ml.feature import HashingTF, IDF, Tokenizer, StringIndexer, CountVectorizer, NGram, VectorAssembler, ChiSqSelector



In [None]:
# create a tokenizer 

tokenizer = Tokenizer(inputCol="processed_tweet", outputCol="words")

In [None]:
# HashingTF: Hashing Term Frequency
# REFERENCE - https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.ml.feature.HashingTF.html#pyspark.ml.feature.HashingTF
# Maps a sequence of terms to their term frequencies using the hashing

hashtf = HashingTF(inputCol="words", outputCol='tf')

In [None]:
# pass the hashtf function to the IDF function
# REFERENCE - https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.ml.feature.IDF.html#pyspark.ml.feature.IDF
# Compute the Inverse Document Frequency (IDF) given a collection of documents ie the tweets df

idf = IDF(inputCol='tf', outputCol="features")

In [None]:
# Index labels

label_stringIdx = StringIndexer(inputCol = "target", outputCol = "label")

In [None]:
from pyspark.ml.classification import LogisticRegression

lr = LogisticRegression()

pipeline = Pipeline(stages=[tokenizer, hashtf, idf, label_stringIdx, lr])

#### Filter data based on key word - 'weather'

##### REFERENCES 
1. https://towardsdatascience.com/sentiment-analysis-with-pyspark-bc8e83f80c35
2. https://towardsdatascience.com/my-absolute-go-to-for-sentiment-analysis-textblob-3ac3a11d524

In [None]:
# create a variable to search within the dataframe on

search_for = "weather"


In [None]:
df2 = df.filter(df["processed_tweet"].contains(search_for))

In [None]:
df2.show(10)

In [None]:
# remove unnecessary columns 

df2.drop('query','author', 'tweet')

In [None]:
# Get the number of rows in the new df
num_rows = df2.count()

# Get the number of columns
num_columns = len(df2.columns)



In [None]:
# Print the shape
print("Number of rows: ", num_rows)
print("Number of columns: ", num_columns)

### **************************************************************



#### Sentiment extraction using TextBlob

##### REFERENCES

1. https://towardsdatascience.com/my-absolute-go-to-for-sentiment-analysis-textblob-3ac3a11d524
2. https://towardsdatascience.com/sentiment-analysis-with-pyspark-bc8e83f80c35
3. https://towardsdatascience.com/my-absolute-go-to-for-sentiment-analysis-textblob-3ac3a11d524

In [None]:
from pyspark.sql import SparkSession
from textblob import TextBlob


In [None]:
# Define a UDF to apply sentiment analysis using TextBlob

def get_sentiment(processed_tweet):
    analysis = TextBlob(processed_tweet)
    
    sentiment = analysis.sentiment.polarity

    return sentiment



In [None]:

# Register the UDF
get_sentiment_udf = spark.udf.register("get_sentiment", get_sentiment)

In [None]:
# Apply the UDF to the 'tweet' column and create a new column 'sentiment'
df2 = df2.withColumn('sentiment', get_sentiment_udf('processed_tweet'))


In [None]:

# Show the DataFrame with the 'processed_tweet' and 'sentiment' columns
df2.select('processed_tweet', 'sentiment').show(truncate=False)

#### Save Weather Data to a new Dataframe

In [None]:
# REFERENCE - https://sparkbyexamples.com/pyspark/pyspark-write-dataframe-to-csv-file/

#rename the dataframe

weather_tweets2 = df2

In [None]:
# save the file to a csv file

#weather_tweets.write.csv("hdfs://localhost:9000/user1/weather_tweets2")

# commented out as file already exists on Hadop

### Time Series

##### References

1. https://towardsdatascience.com/end-to-end-time-series-interpolation-in-pyspark-filling-the-gap-5ccefc6b7fc9
2. https://medium.com/@y.s.yoon/scalable-time-series-forecasting-in-spark-prophet-cnn-lstm-and-sarima-a5306153711e
3. https://medium.com/delaware-pro/interpolate-big-data-time-series-in-native-pyspark-d270d4b592a1
4. https://www.kaggle.com/code/qingyi/time-series-data-analysis-with-spark
5. https://www.analyticsvidhya.com/blog/2022/01/apache-spark-and-facebook-prophet/
6. 
****
Fastai is a deep learning library that was used to complete the analsysis.  This was the selected algorithm as it can be used with pyspark.  Finally the library can work with tabular and text data so can be used to complete a time series and a sentiment analysis. 

In [None]:
from pyspark.sql.functions import pandas_udf, PandasUDFType
from pyspark.sql import SparkSession
from statsmodels.tsa.arima.model import ARIMA
import pandas as pd

from pyspark.sql.functions import col
from pyspark.sql import functions as F

In [None]:
# split date column by characters based on year / month / day
weather_tweets2  = weather_tweets2.withColumn('Day', F.substring('date', 1, 4))
weather_tweets2  = weather_tweets2.withColumn('Month', F.substring('date', 5, 3))
weather_tweets2  = weather_tweets2.withColumn('Month_date', F.substring('date', 8, 2))

# split date column by characters based on hour / minute /second
#weather_tweets2  = weather_tweets2.withColumn('hour', F.substring('date', 10, 2))
#weather_tweets2  = weather_tweets2.withColumn('minute', F.substring('date', 12, 2))
#weather_tweets2  = weather_tweets2.withColumn('second', F.substring('date', 14, 2))

In [None]:
# convert weather_tweets 2 from a spark df to a Pandas df for processing
pdf = weather_tweets2.toPandas()

In [None]:
#pdf2 = pdf.drop(['id', 'query', 'author', 'tweet'], axis =1)

pdf2 = pdf.drop(['id', 'query', 'author', 'tweet', 'Month', 'Month_date', 'Day'], axis =1)

In [None]:
# remove 'PDT' from pdf2 

pdf2['date'] = pdf2['date'].astype(str).str.replace(' PDT', '')

In [None]:
pdf2['date'] = pd.to_datetime(pdf2['date'].str.replace(' PDT', ''), errors='coerce', utc=True)

In [None]:
# put the data in chronological order by date

pdf2 = pdf2.sort_values('date')

In [None]:
pdf2.dtypes

In [None]:
# set the index to the pdf2 based on the date column

pdf2.set_index('date', inplace=True)

In [None]:
#pdf.info()

pdf2.head()

In [None]:
# Define a pandas UDF to apply ARIMA model

@pandas_udf('double', PandasUDFType.SCALAR)
def forecast_arima(time_series: pd.Series) -> pd.Series:
    
    model = ARIMA(time_series, order=(5,1,0))
    model_fit = model.fit()
   
    #forecast = model_fit.forecast(steps=1)[0]
    forecast_output = model_fit.forecast(steps=1)
    print(forecast_output)
    forecast = forecast_output[0]

    
    return pd.Series(forecast)


In [None]:
# Apply the UDF to the 'target' column, not 'date'

df = df.withColumn('forecast', forecast_arima(col('target')))

In [None]:
df.show()

In [None]:
# Import the relevant libraries tocreate a pipleline

#from pyspark.ml import Pipeline
#from pyspark.ml.feature import HashingTF, IDF, Tokenizer, StringIndexer, CountVectorizer, NGram, VectorAssembler, ChiSqSelector



In [None]:
# create a tokenizer 

#tokenizer = Tokenizer(inputCol="processed_tweet", outputCol="words")

In [None]:
# HashingTF: Hashing Term Frequency
# REFERENCE - https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.ml.feature.HashingTF.html#pyspark.ml.feature.HashingTF
# Maps a sequence of terms to their term frequencies using the hashing

#hashtf = HashingTF(inputCol="words", outputCol='tf')

In [None]:
# pass the hashtf function to the IDF function
# REFERENCE - https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.ml.feature.IDF.html#pyspark.ml.feature.IDF
# Compute the Inverse Document Frequency (IDF) given a collection of documents ie the tweets df

#idf = IDF(inputCol='tf', outputCol="features")

In [None]:
# Index labels

#label_stringIdx = StringIndexer(inputCol = "target", outputCol = "label")

In [None]:
#from pyspark.ml.classification import LogisticRegression

#lr = LogisticRegression()

#pipeline = Pipeline(stages=[tokenizer, hashtf, idf, label_stringIdx, lr])