In [0]:
%sql
Create catalog if not exists retail_cpg_demo;
Create schema if not exists retail_cpg_demo.brand_manager;

In [0]:
%sql
Create or replace table retail_cpg_demo.brand_manager.product_reviews (
  rating DOUBLE COMMENT 'Rating of the product (from 1.0 to 5.0)',
  title STRING COMMENT 'Title of the user review',
  text STRING COMMENT 'Text body of the user review',
  images VARIANT COMMENT 'Images that users post after they have received the product. Each image has different sizes (small, medium, large), represented by the small_image_url, medium_image_url, and large_image_url respectively',
  asin STRING COMMENT 'ID of the product',
  parent_asin STRING COMMENT 'Parent ID of the product. Products with different colors, styles, sizes usually belong to the same parent ID. The “asin” in previous Amazon datasets is actually parent ID. Please use parent ID to find product meta',
  user_id STRING COMMENT 'ID of the reviewer',
  timestamp BIGINT COMMENT 'Time of the review (unix time)',
  verified_purchase BOOLEAN COMMENT 'User purchase verification',
  helpful_vote LONG COMMENT 'Helpful votes of the review'
)
USING delta
CLUSTER BY (parent_asin, asin, timestamp, rating)
COMMENT 'Reviews of products';

In [0]:
%sql
Create or replace table retail_cpg_demo.brand_manager.product_reviews_structured (
  structured_review VARIANT COMMENT 'Structured review',
  rating DOUBLE COMMENT 'Rating of the product (from 1.0 to 5.0)',
  title STRING COMMENT 'Title of the user review',
  text STRING COMMENT 'Text body of the user review',
  images VARIANT COMMENT 'Images that users post after they have received the product. Each image has different sizes (small, medium, large), represented by the small_image_url, medium_image_url, and large_image_url respectively',
  asin STRING COMMENT 'ID of the product',
  parent_asin STRING COMMENT 'Parent ID of the product. Products with different colors, styles, sizes usually belong to the same parent ID. The “asin” in previous Amazon datasets is actually parent ID. Please use parent ID to find product meta',
  user_id STRING COMMENT 'ID of the reviewer',
  timestamp BIGINT COMMENT 'Time of the review (unix time)',
  verified_purchase BOOLEAN COMMENT 'User purchase verification',
  helpful_vote LONG COMMENT 'Helpful votes of the review'
)
USING delta
CLUSTER BY (parent_asin, asin, timestamp, rating)
COMMENT 'Reviews of products';

In [0]:
%sql
CREATE OR REPLACE TABLE retail_cpg_demo.brand_manager.product_details (
  main_category STRING COMMENT 'Main category (i.e., domain) of the product',
  title STRING COMMENT 'Name of the product',
  average_rating DOUBLE COMMENT 'Rating of the product shown on the product page',
  rating_number LONG COMMENT 'Number of ratings in the product',
  features variant COMMENT 'Bullet-point format features of the product',
  description variant COMMENT 'Description of the product',
  price DOUBLE COMMENT 'Price in US dollars (at time of crawling)',
  images variant COMMENT 'Images of the product. Each image has different sizes (thumb, large, hi_res). The “variant” field shows the position of image',
  videos variant COMMENT 'Videos of the product including title and url',
  store STRING COMMENT 'Store name of the product',
  categories variant COMMENT 'Hierarchical categories of the product',
  details string COMMENT 'Product details, including materials, brand, sizes, etc.',
  parent_asin STRING COMMENT 'Parent ID of the product',
  bought_together variant COMMENT 'Recommended bundles from the websites'
)
USING delta
CLUSTER BY (parent_asin, main_category)
COMMENT 'Details of product page';

In [0]:
import os
import urllib.request

categories = ["All_Beauty", "Cell_Phones_and_Accessories", "Electronics", "Handmade_Products", "Industrial_and_Scientific", "Musical_Instruments", "Toys_and_Games"]

def download_reviews(category):
  # URL of the dataset
  url = f"https://mcauleylab.ucsd.edu/public_datasets/data/amazon_2023/raw/review_categories/{category}.jsonl.gz"

  # Path to Volume
  local_path = f"/Volumes/retail_cpg_demo/brand_manager/data/reviews/{category}.jsonl.gz"  

  # Download the file using requests or curl (only if needed)
  if not os.path.exists(local_path):
      print("Downloading file...")
      urllib.request.urlretrieve(url, local_path)
      print("Download complete.")
  return True

def download_items(category):
  # URL of the dataset
  url = f"https://mcauleylab.ucsd.edu/public_datasets/data/amazon_2023/raw/meta_categories/meta_{category}.jsonl.gz"

  # Path to Volume
  local_path = f"/Volumes/retail_cpg_demo/brand_manager/data/items/{category}.jsonl.gz"  

  # Download the file using requests or curl (only if needed)
  if not os.path.exists(local_path):
      print("Downloading file...")
      urllib.request.urlretrieve(url, local_path)
      print("Download complete.")
  return True


for category in categories:
  print(f"Loading {category}")
  download_reviews(category)
  download_items(category)

  

