In [4]:
# Complete Data Lakehouse Pipeline with BigQuery
# Data Lake -> ETL -> Data Quality -> Data Warehouse

# For Kaggle or Colab T4
!pip install openpyxl boto3 confluent_kafka google-cloud-bigquery pandas pyarrow great-expectations datasets --quiet


In [7]:
!pip uninstall numpy pandas
!pip install numpy==1.24.3 pandas==2.0.3

Found existing installation: numpy 1.26.4
Uninstalling numpy-1.26.4:
  Would remove:
    /usr/local/bin/f2py
    /usr/local/lib/python3.11/dist-packages/numpy-1.26.4.dist-info/*
    /usr/local/lib/python3.11/dist-packages/numpy.libs/libgfortran-040039e1.so.5.0.0
    /usr/local/lib/python3.11/dist-packages/numpy.libs/libopenblas64_p-r0-0cf96a72.3.23.dev.so
    /usr/local/lib/python3.11/dist-packages/numpy.libs/libquadmath-96973f99.so.0.0.0
    /usr/local/lib/python3.11/dist-packages/numpy/*
Proceed (Y/n)? Traceback (most recent call last):
  File "/usr/local/lib/python3.11/dist-packages/pip/_internal/cli/base_command.py", line 179, in exc_logging_wrapper
    status = run_func(*args)
             ^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/dist-packages/pip/_internal/commands/uninstall.py", line 106, in run
    uninstall_pathset = req.uninstall(
                        ^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/dist-packages/pip/_internal/req/req_install.py", line 722, in uni

In [4]:
from google.colab import auth
auth.authenticate_user()

import os
import requests
import pandas as pd
import io
import re
from google.cloud import bigquery
from google.cloud.exceptions import NotFound
import zipfile
import json

# ========================================
# CONFIGURATION
# ========================================

project_id = "able-balm-454718-n8"
dataset_id = "superset_sandbox"
data_lake_suffix = "_raw_db"
data_warehouse_suffix = "_cleaned_db"

client = bigquery.Client(project=project_id)

# Make sure dataset exists
try:
    client.get_dataset(dataset_id)
    print(f"✅ Dataset `{dataset_id}` already exists.")
except NotFound:
    dataset = bigquery.Dataset(f"{project_id}.{dataset_id}")
    dataset = client.create_dataset(dataset)
    print(f"✅ Created dataset `{dataset_id}`")

# ========================================
# COLUMN SANITIZER for BigQuery
# ========================================

def sanitize_col(col):
    # Lowercase, replace non-alphanum with _, strip repeated _, strip _ ends, max 128 chars (BQ limit)
    new_col = re.sub(r'[^a-zA-Z0-9_]', '_', str(col))
    new_col = re.sub(r'_+', '_', new_col)
    new_col = new_col.strip('_')
    return new_col[:128] if new_col else "col"

def sanitize_columns(df):
    df.columns = [sanitize_col(col) for col in df.columns]
    return df

# ========================================
# DATA INGESTION - JAPANESE SENTIMENT DATA TO DATA LAKE
# ========================================

def ingest_japanese_sentiment_to_lake():
    print("\n🇯🇵 INGESTING JAPANESE SENTIMENT DATA TO DATA LAKE")
    wrime_url = "https://raw.githubusercontent.com/ids-cv/wrime/refs/heads/master/wrime-ver1.tsv"
    wrime_path = "wrime-ver1.tsv"

    try:
        if not os.path.exists(wrime_path):
            print("📥 Downloading WRIME dataset...")
            r = requests.get(wrime_url)
            r.raise_for_status()
            with open(wrime_path, "wb") as f:
                f.write(r.content)

        df_wrime = pd.read_csv(wrime_path, sep="\t")
        df_wrime = df_wrime.dropna(subset=["Sentence"])
        print(f"📊 Loaded {len(df_wrime)} Japanese sentiment records")

        # Sanitize columns
        df_wrime = sanitize_columns(df_wrime)

        # Add data lake metadata
        df_wrime["id"] = df_wrime.index
        df_wrime["loaded_at"] = pd.Timestamp.utcnow()
        df_wrime["data_source"] = "wrime_sentiment"
        df_wrime["ingestion_batch"] = f"batch_{pd.Timestamp.utcnow().strftime('%Y%m%d_%H%M%S')}"

        for col in df_wrime.columns:
            if df_wrime[col].dtype == "object":
                df_wrime[col] = df_wrime[col].astype(str)

        table_id = f"{project_id}.{dataset_id}.jp_sentiment{data_lake_suffix}"
        print(f"🏗️ Uploading Japanese Sentiment to DATA LAKE: {table_id}")
        job_config = bigquery.LoadJobConfig(
            autodetect=True,
            write_disposition="WRITE_TRUNCATE"
        )
        job = client.load_table_from_dataframe(df_wrime, table_id, job_config=job_config)
        job.result()
        print(f"✅ Uploaded to DATA LAKE: {table_id}")

    except Exception as e:
        print(f"❌ Error ingesting Japanese sentiment data: {str(e)}")
        raise

# ========================================
# DATA INGESTION - AMAZON REVIEWS TO DATA LAKE (Alternative to HuggingFace)
# ========================================

def ingest_amazon_reviews_to_lake():
    print("\n🛍️ INGESTING AMAZON REVIEWS DATA TO DATA LAKE")

    try:
        # Using Amazon product reviews dataset from a direct CSV source
        amazon_url = "https://raw.githubusercontent.com/pycaret/pycaret/master/datasets/amazon.csv"

        print("📥 Downloading Amazon reviews dataset...")
        df_amazon = pd.read_csv(amazon_url)
        print(f"📊 Loaded {len(df_amazon)} Amazon review records")

        # Sanitize columns
        df_amazon = sanitize_columns(df_amazon)

        # Add data lake metadata
        df_amazon["id"] = df_amazon.index
        df_amazon["loaded_at"] = pd.Timestamp.utcnow()
        df_amazon["data_source"] = "amazon_reviews"
        df_amazon["ingestion_batch"] = f"batch_{pd.Timestamp.utcnow().strftime('%Y%m%d_%H%M%S')}"

        for col in df_amazon.columns:
            if df_amazon[col].dtype == "object":
                df_amazon[col] = df_amazon[col].astype(str)

        table_id = f"{project_id}.{dataset_id}.amazon_reviews{data_lake_suffix}"
        print(f"🏗️ Uploading Amazon Reviews to DATA LAKE: {table_id}")
        job_config = bigquery.LoadJobConfig(
            autodetect=True,
            write_disposition="WRITE_TRUNCATE"
        )
        job = client.load_table_from_dataframe(df_amazon, table_id, job_config=job_config)
        job.result()
        print(f"✅ Uploaded to DATA LAKE: {table_id}")

    except Exception as e:
        print(f"❌ Error ingesting Amazon reviews data: {str(e)}")
        print("🔄 Trying alternative dataset source...")
        ingest_alternative_reviews_to_lake()

def ingest_alternative_reviews_to_lake():
    """Alternative review data source if primary fails"""
    print("\n🛍️ INGESTING ALTERNATIVE REVIEWS DATA TO DATA LAKE")

    try:
        # Create a sample dataset from publicly available movie reviews
        movie_reviews_url = "https://raw.githubusercontent.com/clairett/pytorch-sentiment-classification/master/data/SST2/train.tsv"

        print("📥 Downloading movie reviews dataset...")
        df_reviews = pd.read_csv(movie_reviews_url, sep='\t', header=None, names=['sentiment', 'review_text'])
        print(f"📊 Loaded {len(df_reviews)} movie review records")

        # Add additional columns to simulate e-commerce data
        df_reviews['product_category'] = pd.Categorical(['Electronics', 'Books', 'Clothing', 'Home', 'Sports']).take(
            pd.array([i % 5 for i in range(len(df_reviews))])
        )
        df_reviews['rating'] = df_reviews['sentiment'].apply(lambda x: 5 if x == 1 else 1)
        df_reviews['verified_purchase'] = pd.Categorical(['Yes', 'No']).take(
            pd.array([i % 2 for i in range(len(df_reviews))])
        )

        # Sanitize columns
        df_reviews = sanitize_columns(df_reviews)

        # Add data lake metadata
        df_reviews["id"] = df_reviews.index
        df_reviews["loaded_at"] = pd.Timestamp.utcnow()
        df_reviews["data_source"] = "movie_reviews"
        df_reviews["ingestion_batch"] = f"batch_{pd.Timestamp.utcnow().strftime('%Y%m%d_%H%M%S')}"

        for col in df_reviews.columns:
            if df_reviews[col].dtype == "object":
                df_reviews[col] = df_reviews[col].astype(str)

        table_id = f"{project_id}.{dataset_id}.movie_reviews{data_lake_suffix}"
        print(f"🏗️ Uploading Movie Reviews to DATA LAKE: {table_id}")
        job_config = bigquery.LoadJobConfig(
            autodetect=True,
            write_disposition="WRITE_TRUNCATE"
        )
        job = client.load_table_from_dataframe(df_reviews, table_id, job_config=job_config)
        job.result()
        print(f"✅ Uploaded to DATA LAKE: {table_id}")

    except Exception as e:
        print(f"❌ Error ingesting alternative reviews data: {str(e)}")
        raise

# ========================================
# GEN-Z SLANG TRANSLATION SETUP
# ========================================

def setup_genz_translation():
    print("\n🔥 SETTING UP GEN-Z TRANSLATION MAPPINGS")

    try:
        # Try to get the Gen-Z slang dataset
        url_slang = "https://raw.githubusercontent.com/kaspercools/genz-dataset/refs/heads/main/genz_slang.csv"
        resp_slang = requests.get(url_slang)
        resp_slang.raise_for_status()

        df_slang = pd.read_csv(io.StringIO(resp_slang.text))
        slang_map = {
            str(row['keyword']).strip().lower(): str(row['description']).strip()
            for _, row in df_slang.iterrows()
            if pd.notnull(row['keyword']) and pd.notnull(row['description'])
        }

        # Try to get emoji dataset
        url_emoji = "https://raw.githubusercontent.com/kaspercools/genz-dataset/refs/heads/main/genz_emojis.csv"
        resp_emoji = requests.get(url_emoji)
        resp_emoji.raise_for_status()

        df_emoji = pd.read_csv(io.StringIO(resp_emoji.text))
        emoji_map = {
            str(row['emoji']).strip(): str(row['Description']).strip()
            for _, row in df_emoji.iterrows()
            if pd.notnull(row['emoji']) and pd.notnull(row['Description'])
        }

    except Exception as e:
        print(f"⚠️ Could not load external slang data: {str(e)}")
        print("🔄 Using built-in translation mappings...")

        # Fallback to built-in mappings
        slang_map = {
            "lit": "amazing, exciting, excellent",
            "salty": "bitter, angry, upset",
            "flex": "show off, boast",
            "ghost": "suddenly stop communicating",
            "vibe": "feeling, atmosphere",
            "stan": "be a big fan of",
            "periodt": "period, end of discussion",
            "bet": "okay, sure, yes",
            "cap": "lie, false statement",
            "fire": "excellent, amazing",
            "sus": "suspicious, questionable",
            "slaps": "is really good",
            "hits different": "is exceptionally good",
            "bussin": "really good, excellent",
            "no cap": "no lie, for real"
        }

        emoji_map = {
            "💯": "one hundred percent, perfect",
            "🔥": "fire, excellent, amazing",
            "😤": "frustrated, determined",
            "💀": "dead, dying of laughter",
            "🤡": "clown, foolish",
            "👑": "queen, king, royalty",
            "✨": "sparkle, magical, special"
        }

    # Add variant patterns
    variant_patterns = {
        "fleek": ["on fleek"],
        "cap": ["no cap"],
        "shade": ["throw shade"],
        "tea": ["spill the tea"],
        "key": ["low key", "high key"],
        "bestie": ["bestie vibes"],
        "grass": ["touch grass"]
    }

    custom_phrase_map = {}
    for base, phrases in variant_patterns.items():
        if base in slang_map:
            for phrase in phrases:
                custom_phrase_map[phrase] = slang_map[base]

    # Manual phrase mappings
    manual_phrase_map = {
        "on fleek": "something that is perfect or done really well",
        "no cap": "no lie, for real, telling the truth",
        "throw shade": "to publicly criticize or express contempt",
        "spill the tea": "to gossip or share juicy information",
        "low key": "somewhat, slightly, or secretly",
        "high key": "very much, obviously, or openly",
        "hits different": "is exceptionally good in a unique way",
        "periodt": "period, end of discussion, final word"
    }

    translation_map = {**manual_phrase_map, **custom_phrase_map, **slang_map, **emoji_map}
    print(f"✅ Loaded {len(translation_map)} translation mappings")
    return translation_map

def replace_slang_and_emoji(text, translation_map):
    if pd.isna(text):
        return ""
    text = str(text)
    for key in sorted(translation_map.keys(), key=lambda x: -len(x)):
        val = translation_map[key]
        if re.match(r'^\W+$', key):
            text = text.replace(key, val)
        else:
            pattern = r'(?i)(?<!\w){}(?=\W|$)'.format(re.escape(key))
            text = re.sub(pattern, val, text)
    return text

# ========================================
# ETL PIPELINE
# ========================================

def etl_japanese_sentiment():
    print("\n🔄 ETL PIPELINE - JAPANESE SENTIMENT")
    source_table = f"{project_id}.{dataset_id}.jp_sentiment{data_lake_suffix}"
    query = f"SELECT * FROM `{source_table}`"
    df = client.query(query).to_dataframe()
    print(f"📊 Loaded {len(df)} rows from data lake")

    # Sanitize columns again (in case query returns non-standard)
    df = sanitize_columns(df)

    # Text processing
    text_cols = [col for col in df.columns if 'sentence' in col.lower() or 'text' in col.lower()]
    if text_cols:
        main_text_col = text_cols[0]
        df['sentence_cleaned'] = df[main_text_col].str.strip()
        df['sentence_length'] = df['sentence_cleaned'].str.len()
        df['word_count'] = df['sentence_cleaned'].str.split().str.len()

    # Emotion processing
    emotion_cols = [col for col in df.columns if col.lower() in ['joy', 'sadness', 'anticipation', 'surprise', 'anger', 'fear', 'disgust', 'trust']]
    if emotion_cols:
        for col in emotion_cols:
            df[col] = pd.to_numeric(df[col], errors='coerce')
            df[f'{col.lower()}_normalized'] = df[col] / df[col].max() if df[col].max() > 0 else 0

        df['emotion_intensity'] = df[emotion_cols].sum(axis=1)
        df['dominant_emotion'] = df[emotion_cols].idxmax(axis=1)
    else:
        df['emotion_intensity'] = 0
        df['dominant_emotion'] = None

    # Metadata
    df['processed_at'] = pd.Timestamp.utcnow()
    df['etl_version'] = '1.0'
    df['quality_score'] = 1.0

    print(f"✅ ETL completed. {len(df)} rows processed")
    return df

def etl_reviews():
    print("\n🔄 ETL PIPELINE - REVIEWS")
    translation_map = setup_genz_translation()

    # Try to find the reviews table
    tables = client.list_tables(dataset_id)
    review_tables = [t for t in tables if 'review' in t.table_id and t.table_id.endswith(data_lake_suffix)]

    if not review_tables:
        print("❌ No review tables found in data lake")
        return pd.DataFrame()

    source_table = f"{project_id}.{dataset_id}.{review_tables[0].table_id}"
    query = f"SELECT * FROM `{source_table}`"
    df = client.query(query).to_dataframe()
    print(f"📊 Loaded {len(df)} rows from data lake")

    # Sanitize columns again
    df = sanitize_columns(df)

    # Text processing - find text columns
    text_cols = [col for col in df.columns if any(keyword in col.lower() for keyword in ['text', 'review', 'comment', 'content'])]
    if text_cols:
        main_text_col = text_cols[0]
        df['review_text_cleaned'] = df[main_text_col].str.strip()
        df['review_text_translated'] = df['review_text_cleaned'].apply(
            lambda x: replace_slang_and_emoji(x, translation_map)
        )
        df['review_length'] = df['review_text_cleaned'].str.len()
        df['word_count'] = df['review_text_cleaned'].str.split().str.len()

    # Rating processing - find rating columns
    rating_cols = [col for col in df.columns if any(keyword in col.lower() for keyword in ['rating', 'score', 'sentiment'])]
    if rating_cols:
        main_rating_col = rating_cols[0]
        df['rating'] = pd.to_numeric(df[main_rating_col], errors='coerce')

        # Normalize rating to 0-1 scale
        if df['rating'].max() > 1:
            df['rating_normalized'] = df['rating'] / df['rating'].max()
        else:
            df['rating_normalized'] = df['rating']

        # Create rating categories
        df['rating_category'] = pd.cut(df['rating'], bins=3, labels=['Poor', 'Fair', 'Good'])

    # Category processing - find category columns
    category_cols = [col for col in df.columns if any(keyword in col.lower() for keyword in ['category', 'department', 'type', 'class'])]
    if category_cols:
        main_category_col = category_cols[0]
        df['category_cleaned'] = df[main_category_col].str.strip().str.title()

    # Feature engineering
    df['has_review_text'] = df[text_cols[0]].notna() if text_cols else False

    if 'rating' in df.columns:
        df['review_sentiment'] = df['rating'].apply(
            lambda x: 'positive' if x >= 0.6 else 'negative' if x <= 0.4 else 'neutral'
        )
    else:
        df['review_sentiment'] = 'neutral'

    # Metadata
    df['processed_at'] = pd.Timestamp.utcnow()
    df['etl_version'] = '1.0'
    df['quality_score'] = 1.0

    print(f"✅ ETL completed. {len(df)} rows processed")
    return df

# ========================================
# DATA QUALITY WITH BASIC CHECKS
# ========================================

def run_data_quality_checks(df, dataset_name):
    print(f"\n🔍 DATA QUALITY CHECKS - {dataset_name.upper()}")

    quality_checks = {
        'total_records': len(df),
        'null_percentage': df.isnull().sum().sum() / (len(df) * len(df.columns)),
        'duplicate_records': df.duplicated().sum(),
        'empty_strings': 0
    }

    # Check for empty strings in text columns
    text_cols = df.select_dtypes(include=['object']).columns
    for col in text_cols:
        quality_checks['empty_strings'] += (df[col] == '').sum()

    # Dataset-specific checks
    if 'jp_sentiment' in dataset_name:
        if 'sentence_cleaned' in df.columns:
            quality_checks['avg_sentence_length'] = df['sentence_cleaned'].str.len().mean()
            quality_checks['sentences_too_short'] = (df['sentence_cleaned'].str.len() < 5).sum()
        if 'emotion_intensity' in df.columns:
            quality_checks['avg_emotion_intensity'] = df['emotion_intensity'].mean()

    elif 'review' in dataset_name:
        if 'rating' in df.columns:
            quality_checks['avg_rating'] = df['rating'].mean()
            quality_checks['ratings_out_of_range'] = ((df['rating'] < 0) | (df['rating'] > df['rating'].max())).sum()
        if 'review_length' in df.columns:
            quality_checks['avg_review_length'] = df['review_length'].mean()

    # Calculate overall quality score
    quality_score = 1.0
    if quality_checks['null_percentage'] > 0.1:
        quality_score -= 0.2
    if quality_checks['duplicate_records'] > len(df) * 0.05:
        quality_score -= 0.1
    if quality_checks['empty_strings'] > len(df) * 0.05:
        quality_score -= 0.1

    quality_score = max(0.5, quality_score)  # Minimum score of 0.5
    df['quality_score'] = quality_score

    print(f"✅ Data quality validation completed")
    print(f"📊 Total records: {quality_checks['total_records']}")
    print(f"📊 Null percentage: {quality_checks['null_percentage']:.2%}")
    print(f"📊 Duplicate records: {quality_checks['duplicate_records']}")
    print(f"📈 Quality score: {quality_score:.3f}")

    return df, quality_checks

# ========================================
# DATA WAREHOUSE STORAGE
# ========================================

def store_in_data_warehouse(df, table_name):
    print(f"\n🏪 STORING IN DATA WAREHOUSE - {table_name}")
    warehouse_table = f"{project_id}.{dataset_id}.{table_name}{data_warehouse_suffix}"

    # Ensure all object columns are strings
    for col in df.columns:
        if df[col].dtype == "object":
            df[col] = df[col].astype(str)

    print(f"🚀 Uploading to DATA WAREHOUSE: {warehouse_table}")
    job_config = bigquery.LoadJobConfig(
        autodetect=True,
        write_disposition="WRITE_TRUNCATE"
    )
    job = client.load_table_from_dataframe(df, warehouse_table, job_config=job_config)
    job.result()
    print(f"✅ Uploaded to DATA WAREHOUSE: {warehouse_table}")

# ========================================
# MAIN EXECUTION PIPELINE
# ========================================

def main():
    print("🚀 STARTING DATA LAKEHOUSE PIPELINE")
    print("="*50)

    try:
        print("\n📥 PHASE 1: DATA INGESTION TO DATA LAKE")
        ingest_japanese_sentiment_to_lake()

        # Try Amazon reviews first, fall back to movie reviews if needed
        try:
            ingest_amazon_reviews_to_lake()
        except:
            print("🔄 Amazon reviews failed, using alternative dataset...")
            ingest_alternative_reviews_to_lake()

        print("\n🔄 PHASE 2: ETL PROCESSING")
        df_jp_sentiment = etl_japanese_sentiment()
        df_reviews = etl_reviews()

        if df_reviews.empty:
            print("⚠️ No review data processed, skipping review pipeline")
            review_quality_score = 0.0
        else:
            print("\n🔍 PHASE 3: DATA QUALITY CHECKS")
            df_jp_sentiment, jp_validation = run_data_quality_checks(df_jp_sentiment, "jp_sentiment")
            df_reviews, review_validation = run_data_quality_checks(df_reviews, "reviews")
            review_quality_score = df_reviews['quality_score'].iloc[0] if not df_reviews.empty else 0.0

        print("\n🏪 PHASE 4: DATA WAREHOUSE STORAGE")
        store_in_data_warehouse(df_jp_sentiment, "jp_sentiment")

        if not df_reviews.empty:
            store_in_data_warehouse(df_reviews, "reviews")

        print("\n📊 PIPELINE SUMMARY")
        print("="*50)
        print(f"✅ Data Lake Tables Created:")
        print(f"   - jp_sentiment{data_lake_suffix}")
        if not df_reviews.empty:
            print(f"   - reviews{data_lake_suffix}")

        print(f"✅ Data Warehouse Tables Created:")
        print(f"   - jp_sentiment{data_warehouse_suffix}")
        if not df_reviews.empty:
            print(f"   - reviews{data_warehouse_suffix}")

        print(f"🔍 Quality Scores:")
        print(f"   - Japanese Sentiment: {df_jp_sentiment['quality_score'].iloc[0]:.3f}")
        if not df_reviews.empty:
            print(f"   - Reviews: {review_quality_score:.3f}")

        print("🎉 DATA LAKEHOUSE PIPELINE COMPLETED SUCCESSFULLY!")

    except Exception as e:
        print(f"❌ Pipeline failed with error: {str(e)}")
        raise

# ========================================
# UTILITY FUNCTIONS
# ========================================

def list_all_tables():
    print(f"\n📋 ALL TABLES IN {dataset_id}:")
    tables = client.list_tables(dataset_id)
    for table in tables:
        print(f"  - {table.table_id}")

def query_data_warehouse(table_suffix="_cleaned_db", limit=5):
    print(f"\n🔍 QUERYING DATA WAREHOUSE TABLES")
    tables = client.list_tables(dataset_id)
    warehouse_tables = [t for t in tables if t.table_id.endswith(table_suffix)]

    for table in warehouse_tables:
        print(f"\n📊 Sample data from {table.table_id}:")
        query = f"SELECT * FROM `{project_id}.{dataset_id}.{table.table_id}` LIMIT {limit}"
        try:
            df = client.query(query).to_dataframe()
            print(df.head())
        except Exception as e:
            print(f"❌ Error querying {table.table_id}: {str(e)}")

# ========================================
# EXECUTION
# ========================================

if __name__ == "__main__":
    main()
    list_all_tables()
    query_data_warehouse()

✅ Dataset `superset_sandbox` already exists.
🚀 STARTING DATA LAKEHOUSE PIPELINE

📥 PHASE 1: DATA INGESTION TO DATA LAKE

🇯🇵 INGESTING JAPANESE SENTIMENT DATA TO DATA LAKE
📊 Loaded 43200 Japanese sentiment records
🏗️ Uploading Japanese Sentiment to DATA LAKE: able-balm-454718-n8.superset_sandbox.jp_sentiment_raw_db
✅ Uploaded to DATA LAKE: able-balm-454718-n8.superset_sandbox.jp_sentiment_raw_db

🛍️ INGESTING AMAZON REVIEWS DATA TO DATA LAKE
📥 Downloading Amazon reviews dataset...
📊 Loaded 20000 Amazon review records
🏗️ Uploading Amazon Reviews to DATA LAKE: able-balm-454718-n8.superset_sandbox.amazon_reviews_raw_db
✅ Uploaded to DATA LAKE: able-balm-454718-n8.superset_sandbox.amazon_reviews_raw_db

🔄 PHASE 2: ETL PROCESSING

🔄 ETL PIPELINE - JAPANESE SENTIMENT
📊 Loaded 43200 rows from data lake
✅ ETL completed. 43200 rows processed

🔄 ETL PIPELINE - REVIEWS

🔥 SETTING UP GEN-Z TRANSLATION MAPPINGS
✅ Loaded 223 translation mappings
📊 Loaded 20000 rows from data lake
✅ ETL completed. 200