In [None]:
def generate_responseOpenAI(prompt):
    OPENAI_MODEL = "gpt-4o"
    OPENAI_ENDPOINT = "https://api.openai.com/v1/chat/completions"
    OPENAI_API_KEY = "abc"

    session = requests.Session()
    headers = {
        "Authorization": f"Bearer {OPENAI_API_KEY}",
        "Content-Type": "application/json"
    }
    """Use OpenAI GPT-4o via direct HTTP request to generate a response"""
    payload = {
        "model": OPENAI_MODEL,
        "messages": [
            {"role": "system", "content": "You are a helpful assistant that summarizes topics."},
            {"role": "user", "content": prompt}
        ],
        "max_tokens": 200,
        "temperature": 0
    }

    try:
        response = session.post(OPENAI_ENDPOINT, headers=headers, json=payload, timeout=20)
        response.raise_for_status()
        return response.json()["choices"][0]["message"]["content"].strip()
    except requests.RequestException as e:
        print("Request failed:", e)
        return ""

def generate_topics_from_customer_reviews(partitions, spark):
    TEXT_DELIMITER = '####'
    previous_topics_broadcast = spark.sparkContext.broadcast(set())
    for i, batch in enumerate(partitions, start=1):
        print(f"\nProcessing Batch {i}")

        previous_topics = previous_topics_broadcast.value

        # First LLM call: Generate 5 summarized topics
        prompt_1 = f"""Below is a set of reviews delimited by {TEXT_DELIMITER}:\n{batch}\n
    Identify 5 summarized topics (1–4 words) from these reviews.
    Return the output as a bullet list, each line one concise topic. No descriptions."""

        text_1 = generate_responseOpenAI(prompt_1)
        print("Raw Topics Output:", text_1)

        extracted_topics = []
        for line in text_1.split("\n"):
            if line.strip():
                match = re.search(r"[-•*]?\s*(.+)", line.strip())
                if match:
                    extracted_topics.append(match.group(1).strip())

        print("Extracted Topics:", extracted_topics)

        # Second LLM call: Filter out previously seen topics
        prompt_2 = f"""Given the following topics:\n{extracted_topics}\n
    Previously covered topics are: {list(previous_topics)}\n
    Return only the new topics not present in the previously covered list.
    Output as a bullet list, one topic per line, 1–4 words, no description."""

        text_2 = generate_responseOpenAI(prompt_2)
        print("Filtered Topics Output:", text_2)

        new_topics = []
        for line in text_2.split("\n"):
            if line.strip():
                match = re.search(r"[-•*]?\s*(.+)", line.strip())
                if match:
                    topic = match.group(1).strip()
                    if topic:
                        new_topics.append(topic)

        print("New Topics:", new_topics)


        if new_topics:
            updated_set = previous_topics.union(set(new_topics))
            previous_topics_broadcast.unpersist()
            previous_topics_broadcast = spark.sparkContext.broadcast(updated_set)

        print("Updated Unique Topics:", previous_topics_broadcast.value)
        print("-" * 50)

        if i == 10: break
    final_topics = list(previous_topics_broadcast.value)
    print("Final Unique Topics:", previous_topics_broadcast.value)
    return final_topics

In [None]:
def create_partitions(nykaa_customer_reviews):
  current_partition = ""
  current_partition_length = 0
  partitions = []

  for row in nykaa_customer_reviews.collect():
      review_length = row.review_length

      if current_partition_length + review_length > 2000:
          partitions.append(current_partition)
          current_partition = ""
          current_partition_length = 0

      to_add = "####" + row.user_review
      current_partition += to_add
      current_partition_length += review_length

  if current_partition:
      partitions.append(current_partition)

  for i, partition in enumerate(partitions):
      print(f"Partition {i+1}:\n")
      print("".join(partition))
      print("\n" + "=" * 50 + "\n")
  return partitions

