In [None]:
from datasets import load_dataset

ds = load_dataset("zeroshot/twitter-financial-news-sentiment")

The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


README.md: 0.00B [00:00, ?B/s]

sent_train.csv: 0.00B [00:00, ?B/s]

sent_valid.csv: 0.00B [00:00, ?B/s]

Generating train split:   0%|          | 0/9543 [00:00<?, ? examples/s]

Generating validation split:   0%|          | 0/2388 [00:00<?, ? examples/s]

In [None]:
from typing import List, Dict
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score
import numpy as np


class TextClassifier:
    """
    A text classification model using Logistic Regression.

    This class wraps a vectorizer and a logistic regression model to perform
    text classification tasks such as sentiment analysis.
    """

    def __init__(self, vectorizer):
        """
        Initialize the TextClassifier.

        Args:
            vectorizer: A vectorizer instance (e.g., TfidfVectorizer, CountVectorizer)
                       that transforms text into numerical features.
        """
        self.vectorizer = vectorizer
        self._model = None

    def fit(self, texts: List[str], labels: List[int]):
        """
        Train the text classifier on the given texts and labels.

        Args:
            texts: List of text documents to train on
            labels: List of corresponding labels (e.g., 0 for negative, 1 for positive)

        Returns:
            self: Returns the instance itself for method chaining
        """
        # Transform texts into feature matrix using the vectorizer
        X = self.vectorizer.fit_transform(texts)

        # Initialize and train the Logistic Regression model
        self._model = LogisticRegression(solver='liblinear', random_state=42)
        self._model.fit(X, labels)

        return self

    def predict(self, texts: List[str]) -> List[int]:
        """
        Predict labels for new texts.

        Args:
            texts: List of text documents to predict labels for

        Returns:
            List of predicted labels

        Raises:
            ValueError: If the model has not been trained yet
        """
        if self._model is None:
            raise ValueError("Model has not been trained yet. Please call fit() first.")

        # Transform texts using the already fitted vectorizer
        X = self.vectorizer.transform(texts)

        # Predict labels
        predictions = self._model.predict(X)

        return predictions.tolist()

    def evaluate(self, y_true: List[int], y_pred: List[int]) -> Dict[str, float]:
        """
        Evaluate the model's predictions using various metrics.

        Args:
            y_true: List of true labels
            y_pred: List of predicted labels

        Returns:
            Dictionary containing accuracy, precision, recall, and f1_score
        """
        metrics = {
            'accuracy': accuracy_score(y_true, y_pred),
            'precision': precision_score(y_true, y_pred, zero_division=0),
            'recall': recall_score(y_true, y_pred, zero_division=0),
            'f1_score': f1_score(y_true, y_pred, zero_division=0)
        }

        return metrics


# Example usage and testing
if __name__ == "__main__":
    from sklearn.feature_extraction.text import TfidfVectorizer

    # Task 1: Data Preparation
    texts = [
        "This movie is fantastic and I love it!",
        "I hate this film, it's terrible.",
        "The acting was superb, a truly great experience.",
        "What a waste of time, absolutely boring.",
        "Highly recommend this, a masterpiece.",
        "Could not finish watching, so bad."
    ]
    labels = [1, 0, 1, 0, 1, 0]  # 1 for positive, 0 for negative

    # Initialize vectorizer
    vectorizer = TfidfVectorizer(max_features=100, stop_words='english')

    # Task 2: TextClassifier Implementation
    # Create and train the classifier
    classifier = TextClassifier(vectorizer)
    classifier.fit(texts, labels)

    # Make predictions on the training data
    predictions = classifier.predict(texts)

    # Evaluate the model
    metrics = classifier.evaluate(labels, predictions)

    # Print results
    print("Training Data Predictions:")
    for text, true_label, pred_label in zip(texts, labels, predictions):
        sentiment = "Positive" if pred_label == 1 else "Negative"
        correct = "âœ“" if true_label == pred_label else "âœ—"
        print(f"{correct} [{sentiment}] {text}")

    print("\nEvaluation Metrics:")
    for metric_name, metric_value in metrics.items():
        print(f"{metric_name.capitalize()}: {metric_value:.4f}")

    # Test on new data
    print("\nTesting on new data:")
    test_texts = [
        "This is an amazing film, loved every minute!",
        "Terrible movie, don't waste your money.",
        "Outstanding performance by the actors."
    ]

    test_predictions = classifier.predict(test_texts)
    for text, pred in zip(test_texts, test_predictions):
        sentiment = "Positive" if pred == 1 else "Negative"
        print(f"[{sentiment}] {text}")

Training Data Predictions:
âœ“ [Positive] This movie is fantastic and I love it!
âœ“ [Negative] I hate this film, it's terrible.
âœ“ [Positive] The acting was superb, a truly great experience.
âœ“ [Negative] What a waste of time, absolutely boring.
âœ“ [Positive] Highly recommend this, a masterpiece.
âœ“ [Negative] Could not finish watching, so bad.

Evaluation Metrics:
Accuracy: 1.0000
Precision: 1.0000
Recall: 1.0000
F1_score: 1.0000

Testing on new data:
[Negative] This is an amazing film, loved every minute!
[Negative] Terrible movie, don't waste your money.
[Negative] Outstanding performance by the actors.


In [None]:
"""
Lab 5 Test: Text Classification with Train/Test Split
This file tests the TextClassifier implementation with proper data splitting.
"""

from typing import List
from sklearn.model_selection import train_test_split
from sklearn.feature_extraction.text import TfidfVectorizer
import re


# RegexTokenizer implementation (from previous labs)
class RegexTokenizer:
    """
    A simple tokenizer that uses regular expressions to split text into tokens.
    """

    def __init__(self, pattern: str = r'\b\w+\b'):
        """
        Initialize the tokenizer with a regex pattern.

        Args:
            pattern: Regular expression pattern for tokenization
        """
        self.pattern = pattern

    def tokenize(self, text: str) -> List[str]:
        """
        Tokenize the input text using the regex pattern.

        Args:
            text: Input text to tokenize

        Returns:
            List of tokens
        """
        text = text.lower()
        tokens = re.findall(self.pattern, text)
        return tokens


