In [1]:
import re
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
import nltk
import findspark
findspark.init()
from nltk.corpus import stopwords
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, pandas_udf,col, lower, regexp_replace
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, ArrayType
from pyspark.ml.feature import CountVectorizer, StringIndexer, Tokenizer, StopWordsRemover
from pyspark.ml.classification import LogisticRegression, RandomForestClassifier, DecisionTreeClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.mllib.evaluation import MulticlassMetrics
from sklearn.metrics import confusion_matrix
from pyspark.ml import PipelineModel, Pipeline, Transformer
from pyspark.ml.param.shared import HasInputCol, HasOutputCol, Param
from pyspark.ml.util import DefaultParamsWritable, DefaultParamsReadable

nltk.download('stopwords')
nltk.download('punkt')

# Define English stopwords
stop_words = stopwords.words('english')

[nltk_data] Downloading package stopwords to
[nltk_data]     C:\Users\mvenk\AppData\Roaming\nltk_data...
[nltk_data]   Package stopwords is already up-to-date!
[nltk_data] Downloading package punkt to
[nltk_data]     C:\Users\mvenk\AppData\Roaming\nltk_data...
[nltk_data]   Package punkt is already up-to-date!


In [2]:
# Create a SparkSession
spark = SparkSession.builder \
    .appName("Text Classification with PySpark") \
    .getOrCreate()

In [4]:
data = spark.read.csv('twitter_training.csv', header=False, inferSchema=True)
validation = spark.read.csv('twitter_validation.csv', header=False, inferSchema=True)

In [6]:

# Define column names
columns = ['id', 'Company', 'Label', 'Text']

# Rename columns
for i, col in enumerate(columns):
    data = data.withColumnRenamed('_c{}'.format(i), col)
    validation = validation.withColumnRenamed('_c{}'.format(i), col)

In [7]:
data.printSchema()


root
 |-- id: integer (nullable = true)
 |-- Company: string (nullable = true)
 |-- Label: string (nullable = true)
 |-- Text: string (nullable = true)



In [8]:
data = data.dropna(subset=['Text'])
validation = validation.dropna(subset=['Text'])

In [9]:

data.select("Text").show(10)

+--------------------+
|                Text|
+--------------------+
|im getting on bor...|
|I am coming to th...|
|im getting on bor...|
|im coming on bord...|
|im getting on bor...|
|im getting into b...|
|So I spent a few ...|
|So I spent a coup...|
|So I spent a few ...|
|So I spent a few ...|
+--------------------+
only showing top 10 rows



In [10]:

# Define the StringIndexer for the label column (index the labels)
label_indexer = StringIndexer(inputCol="Label", outputCol="Label2")

# # Define your index mapping
# class_index_mapping = { "Negative": 0, "Positive": 1, "Neutral": 2, "Irrelevant": 3 }

# Fit StringIndexer on data
label_indexer_model = label_indexer.fit(data)
data = label_indexer_model.transform(data)
validation = label_indexer_model.transform(validation)

# Extract label mapping
label_mapping = label_indexer_model.labels

# Print label mapping
print("Label Mapping:")
for index, label in enumerate(label_mapping):
    print(f"Index {index} --> Label '{label}'")

Label Mapping:
Index 0 --> Label 'Negative'
Index 1 --> Label 'Positive'
Index 2 --> Label 'Neutral'
Index 3 --> Label 'Irrelevant'


In [11]:

def clean_text(df, inputCol="Text", outputCol="cleaned_text"):
    # Remove links starting with https://, http://, www., or containing .com
    df = df.withColumn(outputCol, regexp_replace(df[inputCol], r'https?://\S+|www\.\S+|S+\.com\S+|youtu\.be/\S+', ''))
    # Remove words starting with # or @
    df = df.withColumn(outputCol, regexp_replace(df[outputCol], r'(@|#)\w+', ''))
    # Convert text to lowercase
    df = df.withColumn(outputCol, lower(df[outputCol]))
    # Remove non-alpha characters
    df = df.withColumn(outputCol, regexp_replace(df[outputCol], r'[^a-zA-Z\s]', ''))
    
    return df

In [12]:
cleaned_data = clean_text(data, inputCol="Text", outputCol="Text")
cleaned_validation = clean_text(validation, inputCol="Text", outputCol="Text")

In [13]:
# Define tokenizer
tokenizer = Tokenizer(inputCol="Text", outputCol="tokens")

# Define stopwords remover
stopwords_remover = StopWordsRemover(inputCol="tokens", outputCol="filtered_tokens", stopWords=stop_words)

# Define CountVectorizer
count_vectorizer = CountVectorizer(inputCol="filtered_tokens", outputCol="features", vocabSize=10000, minDF=5)

# Define Logistic Regression
lr = LogisticRegression(maxIter=10, labelCol="Label2", featuresCol="features")

