In [1]:
#dbutils.fs.rm("FileStore/tables/train.csv")

In [2]:
#Imports
from pyspark.sql.types import StringType
from pyspark.sql.functions import lower
import re
from pyspark.ml.feature import Tokenizer, RegexTokenizer
from pyspark.sql.functions import col, udf
from pyspark.sql.types import IntegerType
from pyspark.sql.types import ArrayType
from pyspark.ml.feature import StopWordsRemover
from pyspark.ml.feature import Word2Vec
from pyspark.ml import Pipeline
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.feature import IndexToString, StringIndexer, VectorIndexer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator


In [3]:
#Read the data
sentimentdf = spark.sql("SELECT * FROM train_csv")
display(sentimentdf)

In [4]:
#Data cleaning
def text_cleaning(text):
    text = text.lower()
    text_content = text.split()    
    word_list = ""
    for i in text_content:
        x = 0
        digits = re.findall('[0-9]', i)
        underscore = re.findall('_', i)
        if (('http' not in i) and ('@' not in i) and ('<.*?>' not in i) and (len(digits) < 1) and (len(underscore) < 1)):
            j = re.sub(r'[^\w\s]','',i)
            word_list += j + " "
        
    return word_list 

def data_cleaning(sentimentdf, SentimentText1):
  review_udf = udf(text_cleaning,StringType())
  df = sentimentdf.withColumn("CleanSentimentText",review_udf(sentimentdf[SentimentText1]))
  return df  
  
clean_df = data_cleaning(sentimentdf, "SentimentText")
clean_df.show(2)

In [5]:
#Tokenize
def tokenize(df1, CleanSentimentText1):
  tokenizer = Tokenizer(inputCol=CleanSentimentText1, outputCol="tokens1")
  tokenized = tokenizer.transform(df1)
  return tokenized

tokenized_df = tokenize(clean_df, "CleanSentimentText")
display(tokenized_df)

In [6]:
#Remove space
def clean_string(text1):
  wordGrabber = [] 
  for element in text1:
    if (element != ""):
      wordGrabber.append(element)
  return wordGrabber
  

def remove_space(sentimentdf1, SentimentText2):
  token_udf = udf(clean_string,ArrayType(StringType()))
  token_df = sentimentdf1.withColumn("CleanTokens",token_udf(sentimentdf1[SentimentText2]))
  return token_df

clean_df1 = remove_space(tokenized_df, "tokens1")
display(clean_df1)

In [7]:
#Removed stop words
#inp_array = df.withColumn(col("ItemDescription"), ArrayType(StringType()))
def stopwords_remover(df,tokens):
  remover = StopWordsRemover(inputCol=tokens, outputCol="Tweets")
  tweetsData = remover.transform(df)
  return tweetsData  

tweetsData = stopwords_remover(clean_df1, "CleanTokens")
display(tweetsData)

In [8]:
import pyspark.sql.functions as F
df = tweetsData.withColumn("size", F.size(F.col("Tweets")))
df_filtered = df.filter(F.col("size") >= 1)
display(df_filtered)

In [9]:
twitter_df = df_filtered.select("Tweets", "Sentiment")
display(twitter_df)

In [10]:
g = twitter_df.where(col("Sentiment").isNull())
g.count()

In [11]:
'''Renamed the column Price to Label'''
df1 = twitter_df.withColumn("label", twitter_df.Sentiment.cast(IntegerType()))

In [12]:
g = df1.where(col("Tweets").isNull())
g.count()

In [13]:
clean_df = df1.filter(df1.label.isNotNull())
display(clean_df)

In [14]:
# Learn a mapping from words to Vectors.
def vector_representation(df, tweets):
  word2Vec = Word2Vec(vectorSize=10, minCount=5, inputCol=tweets, outputCol="features")
  model = word2Vec.fit(df)
  twitter_datamodel = model.transform(df)
  return twitter_datamodel
  
vector_df = vector_representation(clean_df, "Tweets")
vector_df.show(2)

In [15]:
display(vector_df)

In [16]:
df2 = vector_df.filter(vector_df.label.isNotNull())
g = df2.where(col("label").isNull())
g.count()

In [17]:
final_twitterdf = vector_df.select("Tweets","features","label")
display(final_twitterdf)

In [18]:
#Divide the data into 70% and 30%
training, test = final_twitterdf.randomSplit(weights=[0.7, 0.3])
training.cache()
test.cache()

In [19]:
display(training)

In [20]:
#Predict using Random Forest Classifier and evaluation using Multiclass Classification Evaluator 
def predict(trainData, testData):
  # Train a RandomForest model.
  rf = RandomForestClassifier(labelCol="label", featuresCol="features", numTrees=10)

  # Train model.  This also runs the indexers.
  model = rf.fit(trainData)

  # Make predictions.
  predictions = model.transform(testData)
  return predictions
  
pred = predict(training, test)
# Select example rows to display.
display(pred)

# Select (prediction, true label) and compute test error
evaluator = MulticlassClassificationEvaluator(
  labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(pred)

In [21]:
predicted_model = pred.select("label","prediction","features")
display(predicted_model)

In [22]:
#Print the accuracy
print("Test Error = %g" % (1.0 - accuracy))

In [23]:
#Read and preprocess the streamed data from Twitter
streamedtdf = spark.sql("SELECT Tweet FROM streamtweets")
streamed_clean_df = data_cleaning(streamedtdf, "Tweet")
stream_tokenized_df = tokenize(streamed_clean_df, "CleanSentimentText")
stream_remove_space = remove_space(stream_tokenized_df,"tokens1")
streamTweetsData = stopwords_remover(stream_remove_space, "CleanTokens")
streamTwitter_df = streamTweetsData.select("Tweets")
streamVector_df = vector_representation(streamTwitter_df, "Tweets")
streamVector_df.show(2)

In [24]:
#Prediction on Twitter streamed data
streampred = predict(final_twitterdf, streamVector_df)
display(streampred)

In [25]:
#Display Results
finalresult = streampred.select("Tweets","prediction")
display(finalresult)