# Introduction

End to end Data Pipeline for M2 Project

# Ingest Brazilian E-Commerce Public Dataset by Olist

Option A: Run command in terminal: curl -L -o "/Users/alexfoo/Documents/NTU_DSAI/5m-data-m2-project/assets/brazilian-ecommerce.zip" https://www.kaggle.com/api/v1/datasets/download/olistbr/brazilian-ecommerce

Option B: Run in cell

In [6]:
import requests, zipfile, os

url = "https://www.kaggle.com/api/v1/datasets/download/olistbr/brazilian-ecommerce"
save_path = "/Users/alexfoo/Documents/NTU_DSAI/5m-data-m2-project/assets/"
zip_file = os.path.join(save_path, "brazilian-ecommerce.zip")

# Download file
response = requests.get(url, stream=True)
if response.status_code == 200:
    with open(zip_file, "wb") as f:
        for chunk in response.iter_content(chunk_size=8192):
            f.write(chunk)
    print(f"Downloaded zip to: {zip_file}")
else:
    print("Download failed:", response.status_code)

# Unzip
with zipfile.ZipFile(zip_file, "r") as zip_ref:
    zip_ref.extractall(save_path)

print(f"Files extracted to: {save_path}")

Downloaded zip to: /Users/alexfoo/Documents/NTU_DSAI/5m-data-m2-project/assets/brazilian-ecommerce.zip
Files extracted to: /Users/alexfoo/Documents/NTU_DSAI/5m-data-m2-project/assets/


# Create a Meltano Project
We will create a Meltano project and use it to
extract data from CSVs and load it into a BigQuery dataset.

We will treat the BigQuery dataset as our data warehouse. The 2 tasks above are typical data ingestion pipelines, which extract data from external and internal sources and load them into a data warehouse.

To create a Meltano project, run:

To create a Meltano project, run:

meltano init meltano-ingestion
cd meltano-ingestion

# Add an Extractor to Pull Data from CSVs
We're going to add an extrator for CSV to get our data. An extractor is responsible for pulling data out of any data source. We will use the tap-csv extractor to pull the dataset from downloaded CSVs.

To add the extractor to our project, run:

meltano add extractor tap-csv

Next, configure the extractor by running:

meltano config tap-csv set --interactive

You will be prompted to enter many options, we just need to enter the following:

Settings
| 1. add_metadata_columns: When True, add the metadata columns (`_sdc_source_file`, `_sdc_source_fi...
│ 2. csv_files_definition: Project-relative path to JSON file holding array of objects as described...
│ 3. faker_config.locale: One or more LCID locale strings to produce localized output for: https:/...
│ 4. faker_config.seed: Value to seed the Faker generator for deterministic output: https://fake...
│ 5. files: Array of objects with `entity`, `path`, `keys`, and `encoding` [Optional...
│ 6. flattening_enabled: 'True' to enable schema flattening and automatically expand nested prope... 
│ 7. flattening_max_depth: The max depth to flatten schemas.
│ 8. stream_map_config: User-defined config values to be used within map expressions.
│ 9. stream_maps: Config object for stream maps capability. For more information check out...

You will be prompted to enter many options, we just need to enter the following:

flattening_enabled: true
flattening_max_depth: 1

In Meltano, when you use tap-csv, you configure it to point at a directory of CSV files. The tap can then ingest multiple CSVs in one go, with each CSV treated as a stream.

Configure it in meltano.yml

Example config (simplified):

plugins:
  extractors:
  - name: tap-csv
    variant: meltanolabs
    pip_url: git+https://github.com/MeltanoLabs/tap-csv.git
    config:
      files:
        - entity: customer
          path: ../assets/olist_customers_dataset.csv
          keys: [customer_id]
        - entity: geolocation
          path: ../assets/olist_geolocation_dataset.csv
          keys: [geolocation_zip_code_prefix]
        - entity: order_item
          path: ../assets/olist_order_items_dataset.csv
          keys: [order_item_id]
        - entity: order_payment
          path: ../assets/olist_order_payments_dataset.csv
          keys: [order_payment_id]          
        - entity: order_review
          path: ../assets/olist_order_reviews_dataset.csv
          keys: [order_reviews_id]   
        - entity: order
          path: ../assets/olist_orders_dataset.csv
          keys: [order_id]   
        - entity: product
          path: ../assets/olist_products_dataset.csv
          keys: [product_id]   
        - entity: seller
          path: ../assets/olist_sellers_dataset.csv
          keys: [seller_id]   
        - entity: product_category_name_translation
          path: ../assets/product_category_name_translation.csv
          keys: [product_category_id]

Test that extractor settings are valid using meltano config :

meltano config tap-csv test

# Add a Loader to Load Data into BigQuery

Before we can load the data into BigQuery, let's create a new project called meltano-learn. Then create a dataset in BigQuery called m2_ingestion (multi-region: US).

Finally, create a service account with the BigQuery Admin role and download the JSON key file to your local machine.

We will now add a loader to load the data into BigQuery.

meltano add loader target-bigquery

meltano config target-bigquery set --interactive

Set the following options:

project: <GCP Project ID>
dataset: m2_ingestion
credentials_path: full path to the service account key file
method: batch_job
denormalized: true
flattening_enabled: true
flattening_max_depth: 1
batch_size: 5000

We can now run the full ingestion (extract-load) pipeline.

meltano run tap-csv target-bigquery

# Job Failed: BrokenPipeError: [Errno 32] Broken pipe

Option A — Increase the file descriptor limit
Temporarily raise limit (for current shell session)
ulimit -n 4096

Option B — Batch your CSVs
	•	Instead of reading hundreds of files at once, process them in smaller groups.
	•	In meltano.yml, only list a subset of CSVs as streams per run.

Option C - Provision a GCP VM with more I/O to run job

Option D - Reduce batch size
	•	In your meltano.yml or tap configuration, reduce how many records are buffered before flushing.

    config:
  batch_size: 500  # default might be 1000+

Combination of options A, B and D, the local machine is able to complete the jobs successfully with ~10mins per batch.

# Job Failed 2:

When running product_category_name_translation.csv

[info     ] google.api_core.exceptions.BadRequest: 400 POST https://bigquery.googleapis.com/bigquery/v2/projects/sound-vehicle-468314-q4/datasets/m2_ingestion/tables?prettyPrint=false: Invalid field name "﻿product_category_name". Fields must contain the allowed characters, and be at most 300 characters long. For allowed characters, please refer to https://cloud.google.com/bigquery/docs/schemas#column_names

# Possible Solution 2 (Not working):

In meltano.yml for tap-csv, add:

plugins:
  extractors:
    - name: tap-csv
      config:
        files:
          - entity: product_category_name_translation
            path: ./data/product_category_name_translation.csv
            keys: [product_category_name]
            clean_headers: true


# Fix:
UTF-8 BOM issue in your CSV header. The fix is to physically strip the BOM from the file before tap-csv hands it off to target-bigquery.

import pandas as pd

df = pd.read_csv("product_category_name_translation.csv", encoding="utf-8-sig")
df.to_csv("product_category_name_translation.csv", index=False)

or

sed -i '1s/^\xEF\xBB\xBF//' product_category_name_translation.csv
