In [None]:
# %pip install boto3 python-dotenv pyspark torch transformers scikit-learn matplotlib --user

In [1]:
import boto3
import json
from dotenv import load_dotenv
import os

# Load environment variables from .env file
load_dotenv()

def load_jsonl_from_s3(bucket_name, file_key):
    """
    Load a JSONL file from an S3 bucket using credentials from environment variables.

    Parameters:
    - bucket_name: str - Name of the S3 bucket.
    - file_key: str - Key (path) of the JSONL file in the bucket.

    Returns:
    - List of Python dictionaries loaded from the JSONL file.
    """
    # Get AWS credentials from environment variables
    aws_access_key_id = os.getenv("AWS_ACCESS_KEY_ID")
    aws_secret_access_key = os.getenv("AWS_SECRET_ACCESS_KEY")
    region_name = os.getenv("AWS_REGION")
    
    # Initialize an S3 client
    s3_client = boto3.client(
        's3',
        aws_access_key_id=aws_access_key_id,
        aws_secret_access_key=aws_secret_access_key,
        region_name=region_name
    )
    
    # Retrieve the file content
    response = s3_client.get_object(Bucket=bucket_name, Key=file_key)
    content = response['Body'].read().decode('utf-8')
    
    # Parse the JSONL content
    data = [json.loads(line) for line in content.splitlines() if line.strip()]
    
    return data

# Example usage
if __name__ == "__main__":
    bucket_name = "small-reviews584"
    file_key = "data/reviews_small.jsonl"
    
    data = load_jsonl_from_s3(bucket_name, file_key)

In [4]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import when, col
from pyspark.ml.feature import Tokenizer, StopWordsRemover, HashingTF, IDF
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator
#import pipeline
from pyspark.ml import Pipeline


# Create a Spark session
spark = SparkSession.builder \
    .appName("AmazonReviewsLocalLR") \
    .master("local[*]") \
    .getOrCreate()

spark.sparkContext.setLogLevel("ERROR")

# Convert Python list of dicts to Spark DataFrame
df = spark.createDataFrame(data)

# Ensure the expected columns exist: 'text' and 'rating'
required_columns = ["text", "rating"]
for col_name in required_columns:
    if col_name not in df.columns:
        raise ValueError(f"Expected column '{col_name}' not found in data")

# Create binary label: label=1 if rating >=3 else 0
df = df.withColumn("label", when(col("rating") >= 3, 1).otherwise(0))

# Filter out rows without text
df = df.filter(col("text").isNotNull())

# Define text processing and feature extraction pipeline
tokenizer = Tokenizer(inputCol="text", outputCol="words")
remover = StopWordsRemover(inputCol="words", outputCol="filtered_words")
hashingTF = HashingTF(inputCol="filtered_words", outputCol="rawFeatures", numFeatures=10000)
idf = IDF(inputCol="rawFeatures", outputCol="features")

# Logistic Regression model
lr = LogisticRegression(featuresCol="features", labelCol="label", maxIter=20, regParam=0.001)

# Create a pipeline
pipeline = Pipeline(stages=[tokenizer, remover, hashingTF, idf, lr])

# Split into train and test sets
train_df, test_df = df.randomSplit([0.8, 0.2], seed=42)

# Fit the model
model = pipeline.fit(train_df)

# Make predictions
predictions = model.transform(test_df)

# Evaluate model performance
binary_evaluator = BinaryClassificationEvaluator(labelCol="label", rawPredictionCol="rawPrediction", metricName="areaUnderROC")
auc = binary_evaluator.evaluate(predictions)

# For multi-metric evaluation
multi_evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction")

accuracy = multi_evaluator.setMetricName("accuracy").evaluate(predictions)
precision = multi_evaluator.setMetricName("weightedPrecision").evaluate(predictions)
recall = multi_evaluator.setMetricName("weightedRecall").evaluate(predictions)
f1 = multi_evaluator.setMetricName("f1").evaluate(predictions)

