# 2. Amazon Book Review Data Preprocessing for Recommendation System

In [1]:
# Import necessary libraries
from google.cloud import bigquery
import apache_beam as beam
from apache_beam.io.gcp.bigquery import ReadFromBigQuery, WriteToBigQuery
from apache_beam.options.pipeline_options import PipelineOptions, StandardOptions
import os

# Set GCP project and storage bucket variables
PROJECT_ID = 'review-analysis-456008'
BUCKET_NAME = 'review-data-yu'
DATA_FILE = 'gs://review-data-yu/raw-data/amazon_reviews_us_Books_v1_02.tsv'

# Define Beam pipeline options
pipeline_options = PipelineOptions(
    project=PROJECT_ID,
    temp_location=f'gs://{BUCKET_NAME}/temp',
    region='us-central1'
)

# Ensure we use DirectRunner (local execution)
pipeline_options.view_as(StandardOptions).runner = 'DirectRunner'

# Initialize BigQuery client
client = bigquery.Client(project=PROJECT_ID)

## 1. Create BigQuery Dataset and Raw Data Table

In [2]:
# Create a BigQuery dataset
dataset_id = f"{PROJECT_ID}.amazon_reviews_dataset"
dataset = bigquery.Dataset(dataset_id)
dataset.location = "us-central1"
try:
    dataset = client.create_dataset(dataset, exists_ok=True)
    print(f"Dataset {dataset_id} created or already exists.")
except Exception as e:
    print(f"Error creating dataset: {e}")

# Define table schema based on TSV structure
schema = [
    bigquery.SchemaField("marketplace", "STRING"),
    bigquery.SchemaField("customer_id", "INTEGER"),  # INTEGER based on the data type
    bigquery.SchemaField("review_id", "STRING"),
    bigquery.SchemaField("product_id", "STRING"),
    bigquery.SchemaField("product_parent", "INTEGER"),
    bigquery.SchemaField("product_title", "STRING"),
    bigquery.SchemaField("product_category", "STRING"),
    bigquery.SchemaField("star_rating", "INTEGER"),
    bigquery.SchemaField("helpful_votes", "INTEGER"),
    bigquery.SchemaField("total_votes", "INTEGER"),
    bigquery.SchemaField("vine", "STRING"),
    bigquery.SchemaField("verified_purchase", "STRING"),
    bigquery.SchemaField("review_headline", "STRING"),
    bigquery.SchemaField("review_body", "STRING"),
    bigquery.SchemaField("review_date", "STRING")
]

# Create table for raw data
table_id = f"{dataset_id}.raw_reviews"
table = bigquery.Table(table_id, schema=schema)
try:
    table = client.create_table(table, exists_ok=True)
    print(f"Table {table_id} created or already exists.")
except Exception as e:
    print(f"Error creating table: {e}")

Dataset review-analysis-456008.amazon_reviews_dataset created or already exists.
Table review-analysis-456008.amazon_reviews_dataset.raw_reviews created or already exists.


## 2. Load TSV Data into BigQuery

In [3]:
# Configure data loading job
job_config = bigquery.LoadJobConfig(
    source_format=bigquery.SourceFormat.CSV,
    skip_leading_rows=1,
    field_delimiter='\t',
    quote_character='',  # Disable quote handling for TSV
    allow_quoted_newlines=True,  # Allow newlines in quoted fields
    max_bad_records=10000,  # Allow some bad records
    schema=schema
)

# Start the data loading job
try:
    load_job = client.load_table_from_uri(
        DATA_FILE,  # GCS TSV file path
        table_id,   # Target table
        job_config=job_config
    )
    
    # Wait for the job to complete
    load_job.result()
    
    print(f"Loaded {load_job.output_rows} rows into {table_id}")
    
    # Check for errors
    if load_job.errors:
        print(f"Encountered {len(load_job.errors)} errors:")
        for error in load_job.errors[:5]:  # Show only first 5 errors
            print(f"  - {error}")
except Exception as e:
    print(f"Error loading data: {e}")

