In [0]:

import glob
import os
import shutil
# CHANGED: Remove unused pandas and BeautifulSoup import at top
# import pandas as pd
# from bs4 import BeautifulSoup

from pyspark.sql import SparkSession
from pyspark.sql.functions import (
    col, lower, trim, regexp_replace, size, split, array_distinct, when
)
from pyspark.sql.types import StringType

class ReviewDataset:
    def __init__(self, spark, input_folder, raw_folder, output_folder):
        self.input_folder = input_folder
        self.raw_folder = raw_folder
        self.output_folder = output_folder
        self.spark = spark

    @staticmethod
    def get_latest_file_in_folder(folder_path: str, pattern: str = "*.csv") -> str:  # CHANGED: fixed HTML-escaped -> to ->
        files = glob.glob(os.path.join(folder_path, pattern))
        if not files:
            raise FileNotFoundError(f"No files found in {folder_path} with pattern {pattern}")
        latest_file = max(files, key=os.path.getmtime)
        return latest_file

    @staticmethod
    def move_file_to_folder(src_file_path: str, dest_folder: str) -> str:  # CHANGED: fixed ->
        os.makedirs(dest_folder, exist_ok=True)
        filename = os.path.basename(src_file_path)
        dest_file_path = os.path.join(dest_folder, filename)
        shutil.move(src_file_path, dest_file_path)
        print(f"Moved file {filename} to {dest_folder}")
        return dest_file_path

    # CHANGED: Remove BeautifulSoup-based cleaners and Python UDFs entirely. Use Spark SQL functions.
    @staticmethod
    def remove_html_spark(df, review_col: str = "review_content"):
        """
        Remove HTML tags using regexp_replace, then lower + trim.
        """
        cleaned = df.withColumn(
            review_col,
            trim(lower(regexp_replace(col(review_col), "<[^>]+>", "")))
        )
        return cleaned

    @staticmethod
    def map_sentiment_spark(df, rating_col: str = "rating"):
        """
        Map numeric rating to sentiment using Spark SQL when/otherwise.
        """
        rating_d = col(rating_col).cast("double")
        df = df.withColumn(
            "sentiment",
            when(rating_d >= 4.0, "positive")
            .when(rating_d == 3.0, "neutral")
            .when(rating_d <= 2.0, "negative")
            .otherwise("neutral")
        )
        return df

    @staticmethod
    def detect_suspicious_reviews_spark(df, review_col: str = "review_content", min_word_len: int = 5):
        df = df.withColumn("word_count", size(split(col(review_col), " ")))
        df = df.withColumn("is_suspicious_length", col("word_count") < min_word_len)  # CHANGED: fixed < operator
        df = df.withColumn("distinct_words_count", size(array_distinct(split(col(review_col), " "))))
        df = df.withColumn("is_suspicious_repetitive", col("distinct_words_count") == 1)
        df = df.withColumn("suspicious_review", col("is_suspicious_length") | col("is_suspicious_repetitive"))
        df = df.drop("word_count", "distinct_words_count", "is_suspicious_length", "is_suspicious_repetitive")
        return df

    def ingest_latest_file(self, input_folder, output_folder):
        latest_file = self.get_latest_file_in_folder(input_folder, "*.csv")
        moved_file = self.move_file_to_folder(latest_file, output_folder)
        print(f"Ingested file is now at: {moved_file}")
        return moved_file

    def clean_data(self, raw_folder):
        latest_file = self.get_latest_file_in_folder(raw_folder, "*.csv")
        print(f"Cleaning latest file: {latest_file}")
        # CHANGED: For local workspace paths, use "file:" scheme; for DBFS use "dbfs:/..."
        # If your path is under /Workspace/..., Spark often needs file: prefix.
        read_path = latest_file if latest_file.startswith("dbfs:/") else f"file:{latest_file}"

        df = self.spark.read.option("header", True).csv(read_path)

        # CHANGED: Use Spark SQL functions instead of Python UDFs for cleaning
        df = self.remove_html_spark(df, review_col="review_content")

        # Drop rows where review_content became null or empty after cleaning
        df = df.filter(col("review_content").isNotNull() & (trim(col("review_content")) != ""))

        # Suspicious review detection stays with Spark functions
        df = self.detect_suspicious_reviews_spark(df, review_col="review_content")

        return df

    def transform(self, df):
        # CHANGED: Use Spark SQL functions for sentiment mapping
        df = self.map_sentiment_spark(df, rating_col="rating")
        df_final = df.select("review_content", "sentiment")
        return df_final

    def save_output(self, df, output_folder, output_filename="training_dataset.csv"):
        os.makedirs(output_folder, exist_ok=True)
        output_path = os.path.join(output_folder, output_filename)

        # CHANGED: Spark writes a folder when using .csv(). Keep .csv extension if you wish,
        # but know it will be a directory containing part files.
        df.coalesce(1).write.mode("overwrite").option("header", True).csv(output_path)
        print(f"Output saved to {output_path}")

    def run_pipeline(self):
        try:
            # Step 1: Ingest latest file (optional)
            # self.ingest_latest_file(self.input_folder, self.output_folder)

            # Step 2: Clean data
            df = self.clean_data(self.raw_folder)
            df.printSchema()
            # CHANGED: replace display(df) with show() for jobs
            df.show(20, truncate=False)

            # Step 3: Transform data
            df_final = self.transform(df)
            df_final.show(20, truncate=False)

            # Step 4: Save output
            self.save_output(df_final, self.output_folder)
        except Exception as e:
            print(f"Error: {e}")

if __name__ == "__main__":
    spark = SparkSession.builder.appName("review_dataset").getOrCreate()

    # CHANGED: Define these paths; they were previously undefined.
    # Update these to match your environment. If using DBFS, prefer dbfs:/ paths.
    input_file_path = "/Workspace/Users/rojsun25@gmail.com/messy_review_dataset_model/input"
    raw_file_path = "/Workspace/Users/rojsun25@gmail.com/messy_review_dataset_model/raw"
    output_file_path = "/Workspace/Users/rojsun25@gmail.com/messy_review_dataset_model/output"

    task = ReviewDataset(spark, input_file_path, raw_file    task = ReviewDataset(spark, input_file_path, raw_file_path, output_file_path)