print("=== Performance Metrics ===")
print(f"Test AUC (Area Under ROC): {auc:.4f}")
print(f"Test Accuracy: {accuracy:.4f}")
print(f"Test Precision: {precision:.4f}")
print(f"Test Recall: {recall:.4f}")
print(f"Test F1-score: {f1:.4f}")

# # Show a confusion matrix-like table
# predictions.groupBy("label", "prediction").count().show()

# Stop the Spark session (if you do not need it further)
# spark.stop()

=== Performance Metrics ===
Test AUC (Area Under ROC): 0.7345
Test Accuracy: 0.8333
Test Precision: 0.8372
Test Recall: 0.8333
Test F1-score: 0.8352


In [3]:
import torch
from transformers import BertTokenizer, BertModel
from torch.utils.data import DataLoader, Dataset, random_split
from torch.nn import Module, Linear, Sigmoid
import torch.nn.functional as F
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score

# Dataset Definition
class ReviewDataset(Dataset):
    def __init__(self, reviews, tokenizer, max_length=128):
        self.reviews = reviews
        self.tokenizer = tokenizer
        self.max_length = max_length

    def __len__(self):
        return len(self.reviews)

    def __getitem__(self, idx):
        review = self.reviews[idx]
        text = review["title"] + " " + review["text"]
        rating = review["rating"]
        label = 1 if rating >= 4 else 0

        encoding = self.tokenizer(
            text,
            max_length=self.max_length,
            padding="max_length",
            truncation=True,
            return_tensors="pt",
        )

        return {
            "input_ids": encoding["input_ids"].squeeze(0),
            "attention_mask": encoding["attention_mask"].squeeze(0),
            "label": torch.tensor(label, dtype=torch.float32),
        }

# Model Definition
class ReviewClassifier(Module):
    def __init__(self, pretrained_model_name="bert-base-uncased", hidden_dim=128):
        super(ReviewClassifier, self).__init__()
        self.bert = BertModel.from_pretrained(pretrained_model_name)
        self.fc1 = Linear(self.bert.config.hidden_size, hidden_dim)
        self.fc2 = Linear(hidden_dim, 1)
        self.sigmoid = Sigmoid()

    def forward(self, input_ids, attention_mask):
        bert_output = self.bert(input_ids=input_ids, attention_mask=attention_mask)
        pooled_output = bert_output.pooler_output
        x = F.relu(self.fc1(pooled_output))
        x = self.fc2(x)
        return self.sigmoid(x)

# Metrics Evaluation
def evaluate_metrics(model, data_loader, device):
    model.eval()
    all_preds, all_labels = [], []

    with torch.no_grad():
        for batch in data_loader:
            input_ids = batch["input_ids"].to(device)
            attention_mask = batch["attention_mask"].to(device)
            labels = batch["label"].to(device)

            outputs = model(input_ids=input_ids, attention_mask=attention_mask).squeeze()
            preds = (outputs > 0.5).cpu().numpy()
            all_preds.extend(preds)
            all_labels.extend(labels.cpu().numpy())

    accuracy = accuracy_score(all_labels, all_preds)
    precision = precision_score(all_labels, all_preds)
    recall = recall_score(all_labels, all_preds)
    f1 = f1_score(all_labels, all_preds)

    return accuracy, precision, recall, f1

# Training Loop
def train_epoch(model, data_loader, criterion, optimizer, device):
    model.train()
    total_loss = 0

    for batch in data_loader:
        input_ids = batch["input_ids"].to(device)
        attention_mask = batch["attention_mask"].to(device)
        labels = batch["label"].to(device)

        optimizer.zero_grad()
        outputs = model(input_ids=input_ids, attention_mask=attention_mask).squeeze()
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

        total_loss += loss.item()

    return total_loss / len(data_loader)

# Initialize tokenizer and dataset
tokenizer = BertTokenizer.from_pretrained("bert-base-uncased")
max_length = 128
dataset = ReviewDataset(data, tokenizer, max_length=max_length)

# Split dataset
train_size = int(0.8 * len(dataset))
test_size = len(dataset) - train_size
train_dataset, test_dataset = random_split(dataset, [train_size, test_size])

train_loader = DataLoader(train_dataset, batch_size=16, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=16)