Loaded 3105520 rows into review-analysis-456008.amazon_reviews_dataset.raw_reviews


## 3. Exploratory Data Analysis using BigQuery

In [11]:
# Perform basic data statistics
basic_stats_query = f"""
SELECT 
  COUNT(*) as total_records,
  COUNT(DISTINCT customer_id) as unique_users,
  COUNT(DISTINCT product_id) as unique_products,
  AVG(star_rating) as avg_rating
FROM `{table_id}`
"""

# Execute query and get results without using to_dataframe()
basic_stats_job = client.query(basic_stats_query)
basic_stats_result = list(basic_stats_job)

print("\n===== BASIC STATISTICS =====")
if basic_stats_result:
    row = basic_stats_result[0]
    print(f"Total Records: {row['total_records']}")
    print(f"Unique Users: {row['unique_users']}")
    print(f"Unique Products: {row['unique_products']}")
    print(f"Average Rating: {row['avg_rating']}")

# Rating distribution analysis
rating_query = f"""
SELECT 
  star_rating,
  COUNT(*) as count,
  ROUND((COUNT(*) / (SELECT COUNT(*) FROM `{table_id}`)) * 100, 2) as percentage
FROM `{table_id}`
GROUP BY star_rating
ORDER BY star_rating
"""

rating_job = client.query(rating_query)
rating_results = list(rating_job)

print("\n===== RATING DISTRIBUTION =====")
print("star_rating | count | percentage")
print("------------------------------")
for row in rating_results:
    print(f"{row['star_rating']} | {row['count']} | {row['percentage']}%")

# User engagement analysis
user_query = f"""
WITH user_reviews AS (
  SELECT 
    customer_id,
    COUNT(*) as review_count
  FROM `{table_id}`
  GROUP BY customer_id
)
SELECT 
  COUNT(*) as total_unique_customers,
  AVG(review_count) as avg_reviews_per_customer,
  MAX(review_count) as max_reviews_by_customer,
  COUNT(CASE WHEN review_count > 1 THEN 1 END) as customers_with_multiple_reviews,
  ROUND(COUNT(CASE WHEN review_count > 1 THEN 1 END) / COUNT(*) * 100, 2) as multiple_reviews_percentage
FROM user_reviews
"""

user_job = client.query(user_query)
user_results = list(user_job)

print("\n===== USER ENGAGEMENT ANALYSIS =====")
if user_results:
    row = user_results[0]
    print(f"Total Unique Customers: {row['total_unique_customers']}")
    print(f"Average Reviews per Customer: {row['avg_reviews_per_customer']}")
    print(f"Max Reviews by a Single Customer: {row['max_reviews_by_customer']}")
    print(f"Customers with Multiple Reviews: {row['customers_with_multiple_reviews']} ({row['multiple_reviews_percentage']}%)")

# Product analysis
product_query = f"""
WITH product_reviews AS (
  SELECT 
    product_id,
    COUNT(*) as review_count
  FROM `{table_id}`
  GROUP BY product_id
)
SELECT 
  COUNT(*) as total_unique_products,
  AVG(review_count) as avg_reviews_per_product,
  MAX(review_count) as max_reviews_by_product,
  COUNT(CASE WHEN review_count > 1 THEN 1 END) as products_with_multiple_reviews,
  ROUND(COUNT(CASE WHEN review_count > 1 THEN 1 END) / COUNT(*) * 100, 2) as multiple_reviews_percentage
FROM product_reviews
"""

product_job = client.query(product_query)
product_results = list(product_job)

print("\n===== PRODUCT ANALYSIS =====")
if product_results:
    row = product_results[0]
    print(f"Total Unique Products: {row['total_unique_products']}")
    print(f"Average Reviews per Product: {row['avg_reviews_per_product']}")
    print(f"Max Reviews for a Single Product: {row['max_reviews_by_product']}")
    print(f"Products with Multiple Reviews: {row['products_with_multiple_reviews']} ({row['multiple_reviews_percentage']}%)")


