In [0]:
#running for first time on a new cluster
# !pip install nltk

In [0]:
# import nltk
# nltk.download("stopwords")
# nltk.download('wordnet')
# nltk.download('omw-1.4')   #patch for error on wordnet lemmatizer

In [0]:
# kaggle dataset --- https://www.kaggle.com/datasets/kazanova/sentiment140

# File location and type
file_location = "/FileStore/tables/data.csv"
file_type = "csv"

# CSV options
infer_schema = "false"
first_row_is_header = "false"
delimiter = ","

# The applied options are for CSV files. For other file types, these will be ignored.
df = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(file_location)

df = df.repartition(4)

display(df)

In [0]:
# dropping timestamp, user_ids and usernames
df = df.drop("_c1","_c2","_c3","_c4").toDF("label","tweet")
df.display()

In [0]:
df.printSchema()

In [0]:
# Create a view or table

temp_table_name = "data_csv"

df.createOrReplaceTempView(temp_table_name)

In [0]:
%sql

/* Query the created temp table in a SQL cell */

select * from `data_csv`

In [0]:
%sql

select count("label") as label from `data_csv` group by label

In [0]:
"""
    -ve tweet is intially given 4 in label column, it is replaced with 1 for simpler understanding and esay evaluation
"""

from pyspark.sql.functions import * 
df = df.withColumn("label",when(df["label"]=="4","1").otherwise("0"))
df = df.withColumn("label",df.label.cast("integer"))

In [0]:
# CUSTOM TRANSFORMER
import nltk
import string,re
from nltk.corpus import stopwords
from nltk.tokenize import RegexpTokenizer
from nltk import WordNetLemmatizer
from pyspark.ml.feature import HashingTF, IDF
from pyspark.ml import Transformer

class PreprocessingTweetTransformer(Transformer):
    """
        The tweet is preprocessed by several methods which include converting to lower-case, removing stopwords, removing puntuations,
        removing repeating characters, removing URLs syntax, removal of numbers, seperating to tokens, stemming, lemmatization and
        TFIDF Vectorization.
    """
    def __init__(self):
        super(PreprocessingTweetTransformer,self).__init__()
        self.STOPWORDS = stopwords.words('english')
        self.english_punctuations = string.punctuation
        self.punctuations_list = self.english_punctuations
        self.tokenizer = RegexpTokenizer(pattern = "\s+",gaps=True)
        self.st = nltk.PorterStemmer()
        self.lm = WordNetLemmatizer()
        
    def _transform(self, df:DataFrame) -> DataFrame:
        """ 
            Input: DataFrame
            Output: DataFrame
            Description: Tranforms the given dataframe to required dataframe using several methods mentioned below.
        """
    
        lowercased_df = df.rdd.map(lambda row:[row.label, self.to_lower(row.tweet)]).toDF()
        removed_stopwords_df = lowercased_df.rdd.map(lambda row:[row._1, self.removing_stopwords(row._2)]).toDF()
        cleaned_punctuations_df = removed_stopwords_df.rdd.map(lambda row:[row._1, self.cleaning_punctuations(row._2)]).toDF()
        cleaned_repeating_char_df = cleaned_punctuations_df.rdd.map(lambda row:[row._1, self.cleaning_repeating_char(row._2)]).toDF()
        cleaned_URL_df = cleaned_repeating_char_df.rdd.map(lambda row:[row._1, self.cleaning_URLs(row._2)]).toDF()
        cleaned_numbers_df = cleaned_URL_df.rdd.map(lambda row:[row._1, self.cleaning_numbers(row._2)]).toDF()
        tokenized_df = cleaned_numbers_df.rdd.map(lambda row:[row._1, self.tokenize(row._2)]).toDF()
        stemmed_df = tokenized_df.rdd.map(lambda row:[row._1, self.stemming(row._2)]).toDF()
        lemmatized_df = stemmed_df.rdd.map(lambda row:[row._1, self.lemmatizer(row._2)]).toDF()
        res_df = lemmatized_df.toDF("label","tweet")
        return res_df
    
    def to_lower(self, text:String) -> String:
        """ 
            Input: String
            Output: String
            Description: Converts the given string to lowered case string.
        """
        return text.lower()
    
    def removing_stopwords(self, text:String) -> String:
        """ 
            Input: String
            Output: String
            Description: Removes stop words such as at, a, an, the etc which donot add meaning to the given text.
        """
        return " ".join([word for word in str(text).split() if word not in self.STOPWORDS])
    
    def cleaning_punctuations(self,text:String) -> String:
        """ 
            Input: String
            Output: String
            Description: Removes the punctuations if present in the given text
        """
        translator = str.maketrans('', '', self.punctuations_list)
        return text.translate(translator)
    
    def cleaning_repeating_char(self, text:String) -> String:
        """ 
            Input: String
            Output: String
            Description: Removes repeating characters, such as aaaah is converted to ah.
        """
        return re.sub(r'(.)1+', r'1', text)

    def cleaning_URLs(self, text:String) -> String:
        """ 
            Input: String
            Output: String
            Description: Removes the https:// or http:// from the URL if present in the text.
        """
        return re.sub('((www.[^s]+)|(https?://[^s]+)|(http?://[^s]+))',' ',text)
    
    def cleaning_numbers(self,text:String) -> String:
        """ 
            Input: String
            Output: String
            Description: Removes the numerals if present in the text.
        """
        return re.sub('[0-9]+', '', text)
    
    def tokenize(self, text:String) -> List:
        """ 
            Input: String
            Output: List
            Description: Splits the text into smaller tokens.For example, "Hello World" is converted to ["Hello","World"]
        """
        return self.tokenizer.tokenize(text)
    
    def stemming(self, data:List) -> List:
        """ 
            Input: List
            Output: List
            Description: Converts every word to its root/base word if for a given word, it exists.
        """
        text = [self.st.stem(word) for word in data]
        return data
    
    def lemmatizer(self, data:List) -> List:
        """ 
            Input: List
            Output: List
            Description: Removes the punctuations if present in the given text
        """
        text = [self.lm.lemmatize(word) for word in data]
        return text

