In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import regexp_replace, col
from pyspark.sql import SparkSession
from sparknlp.base import DocumentAssembler, Pipeline, EmbeddingsFinisher
from sparknlp.annotator import Tokenizer, WordEmbeddingsModel, SentenceEmbeddings
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
import pyspark.sql.functions as F
from pyspark.ml.classification import LogisticRegression
from pyspark.sql.window import Window

In [None]:
# Note: Spark-nlp downloads huge amount of jar files, this might take a while

spark = (
    SparkSession.builder.appName("Spark-Text-Classification")
    .master("local[*]")
    .config("spark.driver.memory", "8G")
    .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    .config("spark.kryoserializer.buffer.max", "2000M")
    .config("spark.driver.maxResultSize", "0")
    .config("spark.jars.packages", "com.johnsnowlabs.nlp:spark-nlp_2.12:5.5.3")
    .getOrCreate()
)

# Read IMDB Dataset

In [None]:
# https://huggingface.co/datasets/stanfordnlp/imdb

imdb_dataset = spark.read.parquet("data/imdb-train-00000-of-00001.parquet").withColumn(
    "text", regexp_replace(col("text"), "[^a-zA-Z0-9\\s]", "")
)

# Create a window partitioned by label and ordered randomly
windowSpec = Window.partitionBy("label").orderBy(F.rand())

"""
# Add a row number per label and filter to keep only the first 1000 rows per class
# This ensures for efficiency and balance in the dataset for testing. 
# For production, you might want to use full dataset
"""
imdb_dataset = (
    imdb_dataset.withColumn("row_num", F.row_number().over(windowSpec))
    .filter(F.col("row_num") <= 1000)
    .drop("row_num")
)

# Show the sampled dataset
imdb_dataset.show(5, truncate=50)

print(
    "Number of classes in the sampled dataset: ",
    imdb_dataset.select("label").distinct().count(),
    "total number of rows: ",
    imdb_dataset.count(),
)

# Create a pipeline that cleans and tokenizes the texts

In [None]:
# Define Spark NLP pipeline stages

# 1. DocumentAssembler converts raw text into a document annotation.
document_assembler = DocumentAssembler().setInputCol("text").setOutputCol("document")

# 2. Tokenizer splits the document into tokens.
tokenizer = Tokenizer().setInputCols(["document"]).setOutputCol("token")

# 3. Load pre-trained GloVe embeddings
# 3. Load GloVe embeddings from local file
word_embeddings = (
    WordEmbeddingsModel.load("data/glove_100d")
    .setInputCols(["document", "token"])
    .setOutputCol("embeddings")
)

# 4. Create sentence-level embeddings by averaging the word embeddings.
sentence_embeddings = (
    SentenceEmbeddings()
    .setInputCols(["document", "embeddings"])
    .setOutputCol("sentence_embeddings")
    .setPoolingStrategy("AVERAGE")
)

# 5. Finisher converts NLP annotations into plain array column
finisher = (
    EmbeddingsFinisher()
    .setInputCols("sentence_embeddings")
    .setOutputCols("features")
    .setOutputAsVector(True)
    .setCleanAnnotations(True)
)

# Build the pipeline
nlp_pipeline = Pipeline(
    stages=[
        document_assembler,
        tokenizer,
        word_embeddings,
        sentence_embeddings,
        finisher,
    ]
)

In [None]:
# Fit and transform the data
final_data = (
    nlp_pipeline.fit(imdb_dataset)
    .transform(imdb_dataset)
    .selectExpr("label", "explode(features) as features")
)

final_data.show(5, truncate=50)

# 1-Train a Logistic Regression Model

In [None]:
# Split data into training (80%) and testing (20%) sets
train_data, test_data = final_data.randomSplit([0.8, 0.2], seed=42)

# Configure the LogisticRegression model from spark-rapids-ml
lr = LogisticRegression(featuresCol="features", labelCol="label", maxIter=50)

# Fit the model using the training data
model = lr.fit(train_data)

# Make predictions on the test data
predictions = model.transform(test_data)

# Evaluate test accuracy; compare the predicted labels with numeric labels ("label_index")
evaluator = MulticlassClassificationEvaluator(
    labelCol="label", predictionCol="prediction", metricName="accuracy"
)
accuracy = evaluator.evaluate(predictions)
print(f"Test Accuracy: {accuracy:.2f}")

In [None]:
predictions.show(5, truncate=50)

# 2-Use Trained model on Unseen Data

In [None]:
"""
we have used Yelp reviews for outbound sentiment analysis https://huggingface.co/datasets/Yelp/yelp_review_full
Yelp dataset are presentedin train-test split which can also be used for 
trainin multiclass classification model sinde it has 5 classes for each review
for simplicity we have used only 2 classes for binary classification using IMDB dataset
"""

# Read the Yelp dataset from parquet file
yelp_dataset = (
    spark.read.parquet("data/yelp-test-00000-of-00001.parquet")
    .select("text")
    .withColumn("text", regexp_replace(col("text"), "[^a-zA-Z0-9\\s]", ""))
)

# Show a sample of the data
yelp_dataset.show(5, truncate=50)

## 2.1-Use NLP Pipeline to convert sentences into 100dim embeddings

In [None]:
yelp_dataset = (
    nlp_pipeline.fit(yelp_dataset)
    .transform(yelp_dataset)
    .selectExpr("explode(features) as features")
)

yelp_dataset.show(5, truncate=50)

## 2.2-Use trained regression model to classify unseen data

In [None]:
yelp_predictions = model.transform(yelp_dataset)

In [None]:
yelp_predictions.show(5, truncate=50)