===== BASIC STATISTICS =====
Total Records: 3105520
Unique Users: 1502380
Unique Products: 779733
Average Rating: 4.182723022231384

===== RATING DISTRIBUTION =====
star_rating | count | percentage
------------------------------
1 | 238221 | 7.67%
2 | 166384 | 5.36%
3 | 249926 | 8.05%
4 | 586182 | 18.88%
5 | 1864807 | 60.05%

===== USER ENGAGEMENT ANALYSIS =====
Total Unique Customers: 1502380
Average Reviews per Customer: 2.067066920486164
Max Reviews by a Single Customer: 21922
Customers with Multiple Reviews: 370478 (24.66%)

===== PRODUCT ANALYSIS =====
Total Unique Products: 779733
Average Reviews per Product: 3.9827992402527257
Max Reviews for a Single Product: 4625
Products with Multiple Reviews: 380957 (48.86%)


## 4. Data Preprocessing and Filtering

In [12]:
# Create tables for high-frequency users and products
print("\n===== FILTERING HIGH-FREQUENCY USERS AND PRODUCTS =====")

# Create table for high-frequency users (≥30 reviews)
high_freq_users_query = f"""
CREATE OR REPLACE TABLE `{dataset_id}.high_freq_users` AS
SELECT customer_id, COUNT(*) as review_count
FROM `{table_id}`
GROUP BY customer_id
HAVING COUNT(*) >= 30
"""

# Create table for high-frequency products (≥30 reviews)
high_freq_products_query = f"""
CREATE OR REPLACE TABLE `{dataset_id}.high_freq_products` AS
SELECT product_id, COUNT(*) as review_count
FROM `{table_id}`
GROUP BY product_id
HAVING COUNT(*) >= 30
"""

# Execute the queries
try:
    client.query(high_freq_users_query).result()
    print("High-frequency users table created.")
    
    client.query(high_freq_products_query).result()
    print("High-frequency products table created.")
except Exception as e:
    print(f"Error creating frequency tables: {e}")

# Count how many users and products meet the criteria
count_query = f"""
SELECT
  (SELECT COUNT(*) FROM `{dataset_id}.high_freq_users`) as high_freq_users,
  (SELECT COUNT(*) FROM `{dataset_id}.high_freq_products`) as high_freq_products
"""

count_job = client.query(count_query)
count_results = list(count_job)

if count_results:
    row = count_results[0]
    print(f"Found {row['high_freq_users']} users with ≥30 reviews")
    print(f"Found {row['high_freq_products']} products with ≥30 reviews")


===== FILTERING HIGH-FREQUENCY USERS AND PRODUCTS =====
High-frequency users table created.
High-frequency products table created.
Found 6521 users with ≥30 reviews
Found 11607 products with ≥30 reviews


## 5. Create Filtered Dataset

In [13]:
# Create filtered dataset with only high-frequency users and products
filtered_reviews_query = f"""
CREATE OR REPLACE TABLE `{dataset_id}.filtered_reviews` AS
SELECT r.*
FROM `{table_id}` r
JOIN `{dataset_id}.high_freq_users` u ON r.customer_id = u.customer_id
JOIN `{dataset_id}.high_freq_products` p ON r.product_id = p.product_id
WHERE r.star_rating IS NOT NULL
"""

try:
    client.query(filtered_reviews_query).result()
    print("Filtered reviews table created.")
except Exception as e:
    print(f"Error creating filtered table: {e}")

# Count records in filtered dataset
filtered_count_query = f"""
SELECT COUNT(*) as filtered_count
FROM `{dataset_id}.filtered_reviews`
"""

filtered_count_job = client.query(filtered_count_query)
filtered_count_results = list(filtered_count_job)

if filtered_count_results:
    row = filtered_count_results[0]
    print(f"Filtered dataset contains {row['filtered_count']} reviews")


Filtered reviews table created.
Filtered dataset contains 143479 reviews


## 6. Create ID Mappings for Users and Products

