In [1]:
import os
import glob
import pandas as pd
import matplotlib.pyplot as plt
import numpy as np
import random
from datetime import datetime, timedelta
from dateutil.relativedelta import relativedelta
import pprint
import pyspark
import pyspark.sql.functions as F

from pyspark.sql.functions import col
from pyspark.sql.types import StringType, IntegerType, FloatType, DateType, DoubleType, TimestampType

import utils.data_processing_bronze_table
import utils.data_processing_silver_table
import utils.data_processing_gold_table

In [2]:
# Initialize SparkSession
spark = pyspark.sql.SparkSession.builder \
    .appName("dev") \
    .master("local[*]") \
    .getOrCreate()

# Set log level to ERROR to hide warnings
spark.sparkContext.setLogLevel("ERROR")

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/05/22 11:46:49 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


## Build Bronze Table

In [4]:
raw_base    = "data/raw"               # raw
bronze_base = "datamart/bronze"        # Bronze 

tables = {
    "feature_clickstream":         "feature_clickstream.csv",
    "features_attributes": "features_attributes.csv",
    "features_financials": "features_financials.csv",
    "lms_loan_daily":     "lms_loan_daily.csv",
}

In [7]:
for tbl_name, filename in tables.items():
    # 1) raw path
    raw_path = os.path.join(raw_base, filename)
    # 2) automatically choose the reading format
    if filename.endswith(".csv"):
        df = spark.read \
                  .option("header", "true") \
                  .option("inferSchema", "true") \
                  .csv(raw_path)
    elif filename.endswith(".json"):
        df = spark.read.json(raw_path)
    elif filename.endswith(".parquet"):
        df = spark.read.parquet(raw_path)
    else:
        raise ValueError(f"not supported file format：{filename}")

    # 3) add a timestamp to all the tables
    df = df.withColumn("ingest_time", F.current_timestamp())

    # 4) write to bronze table
    out_path = os.path.join(bronze_base, tbl_name)
    df.write.mode("overwrite").parquet(out_path)

    print(f"[Bronze] is writed  {tbl_name} to {out_path}")


                                                                                

[Bronze] is writed  feature_clickstream to datamart/bronze/feature_clickstream


                                                                                

[Bronze] is writed  features_attributes to datamart/bronze/features_attributes


                                                                                

[Bronze] is writed  features_financials to datamart/bronze/features_financials
[Bronze] is writed  lms_loan_daily to datamart/bronze/lms_loan_daily


In [12]:
# utils/data_processing_bronze_table.py

import os
import pyspark.sql.functions as F

def process_bronze(spark, raw_base, bronze_base, tables):
    """
    - spark: SparkSession
    - raw_base: 原始数据文件夹
    - bronze_base: Bronze 层根目录
    - tables: dict{name: filename}
    """
    for tbl_name, filename in tables.items():
        raw_path = os.path.join(raw_base, filename)
        # 根据后缀自动读取
        if filename.endswith(".csv"):
            df = spark.read.option("header","true")\
                           .option("inferSchema","true")\
                           .csv(raw_path)
        elif filename.endswith(".json"):
            df = spark.read.json(raw_path)
        elif filename.endswith(".parquet"):
            df = spark.read.parquet(raw_path)
        else:
            raise ValueError(f"Unsupported file format: {filename}")
        # 加入摄取时间
        df = df.withColumn("ingest_time", F.current_timestamp())
        # 写出
        out_path = os.path.join(bronze_base, tbl_name)
        df.write.mode("overwrite").parquet(out_path)
        print(f"[Bronze] 写入 {tbl_name} → {out_path}")
    # 若需要，可以返回所有 DF 的 dict
    # return {name: spark.read.parquet(os.path.join(bronze_base,name)) for name in tables}