In [None]:
# when user asks for what customers are talking about the product on the internet
def customer_review_topic_modeling(product_name):
  from pyspark.sql import SparkSession
  spark = SparkSession.builder.appName("BAX493_FinalProject")\
      .config("spark.jars.packages", "graphframes:graphframes:0.8.2-spark3.0-s_2.12") \
      .config("spark.sql.adaptive.enabled", "true") \
      .config("spark.sql.adaptive.coalescePartitions.enabled", "true") \
      .config("spark.driver.memory", "12g").config("spark.executor.memory", "12g").getOrCreate()

  file_path = "/content/sample_data/nyka_top_brands_cosmetics_product_reviews.csv"
  # Source: https://www.kaggle.com/datasets/jithinanievarghese/cosmetics-and-beauty-products-reviews-top-brands

  nykaa_customer_reviews = spark.read.csv(file_path, header=True, inferSchema=True)

  nykaa_customer_reviews = nykaa_customer_reviews.select(
      "brand_name", "review_title", "review_text", "review_rating", "review_label",
      "product_title", "mrp", "product_rating", "product_rating_count", "product_url"
  )

  nykaa_customer_reviews = nykaa_customer_reviews.withColumn('user_review',
                      concat(col('review_title'),lit('. '), col('review_text')))
  nykaa_customer_reviews = nykaa_customer_reviews.drop("review_text", "review_title")

  nykaa_customer_reviews = nykaa_customer_reviews.filter(col("user_review").isNotNull())
  nykaa_customer_reviews = nykaa_customer_reviews.withColumn("review_length", length("user_review"))
  nykaa_customer_reviews = nykaa_customer_reviews.where(nykaa_customer_reviews.product_title==product_name)
  partitions = create_partitions(nykaa_customer_reviews)
  final_topics = generate_topics_from_customer_reviews(partitions, spark)
  return (partitions, final_topics)

In [None]:
import re
from collections import Counter

def analyze_sentiment_from_customer_reviews(partitions, spark):
    TEXT_DELIMITER = '####'
    overall_sentiment_counter = Counter()  # Track sentiment across all batches

    for i, batch in enumerate(partitions, start=1):
        print(f"\nProcessing Batch {i}")

        # Prompt for sentiment analysis
        prompt = f"""Below is a set of customer reviews delimited by '{TEXT_DELIMITER}':\n{batch}\n\n
For each review, assign one of the following sentiment labels: Positive, Neutral, or Negative.

Return the results as a list, with each line containing:
[Sentiment]: [Original Review]

Example:
Positive: I love this product!
Neutral: The packaging was okay.
Negative: It irritated my skin.

Now, return the labeled reviews below:"""

        text = generate_responseOpenAI(prompt)
        print("Sentiment Analysis Output:\n", text)

        # Extract sentiment labels using regex
        sentiments = []
        for line in text.splitlines():
            match = re.match(r'^\s*(Positive|Neutral|Negative)\s*:', line, re.IGNORECASE)
            if match:
                sentiment = match.group(1).capitalize()
                sentiments.append(sentiment)

        # Batch-level stats
        batch_sentiment_counter = Counter(sentiments)
        majority_sentiment = batch_sentiment_counter.most_common(1)[0][0] if batch_sentiment_counter else "Unknown"
        print(f"Majority Sentiment (Batch {i}): {majority_sentiment}")
        print(f"Sentiment Counts (Batch {i}): {dict(batch_sentiment_counter)}")

        # Update overall sentiment tracker
        overall_sentiment_counter.update(batch_sentiment_counter)

        print("-" * 50)

        if i == 10: #stopping at 10 batches here due to paucity of time. Feel free to change this to run over the entire dataset
            break

    # Final overall stats
    overall_majority = overall_sentiment_counter.most_common(1)[0][0] if overall_sentiment_counter else "Unknown"
    print("\n===== Overall Sentiment Summary =====")
    print(f"Overall Sentiment Counts: {dict(overall_sentiment_counter)}")
    print(f"Overall Majority Sentiment: {overall_majority}")

In [None]:
def customer_review_sentiment_modeling(product_name):
  from pyspark.sql import SparkSession
  spark = SparkSession.builder.appName("BAX493_FinalProject")\
      .config("spark.jars.packages", "graphframes:graphframes:0.8.2-spark3.0-s_2.12") \
      .config("spark.sql.adaptive.enabled", "true") \
      .config("spark.sql.adaptive.coalescePartitions.enabled", "true") \
      .config("spark.driver.memory", "12g").config("spark.executor.memory", "12g").getOrCreate()

  file_path = "/content/sample_data/nyka_top_brands_cosmetics_product_reviews.csv"
  # Source: https://www.kaggle.com/datasets/jithinanievarghese/cosmetics-and-beauty-products-reviews-top-brands

  nykaa_customer_reviews = spark.read.csv(file_path, header=True, inferSchema=True)

  nykaa_customer_reviews = nykaa_customer_reviews.select(
      "brand_name", "review_title", "review_text", "review_rating", "review_label",
      "product_title", "mrp", "product_rating", "product_rating_count", "product_url"
  )

  nykaa_customer_reviews = nykaa_customer_reviews.withColumn('user_review',
                      concat(col('review_title'),lit('. '), col('review_text')))
  nykaa_customer_reviews = nykaa_customer_reviews.drop("review_text", "review_title")

  nykaa_customer_reviews = nykaa_customer_reviews.filter(col("user_review").isNotNull())
  nykaa_customer_reviews = nykaa_customer_reviews.withColumn("review_length", length("user_review"))
  nykaa_customer_reviews = nykaa_customer_reviews.where(nykaa_customer_reviews.product_title==product_name)
  partitions = create_partitions(nykaa_customer_reviews)
  analyze_sentiment_from_customer_reviews(partitions, spark)