In [15]:
# Create user ID mapping table
user_mapping_query = f"""
CREATE OR REPLACE TABLE `{dataset_id}.user_id_mapping` AS
SELECT 
  customer_id,
  ROW_NUMBER() OVER (ORDER BY customer_id) - 1 as user_idx
FROM (
  SELECT DISTINCT customer_id
  FROM `{dataset_id}.filtered_reviews`
)
"""

# Create product ID mapping table
product_mapping_query = f"""
CREATE OR REPLACE TABLE `{dataset_id}.product_id_mapping` AS
SELECT 
  product_id,
  ROW_NUMBER() OVER (ORDER BY product_id) - 1 as product_idx
FROM (
  SELECT DISTINCT product_id
  FROM `{dataset_id}.filtered_reviews`
)
"""


try:
    client.query(user_mapping_query).result()
    print("User ID mapping table created.")
    
    client.query(product_mapping_query).result()
    print("Product ID mapping table created.")
except Exception as e:
    print(f"Error creating mapping tables: {e}")

# Count mappings
mapping_count_query = f"""
SELECT
  (SELECT COUNT(*) FROM `{dataset_id}.user_id_mapping`) as user_count,
  (SELECT COUNT(*) FROM `{dataset_id}.product_id_mapping`) as product_count
"""

mapping_count_job = client.query(mapping_count_query)
mapping_count_results = list(mapping_count_job)

if mapping_count_results:
    row = mapping_count_results[0]
    print(f"Created mappings for {row['user_count']} users and {row['product_count']} products")

User ID mapping table created.
Product ID mapping table created.
Created mappings for 6238 users and 11079 products


## 7. Apply ID Mappings to Create Processed Dataset

In [16]:
# Create processed dataset with numeric indices
processed_reviews_query = f"""
CREATE OR REPLACE TABLE `{dataset_id}.processed_reviews` AS
SELECT 
  f.*,
  u.user_idx,
  p.product_idx
FROM `{dataset_id}.filtered_reviews` f
JOIN `{dataset_id}.user_id_mapping` u ON f.customer_id = u.customer_id
JOIN `{dataset_id}.product_id_mapping` p ON f.product_id = p.product_id
"""

try:
    client.query(processed_reviews_query).result()
    print("Processed reviews table created with ID mappings applied.")
except Exception as e:
    print(f"Error creating processed table: {e}")

# Verify processed dataset
processed_count_query = f"""
SELECT COUNT(*) as processed_count
FROM `{dataset_id}.processed_reviews`
"""

processed_count_job = client.query(processed_count_query)
processed_count_results = list(processed_count_job)

if processed_count_results:
    row = processed_count_results[0]
    print(f"Processed dataset contains {row['processed_count']} reviews")


Processed reviews table created with ID mappings applied.
Processed dataset contains 143479 reviews


## 8. Split Data into Training and Testing Sets

In [20]:
# First, create a temporary table with random values
random_values_query = f"""
CREATE OR REPLACE TABLE `{dataset_id}.temp_reviews_random` AS
SELECT 
  *,
  RAND() as random_value,
  ROW_NUMBER() OVER(PARTITION BY customer_id ORDER BY RAND()) as user_review_rank,
  COUNT(*) OVER(PARTITION BY customer_id) as user_review_count
FROM `{dataset_id}.processed_reviews`
"""

# Then create training set from the temporary table
train_query = f"""
CREATE OR REPLACE TABLE `{dataset_id}.train_reviews` AS
SELECT * EXCEPT(random_value, user_review_rank, user_review_count)
FROM `{dataset_id}.temp_reviews_random`
WHERE user_review_rank <= CEIL(0.8 * user_review_count)
"""

# Create test set from the remaining reviews
test_query = f"""
CREATE OR REPLACE TABLE `{dataset_id}.test_reviews` AS
SELECT *
FROM `{dataset_id}.processed_reviews`
WHERE review_id NOT IN (
  SELECT review_id FROM `{dataset_id}.train_reviews`
)
"""

