Step 1: Extract from Kaggle
- Setup environment variable
- Store raw CSVs in a structured folder (data/raw/olist_orders.csv, etc.) so Meltano can ingest cleanly.

Step 2: Load into BigQuery with Meltano
- Configure Meltano extractor (tap-kaggle or tap-csv) to read raw CSVs.
- Configure loader (target-bigquery) with your GCP project credentials.
- Define schema mapping in Meltano so each CSV → BigQuery table:
- customers.csv → raw_customers
- orders.csv → raw_orders
- order_items.csv → raw_order_items
- etc.
This ensures raw layer consistency before DBT transformations.




# Olist Brazilian E‑Commerce

This notebook implements the plan you requested:

- **Step 1: Extract from Kaggle** — set environment variables and download raw CSVs into `data/raw/`.
- **Step 2: Load into BigQuery with Meltano** — configure a CSV extractor and `target-bigquery` loader, map CSVs → raw tables.
- **Step 3: Transform with dbt** — create staging models, facts, dimensions, and tests.

Edit the environment variables and file paths in the first code cell before running.

### Notes before running

- This notebook assumes you have Python, `pip`, and access to the internet from the environment where you run it.
- You must **not** commit secrets. Use environment variables for credentials.
- The notebook writes configuration files (Meltano, dbt `profiles.yml`, and sample SQL models) into the working directory so you can iterate locally or in a CI/CD pipeline.


In [None]:
# Step 0: Edit these variables before running
import os

# Kaggle credentials (set as env vars; do NOT hardcode in shared repos)
os.environ['KAGGLE_USERNAME'] = os.getenv('KAGGLE_USERNAME', 'kiessxxxxxx')  # replace with your Kaggle username
os.environ['KAGGLE_KEY'] = os.getenv('KAGGLE_KEY', 'AAAA_AAA9999999999999') # replace with your Kaggle API key

# GCP project and service account JSON path
os.environ['GCP_PROJECT'] = os.getenv('GCP_PROJECT', 'aaaaaaaa') # replace with your GCP project ID
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = os.getenv('GOOGLE_APPLICATION_CREDENTIALS', '/home/dsai/5m-data-2.6-data-pipelines-orchestration/aaaaaaaaa.json')

# Local paths used by this notebook
BASE_DIR = os.path.abspath('.')
RAW_DIR = os.path.join(BASE_DIR, 'data', 'raw')
os.makedirs(RAW_DIR, exist_ok=True)

print('KAGGLE_USERNAME:', os.environ['KAGGLE_USERNAME'])
print('GCP_PROJECT:', os.environ['GCP_PROJECT'])
print('RAW_DIR:', RAW_DIR)


KAGGLE_USERNAME: kieronsiriban
GCP_PROJECT: algebraic-road-478012-u9
RAW_DIR: /home/dsai/5m-data-2.6-data-pipelines-orchestration/data/raw


## Step 1: Extract from Kaggle

This cell installs the Kaggle API client, downloads the Olist dataset, and places CSVs into `data/raw/` with predictable filenames so Meltano can ingest them.

In [24]:
# Install Kaggle client if needed
!pip install --quiet kaggle pandas

from kaggle.api.kaggle_api_extended import KaggleApi
import zipfile
import shutil
import glob
import pandas as pd

api = KaggleApi()
api.authenticate()

dataset = 'olistbr/brazilian-ecommerce'
download_path = '/tmp/olist_kaggle_download'
import os
os.makedirs(download_path, exist_ok=True)

print('Downloading dataset...')
api.dataset_download_files(dataset, path=download_path, unzip=True)
print('Download complete. Listing CSVs:')
csvs = glob.glob(os.path.join(download_path, '*.csv'))
for f in csvs:
    print('-', os.path.basename(f))

# Copy CSVs to data/raw with stable names
for f in csvs:
    dest = os.path.join(RAW_DIR, os.path.basename(f))
    shutil.copyfile(f, dest)
print('\nCopied CSVs to', RAW_DIR)

# Quick peek at one file
pd.read_csv(os.path.join(RAW_DIR, os.path.basename(csvs[0]))).head()

Downloading dataset...
Dataset URL: https://www.kaggle.com/datasets/olistbr/brazilian-ecommerce
Download complete. Listing CSVs:
- olist_order_payments_dataset.csv
- olist_orders_dataset.csv
- olist_products_dataset.csv
- olist_order_reviews_dataset.csv
- olist_customers_dataset.csv
- olist_sellers_dataset.csv
- olist_geolocation_dataset.csv
- olist_order_items_dataset.csv
- product_category_name_translation.csv

Copied CSVs to /home/dsai/5m-data-2.6-data-pipelines-orchestration/data/raw