In [None]:
def customer_review_topic_modeling_CoherenceScore(product_name):
    from pyspark.sql import SparkSession
    from pyspark.sql.functions import concat, col, lit, length

    spark = SparkSession.builder.appName("BAX493_FinalProject")\
        .config("spark.jars.packages", "graphframes:graphframes:0.8.2-spark3.0-s_2.12") \
        .config("spark.sql.adaptive.enabled", "true") \
        .config("spark.sql.adaptive.coalescePartitions.enabled", "true") \
        .config("spark.driver.memory", "12g").config("spark.executor.memory", "12g").getOrCreate()

    file_path = "/content/sample_data/nyka_top_brands_cosmetics_product_reviews.csv"

    nykaa_customer_reviews = spark.read.csv(file_path, header=True, inferSchema=True)

    nykaa_customer_reviews = nykaa_customer_reviews.select(
        "brand_name", "review_title", "review_text", "review_rating", "review_label",
        "product_title", "mrp", "product_rating", "product_rating_count", "product_url"
    )

    nykaa_customer_reviews = nykaa_customer_reviews.withColumn('user_review',
                        concat(col('review_title'), lit('. '), col('review_text')))
    nykaa_customer_reviews = nykaa_customer_reviews.drop("review_text", "review_title")
    nykaa_customer_reviews = nykaa_customer_reviews.filter(col("user_review").isNotNull())
    nykaa_customer_reviews = nykaa_customer_reviews.withColumn("review_length", length("user_review"))
    nykaa_customer_reviews = nykaa_customer_reviews.where(nykaa_customer_reviews.product_title == product_name)

    partitions = create_partitions(nykaa_customer_reviews)

    # Step 1: Run topic generation
    final_topics = generate_topics_from_customer_reviews(partitions, spark)

    # Step 2: Collect Final Topics from Broadcast Variable
    # final_topics = list(generate_topics_from_customer_reviews.previous_topics)

    # Step 3: Collect actual reviews
    all_reviews = [row["user_review"] for row in nykaa_customer_reviews.collect() if row["user_review"]]

    # Step 4: Coherence Calculation (same as before)
    import nltk
    nltk.download('punkt', force=True)
    nltk.download('stopwords', force=True)
    nltk.download('wordnet', force=True)
    from nltk.tokenize import word_tokenize
    from gensim.corpora import Dictionary
    from gensim.models import CoherenceModel
    from collections import defaultdict
    import re

    nltk.download('punkt')
    nltk.download('stopwords')

    stopwords = set(nltk.corpus.stopwords.words('english'))

    from nltk.tokenize import TreebankWordTokenizer
    tokenizer = TreebankWordTokenizer()

    def preprocess(text):
        return [w.lower() for w in tokenizer.tokenize(text) if w.isalpha() and w.lower() not in stopwords]

    tokenized_reviews = [preprocess(review) for review in all_reviews]

    topic_to_reviews = defaultdict(list)

    for topic in final_topics:
        topic_words = topic.lower().split()
        pattern = re.compile(r'\b(?:' + '|'.join(re.escape(w) for w in topic_words) + r')\b')
        for review_text, tokens in zip(all_reviews, tokenized_reviews):
            if pattern.search(' '.join(tokens)):
                topic_to_reviews[topic].append(tokens)

    coherence_scores = {}

    for topic, docs in topic_to_reviews.items():
        if len(docs) < 2:
            print(f"Skipping topic '{topic}' (not enough matching reviews)")
            continue

        dictionary = Dictionary(docs)
        corpus = [dictionary.doc2bow(doc) for doc in docs]
        all_words = [word for doc in docs for word in doc]
        word_freq = nltk.FreqDist(all_words)
        top_words = [w for w, _ in word_freq.most_common(10)]

        coherence_model = CoherenceModel(
            topics=[top_words],
            texts=docs,
            dictionary=dictionary,
            coherence='c_v'
        )
        coherence_scores[topic] = coherence_model.get_coherence()

    print("\nFinal Coherence Scores:")
    for topic, score in coherence_scores.items():
        print(f"{topic}: {score:.4f}")