In [14]:
# create the pipeline
pipeline = Pipeline(stages=[tokenizer, stopwords_remover, count_vectorizer, lr])

# Apply the pipeline to the data
model = pipeline.fit(cleaned_data)
processed_data = model.transform(cleaned_data)

In [15]:
processed_data.printSchema()

root
 |-- id: integer (nullable = true)
 |-- Company: string (nullable = true)
 |-- Label: string (nullable = true)
 |-- Text: string (nullable = true)
 |-- Label2: double (nullable = false)
 |-- tokens: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- filtered_tokens: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- features: vector (nullable = true)
 |-- rawPrediction: vector (nullable = true)
 |-- probability: vector (nullable = true)
 |-- prediction: double (nullable = false)



In [16]:
processed_data.select("Text", "Label2", "prediction").show()

+--------------------+------+----------+
|                Text|Label2|prediction|
+--------------------+------+----------+
|im getting on bor...|   1.0|       1.0|
|i am coming to th...|   1.0|       1.0|
|im getting on bor...|   1.0|       1.0|
|im coming on bord...|   1.0|       1.0|
|im getting on bor...|   1.0|       1.0|
|im getting into b...|   1.0|       1.0|
|so i spent a few ...|   1.0|       1.0|
|so i spent a coup...|   1.0|       1.0|
|so i spent a few ...|   1.0|       1.0|
|so i spent a few ...|   1.0|       1.0|
| so i spent a few...|   1.0|       1.0|
|                 was|   1.0|       0.0|
|rockhard la varlo...|   2.0|       2.0|
|rockhard la varlo...|   2.0|       2.0|
|rockhard la varlo...|   2.0|       2.0|
|rockhard la vita ...|   2.0|       2.0|
|live rock  hard m...|   2.0|       2.0|
|ihard like me rar...|   2.0|       2.0|
|that was the firs...|   1.0|       1.0|
|this was the firs...|   1.0|       1.0|
+--------------------+------+----------+
only showing top

In [22]:
import joblib
from pyspark.ml.linalg import Vectors

# Assuming 'model' is the trained pipeline model
lr_model = model.stages[-1]  # Access the LogisticRegression model from the pipeline

# Check if it is a multiclass model by accessing 'coefficientMatrix' and 'interceptVector'
model_params = {
    'coefficientMatrix': lr_model.coefficientMatrix.toArray(),  # Matrix of coefficients for each class
    'interceptVector': lr_model.interceptVector.toArray(),      # Vector of intercepts for each class
    'numClasses': lr_model.numClasses,
    'numFeatures': lr_model.numFeatures
}

# Save model parameters to a pickle file
joblib.dump(model_params, "logistic_regression_model_params.pkl")

print("Multiclass logistic regression model parameters saved successfully.")


Multiclass logistic regression model parameters saved successfully.


In [25]:
import joblib
import numpy as np
from pyspark.ml.classification import LogisticRegressionModel
from pyspark.ml.linalg import DenseMatrix, Vectors

# Load the model parameters
loaded_model = LogisticRegressionModel.load("logistic_regression_model_params.pkl")




Py4JJavaError: An error occurred while calling o582.load.
: org.apache.hadoop.mapred.InvalidInputException: Input path does not exist: file:/e:/SEM-5-PROJ/BDA/Twitter-sentiment-analysis/ml-model/logistic_regression_model_params.pkl/metadata
	at org.apache.hadoop.mapred.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:304)
	at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:244)
	at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:332)
	at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:208)
	at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:294)
	at scala.Option.getOrElse(Option.scala:189)
	at org.apache.spark.rdd.RDD.partitions(RDD.scala:290)
	at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)
	at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:294)
	at scala.Option.getOrElse(Option.scala:189)
	at org.apache.spark.rdd.RDD.partitions(RDD.scala:290)
	at org.apache.spark.rdd.RDD.$anonfun$take$1(RDD.scala:1471)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:410)
	at org.apache.spark.rdd.RDD.take(RDD.scala:1465)
	at org.apache.spark.rdd.RDD.$anonfun$first$1(RDD.scala:1506)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:410)
	at org.apache.spark.rdd.RDD.first(RDD.scala:1506)
	at org.apache.spark.ml.util.DefaultParamsReader$.loadMetadata(ReadWrite.scala:587)
	at org.apache.spark.ml.classification.LogisticRegressionModel$LogisticRegressionModelReader.load(LogisticRegression.scala:1326)
	at org.apache.spark.ml.classification.LogisticRegressionModel$LogisticRegressionModelReader.load(LogisticRegression.scala:1320)
	at java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103)
	at java.base/java.lang.reflect.Method.invoke(Method.java:580)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:1570)
Caused by: java.io.IOException: Input path does not exist: file:/e:/SEM-5-PROJ/BDA/Twitter-sentiment-analysis/ml-model/logistic_regression_model_params.pkl/metadata
	at org.apache.hadoop.mapred.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:278)
	... 33 more