In [0]:
stages = []

transformer = PreprocessingTweetTransformer()
stages+=[transformer]

In [0]:
from pyspark.ml.feature import HashingTF, IDF

hashingTF = HashingTF(inputCol = "tweet",outputCol = "hashedTF_tweet",)
stages+=[hashingTF]
idf = IDF(inputCol = "hashedTF_tweet",outputCol = "idf_features")
stages+=[idf]

In [0]:
from pyspark.ml import Pipeline

preprocessing_pipeline = Pipeline(stages=stages)
model = preprocessing_pipeline.fit(df)
df_updated = model.transform(df)

In [0]:
train_df, test_df = df_updated.randomSplit([0.7,0.3],24)

In [0]:
from pyspark.ml.classification import LogisticRegression
lr = LogisticRegression(featuresCol = 'idf_features', labelCol = 'label', maxIter=10000)
lrModel = lr.fit(train_df)

In [0]:
import matplotlib.pyplot as plt
import numpy as np
beta = np.sort(lrModel.coefficients)
plt.plot(beta)
plt.ylabel('Beta Coefficients')
plt.show()

In [0]:
trainingSummary = lrModel.summary
roc = trainingSummary.roc.toPandas()
plt.plot(roc['FPR'],roc['TPR'])
plt.ylabel('False Positive Rate')
plt.xlabel('True Positive Rate')
plt.title('ROC Curve')
plt.show()
print('Training set areaUnderROC: ' + str(trainingSummary.areaUnderROC))

In [0]:
predictions = lrModel.transform(test_df)

In [0]:
predictions.select("label","prediction","probability").display()

In [0]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator
evaluator = BinaryClassificationEvaluator()
print('Test Area Under ROC', evaluator.evaluate(predictions))

In [0]:
print(lrModel.summary.accuracy)