# Device configuration
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = ReviewClassifier().to(device)

# Loss and optimizer
criterion = torch.nn.BCELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=2e-5)

# Training the model
epochs = 3
for epoch in range(epochs):
    train_loss = train_epoch(model, train_loader, criterion, optimizer, device)
    print(f"Epoch {epoch+1}/{epochs}, Train Loss: {train_loss:.4f}")

# Evaluate the model
print("Evaluating on the test set...")
accuracy, precision, recall, f1 = evaluate_metrics(model, test_loader, device)

print(f"Test Metrics - Accuracy: {accuracy:.4f}, Precision: {precision:.4f}, Recall: {recall:.4f}, F1 Score: {f1:.4f}")

# Save the model
torch.save(model.state_dict(), "review_classifier.pt")
print("Model saved to review_classifier.pt")


model.safetensors:   0%|          | 0.00/440M [00:00<?, ?B/s]

Epoch 1/3, Train Loss: 0.5004
Epoch 2/3, Train Loss: 0.2962
Epoch 3/3, Train Loss: 0.1739
Evaluating on the test set...
Test Metrics - Accuracy: 0.9300, Precision: 0.9593, Recall: 0.9593, F1 Score: 0.9593
Model saved to review_classifier.pt


In [5]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when
from pyspark.ml.feature import Tokenizer, StopWordsRemover, HashingTF, IDF, VectorAssembler
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator
from pyspark.ml import Pipeline

# Start Spark session
spark = SparkSession.builder \
    .appName("SentimentClassifier") \
    .master("local[*]") \
    .getOrCreate()

spark.sparkContext.setLogLevel("ERROR")

# Create a binary label: favorable (1) for ratings >= 3, not favorable (0) otherwise
df = df.withColumn("label", when(col("rating") >= 3, 1).otherwise(0))

# Process text column
text_tokenizer = Tokenizer(inputCol="text", outputCol="text_words")
text_remover = StopWordsRemover(inputCol="text_words", outputCol="filtered_text_words")
text_hashingTF = HashingTF(inputCol="filtered_text_words", outputCol="text_rawFeatures", numFeatures=10000)
text_idf = IDF(inputCol="text_rawFeatures", outputCol="features")

# Process title column
title_tokenizer = Tokenizer(inputCol="title", outputCol="title_words")
title_remover = StopWordsRemover(inputCol="title_words", outputCol="filtered_title_words")
title_hashingTF = HashingTF(inputCol="filtered_title_words", outputCol="title_rawFeatures", numFeatures=5000)
title_idf = IDF(inputCol="title_rawFeatures", outputCol="title_features")

# Combine features from text and title
feature_assembler = VectorAssembler(
    inputCols=["features", "title_features"], 
    outputCol="assembled_features"
)

# Random Forest Classifier
rf = RandomForestClassifier(featuresCol="assembled_features", labelCol="label", numTrees=100, maxDepth=10, seed=42)

# Pipeline to chain all the stages together
rf_pipeline = Pipeline(stages=[
    text_tokenizer, text_remover, text_hashingTF, text_idf,
    title_tokenizer, title_remover, title_hashingTF, title_idf,
    feature_assembler, rf
])

# Train/test data split
rf_train_df, rf_test_df = df.randomSplit([0.8, 0.2], seed=42)

# Train the model
rf_model = rf_pipeline.fit(rf_train_df)

# Make predictions on the test set
rf_predictions = rf_model.transform(rf_test_df)

# Evaluate model performance and metrics
binary_evaluator = BinaryClassificationEvaluator(labelCol="label", rawPredictionCol="rawPrediction", metricName="areaUnderROC")
auc = binary_evaluator.evaluate(rf_predictions)
print(f"Test AUC: {auc}")

multi_evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction")
accuracy = multi_evaluator.evaluate(rf_predictions, {multi_evaluator.metricName: "accuracy"})
precision = multi_evaluator.evaluate(rf_predictions, {multi_evaluator.metricName: "weightedPrecision"})
recall = multi_evaluator.evaluate(rf_predictions, {multi_evaluator.metricName: "weightedRecall"})
f1 = multi_evaluator.evaluate(rf_predictions, {multi_evaluator.metricName: "f1"})