Unnamed: 0,order_id,payment_sequential,payment_type,payment_installments,payment_value
0,b81ef226f3fe1789b1e8b2acac839d17,1,credit_card,8,99.33
1,a9810da82917af2d9aefd1278f1dcfa0,1,credit_card,1,24.39
2,25e8ea4e93396b6fa0d3dd708e76c1bd,1,credit_card,1,65.71
3,ba78997921bbcdc1373bb41e913ab953,1,credit_card,8,107.78
4,42fdf880ba16b47b59251dd489d4441a,1,credit_card,2,128.45


In [None]:
#run in terminal 

# ensure meltano CLI is available
pip install --upgrade meltano

# initialize project if you haven't
meltano init olist_pipeline
cd olist_pipeline   

# add plugins (this updates meltano.yml and installs the correct packages)
meltano add extractor tap-csv
meltano add loader target-bigquery

# verify plugins
meltano plugins list


## Step 2: Load into BigQuery with Meltano

We will scaffold a Meltano project that uses a CSV extractor (tap-csv) and `target-bigquery` as the loader. The idea: Meltano reads CSVs from `data/raw/` and writes them to BigQuery tables in a `raw_` dataset.

Below are commands and configuration snippets. The notebook will write a minimal `meltano.yml` and show how to run `meltano elt` for each CSV. In production, prefer a single pipeline or orchestrator (Airflow, Cloud Composer, Cloud Workflows) to run Meltano jobs.

In [None]:
#run in terminal 
gcloud auth application-default login

In [1]:
# Install Meltano and the published BigQuery target package
!pip install --quiet meltano z3-target-bigquery

import os, textwrap

meltano_yml = textwrap.dedent(f"""
version: 1
plugins:
  extractors:
    - name: tap-csv
      pip_url: tap-csv
  loaders:
    - name: target-bigquery
      pip_url: z3-target-bigquery
  orchestrators: []
  utilities: []
environments:
  local:
    variables:
      GOOGLE_APPLICATION_CREDENTIALS: "{os.environ.get('GOOGLE_APPLICATION_CREDENTIALS')}"
      GCP_PROJECT: "{os.environ.get('GCP_PROJECT')}"
""")

with open('meltano.yml', 'w') as f:
    f.write(meltano_yml)

print('Wrote meltano.yml with z3-target-bigquery')
print()
print('Next steps (run in a shell or notebook cells):')
print('  1) Initialize Meltano project if needed: meltano init olist_project --no-input')
print('  2) Register plugins: meltano add extractor tap-csv && meltano add loader target-bigquery')
print('  3) Verify plugins: meltano plugins list')
print('  4) Run an ELT for a single CSV (example):')
print('     meltano elt tap-csv target-bigquery --job_id=olist_customers \\')
print('       --config tap-csv:input_path=data/raw/customers.csv \\')
print('       --config target-bigquery:dataset=olist_raw')

Wrote meltano.yml with z3-target-bigquery

Next steps (run in a shell or notebook cells):
  1) Initialize Meltano project if needed: meltano init olist_project --no-input
  2) Register plugins: meltano add extractor tap-csv && meltano add loader target-bigquery
  3) Verify plugins: meltano plugins list
  4) Run an ELT for a single CSV (example):
     meltano elt tap-csv target-bigquery --job_id=olist_customers \
       --config tap-csv:input_path=data/raw/customers.csv \
       --config target-bigquery:dataset=olist_raw


### Example mapping (CSV → BigQuery table)

- `olist_customers_dataset.csv` → `olist_raw.raw_customers`
- `olist_orders_dataset.csv` → `olist_raw.raw_orders`
- `olist_order_items_dataset.csv` → `olist_raw.raw_order_items`
- `olist_products_dataset.csv` → `olist_raw.raw_products`
- `olist_sellers_dataset.csv` → `olist_raw.raw_sellers`
- `olist_order_payments_dataset.csv` → `olist_raw.raw_payments`
- `olist_order_reviews_dataset.csv` → `olist_raw.raw_order_reviews`
- `olist_geolocation_dataset.csv` → `olist_raw.raw_geolocation`

Below is a sample `target-bigquery` config snippet you can set via `meltano config` or in the Meltano UI. It uses the service account JSON via `GOOGLE_APPLICATION_CREDENTIALS`.

In [12]:
target_bigquery_config = {
    'project_id': os.environ['GCP_PROJECT'],
    'dataset': 'olist_raw',
    # keyfile is optional if GOOGLE_APPLICATION_CREDENTIALS is set
    'keyfile': os.environ['GOOGLE_APPLICATION_CREDENTIALS']
}
import json
print('Sample target-bigquery config (for reference):')
print(json.dumps(target_bigquery_config, indent=2))

Sample target-bigquery config (for reference):
{
  "project_id": "algebraic-road-478012-u9",
  "dataset": "olist_raw",
  "keyfile": "/home/dsai/5m-data-2.6-data-pipelines-orchestration/algebraic-road-478012-u9-fe73ba0332c3.json"
}