# TextClassifier implementation (from Task 2)
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score
from typing import Dict


class TextClassifier:
    """
    A text classification model using Logistic Regression.
    """

    def __init__(self, vectorizer):
        """
        Initialize the TextClassifier.

        Args:
            vectorizer: A vectorizer instance
        """
        self.vectorizer = vectorizer
        self._model = None

    def fit(self, texts: List[str], labels: List[int]):
        """
        Train the text classifier.

        Args:
            texts: List of text documents
            labels: List of corresponding labels
        """
        X = self.vectorizer.fit_transform(texts)
        self._model = LogisticRegression(solver='liblinear', random_state=42)
        self._model.fit(X, labels)
        return self

    def predict(self, texts: List[str]) -> List[int]:
        """
        Predict labels for new texts.

        Args:
            texts: List of text documents

        Returns:
            List of predicted labels
        """
        if self._model is None:
            raise ValueError("Model has not been trained yet. Please call fit() first.")

        X = self.vectorizer.transform(texts)
        predictions = self._model.predict(X)
        return predictions.tolist()

    def evaluate(self, y_true: List[int], y_pred: List[int]) -> Dict[str, float]:
        """
        Evaluate the model's predictions.

        Args:
            y_true: List of true labels
            y_pred: List of predicted labels

        Returns:
            Dictionary containing metrics
        """
        metrics = {
            'accuracy': accuracy_score(y_true, y_pred),
            'precision': precision_score(y_true, y_pred, zero_division=0),
            'recall': recall_score(y_true, y_pred, zero_division=0),
            'f1_score': f1_score(y_true, y_pred, zero_division=0)
        }
        return metrics


def main():
    """
    Main function to test the TextClassifier with train/test split.
    """
    print("=" * 70)
    print("LAB 5 TEST: TEXT CLASSIFICATION WITH TRAIN/TEST SPLIT")
    print("=" * 70)

    # Task 3: Define the dataset
    texts = [
        "This movie is fantastic and I love it!",
        "I hate this film, it's terrible.",
        "The acting was superb, a truly great experience.",
        "What a waste of time, absolutely boring.",
        "Highly recommend this, a masterpiece.",
        "Could not finish watching, so bad.",
        "Amazing storyline, kept me engaged throughout.",
        "Disappointing and poorly executed.",
        "Brilliant cinematography and great performances.",
        "Not worth watching, very dull.",
        "Exceptional movie, one of the best I've seen.",
        "Awful, I regret watching this.",
        "Wonderful experience, loved every scene.",
        "Boring and predictable plot.",
        "Outstanding film with excellent direction.",
        "Terrible waste of money and time.",
        "Incredible acting and beautiful visuals.",
        "So bad, couldn't watch till the end.",
        "Superb entertainment, highly enjoyable.",
        "Worst movie ever, absolutely horrible."
    ]

    labels = [1, 0, 1, 0, 1, 0, 1, 0, 1, 0, 1, 0, 1, 0, 1, 0, 1, 0, 1, 0]
    # 1 for positive, 0 for negative

    print(f"\nTotal dataset size: {len(texts)} samples")
    print(f"Positive samples: {sum(labels)}")
    print(f"Negative samples: {len(labels) - sum(labels)}")

    # Split data into training and testing sets (80% train, 20% test)
    X_train, X_test, y_train, y_test = train_test_split(
        texts,
        labels,
        test_size=0.2,
        random_state=42,
        stratify=labels  # Maintain class distribution
    )

    print(f"\nTraining set size: {len(X_train)} samples")
    print(f"Testing set size: {len(X_test)} samples")

    # Instantiate RegexTokenizer
    print("\n" + "-" * 70)
    print("Step 1: Initialize RegexTokenizer")
    print("-" * 70)
    tokenizer = RegexTokenizer(pattern=r'\b\w+\b')
    print("âœ“ RegexTokenizer initialized")

    # Example tokenization
    example_text = X_train[0]
    tokens = tokenizer.tokenize(example_text)
    print(f"\nExample tokenization:")
    print(f"Text: '{example_text}'")
    print(f"Tokens: {tokens}")

    # Instantiate TfidfVectorizer
    print("\n" + "-" * 70)
    print("Step 2: Initialize TfidfVectorizer")
    print("-" * 70)
    vectorizer = TfidfVectorizer(
        max_features=100,
        stop_words='english',
        ngram_range=(1, 2),  # Use unigrams and bigrams
        min_df=1
    )
    print("âœ“ TfidfVectorizer initialized")
    print(f"  - max_features: 100")
    print(f"  - stop_words: 'english'")
    print(f"  - ngram_range: (1, 2)")

    # Instantiate TextClassifier
    print("\n" + "-" * 70)
    print("Step 3: Initialize TextClassifier")
    print("-" * 70)
    classifier = TextClassifier(vectorizer)
    print("âœ“ TextClassifier initialized with TfidfVectorizer")

    # Train the classifier
    print("\n" + "-" * 70)
    print("Step 4: Train the Classifier")
    print("-" * 70)
    print("Training in progress...")
    classifier.fit(X_train, y_train)
    print("âœ“ Classifier trained successfully")

    # Make predictions on training data
    print("\n" + "-" * 70)
    print("Step 5: Evaluate on Training Data")
    print("-" * 70)
    train_predictions = classifier.predict(X_train)
    train_metrics = classifier.evaluate(y_train, train_predictions)

    print("\nTraining Set Metrics:")
    for metric_name, metric_value in train_metrics.items():
        print(f"  {metric_name.capitalize():12s}: {metric_value:.4f}")

    # Make predictions on test data
    print("\n" + "-" * 70)
    print("Step 6: Evaluate on Test Data")
    print("-" * 70)
    test_predictions = classifier.predict(X_test)
    test_metrics = classifier.evaluate(y_test, test_predictions)

    print("\nTest Set Metrics:")
    for metric_name, metric_value in test_metrics.items():
        print(f"  {metric_name.capitalize():12s}: {metric_value:.4f}")

    # Display test predictions
    print("\n" + "-" * 70)
    print("Test Set Predictions Details:")
    print("-" * 70)
    for i, (text, true_label, pred_label) in enumerate(zip(X_test, y_test, test_predictions), 1):
        sentiment_true = "Positive" if true_label == 1 else "Negative"
        sentiment_pred = "Positive" if pred_label == 1 else "Negative"
        correct = "âœ“" if true_label == pred_label else "âœ—"
        print(f"\n{i}. {correct} Text: '{text}'")
        print(f"   True: {sentiment_true} | Predicted: {sentiment_pred}")

    # Test on new unseen data
    print("\n" + "=" * 70)
    print("BONUS: Testing on New Unseen Data")
    print("=" * 70)

    new_texts = [
        "This is an amazing film, loved every minute!",
        "Terrible movie, don't waste your money.",
        "Outstanding performance by the actors.",
        "Very disappointing, expected much better.",
        "Absolute masterpiece, beautifully crafted."
    ]

    new_predictions = classifier.predict(new_texts)

    for i, (text, pred) in enumerate(zip(new_texts, new_predictions), 1):
        sentiment = "Positive" if pred == 1 else "Negative"
        emoji = "ðŸ˜Š" if pred == 1 else "ðŸ˜ž"
        print(f"\n{i}. [{sentiment} {emoji}] {text}")

    print("\n" + "=" * 70)
    print("TEST COMPLETED SUCCESSFULLY!")
    print("=" * 70)