In [0]:
%sql
DROP TABLE retail_cpg_demo.brand_manager.product_details;
DROP TABLE retail_cpg_demo.brand_manager.product_reviews;

In [0]:
from pyspark.sql.functions import concat_ws, col, to_json

categories = ["All_Beauty", "Cell_Phones_and_Accessories", "Electronics", "Handmade_Products", "Industrial_and_Scientific", "Musical_Instruments", "Toys_and_Games"]

for category in categories:
  # Load the items JSONL.GZ files into a Spark DataFrame and write to a table
  df_items = spark.read.json(f"/Volumes/retail_cpg_demo/brand_manager/data/items/{category}.jsonl.gz", multiLine=True)
  df_items = (df_items.withColumn("details", to_json(col("details")))
                      .withColumn("categories", to_json(col("categories")))
                      .withColumn("description", to_json(col("description")))
                      .withColumn("features", to_json(col("features")))
                      .withColumn("images", to_json(col("images")))
                      .withColumn("videos", to_json(col("videos")))
                      .withColumn("price", col("price").cast("double"))
                      .select("parent_asin","main_category","average_rating","rating_number","price","title","description","details","features","bought_together","categories","images","store","videos")
  )
  df_items.write.mode("append").saveAsTable("retail_cpg_demo.brand_manager.product_details")

  # Load the reviews JSONL.GZ files into a Spark DataFrame and write to a table
  df_reviews = spark.read.json(f"/Volumes/retail_cpg_demo/brand_manager/data/reviews/{category}.jsonl.gz", multiLine=True)
  df_reviews = df_reviews.withColumn("images", to_json(col("images")))
  df_reviews.write.mode("append").saveAsTable("retail_cpg_demo.brand_manager.product_reviews")


In [0]:
#Having to get cute here to avoid hitting api limits
#Note - 100k inferences = ~40min in benchmarks.  Reducing this to only rows for the years 2022 and 2023 to avoid needing to process too much.  We can change this later if needed.

import time

batch_size = 100000
batch_number = 0
done = False

while not done:
    query = """
    WITH numbered_rows AS (
      Select
        *
        ,row_number() over (order by parent_asin, user_id, `timestamp`) as rn
      FROM retail_cpg_demo.brand_manager.product_reviews 
      Where from_unixtime(`timestamp`/1000)::date >= '2022-01-01'
    )
    SELECT ai_query(
            "databricks-meta-llama-3-3-70b-instruct",
            CONCAT(
              'Extract the following information from the review: ',
              'Sentiment (how does the customer feel about the product mentioned in the review in 2 words or less), Sentiment Score (1-5), Positive Feature (what are the positive features mentioned in the review) (comma separate if multiple), Negative Feature (what are the negative features mentioned in the review) (comma separate if multiple), Missing Feature (what are the features wished for or wanted mentioned in the review) (comma separate if multiple), Unexpected Uses (what are the unexpected uses mentioned in the review that are not like the description) (comma separate if multiple).',
              'If the review doesnt contain an element, leave it blank or set it to zero. For instance, if the review does not mention specific features, then set feature = null. All scores should be 1-5 (if they are not null), with 1 being the worst and 5 being the best.',
              'Review: ', r.title, '. ', r.text,
              'Description: ', pd.title, '. ', pd.description 
            ),
            responseFormat => '{
                "type": "json_schema",
                "json_schema": {
                    "name": "review_extraction",
                    "schema": {
                        "type": "object",
                        "properties": {
                            "sentiment": { "type": "string" },
                            "sentiment_score": { "type": "integer" },
                            "positive_feature": { "type": "string" },
                            "negative_feature": { "type": "string" },
                            "missing_feature": { "type": "string" },
                            "unexpected_uses": { "type": "string" }
                        }
                    },
                    "strict": true
                }
            }'
        ) AS structured_review, 
        from_unixtime(r.`timestamp`/1000)::date as date,
        r.*
    FROM numbered_rows r
    INNER JOIN retail_cpg_demo.brand_manager.product_details pd
      ON r.parent_asin = pd.parent_asin """ + f"""
    WHERE rn > {batch_size * batch_number}
      AND rn <= {batch_size * (batch_number + 1)}
    """
    
    df_batch = spark.sql(query)
    count = df_batch.count()

    if count == 0:
        done = True
    else:
        # Insert batch
        print(f"Processing batch {batch_number + 1} with {count} rows")
        df_batch.write.mode("append").saveAsTable("retail_cpg_demo.brand_manager.product_reviews_structured")
        batch_number += 1
        time.sleep(5)  # Wait 5 seconds

In [0]:
%sql
Select *
FROM (
Select 
      lower(structured_review:sentiment::string) AS sentiment
      ,structured_review:sentiment_score::int as sentiment_score
      ,structured_review:positive_feature::string as positive_feature
      ,structured_review:negative_feature::string as negative_feature
      ,structured_review:missing_feature::string as missing_feature
      ,structured_review:unexpected_uses::string as unexpected_uses
      ,details:Brand
      ,*
FROM retail_cpg_demo.brand_manager.product_reviews_structured r
INNER JOIN retail_cpg_demo.brand_manager.product_details pd
ON r.parent_asin = pd.parent_asin

)