## Imports

In [1]:
!pip install datasets
!pip install duckdb
!pip install pyarrow

from datasets import Dataset
import duckdb
import pyarrow as pa
import glob
import os

Defaulting to user installation because normal site-packages is not writeable
Defaulting to user installation because normal site-packages is not writeable
Defaulting to user installation because normal site-packages is not writeable


# Merging Reviews and Metadata for product Categories

This script processes product reviews and meta data merges them by category and saves the result as a .parquet file by using DuckDB

## Overview
The following steps are performed:
1. **Creates DuckDB tables** for storing reviews and meatadata
2. **Streams data** from all arrow files into two DuckDB tables (Reviews and Metadata)
3. **Joins** the two tables on the category
4. **Exports the data** to a Parquet file with Zstd compession

## Functions

``` python
def create_tables()
``` 
1. Drops any prexisting review_stream and metastream tables
2. Initializes two DuckDB tables:
    
     review_stream(rating, title, text, images, asin, parent_asin, user_id, timestamp, helpful_vote, verified_purcahse)
     
     meta_stream(main_category, title, average_rating, rating_number, features, description, price, images, videos, store, categories, details, parent_asin, bought_together,subtitle, author)

```python
def stream_to_duckdb(table_name, folder):
```
1. Finds all arrow files in folder
2. Read file into hugging face's Dataset to process in batches
3. Inserts batch into database through the temporary table (temp_batch)  

```python
def is_parquet_valid(path):
```
1. Attempts to read parquet file using DuckDB
2. Returns true if it is and false otherwise

```python
def merge_category(category):
```
1. Skips categories that already exists
2. Loads raw review and metadata for the given category using create_tables and stream_to_db
3. joins the review_stream and meta_stream database on parent_asin
4. Exports merged data to Parquet file
    

In [3]:
def create_tables():
    duckdb.sql("DROP TABLE IF EXISTS review_stream")
    duckdb.sql("DROP TABLE IF EXISTS meta_stream")
    
    duckdb.sql("""
    CREATE TABLE review_stream (
        rating DOUBLE,
        title TEXT,
        text TEXT,
        images TEXT,
        asin TEXT,
        parent_asin TEXT,
        user_id TEXT,
        timestamp BIGINT,
        helpful_vote BIGINT,
        verified_purchase BOOLEAN
    );
    """)

    duckdb.sql("""
    CREATE TABLE meta_stream (
        main_category TEXT,
        title TEXT,
        average_rating DOUBLE,
        rating_number BIGINT,
        features TEXT,
        description TEXT,
        price TEXT,
        images TEXT,
        videos TEXT,
        store TEXT,
        categories TEXT,
        details TEXT,
        parent_asin TEXT,
        bought_together TEXT,
        subtitle TEXT,
        author TEXT
    );
    """)

def stream_to_duckdb(table_name, folder):
    files = sorted(glob.glob(os.path.join(folder, "*.arrow")))
    for path in files:
        dataset = Dataset.from_file(path)
        reader = dataset.data.to_reader()
        for batch in reader:
            table = pa.Table.from_batches([batch])
            duckdb.register("tmp_batch", table)
            duckdb.sql(f"INSERT INTO {table_name} SELECT * FROM tmp_batch")
            duckdb.unregister("tmp_batch")

def is_parquet_valid(path):
    try:
        duckdb.sql(f"SELECT COUNT(*) FROM '{path}'")
        return True
    except Exception:
        return False

def merge_category(category):
    output_path = os.path.join(base_path, f"joined_{category}.parquet")

    if os.path.exists(output_path) and is_parquet_valid(output_path):
        print(f"Skipping {category}, already processed.\n")
        return

    print(f"Processing category: {category}")
    
    review_folder = os.path.join(base_path, f"raw_review_{category}", "full")
    meta_folder   = os.path.join(base_path, f"raw_meta_{category}", "full")

    create_tables()
    stream_to_duckdb("review_stream", review_folder)
    stream_to_duckdb("meta_stream", meta_folder)

    duckdb.sql(f"""
        COPY (
            SELECT 
                '{category}' AS category,
                r.rating,
                r.title,
                r.text,
                r.user_id,
                r.asin,
                r.parent_asin,
                r.timestamp,
                r.verified_purchase,
                r.helpful_vote,
                COALESCE(m.details ->> 'brand', 'Unknown') AS brand,
                COALESCE(m.main_category, 'Unknown') AS main_category,
                COALESCE(m.store, 'Unknown') AS store,
                COALESCE(m.price, 'Unknown') AS price
            FROM review_stream r
            LEFT JOIN meta_stream m ON r.{join_key} = m.{join_key}
        ) TO '{output_path}' (FORMAT PARQUET, COMPRESSION 'zstd');
    """)

    print(f"Exported: {output_path}\n")

# Process Data
This code merges the raw reviews and metadata for all categories in the directory

## Variables
* **base_path**: Root directory where datasets are stored
* **join_key**:  The column used to join review and metadata tables (parent_asin)
* **review_folders**: A list of paths to folders matching the pattern raw_review_* .
* **categories**: A filtered list of valid category names derived from the folder names.

## Function
1. Identifies Categories by scanning path for raw reviews
2. Ensures raw_review_{category}/full and raw_meta_{category}/full both exist
3. If they do the category is added
4. If no categories are found an error message is printed
5. Otherwise merge_category is used to get the merged parquet file

In [13]:
base_path = r"C:\Users\anees\Desktop\datasets"
join_key = "parent_asin"

review_folders = glob.glob(os.path.join(base_path, "raw_review_*"))
categories = []

for path in review_folders:
    cat = os.path.basename(path).replace("raw_review_", "")
    review_dir = os.path.join(base_path, f"raw_review_{cat}", "full")
    meta_dir = os.path.join(base_path, f"raw_meta_{cat}", "full")
    if os.path.exists(meta_dir) and os.path.exists(review_dir):
        categories.append(cat)
if categories == :
    print("No files found")
else:
    for category in categories:
        merge_category(category)