if __name__ == "__main__":
    main()

LAB 5 TEST: TEXT CLASSIFICATION WITH TRAIN/TEST SPLIT

Total dataset size: 20 samples
Positive samples: 10
Negative samples: 10

Training set size: 16 samples
Testing set size: 4 samples

----------------------------------------------------------------------
Step 1: Initialize RegexTokenizer
----------------------------------------------------------------------
âœ“ RegexTokenizer initialized

Example tokenization:
Text: 'Worst movie ever, absolutely horrible.'
Tokens: ['worst', 'movie', 'ever', 'absolutely', 'horrible']

----------------------------------------------------------------------
Step 2: Initialize TfidfVectorizer
----------------------------------------------------------------------
âœ“ TfidfVectorizer initialized
  - max_features: 100
  - stop_words: 'english'
  - ngram_range: (1, 2)

----------------------------------------------------------------------
Step 3: Initialize TextClassifier
----------------------------------------------------------------------
âœ“ TextClassif

In [None]:
# CÃ i Ä‘áº·t Java
!apt-get update
!apt-get install -y openjdk-8-jdk-headless

# Set JAVA_HOME
import os
os.environ['JAVA_HOME'] = '/usr/lib/jvm/java-8-openjdk-amd64'

# Verify Java installation
!java -version

