In [None]:
from google.colab import drive
drive.mount('/content/drive')


Mounted at /content/drive


Installing Dependencies

In [None]:
!pip install pyspark==3.3.2 pandas pyarrow fastparquet requests


Collecting pyspark==3.3.2
  Downloading pyspark-3.3.2.tar.gz (281.4 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m281.4/281.4 MB[0m [31m4.3 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting fastparquet
  Downloading fastparquet-2024.11.0-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (4.2 kB)
Collecting py4j==0.10.9.5 (from pyspark==3.3.2)
  Downloading py4j-0.10.9.5-py2.py3-none-any.whl.metadata (1.5 kB)
Downloading py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m199.7/199.7 kB[0m [31m7.6 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading fastparquet-2024.11.0-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (1.8 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.8/1.8 MB[0m [31m16.0 MB/s[0m eta [36m0:00:00[0m
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ..

CSV'S Landing

In [None]:
import os
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
from datetime import date

#project root in Google Drive
PROJECT = "/content/drive/MyDrive/AdventureWorks_DE_Project/dataengneering-project"

RAW = PROJECT + "/data/raw"
LANDING_ROOT = PROJECT + "/landing"
today = date.today().isoformat()

# Ensure root folders exist
os.makedirs(RAW, exist_ok=True)
os.makedirs(LANDING_ROOT, exist_ok=True)

# Convert CSV → Parquet and land into date-partitioned folders
for fname in sorted(os.listdir(RAW)):
    if fname.lower().endswith(".csv"):
        path = os.path.join(RAW, fname)
        df = pd.read_csv(path)

        table_name = os.path.splitext(fname)[0]
        outdir = os.path.join(LANDING_ROOT, table_name, today)
        os.makedirs(outdir, exist_ok=True)

        outpath = os.path.join(outdir, "data.parquet")
        df.to_parquet(outpath, index=False)

        print("Saved", outpath, "rows:", len(df))



API's Landing


In [None]:
import os, requests, pandas as pd
from datetime import date

# --- Correct Base URL (must end with /) ---
API_BASE = "https://demodata.grapecity.com/"

# --- Your project path in Drive ---
PROJECT = "/content/drive/MyDrive/AdventureWorks_DE_Project/dataengneering-project"

LANDING_ROOT = PROJECT + "/landing"
today = date.today().isoformat()

# Endpoints (your list is correct)
endpoints = {
    "businessEntityAddresses": "adventureworks/api/v1/businessEntityAddresses",
    "addressTypes": "adventureworks/api/v1/addressTypes",
    "billOfMaterials": "adventureworks/api/v1/billOfMaterials",
    "productCategories": "adventureworks/api/v1/productCategories",
    "productModels": "adventureworks/api/v1/productModels",
    "products": "adventureworks/api/v1/products",
    "productSubcategories": "adventureworks/api/v1/productSubcategories",
    "productDescriptions": "adventureworks/api/v1/productDescriptions",
    "productReviews": "adventureworks/api/v1/productReviews",
    "salesOrders": "adventureworks/api/v1/salesOrders",
    "salesReasons": "adventureworks/api/v1/salesReasons",
    "salesTaxRates": "adventureworks/api/v1/salesTaxRates",
    "creditCards": "adventureworks/api/v1/creditCards",
    "customers": "adventureworks/api/v1/customers",
    "employees": "adventureworks/api/v1/employees",
    "locations": "adventureworks/api/v1/locations",

}

os.makedirs(LANDING_ROOT, exist_ok=True)

for name, path in endpoints.items():
    url = API_BASE + path
    print("GET", url)

    r = requests.get(url, timeout=60)
    r.raise_for_status()

    payload = r.json()

    if isinstance(payload, dict):
        arr = None
        for v in payload.values():
            if isinstance(v, list):
                arr = v
                break
        if arr is None:
            arr = payload if isinstance(payload, list) else []
    else:
        arr = payload

    if not arr:
        print("No data for", name)
        continue

    df = pd.DataFrame(arr)

    outdir = os.path.join(LANDING_ROOT, f"api_{name}", today)
    os.makedirs(outdir, exist_ok=True)

    outpath = os.path.join(outdir, "data.parquet")
    df.to_parquet(outpath, index=False)

    print("Saved", outpath, "rows:", len(df))


GET https://demodata.grapecity.com/adventureworks/api/v1/businessEntityAddresses
Saved /content/drive/MyDrive/AdventureWorks_DE_Project/dataengneering-project/landing/api_businessEntityAddresses/2025-11-30/data.parquet rows: 100
GET https://demodata.grapecity.com/adventureworks/api/v1/addressTypes
Saved /content/drive/MyDrive/AdventureWorks_DE_Project/dataengneering-project/landing/api_addressTypes/2025-11-30/data.parquet rows: 6
GET https://demodata.grapecity.com/adventureworks/api/v1/billOfMaterials
Saved /content/drive/MyDrive/AdventureWorks_DE_Project/dataengneering-project/landing/api_billOfMaterials/2025-11-30/data.parquet rows: 100
GET https://demodata.grapecity.com/adventureworks/api/v1/productCategories
Saved /content/drive/MyDrive/AdventureWorks_DE_Project/dataengneering-project/landing/api_productCategories/2025-11-30/data.parquet rows: 4
GET https://demodata.grapecity.com/adventureworks/api/v1/productModels
Saved /content/drive/MyDrive/AdventureWorks_DE_Project/dataengneeri

Landing Bronze Parquet

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import current_timestamp, to_date

spark = SparkSession.builder \
    .appName("AdventureWorks_Bronze") \
    .getOrCreate()

LANDING_ROOT = PROJECT + "/landing"
BRONZE_ROOT = PROJECT + "/bronze"
today = date.today().isoformat()


Products Bronze Transform

In [None]:
# read landing parquet for products (if CSV-to-parquet saved it)
p_path = f"{LANDING_ROOT}/products/{today}/data.parquet"
# if API used, path may be landing/api_products/...
if not os.path.exists(p_path):
    p_path = f"{LANDING_ROOT}/api_products/{today}/data.parquet"

df = spark.read.parquet(p_path)
# Example schema enforcement: cast columns if required (customize per table)
# For demonstration, just deduplicate and add ingestion columns:
df_clean = df.dropDuplicates().withColumn("load_timestamp", current_timestamp()).withColumn("ingestion_date", to_date(current_timestamp()))

out_dir = f"{BRONZE_ROOT}/products/{today}"
df_clean.write.mode("overwrite").parquet(out_dir)
print("Bronze products written to", out_dir)


Bronze products written to /content/drive/MyDrive/AdventureWorks_DE_Project/dataengneering-project/bronze/products/2025-11-30


Downloading Data Parquets for Bronze Layer


In [None]:
import shutil

src = "/content/drive/MyDrive/AdventureWorks_DE_Project/dataengneering-project/landing"
dst = "/content/bronze.zip"

shutil.make_archive("/content/bronze", 'zip', src)

from google.colab import files
files.download("/content/bronze.zip")


<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

In [None]:
import glob
files = glob.glob(r"C:\Users\SARA\Downloads\dataengineering-project\bronze\**\*.parquet", recursive=True)
print(files)
print("Total:", len(files))


[]
Total: 0
