In [None]:
import logging
import os
import sys

import asyncio
import json
import re
import textwrap
import time
from collections import defaultdict
from uuid import uuid4

import pandas as pd
from asynciolimiter import Limiter, StrictLimiter
from google.cloud import aiplatform, bigquery
from pydantic import BaseModel, ValidationError, Field, field_validator
from langchain_google_vertexai import VertexAI, VertexAIEmbeddings
from tqdm.asyncio import tqdm_asyncio
from typing import List

In [None]:
PROJECT_ID = "YOUR-GCP-PROJECT-NAME" # We assume you have already authenticated
LOCATION = "us-central1" # Your GCP project location
aiplatform.init(project=PROJECT_ID, location=LOCATION)

In [None]:
LLM_NAME = 'gemini-1.5-flash'
llm = VertexAI(
    model_name=LLM_NAME,
    max_output_tokens=2048,
    temperature=0,
    top_p=0.8,
    top_k=40,
    verbose=True,
)

In [None]:
review_data = pd.read_pickle('product_reviews.pkl')
review_data.head()

In [None]:
from tqdm import tqdm

def reformat_data(data: pd.DataFrame):
    reviews = []
    for id, row in tqdm(data.iterrows(), total=data.shape[0]):
        reviews.append(
            {
                "PRODUCT_NAME": row["short_name"],
                "REVIEW_TEXT": row["review_text"],
                "REVIEW_ID": row["review_id"],
            }
        )
    return reviews

In [None]:
raw_reviews = reformat_data(review_data)
raw_reviews[:5]

In [None]:
PROMPT = """We have a list of customer reviews for a product. Extract at most 5 features from each REVIEW_TEXT. Features must be relevant to the product attributes or specifications, they must not be representative of a person, or an animal, avoid naive features like (best, product, good). Acronyms should be capitalized according to standard usage (e.g.,  GPS,  USB,  RAM).

Here is the review list, formatted as [{"PRODUCT_NAME": "", "REVIEW_TEXT": "", "REVIEW_ID": ""}]:
--------------
<<REVIEW>>
--------------

Output the feature indices, feature names with at most two words, the representative sentences in the review, and the associated customer sentiments (Positive or Negative only) in a json object with the following format:

**ONLY output the following JSON array.  Do not include any other text.**

```json
[
 {"REVIEW_ID": "", "ID": 0, "FEATURE": "", "SENTIMENT": "Positive" or "Negative", "REPR_SENTENCE": ""},
 {"REVIEW_ID": "", "ID": 1, "FEATURE": "", "SENTIMENT": "Positive" or "Negative", "REPR_SENTENCE": ""}
 // ...more objects as needed...
]
```
"""

In [None]:
class LLMOutput(BaseModel):
    ReviewId: str = Field(..., alias="REVIEW_ID")
    AspectId: int = Field(..., alias="ID")
    Aspect: str = Field(..., alias="FEATURE")
    Sentiment: str = Field(..., alias="SENTIMENT")
    RepresentativeSentence: str = Field(..., alias="REPR_SENTENCE")

    model_config = {'populate_by_name': True}

    @field_validator('Sentiment') # Changed to @field_validator
    def check_sentiment(cls, v):
        if v not in ["Positive", "Negative", "Neutral", "Mixed"]:
            raise ValueError("Invalid sentiment value. Must be one of: 'Positive', 'Negative', 'Neutral', 'Mixed'")
        return v
    
requests_per_minute = 100
time_window = 60
rate_limiter = StrictLimiter(requests_per_minute/time_window)

In [None]:
async def executer(review_objs, progress):
    """Executes a single LLM request with rate limiting and error handling."""
    try:
        await rate_limiter.wait()
        s = time.perf_counter()
        result = await llm.ainvoke(
            PROMPT.replace("<<REVIEW>>", json.dumps(review_objs))
        )
        elapsed = time.perf_counter() - s
        sleep_time = max(0, time_window - elapsed + 1)  # Ensure sleep_time is non-negative
        progress.update()
        await asyncio.sleep(sleep_time)
        return result
    except Exception as e:
        print(f"Error in executer: {e}, Review Objects: {review_objs}")
        return None  # or some other default value indicating failure

In [None]:
async def async_llm_executer(review_list, batch_size):
    BATCHES = [
        review_list[i * batch_size:(i + 1) * batch_size]
        for i in range((len(review_list) + batch_size - 1) // batch_size)
    ]
    pbar = tqdm(total=len(BATCHES), position=0, ncols=90)
    pbar.set_description(desc=f"Requests/min = {requests_per_minute}", refresh=True)
    results = await asyncio.gather(*[executer(review_batch, pbar) for review_batch in BATCHES])
    return results

In [None]:
def post_process_results(raw_aspects):
    aspects_flattened = []
    # print(f"Raw aspects input length: {len(rawaspects)}")
    for batch_results in tqdm(raw_aspects, total=len(raw_aspects), desc="Processing Batches"):
        # print(f"Batch results length: {len(batch_results)}") 
        if batch_results:
            for i in batch_results:
                parsed_results = parse_llm_output(i)
                aspects_flattened.extend([output.model_dump() for output in parsed_results])

    return aspects_flattened

In [None]:
def parse_llm_output(llm_output_str) -> List[LLMOutput]:
    """Parses the raw string output from the LLM into a list of LLMOutput objects."""
    import ast
    import json

    try:
        llm_output_str = llm_output_str.replace("```", "").replace("\n", "").replace("json[", "[")
        parsed_output = ast.literal_eval(llm_output_str)
    except (ValueError, SyntaxError) as e:
        try:
            llm_output_str = llm_output_str.replace("```", "").replace("\n", "").replace("json[", "[")
            parsed_output = json.loads(llm_output_str)
        except json.JSONDecodeError as e2:
            print(f"Error parsing LLM output (JSONDecodeError): {e2} Input String: {llm_output_str}")
            return []
    except Exception as e:
        print(f"Error parsing LLM output (Unexpected Error): {e} Input String: {llm_output_str}")
        return []


    if isinstance(parsed_output, list):
        results = []
        for item in parsed_output:
            try:
                result = LLMOutput(**item)
                results.append(result)
            except (KeyError, ValueError, TypeError) as e:
                print(f"Error creating LLMOutput object from: {item}. Error: {e}")
        return results
    elif isinstance(parsed_output, dict):
        try:
            return [LLMOutput(**parsed_output)]
        except (KeyError, ValueError, TypeError) as e:
            print(f"Error creating LLMOutput object from: {parsed_output}. Error: {e}")
            return []
    else:
        print(f"Unexpected LLM output format: {type(parsed_output)}")
        return []

In [None]:
async_raw_aspects_output = await asyncio.gather(
    async_llm_executer(raw_reviews, batch_size=5)
)

In [None]:
aspects_flattened = post_process_results(async_raw_aspects_output)

In [None]:
aspects_flattened[:5]

In [None]:
df = pd.DataFrame(aspects_flattened)
df.head()

In [None]:
df['AspectId'] = df['ReviewId'] + "_" + df['AspectId'].astype(str)
df.head()

In [None]:
df['ProductFamilyId'] = df.ReviewId.replace(review_data.set_index('review_id')['product_family_id'])
df.head()

In [None]:
df.to_pickle('product_review_aspects.pkl')