0% [Working]            Hit:1 https://cli.github.com/packages stable InRelease
0% [Connecting to archive.ubuntu.com (185.125.190.82)] [Connecting to security.                                                                               Hit:2 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2204/x86_64  InRelease
Get:3 https://cloud.r-project.org/bin/linux/ubuntu jammy-cran40/ InRelease [3,632 B]
Get:4 https://r2u.stat.illinois.edu/ubuntu jammy InRelease [6,555 B]
Get:5 https://cloud.r-project.org/bin/linux/ubuntu jammy-cran40/ Packages [83.2 kB]
Hit:6 https://ppa.launchpadcontent.net/deadsnakes/ppa/ubuntu jammy InRelease
Get:7 https://r2u.stat.illinois.edu/ubuntu jammy/main all Packages [9,411 kB]
Hit:8 http://archive.ubuntu.com/ubuntu jammy InRelease
Hit:9 https://ppa.launchpadcontent.net/graphics-drivers/ppa/ubuntu jammy InRelease
Get:10 http://archive.ubuntu.com/ubuntu jammy-updates InRelease [128 kB]
Hit:11 https://ppa.launchpadcontent.net/ubuntugis/ppa/ubuntu 

In [None]:
!pip install pyspark



In [None]:
"""
Lab 5 Spark: Advanced Sentiment Analysis with PySpark
This file demonstrates how to build a text classification pipeline using Apache Spark
for handling large-scale datasets that don't fit into a single machine's memory.
"""

from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.ml import Pipeline
from pyspark.ml.feature import Tokenizer, StopWordsRemover, HashingTF, IDF
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
import os


def create_sample_data(spark, output_path="data/sentiments.csv"):
    """
    Create a sample sentiments dataset for demonstration.

    Args:
        spark: SparkSession instance
        output_path: Path to save the CSV file
    """
    # Sample data with sentiment labels (-1 for negative, 1 for positive)
    data = [
        ("This movie is fantastic and I love it!", 1),
        ("I hate this film, it's terrible.", -1),
        ("The acting was superb, a truly great experience.", 1),
        ("What a waste of time, absolutely boring.", -1),
        ("Highly recommend this, a masterpiece.", 1),
        ("Could not finish watching, so bad.", -1),
        ("Amazing storyline, kept me engaged throughout.", 1),
        ("Disappointing and poorly executed.", -1),
        ("Brilliant cinematography and great performances.", 1),
        ("Not worth watching, very dull.", -1),
        ("Exceptional movie, one of the best I've seen.", 1),
        ("Awful, I regret watching this.", -1),
        ("Wonderful experience, loved every scene.", 1),
        ("Boring and predictable plot.", -1),
        ("Outstanding film with excellent direction.", 1),
        ("Terrible waste of money and time.", -1),
        ("Incredible acting and beautiful visuals.", 1),
        ("So bad, couldn't watch till the end.", -1),
        ("Superb entertainment, highly enjoyable.", 1),
        ("Worst movie ever, absolutely horrible.", -1),
        ("Compelling story with emotional depth.", 1),
        ("Poorly written and badly directed.", -1),
        ("Captivating from start to finish.", 1),
        ("Complete disaster, very disappointing.", -1),
        ("Excellent performances all around.", 1),
        ("Waste of time, not recommended.", -1),
        ("Beautiful cinematography and soundtrack.", 1),
        ("Dull and uninspiring movie.", -1),
        ("Must watch, absolutely brilliant.", 1),
        ("Horrible experience, very bad.", -1),
    ]

    # Create DataFrame
    df = spark.createDataFrame(data, ["text", "sentiment"])

    # Save to CSV
    os.makedirs(os.path.dirname(output_path), exist_ok=True)
    df.coalesce(1).write.csv(output_path, header=True, mode="overwrite")
    print(f"âœ“ Sample data created at: {output_path}")

    return output_path


def check_java_installation():
    """
    Check if Java is installed and set up JAVA_HOME if needed.
    """
    import subprocess
    import sys

    try:
        # Check if Java is installed
        result = subprocess.run(['java', '-version'],
                              capture_output=True,
                              text=True,
                              timeout=5)
        if result.returncode == 0:
            print("âœ“ Java is installed")
            return True
    except (FileNotFoundError, subprocess.TimeoutExpired):
        pass

    # Try to install Java on Colab
    print("âš  Java not found. Installing Java...")
    try:
        subprocess.run(['apt-get', 'update'], check=True, capture_output=True)
        subprocess.run(['apt-get', 'install', '-y', 'openjdk-8-jdk-headless'],
                      check=True, capture_output=True)

        # Set JAVA_HOME
        os.environ['JAVA_HOME'] = '/usr/lib/jvm/java-8-openjdk-amd64'
        print("âœ“ Java installed successfully")
        return True
    except:
        print("âœ— Failed to install Java automatically")
        print("\nPlease run these commands in a Colab cell:")
        print("!apt-get update")
        print("!apt-get install -y openjdk-8-jdk-headless")
        print("import os")
        print("os.environ['JAVA_HOME'] = '/usr/lib/jvm/java-8-openjdk-amd64'")
        return False


def main():
    """
    Main function to run PySpark sentiment analysis pipeline.
    """
    print("=" * 80)
    print("PYSPARK SENTIMENT ANALYSIS PIPELINE")
    print("=" * 80)

    # Check Java installation
    print("\nChecking Java installation...")
    print("-" * 80)
    if not check_java_installation():
        print("\nâœ— Cannot proceed without Java. Please install Java first.")
        return

    # Step 1: Initialize Spark Session
    print("\nStep 1: Initialize Spark Session")
    print("-" * 80)

    try:
        spark = SparkSession.builder \
            .appName("SentimentAnalysis") \
            .master("local[*]") \
            .config("spark.driver.memory", "2g") \
            .config("spark.driver.host", "127.0.0.1") \
            .getOrCreate()

        # Set log level to reduce verbosity
        spark.sparkContext.setLogLevel("ERROR")
        print("âœ“ Spark Session initialized")
        print(f"  - App Name: SentimentAnalysis")
        print(f"  - Spark Version: {spark.version}")
    except Exception as e:
        print(f"âœ— Failed to initialize Spark Session: {str(e)}")
        print("\nTroubleshooting tips:")
        print("1. Make sure Java 8 or 11 is installed")
        print("2. Set JAVA_HOME environment variable")
        print("3. Try running: !apt-get install -y openjdk-8-jdk-headless")
        return

    # Step 2: Load Data
    print("\nStep 2: Load Data")
    print("-" * 80)

    data_path = "data/sentiments.csv"

    # Create sample data if it doesn't exist
    if not os.path.exists(data_path):
        print("Creating sample dataset...")
        create_sample_data(spark, data_path)

    # Read the CSV file
    # Note: Since we used coalesce(1), there will be a part file inside the directory
    try:
        df = spark.read.csv(data_path, header=True, inferSchema=True)
    except:
        # If direct read fails, try reading from the directory
        df = spark.read.csv(f"{data_path}/*.csv", header=True, inferSchema=True)

    print(f"âœ“ Data loaded from: {data_path}")

    # Show initial row count
    initial_row_count = df.count()
    print(f"  - Total rows: {initial_row_count}")

    # Show schema
    print("\nDataset Schema:")
    df.printSchema()

    # Show sample data
    print("\nSample Data (first 5 rows):")
    df.show(5, truncate=50)

    # Drop rows with null sentiment values
    df = df.dropna(subset=["sentiment"])
    rows_after_drop = df.count()
    print(f"\nâœ“ Dropped {initial_row_count - rows_after_drop} rows with null sentiments")

    # Convert -1/1 labels to 0/1 for binary classification
    df = df.withColumn("label", (col("sentiment").cast("integer") + 1) / 2)
    print("âœ“ Converted sentiment labels: -1 â†’ 0 (negative), 1 â†’ 1 (positive)")

    # Show label distribution
    print("\nLabel Distribution:")
    df.groupBy("label").count().show()

    # Step 3: Split Data
    print("\nStep 3: Split Data into Training and Test Sets")
    print("-" * 80)

    # Split data: 80% training, 20% testing
    trainingData, testData = df.randomSplit([0.8, 0.2], seed=42)

    train_count = trainingData.count()
    test_count = testData.count()

    print(f"âœ“ Training set: {train_count} samples ({train_count/rows_after_drop*100:.1f}%)")
    print(f"âœ“ Test set: {test_count} samples ({test_count/rows_after_drop*100:.1f}%)")

    # Step 4: Build Preprocessing Pipeline
    print("\nStep 4: Build Preprocessing Pipeline")
    print("-" * 80)

    # Tokenizer: Splits text into words
    tokenizer = Tokenizer(inputCol="text", outputCol="words")
    print("âœ“ Tokenizer: Splits text into words")

    # StopWordsRemover: Removes common stop words
    stopwordsRemover = StopWordsRemover(
        inputCol="words",
        outputCol="filtered_words"
    )
    print("âœ“ StopWordsRemover: Removes common stop words")

    # HashingTF: Converts tokens into feature vectors using hashing
    hashingTF = HashingTF(
        inputCol="filtered_words",
        outputCol="raw_features",
        numFeatures=10000
    )
    print("âœ“ HashingTF: Converts tokens to feature vectors (10,000 features)")

    # IDF: Inverse Document Frequency - rescales feature vectors
    idf = IDF(
        inputCol="raw_features",
        outputCol="features"
    )
    print("âœ“ IDF: Rescales features using inverse document frequency")

    # Step 5: Train the Model
    print("\nStep 5: Initialize Logistic Regression Model")
    print("-" * 80)

    # LogisticRegression: Binary classification model
    lr = LogisticRegression(
        maxIter=10,
        regParam=0.001,
        featuresCol="features",
        labelCol="label"
    )
    print("âœ“ Logistic Regression initialized")
    print(f"  - Max Iterations: 10")
    print(f"  - Regularization Parameter: 0.001")

    # Assemble the Pipeline
    print("\nStep 6: Assemble and Train Pipeline")
    print("-" * 80)

    pipeline = Pipeline(stages=[tokenizer, stopwordsRemover, hashingTF, idf, lr])
    print("âœ“ Pipeline created with 5 stages:")
    print("  1. Tokenizer")
    print("  2. StopWordsRemover")
    print("  3. HashingTF")
    print("  4. IDF")
    print("  5. LogisticRegression")

    # Train the model
    print("\nTraining model (this may take a moment)...")
    model = pipeline.fit(trainingData)
    print("âœ“ Model trained successfully!")

    # Step 6: Make Predictions
    print("\nStep 7: Make Predictions on Test Data")
    print("-" * 80)

    predictions = model.transform(testData)
    print("âœ“ Predictions generated")

    # Show predictions
    print("\nSample Predictions (first 5 rows):")
    predictions.select("text", "label", "prediction", "probability").show(5, truncate=50)

    # Step 7: Evaluate the Model
    print("\nStep 8: Evaluate Model Performance")
    print("-" * 80)

    # Accuracy
    evaluator_accuracy = MulticlassClassificationEvaluator(
        labelCol="label",
        predictionCol="prediction",
        metricName="accuracy"
    )
    accuracy = evaluator_accuracy.evaluate(predictions)
    print(f"âœ“ Accuracy: {accuracy:.4f}")

    # F1 Score
    evaluator_f1 = MulticlassClassificationEvaluator(
        labelCol="label",
        predictionCol="prediction",
        metricName="f1"
    )
    f1 = evaluator_f1.evaluate(predictions)
    print(f"âœ“ F1 Score: {f1:.4f}")

    # Precision
    evaluator_precision = MulticlassClassificationEvaluator(
        labelCol="label",
        predictionCol="prediction",
        metricName="weightedPrecision"
    )
    precision = evaluator_precision.evaluate(predictions)
    print(f"âœ“ Weighted Precision: {precision:.4f}")

    # Recall
    evaluator_recall = MulticlassClassificationEvaluator(
        labelCol="label",
        predictionCol="prediction",
        metricName="weightedRecall"
    )
    recall = evaluator_recall.evaluate(predictions)
    print(f"âœ“ Weighted Recall: {recall:.4f}")

    # Confusion Matrix (manual calculation)
    print("\nConfusion Matrix:")
    print("-" * 80)

    # Calculate confusion matrix components
    tp = predictions.filter((col("label") == 1) & (col("prediction") == 1)).count()
    tn = predictions.filter((col("label") == 0) & (col("prediction") == 0)).count()
    fp = predictions.filter((col("label") == 0) & (col("prediction") == 1)).count()
    fn = predictions.filter((col("label") == 1) & (col("prediction") == 0)).count()

    print(f"                Predicted")
    print(f"                Neg (0)  Pos (1)")
    print(f"Actual  Neg (0)   {tn:3d}      {fp:3d}")
    print(f"        Pos (1)   {fn:3d}      {tp:3d}")

    # Test on new data
    print("\n" + "=" * 80)
    print("BONUS: Test Pipeline on New Unseen Data")
    print("=" * 80)

    new_data = [
        ("This is an amazing film, loved every minute!",),
        ("Terrible movie, don't waste your money.",),
        ("Outstanding performance by the actors.",),
        ("Very disappointing, expected much better.",),
        ("Absolute masterpiece, beautifully crafted.",)
    ]

    new_df = spark.createDataFrame(new_data, ["text"])
    new_predictions = model.transform(new_df)

    print("\nPredictions on New Data:")
    result = new_predictions.select("text", "prediction", "probability").collect()

    for i, row in enumerate(result, 1):
        sentiment = "Positive ðŸ˜Š" if row["prediction"] == 1.0 else "Negative ðŸ˜ž"
        confidence = max(row["probability"].toArray()) * 100
        print(f"\n{i}. [{sentiment}] (Confidence: {confidence:.1f}%)")
        print(f"   Text: '{row['text']}'")

    # Stop Spark Session
    print("\n" + "=" * 80)
    print("Stopping Spark Session...")
    spark.stop()
    print("âœ“ Spark Session stopped")
    print("=" * 80)
    print("PYSPARK SENTIMENT ANALYSIS COMPLETED SUCCESSFULLY!")
    print("=" * 80)


if __name__ == "__main__":
    main()

PYSPARK SENTIMENT ANALYSIS PIPELINE

Checking Java installation...
--------------------------------------------------------------------------------
âœ“ Java is installed

Step 1: Initialize Spark Session
--------------------------------------------------------------------------------
âœ“ Spark Session initialized
  - App Name: SentimentAnalysis
  - Spark Version: 3.5.1

Step 2: Load Data
--------------------------------------------------------------------------------
Creating sample dataset...
âœ“ Sample data created at: data/sentiments.csv
âœ“ Data loaded from: data/sentiments.csv
  - Total rows: 30

Dataset Schema:
root
 |-- text: string (nullable = true)
 |-- sentiment: integer (nullable = true)


Sample Data (first 5 rows):
+------------------------------------------------+---------+
|                                            text|sentiment|
+------------------------------------------------+---------+
|          This movie is fantastic and I love it!|        1|
|                I

In [None]:
"""
Lab Task 4: Evaluating and Improving Model Performance

This module demonstrates various techniques to improve text classification performance:
1. Improved preprocessing and feature selection
2. Advanced embedding methods (Word2Vec)
3. Multiple model architectures (Naive Bayes, Random Forest, Gradient Boosting)
"""

import re
import numpy as np
from typing import List, Dict, Tuple
from sklearn.model_selection import train_test_split, cross_val_score
from sklearn.feature_extraction.text import TfidfVectorizer, CountVectorizer
from sklearn.linear_model import LogisticRegression
from sklearn.naive_bayes import MultinomialNB
from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score
from sklearn.metrics import classification_report, confusion_matrix
import warnings
warnings.filterwarnings('ignore')


class AdvancedTextPreprocessor:
    """
    Advanced text preprocessor with noise filtering and vocabulary reduction.
    """

    def __init__(self, remove_urls=True, remove_special_chars=True,
                 min_word_length=2, lowercase=True):
        """
        Initialize the preprocessor with various options.

        Args:
            remove_urls: Remove URLs from text
            remove_special_chars: Remove special characters
            min_word_length: Minimum word length to keep
            lowercase: Convert text to lowercase
        """
        self.remove_urls = remove_urls
        self.remove_special_chars = remove_special_chars
        self.min_word_length = min_word_length
        self.lowercase = lowercase

    def clean_text(self, text: str) -> str:
        """
        Clean and preprocess text.

        Args:
            text: Input text

        Returns:
            Cleaned text
        """
        # Lowercase
        if self.lowercase:
            text = text.lower()

        # Remove URLs
        if self.remove_urls:
            text = re.sub(r'http\S+|www\S+|https\S+', '', text, flags=re.MULTILINE)

        # Remove HTML tags
        text = re.sub(r'<.*?>', '', text)

        # Remove special characters but keep spaces
        if self.remove_special_chars:
            text = re.sub(r'[^a-zA-Z\s]', '', text)

        # Remove extra whitespace
        text = re.sub(r'\s+', ' ', text).strip()

        # Remove short words
        words = text.split()
        words = [w for w in words if len(w) >= self.min_word_length]
        text = ' '.join(words)

        return text

    def preprocess_corpus(self, texts: List[str]) -> List[str]:
        """
        Preprocess a list of texts.

        Args:
            texts: List of texts

        Returns:
            List of cleaned texts
        """
        return [self.clean_text(text) for text in texts]


class ImprovedTextClassifier:
    """
    Improved text classifier with multiple model options and advanced features.
    """

    def __init__(self, vectorizer, model_type='logistic_regression', **model_params):
        """
        Initialize the classifier.

        Args:
            vectorizer: Vectorizer instance (TfidfVectorizer or CountVectorizer)
            model_type: Type of model ('logistic_regression', 'naive_bayes',
                       'random_forest', 'gradient_boosting')
            **model_params: Additional parameters for the model
        """
        self.vectorizer = vectorizer
        self.model_type = model_type
        self.model_params = model_params
        self._model = None

        # Initialize model based on type
        self._init_model()

    def _init_model(self):
        """Initialize the classification model based on model_type."""
        if self.model_type == 'logistic_regression':
            default_params = {'solver': 'liblinear', 'random_state': 42}
            default_params.update(self.model_params)
            self._model = LogisticRegression(**default_params)

        elif self.model_type == 'naive_bayes':
            self._model = MultinomialNB(**self.model_params)

        elif self.model_type == 'random_forest':
            default_params = {'n_estimators': 100, 'random_state': 42}
            default_params.update(self.model_params)
            self._model = RandomForestClassifier(**default_params)

        elif self.model_type == 'gradient_boosting':
            default_params = {'n_estimators': 100, 'random_state': 42}
            default_params.update(self.model_params)
            self._model = GradientBoostingClassifier(**default_params)

        else:
            raise ValueError(f"Unknown model type: {self.model_type}")

    def fit(self, texts: List[str], labels: List[int]):
        """Train the classifier."""
        X = self.vectorizer.fit_transform(texts)
        self._model.fit(X, labels)
        return self

    def predict(self, texts: List[str]) -> List[int]:
        """Predict labels for new texts."""
        X = self.vectorizer.transform(texts)
        predictions = self._model.predict(X)
        return predictions.tolist()

    def predict_proba(self, texts: List[str]) -> np.ndarray:
        """Predict probabilities for each class."""
        X = self.vectorizer.transform(texts)
        if hasattr(self._model, 'predict_proba'):
            return self._model.predict_proba(X)
        else:
            raise ValueError(f"Model {self.model_type} doesn't support predict_proba")

    def evaluate(self, y_true: List[int], y_pred: List[int]) -> Dict[str, float]:
        """Evaluate the model's predictions."""
        metrics = {
            'accuracy': accuracy_score(y_true, y_pred),
            'precision': precision_score(y_true, y_pred, zero_division=0),
            'recall': recall_score(y_true, y_pred, zero_division=0),
            'f1_score': f1_score(y_true, y_pred, zero_division=0)
        }
        return metrics

    def cross_validate(self, texts: List[str], labels: List[int], cv=5) -> Dict[str, float]:
        """Perform cross-validation."""
        X = self.vectorizer.fit_transform(texts)
        scores = cross_val_score(self._model, X, labels, cv=cv, scoring='f1')
        return {
            'mean_f1': scores.mean(),
            'std_f1': scores.std(),
            'scores': scores.tolist()
        }


class SimpleWord2VecClassifier:
    """
    A simple Word2Vec-based classifier using averaged word embeddings.
    Note: For production use, consider using gensim's Word2Vec or pre-trained embeddings.
    """

    def __init__(self, embedding_dim=100, model_type='logistic_regression'):
        """
        Initialize Word2Vec classifier.

        Args:
            embedding_dim: Dimension of word embeddings
            model_type: Type of classifier to use
        """
        self.embedding_dim = embedding_dim
        self.model_type = model_type
        self.word_vectors = {}
        self._model = None

        if model_type == 'logistic_regression':
            self._model = LogisticRegression(solver='liblinear', random_state=42)
        elif model_type == 'gradient_boosting':
            self._model = GradientBoostingClassifier(n_estimators=100, random_state=42)

    def _build_vocabulary(self, texts: List[str]):
        """Build vocabulary from texts."""
        vocab = set()
        for text in texts:
            words = text.lower().split()
            vocab.update(words)
        return list(vocab)

    def _initialize_embeddings(self, vocab: List[str]):
        """Initialize random word embeddings (simplified version)."""
        np.random.seed(42)
        for word in vocab:
            self.word_vectors[word] = np.random.randn(self.embedding_dim) * 0.1

    def _text_to_vector(self, text: str) -> np.ndarray:
        """Convert text to averaged word vector."""
        words = text.lower().split()
        vectors = [self.word_vectors.get(word, np.zeros(self.embedding_dim))
                  for word in words if word in self.word_vectors]

        if not vectors:
            return np.zeros(self.embedding_dim)

        return np.mean(vectors, axis=0)

    def fit(self, texts: List[str], labels: List[int]):
        """Train the classifier."""
        # Build vocabulary and initialize embeddings
        vocab = self._build_vocabulary(texts)
        self._initialize_embeddings(vocab)

        # Convert texts to vectors
        X = np.array([self._text_to_vector(text) for text in texts])

        # Train model
        self._model.fit(X, labels)
        return self

    def predict(self, texts: List[str]) -> List[int]:
        """Predict labels for new texts."""
        X = np.array([self._text_to_vector(text) for text in texts])
        predictions = self._model.predict(X)
        return predictions.tolist()

    def evaluate(self, y_true: List[int], y_pred: List[int]) -> Dict[str, float]:
        """Evaluate predictions."""
        metrics = {
            'accuracy': accuracy_score(y_true, y_pred),
            'precision': precision_score(y_true, y_pred, zero_division=0),
            'recall': recall_score(y_true, y_pred, zero_division=0),
            'f1_score': f1_score(y_true, y_pred, zero_division=0)
        }
        return metrics


def compare_models(X_train, X_test, y_train, y_test) -> Dict:
    """
    Compare different model architectures.

    Returns:
        Dictionary with results for each model
    """
    results = {}

    model_configs = {
        'Logistic Regression': ('logistic_regression', {}),
        'Naive Bayes': ('naive_bayes', {}),
        'Random Forest': ('random_forest', {}),
        'Gradient Boosting': ('gradient_boosting', {})
    }

    for model_name, (model_type, params) in model_configs.items():
        print(f"\nTraining {model_name}...")

        # Create vectorizer for each model
        vectorizer = TfidfVectorizer(
            max_features=1000,
            ngram_range=(1, 2),
            min_df=2,
            stop_words='english'
        )

        # Create and train classifier
        classifier = ImprovedTextClassifier(vectorizer, model_type, **params)
        classifier.fit(X_train, y_train)

        # Make predictions
        y_pred = classifier.predict(X_test)

        # Evaluate
        metrics = classifier.evaluate(y_test, y_pred)
        results[model_name] = metrics

        print(f"  Accuracy: {metrics['accuracy']:.4f}")
        print(f"  F1 Score: {metrics['f1_score']:.4f}")

    return results


def main():
    """
    Main function demonstrating all improvement techniques.
    """
    print("=" * 80)
    print("TASK 4: EVALUATING AND IMPROVING MODEL PERFORMANCE")
    print("=" * 80)

    # Extended dataset
    texts = [
        "This movie is fantastic and I love it!",
        "I hate this film, it's terrible.",
        "The acting was superb, a truly great experience.",
        "What a waste of time, absolutely boring.",
        "Highly recommend this, a masterpiece.",
        "Could not finish watching, so bad.",
        "Amazing storyline, kept me engaged throughout.",
        "Disappointing and poorly executed.",
        "Brilliant cinematography and great performances.",
        "Not worth watching, very dull.",
        "Exceptional movie, one of the best I've seen.",
        "Awful, I regret watching this.",
        "Wonderful experience, loved every scene.",
        "Boring and predictable plot.",
        "Outstanding film with excellent direction.",
        "Terrible waste of money and time.",
        "Incredible acting and beautiful visuals.",
        "So bad, couldn't watch till the end.",
        "Superb entertainment, highly enjoyable.",
        "Worst movie ever, absolutely horrible.",
        "Compelling story with emotional depth.",
        "Poorly written and badly directed.",
        "Captivating from start to finish.",
        "Complete disaster, very disappointing.",
        "Excellent performances all around.",
        "Waste of time, not recommended.",
        "Beautiful cinematography and soundtrack.",
        "Dull and uninspiring movie.",
        "Must watch, absolutely brilliant.",
        "Horrible experience, very bad.",
    ]

    labels = [1, 0, 1, 0, 1, 0, 1, 0, 1, 0, 1, 0, 1, 0, 1, 0, 1, 0, 1, 0,
              1, 0, 1, 0, 1, 0, 1, 0, 1, 0]

    # Split data
    X_train, X_test, y_train, y_test = train_test_split(
        texts, labels, test_size=0.2, random_state=42, stratify=labels
    )

    print(f"\nDataset: {len(texts)} samples")
    print(f"Training: {len(X_train)} | Testing: {len(X_test)}")

    # ========================================================================
    # 1. IMPROVED PREPROCESSING
    # ========================================================================
    print("\n" + "=" * 80)
    print("1. IMPROVED PREPROCESSING AND FEATURE SELECTION")
    print("=" * 80)

    preprocessor = AdvancedTextPreprocessor(
        remove_urls=True,
        remove_special_chars=True,
        min_word_length=2,
        lowercase=True
    )

    print("\nCleaning training data...")
    X_train_clean = preprocessor.preprocess_corpus(X_train)
    X_test_clean = preprocessor.preprocess_corpus(X_test)

    print("\nExample preprocessing:")
    print(f"Original: '{X_train[0]}'")
    print(f"Cleaned:  '{X_train_clean[0]}'")

    # Test different vectorizer configurations
    print("\n" + "-" * 80)
    print("Comparing different TF-IDF configurations:")
    print("-" * 80)

    vectorizer_configs = {
        'Basic TF-IDF': {'max_features': None, 'ngram_range': (1, 1)},
        'TF-IDF + Bigrams': {'max_features': 1000, 'ngram_range': (1, 2)},
        'TF-IDF + min_df=2': {'max_features': 1000, 'ngram_range': (1, 2), 'min_df': 2},
        'TF-IDF Reduced (500)': {'max_features': 500, 'ngram_range': (1, 2), 'min_df': 2}
    }

    for config_name, params in vectorizer_configs.items():
        vectorizer = TfidfVectorizer(stop_words='english', **params)
        classifier = ImprovedTextClassifier(vectorizer, 'logistic_regression')
        classifier.fit(X_train_clean, y_train)

        y_pred = classifier.predict(X_test_clean)
        metrics = classifier.evaluate(y_test, y_pred)

        print(f"\n{config_name}:")
        print(f"  Accuracy: {metrics['accuracy']:.4f} | F1: {metrics['f1_score']:.4f}")

    # ========================================================================
    # 2. ADVANCED EMBEDDING METHODS
    # ========================================================================
    print("\n" + "=" * 80)
    print("2. ADVANCED EMBEDDING METHODS (Word2Vec)")
    print("=" * 80)

    print("\nTraining Word2Vec-based classifier...")
    w2v_classifier = SimpleWord2VecClassifier(
        embedding_dim=100,
        model_type='logistic_regression'
    )

    w2v_classifier.fit(X_train_clean, y_train)
    y_pred_w2v = w2v_classifier.predict(X_test_clean)
    metrics_w2v = w2v_classifier.evaluate(y_test, y_pred_w2v)

    print("\nWord2Vec Results:")
    print(f"  Accuracy:  {metrics_w2v['accuracy']:.4f}")
    print(f"  Precision: {metrics_w2v['precision']:.4f}")
    print(f"  Recall:    {metrics_w2v['recall']:.4f}")
    print(f"  F1 Score:  {metrics_w2v['f1_score']:.4f}")

    # ========================================================================
    # 3. COMPARING DIFFERENT MODEL ARCHITECTURES
    # ========================================================================
    print("\n" + "=" * 80)
    print("3. COMPARING DIFFERENT MODEL ARCHITECTURES")
    print("=" * 80)

    results = compare_models(X_train_clean, X_test_clean, y_train, y_test)

    # Summary table
    print("\n" + "-" * 80)
    print("PERFORMANCE SUMMARY")
    print("-" * 80)
    print(f"{'Model':<25} {'Accuracy':<12} {'Precision':<12} {'Recall':<12} {'F1 Score':<12}")
    print("-" * 80)

    for model_name, metrics in results.items():
        print(f"{model_name:<25} "
              f"{metrics['accuracy']:<12.4f} "
              f"{metrics['precision']:<12.4f} "
              f"{metrics['recall']:<12.4f} "
              f"{metrics['f1_score']:<12.4f}")

    # Find best model
    best_model = max(results.items(), key=lambda x: x[1]['f1_score'])
    print("-" * 80)
    print(f"  Best Model: {best_model[0]} (F1 Score: {best_model[1]['f1_score']:.4f})")

    # ========================================================================
    # 4. RECOMMENDATIONS
    # ========================================================================
    print("\n" + "=" * 80)
    print("RECOMMENDATIONS FOR IMPROVEMENT")
    print("=" * 80)

    recommendations = """
    âœ“ Preprocessing Improvements:
      â€¢ Add spell correction for noisy text
      â€¢ Use lemmatization instead of simple tokenization
      â€¢ Experiment with different stop word lists

    âœ“ Feature Engineering:
      â€¢ Try character n-grams for capturing style
      â€¢ Add sentiment lexicon features
      â€¢ Combine TF-IDF with hand-crafted features

    âœ“ Advanced Embeddings:
      â€¢ Use pre-trained Word2Vec (Google News)
      â€¢ Try GloVe embeddings
      â€¢ Experiment with FastText for OOV words
      â€¢ Use contextual embeddings (BERT, if resources allow)

    âœ“ Model Architecture:
      â€¢ Try ensemble methods (voting, stacking)
      â€¢ Experiment with neural networks (LSTM, CNN)
      â€¢ Use hyperparameter tuning (GridSearch, RandomSearch)

    âœ“ Data Improvements:
      â€¢ Collect more training data
      â€¢ Balance the dataset if imbalanced
      â€¢ Use data augmentation techniques
      â€¢ Add cross-validation for robust evaluation
    """

    print(recommendations)

    print("\n" + "=" * 80)
    print("TASK 4 COMPLETED SUCCESSFULLY!")
    print("=" * 80)


if __name__ == "__main__":
    main()

TASK 4: EVALUATING AND IMPROVING MODEL PERFORMANCE

Dataset: 30 samples
Training: 24 | Testing: 6

1. IMPROVED PREPROCESSING AND FEATURE SELECTION

Cleaning training data...

Example preprocessing:
Original: 'So bad, couldn't watch till the end.'
Cleaned:  'so bad couldnt watch till the end'

--------------------------------------------------------------------------------
Comparing different TF-IDF configurations:
--------------------------------------------------------------------------------

Basic TF-IDF:
  Accuracy: 0.8333 | F1: 0.8000

TF-IDF + Bigrams:
  Accuracy: 0.8333 | F1: 0.8000

TF-IDF + min_df=2:
  Accuracy: 0.5000 | F1: 0.5714

TF-IDF Reduced (500):
  Accuracy: 0.5000 | F1: 0.5714

2. ADVANCED EMBEDDING METHODS (Word2Vec)

Training Word2Vec-based classifier...

Word2Vec Results:
  Accuracy:  0.6667
  Precision: 0.6667
  Recall:    0.6667
  F1 Score:  0.6667

3. COMPARING DIFFERENT MODEL ARCHITECTURES

Training Logistic Regression...
  Accuracy: 0.5000
  F1 Score: 0.5714