## Step 2b: Alternative — direct Python BigQuery upload (useful for quick testing)

For quick load CSVs into BigQuery without Meltano for validation, use the Python BigQuery client. This is **not** the Meltano path but is useful for quick checks.

In [None]:
# Robust CSV -> BigQuery loader
!pip install --quiet google-cloud-bigquery pandas pyarrow

import os
import pandas as pd
from google.cloud import bigquery
from google.api_core.exceptions import GoogleAPIError

# Ensure these are set in your environment
GCP_PROJECT = os.environ.get('GCP_PROJECT')
GCP_CRED = os.environ.get('GOOGLE_APPLICATION_CREDENTIALS')
RAW_DIR = os.path.join(os.getcwd(), 'data', 'raw')  # adjust if different

if not GCP_PROJECT or not GCP_CRED:
    raise RuntimeError("Set GCP_PROJECT and GOOGLE_APPLICATION_CREDENTIALS environment variables before running.")

client = bigquery.Client(project=GCP_PROJECT)

def ensure_dataset(dataset_id: str):
    dataset_ref = bigquery.Dataset(f"{GCP_PROJECT}.{dataset_id}")
    try:
        client.get_dataset(dataset_ref)
        print(f"Dataset {dataset_id} exists.")
    except Exception:
        print(f"Dataset {dataset_id} not found — creating it.")
        dataset = bigquery.Dataset(dataset_ref)
        dataset.location = "US"  # change region if needed
        client.create_dataset(dataset, exists_ok=True)
        print(f"Created dataset {dataset_id}.")

def load_csv_to_bq(local_csv_path, table_name, dataset='olist_raw', chunk_size=200_000):
    table_id = f"{GCP_PROJECT}.{dataset}.{table_name}"
    print(f"\nLoading {local_csv_path} -> {table_id}")
    # Try to read a small sample to detect dtypes and parse dates
    try:
        sample = pd.read_csv(local_csv_path, nrows=1000, low_memory=False)
    except Exception as e:
        print("Failed to read CSV sample:", e)
        raise

    # Heuristic: parse common timestamp columns if present
    date_cols = [c for c in sample.columns if 'date' in c.lower() or 'timestamp' in c.lower()]
    parse_dates = date_cols if date_cols else None

    # Try full read with parse_dates if any
    try:
        df = pd.read_csv(local_csv_path, parse_dates=parse_dates, low_memory=False)
    except Exception as e:
        print("Full read failed, attempting chunked read:", e)
        # Fallback: load in chunks and append to BigQuery
        try:
            job_config = bigquery.LoadJobConfig(
                write_disposition=bigquery.WriteDisposition.WRITE_TRUNCATE
            )
            # First chunk: create/replace table
            first = True
            for chunk in pd.read_csv(local_csv_path, chunksize=chunk_size, parse_dates=parse_dates, low_memory=False):
                if first:
                    job = client.load_table_from_dataframe(chunk, table_id, job_config=job_config)
                    job.result()
                    first = False
                else:
                    job_config.write_disposition = bigquery.WriteDisposition.WRITE_APPEND
                    job = client.load_table_from_dataframe(chunk, table_id, job_config=job_config)
                    job.result()
            print(f"Chunked load complete for {local_csv_path}")
            return
        except GoogleAPIError as ge:
            print("BigQuery chunked load failed:", ge)
            raise

    # If we have a dataframe, attempt a single load
    try:
        job_config = bigquery.LoadJobConfig(
            write_disposition=bigquery.WriteDisposition.WRITE_TRUNCATE
        )
        job = client.load_table_from_dataframe(df, table_id, job_config=job_config)
        job.result()
        print(f"Loaded {local_csv_path} -> {table_id} ({df.shape[0]} rows)")
    except GoogleAPIError as ge:
        print("BigQuery load failed:", ge)
        # As a fallback, try chunked upload
        try:
            print("Retrying with chunked upload...")
            first = True
            for chunk in pd.read_csv(local_csv_path, chunksize=chunk_size, parse_dates=parse_dates, low_memory=False):
                if first:
                    job_config.write_disposition = bigquery.WriteDisposition.WRITE_TRUNCATE
                    job = client.load_table_from_dataframe(chunk, table_id, job_config=job_config)
                    job.result()
                    first = False
                else:
                    job_config.write_disposition = bigquery.WriteDisposition.WRITE_APPEND
                    job = client.load_table_from_dataframe(chunk, table_id, job_config=job_config)
                    job.result()
            print("Chunked retry succeeded.")
        except Exception as e:
            print("Chunked retry also failed:", e)
            raise

# Ensure dataset exists
ensure_dataset('olist_raw')