try:
    # Step 1: Create temporary table with random values
    client.query(random_values_query).result()
    print("Temporary table with random values created.")
    
    # Step 2: Create training set
    client.query(train_query).result()
    print("Training set created.")
    
    # Step 3: Create test set
    client.query(test_query).result()
    print("Test set created.")
    
    print("Train/test split completed.")
except Exception as e:
    print(f"Error creating train/test split: {e}")

Temporary table with random values created.
Training set created.
Test set created.
Train/test split completed.


## 9. Validate Data Processing

In [21]:
# Final validation to ensure data quality throughout the pipeline
validation_query = f"""
SELECT 
  'raw' as stage, COUNT(*) as count,
  COUNT(DISTINCT customer_id) as unique_users,
  COUNT(DISTINCT product_id) as unique_products
FROM `{table_id}`
UNION ALL
SELECT 
  'filtered' as stage, COUNT(*) as count,
  COUNT(DISTINCT customer_id) as unique_users,
  COUNT(DISTINCT product_id) as unique_products
FROM `{dataset_id}.filtered_reviews`
UNION ALL
SELECT 
  'processed' as stage, COUNT(*) as count,
  COUNT(DISTINCT customer_id) as unique_users,
  COUNT(DISTINCT product_id) as unique_products  
FROM `{dataset_id}.processed_reviews`
UNION ALL
SELECT 
  'train' as stage, COUNT(*) as count,
  COUNT(DISTINCT customer_id) as unique_users,
  COUNT(DISTINCT product_id) as unique_products  
FROM `{dataset_id}.train_reviews`
UNION ALL
SELECT 
  'test' as stage, COUNT(*) as count,
  COUNT(DISTINCT customer_id) as unique_users,
  COUNT(DISTINCT product_id) as unique_products  
FROM `{dataset_id}.test_reviews`
ORDER BY 
  CASE 
    WHEN stage = 'raw' THEN 1
    WHEN stage = 'filtered' THEN 2
    WHEN stage = 'processed' THEN 3
    WHEN stage = 'train' THEN 4
    WHEN stage = 'test' THEN 5
  END
"""

validation_job = client.query(validation_query)
validation_results = list(validation_job)

print("\n===== DATA PROCESSING VALIDATION =====")
print("Stage | Count | Unique Users | Unique Products")
print("------------------------------------------")
for row in validation_results:
    print(f"{row['stage']} | {row['count']} | {row['unique_users']} | {row['unique_products']}")

# Additional quality check - verify user_idx and product_idx distributions
idx_check_query = f"""
SELECT 
  MIN(user_idx) as min_user_idx,
  MAX(user_idx) as max_user_idx,
  MIN(product_idx) as min_product_idx,
  MAX(product_idx) as max_product_idx,
  COUNT(DISTINCT user_idx) as unique_user_idx,
  COUNT(DISTINCT product_idx) as unique_product_idx
FROM `{dataset_id}.processed_reviews`
"""

idx_check_job = client.query(idx_check_query)
idx_check_results = list(idx_check_job)

print("\n===== INDEX MAPPING VALIDATION =====")
if idx_check_results:
    row = idx_check_results[0]
    print(f"User index range: {row['min_user_idx']} to {row['max_user_idx']} ({row['unique_user_idx']} unique)")
    print(f"Product index range: {row['min_product_idx']} to {row['max_product_idx']} ({row['unique_product_idx']} unique)")

print("\nData preprocessing complete! The processed data is available in BigQuery.")



===== DATA PROCESSING VALIDATION =====
Stage | Count | Unique Users | Unique Products
------------------------------------------
raw | 3105520 | 1502380 | 779733
filtered | 143479 | 6238 | 11079
processed | 143479 | 6238 | 11079
train | 117283 | 6238 | 10957
test | 26196 | 5415 | 8302

===== INDEX MAPPING VALIDATION =====
User index range: 0 to 6237 (6238 unique)
Product index range: 0 to 11078 (11079 unique)

Data preprocessing complete! The processed data is available in BigQuery.