print(f"Test Accuracy: {accuracy}")
print(f"Test Precision: {precision}")
print(f"Test Recall: {recall}")
print(f"Test F1 Score: {f1}")

# Stop the Spark session
# spark.stop()

                                                                                

Test AUC: 0.702440458688621


                                                                                

Test Accuracy: 0.9040404040404041
Test Precision: 0.817289052137537
Test Recall: 0.9040404040404041
Test F1 Score: 0.8584786860648931


In [6]:
#naive bayes
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when
from pyspark.ml.feature import Tokenizer, StopWordsRemover, HashingTF, IDF, VectorAssembler
from pyspark.ml.classification import NaiveBayes
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator
from pyspark.ml import Pipeline

# Start Spark session
spark = SparkSession.builder \
    .appName("SentimentClassifier") \
    .master("local[*]") \
    .getOrCreate()

spark.sparkContext.setLogLevel("ERROR")

# Create a binary label: favorable (1) for ratings >= 3, not favorable (0) otherwise
df = df.withColumn("label", when(col("rating") >= 3, 1).otherwise(0))

# Process text column
text_tokenizer = Tokenizer(inputCol="text", outputCol="text_words")
text_remover = StopWordsRemover(inputCol="text_words", outputCol="filtered_text_words")
text_hashingTF = HashingTF(inputCol="filtered_text_words", outputCol="text_rawFeatures", numFeatures=10000)
text_idf = IDF(inputCol="text_rawFeatures", outputCol="features")

# Process title column
title_tokenizer = Tokenizer(inputCol="title", outputCol="title_words")
title_remover = StopWordsRemover(inputCol="title_words", outputCol="filtered_title_words")
title_hashingTF = HashingTF(inputCol="filtered_title_words", outputCol="title_rawFeatures", numFeatures=5000)
title_idf = IDF(inputCol="title_rawFeatures", outputCol="title_features")

# Combine features from text and title
feature_assembler = VectorAssembler(
    inputCols=["features", "title_features"], 
    outputCol="assembled_features"
)

# Naive Bayes Classifier
nb = NaiveBayes(featuresCol="assembled_features", labelCol="label", smoothing=1.0)

# Pipeline to chain all the stages together
nb_pipeline = Pipeline(stages=[
    text_tokenizer, text_remover, text_hashingTF, text_idf,
    title_tokenizer, title_remover, title_hashingTF, title_idf,
    feature_assembler, nb
])

# Train/test data split
nb_train_df, nb_test_df = df.randomSplit([0.8, 0.2], seed=42)

# Train the model
nb_model = nb_pipeline.fit(nb_train_df)

# Make predictions on the test set
nb_predictions = nb_model.transform(nb_test_df)

# Evaluate model performance and metrics
binary_evaluator = BinaryClassificationEvaluator(labelCol="label", rawPredictionCol="rawPrediction", metricName="areaUnderROC")

auc = binary_evaluator.evaluate(nb_predictions)

print(f"Test AUC: {auc}")

multi_evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction")

accuracy = multi_evaluator.evaluate(nb_predictions, {multi_evaluator.metricName: "accuracy"})
precision = multi_evaluator.evaluate(nb_predictions, {multi_evaluator.metricName: "weightedPrecision"})
recall = multi_evaluator.evaluate(nb_predictions, {multi_evaluator.metricName: "weightedRecall"})
f1 = multi_evaluator.evaluate(nb_predictions, {multi_evaluator.metricName: "f1"})
print(f"Test Accuracy: {accuracy}")
print(f"Test Precision: {precision}")
print(f"Test Recall: {recall}")
print(f"Test F1 Score: {f1}")

# Stop the Spark session
spark.stop() # Commented out to avoid stopping the Spark session before using it for the next example, add to last algo cell


                                                                                

Test AUC: 0.49456042340488093
Test Accuracy: 0.8434343434343434
Test Precision: 0.879016354016354
Test Recall: 0.8434343434343434
Test F1 Score: 0.8585264513630095