# Files to load (adjust list/order as you like)
files_and_tables = {
    'olist_customers_dataset.csv': 'raw_customers',
    'olist_geolocation_dataset.csv': 'raw_geolocation',
    'olist_order_items_dataset.csv': 'raw_order_items',
    'olist_order_payments_dataset.csv': 'raw_payments',
    'olist_order_reviews_dataset.csv': 'raw_order_reviews',
    'olist_orders_dataset.csv': 'raw_orders',
    'olist_products_dataset.csv': 'raw_products',
    'olist_sellers_dataset.csv': 'raw_sellers',
    'product_category_name_translation.csv': 'raw_product_category_name_translation'
}

for fname, table in files_and_tables.items():
    path = os.path.join(RAW_DIR, fname)
    if os.path.exists(path):
        try:
            load_csv_to_bq(path, table)
        except Exception as e:
            print(f"Failed to load {fname}: {e}")
    else:
        print(f"{fname} not found in {RAW_DIR}")

# List loaded tables
print("\nLoaded tables in dataset olist_raw:")
try:
    tables = client.list_tables('olist_raw')
    for table in tables:
        print('-', table.table_id)
except Exception as e:
    print("Failed to list tables:", e)


Dataset olist_raw exists.

Loading /home/dsai/5m-data-2.6-data-pipelines-orchestration/data/raw/customers.csv -> algebraic-road-478012-u9.olist_raw.raw_customers




Loaded /home/dsai/5m-data-2.6-data-pipelines-orchestration/data/raw/customers.csv -> algebraic-road-478012-u9.olist_raw.raw_customers (99441 rows)

Loading /home/dsai/5m-data-2.6-data-pipelines-orchestration/data/raw/geolocation.csv -> algebraic-road-478012-u9.olist_raw.raw_geolocation




Loaded /home/dsai/5m-data-2.6-data-pipelines-orchestration/data/raw/geolocation.csv -> algebraic-road-478012-u9.olist_raw.raw_geolocation (1000163 rows)

Loading /home/dsai/5m-data-2.6-data-pipelines-orchestration/data/raw/order_items.csv -> algebraic-road-478012-u9.olist_raw.raw_order_items




Loaded /home/dsai/5m-data-2.6-data-pipelines-orchestration/data/raw/order_items.csv -> algebraic-road-478012-u9.olist_raw.raw_order_items (112650 rows)

Loading /home/dsai/5m-data-2.6-data-pipelines-orchestration/data/raw/order_payments.csv -> algebraic-road-478012-u9.olist_raw.raw_payments




Loaded /home/dsai/5m-data-2.6-data-pipelines-orchestration/data/raw/order_payments.csv -> algebraic-road-478012-u9.olist_raw.raw_payments (103886 rows)

Loading /home/dsai/5m-data-2.6-data-pipelines-orchestration/data/raw/order_reviews.csv -> algebraic-road-478012-u9.olist_raw.raw_order_reviews




Loaded /home/dsai/5m-data-2.6-data-pipelines-orchestration/data/raw/order_reviews.csv -> algebraic-road-478012-u9.olist_raw.raw_order_reviews (99224 rows)

Loading /home/dsai/5m-data-2.6-data-pipelines-orchestration/data/raw/orders.csv -> algebraic-road-478012-u9.olist_raw.raw_orders




Loaded /home/dsai/5m-data-2.6-data-pipelines-orchestration/data/raw/orders.csv -> algebraic-road-478012-u9.olist_raw.raw_orders (99441 rows)

Loading /home/dsai/5m-data-2.6-data-pipelines-orchestration/data/raw/products.csv -> algebraic-road-478012-u9.olist_raw.raw_products




Loaded /home/dsai/5m-data-2.6-data-pipelines-orchestration/data/raw/products.csv -> algebraic-road-478012-u9.olist_raw.raw_products (32951 rows)

Loading /home/dsai/5m-data-2.6-data-pipelines-orchestration/data/raw/sellers.csv -> algebraic-road-478012-u9.olist_raw.raw_sellers




Loaded /home/dsai/5m-data-2.6-data-pipelines-orchestration/data/raw/sellers.csv -> algebraic-road-478012-u9.olist_raw.raw_sellers (3095 rows)

Loading /home/dsai/5m-data-2.6-data-pipelines-orchestration/data/raw/product_category_name_translation.csv -> algebraic-road-478012-u9.olist_raw.raw_product_category_name_translation




Loaded /home/dsai/5m-data-2.6-data-pipelines-orchestration/data/raw/product_category_name_translation.csv -> algebraic-road-478012-u9.olist_raw.raw_product_category_name_translation (71 rows)

Loaded tables in dataset olist_raw:
- raw_customers
- raw_geolocation
- raw_order_items
- raw_order_reviews
- raw_orders
- raw_payments
- raw_product_category_name_translation
- raw_products
- raw_sellers
