**STEPS TO FOLLOW**

- Pick the Active Eligible table from config Table where active_flag=1, load_flag=1 and bq_to_gcs_status is 'COMPLETED'
- Reads the parqut file from GCS (GCS path is like gcs_path/dt=/*.parquet) means take all the files present inside gcs path for any particular timestamp 
- Uses the latest dt(datetime file) folder
- Writes the delta bronze path
- Create the Table from bronze path to bronze dataset 
- Update the config table

In [0]:
dbutils.widgets.text("mysql_host","34.56.24.205")
mysql_host = dbutils.widgets.get("mysql_host")

dbutils.widgets.text("mysql_port","3306")
mysql_port = dbutils.widgets.get("mysql_port")

dbutils.widgets.text("mysql_root","root")
mysql_user = dbutils.widgets.get("mysql_root")

dbutils.widgets.text("mysql_db","GCPMigrationMeta")
mysql_db = dbutils.widgets.get("mysql_db")

dbutils.widgets.text("mysql_password","Anupamnaina#18")
mysql_password = dbutils.widgets.get("mysql_password")

dbutils.widgets.text("bq_sa_key",'/Volumes/workspace/default/csv/datamigrationproject-483310-739969975183.json')
GOOGLE_APPLICATION_CREDENTIALS = dbutils.widgets.get("bq_sa_key")

# Unity Catalog widgets
dbutils.widgets.text("catalog_name", "bronze")
catalog_name = dbutils.widgets.get("catalog_name")

dbutils.widgets.text("schema_name", "raw")
schema_name = dbutils.widgets.get("schema_name")

In [0]:
%pip install mysql-connector-python google-cloud-bigquery google-cloud-storage

In [0]:
import json, os, datetime as dt
import mysql.connector as mc
from google.cloud import bigquery
from contextlib import contextmanager 
# contextlib python library help us to setup and clean up the resources automatically

MYSQL = {
    "host": dbutils.widgets.get("mysql_host").strip(),
    "port": int(dbutils.widgets.get("mysql_port")),
    "db":   dbutils.widgets.get("mysql_db").strip(),
    "user": dbutils.widgets.get("mysql_user").strip(),
    "pwd":  dbutils.widgets.get("mysql_password"),
}

catalog_name = dbutils.widgets.get("catalog_name").strip()
schema_name  = dbutils.widgets.get("schema_name").strip()

# GCP service account key path (convert dbfs:/ to /dbfs/)   
GCP_KEY = dbutils.widgets.get("GOOGLE_APPLICATION_CREDENTIALS").replace('dbfs:/','/dbfs/') 

assert os.path.exists(GCP_KEY), f"GCP key not found at {GCP_KEY}"
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = GCP_KEY  # auth for GCS SDK

In [0]:
@contextmanager
def mysql_conn():
    conn = mc.connect(
        host = MYSQL['host'], 
        port = MYSQL['port'],
        user = MYSQL['user'],
        password = MYSQL['pwd'],
        database = MYSQL['db']
    )
    try:
        yield conn
    except Exception as e:
        print(f"Failed to connect to MYSQL \nHere is the error -: \n{e}")
    # yield returns a value temporarily and pauses the function, then resumes later from the same place.
    finally:
        conn.close()

In [0]:
def fetch_eligible_rows():
    """
    Fetch rows from config_table that are eligible for processing:
    - active_flag=1
    - load_flag=1
    - bq_to_gcs_status='COMPLETED'
    - gcs_to_bronze_status in ('NOT_STARTED','FAILED')
    """
    with mysql_conn() as conn:
        cur = conn.cursor(dictionary=True)
        cur.execute("""
          SELECT table_name, gcs_path, target_path
          FROM config_table
          WHERE active_flag=1
            AND load_flag=1
            AND bq_to_gcs_status='COMPLETED'
            AND gcs_to_bronze_status IN ('NOT_STARTED','FAILED')
          ORDER BY table_name
        """)
        rows = cur.fetchall()
        cur.close()
    return rows

In [0]:
def set_bronze_status(table_name, status, err = None):
     with mysql_conn() as conn:
        cur = conn.cursor()
        if status == 'IN_PROGRESS':
            # Mark as in progress and clear any previous error 
            cur.execute("""UPDATE GCPMigrationMeta.config_table set gcs_to_bronze_status="IN_PROGRESS", last_run_ts=NOW(), error_message=NULL where table=%s""", (table_name))

        elif status == 'COMPLETED':
            # Mark as in progress and clear any previous error 
            cur.execute("""UPDATE GCPMigrationMeta.config_table set gcs_to_bronze_status="COMPLETED", last_run_ts=NOW(), error_message=NULL where table=%s""", (table_name))

        else:
            # Mark as in progress and clear any previous error 
            cur.execute("""UPDATE GCPMigrationMeta.config_table set gcs_to_bronze_status="FAILED", last_run_ts=NOW(), error_message = '{str(err)[:2000] if err else "FAILED"},                     WHERE table_name = '{table_name}')    

        

In [0]:
def reset_load_flag(table_name):
    with mysql_conn() as conn:
        cur = conn.cursor()
        cur.execute("""UPDATE CPMigrationMeta.config_table set load_flag=0 where table_name=%s""", (table_name))
        conn.commit()
        cur.close()

In [0]:
# This will return us the bucket name
def gcs_client():
    # storage is a gcs inbuilt fun that will give us the storage details
    return storage.Client()

In [0]:
def split_gcs(gcs_uri:str):
    """Split a GCS URI into bucket and path.""""
    assert gcs_uri.startwith("gs://")
    p = urlparse(gcs_uri)
    return p.netloc, p.path.lstrip("/")  #netloc will give us the bucket name 

In [0]:
# We have to get the uri for the lastest one 
def latest_dt_uri(gcs_base: str) -> str:
    """
    Find the latest dt=YYYYMMDDTHHMMSSZ folder under gcs_base.
    Return a wildcard URI for all Parquet files in that folder.
    """
    bucket_name, base_prefix = split_gs(gcs_base.rstrip("/"))
    cli = gcs_client()

    # List all objects under base_prefix/dt=
    search_prefix = f"{base_prefix}/dt="
    dts = set()
    for blob in cli.list_blobs(bucket_name, prefix=search_prefix):
        # Look for '.../dt=xxxxx/' in blob.name or files under it
        m = re.search(r"dt=([\dT]+Z)/", blob.name)
        if m:
            dts.add(m.group(1))
    if not dts:
        # fallback: allow wildcard if no dt folders found
        return f"gs://{bucket_name}/{base_prefix}/dt=*/*.parquet"
    latest = sorted(dts)[-1]
    return f"gs://{bucket_name}/{base_prefix}/dt={latest}/*.parquet"

In [0]:
# how this latest_dt_uri fun will work
latest_dt_url("bigquerytogcsmigration/exports/ods/ods_order_items")
now from this folder this function will return the latest datetime uri5

In [0]:
# We have to check that the given directory path like this "bigquerytogcsmigration/exports/ods/ods_order_items" exists or not
def ensure_dir(local_path: str):
    """Create a local directory if does not exist"""
    os.markedirs(local_path, exist_ok=True)

In [0]:
def prefix_from_wildcard(gcs_uri: str) -> tuple[str, str]:
    """
    Given a GCS URI with wildcard, return (bucket, prefix) for listing.
    Example: gs://bucket/foo/bar/dt=.../*.parquet -> (bucket, 'foo/bar/dt=.../')
    """
    bucket, path = _split_gs(gcs_uri)
    if "*" in path:
        prefix = path[:path.rfind("/") + 1]
    else:
        prefix = path if path.endswith("/") else path + "/"
    return bucket, prefix

In [0]:
def copy_gcs_prefix_to_dbfs(gcs_uri_wildcard: str, dbfs_dir: str) -> str:
    """
    Copy all objects under the wildcard's parent prefix from GCS to DBFS directory.
    Returns the DBFS directory containing the downloaded Parquet files.
    """
    bucket, prefix = _prefix_from_wildcard(gcs_uri_wildcard)
    cli = gcs_client()
    bkt = cli.bucket(bucket)

    local_dir = dbfs_dir.replace("dbfs:/", "/dbfs/")
    if os.path.exists(local_dir):
        shutil.rmtree(local_dir)
    ensure_dir(local_dir)

    n = 0
    for blob in cli.list_blobs(bucket, prefix=prefix):
        if blob.name.endswith("/"):
            continue
        local_file = os.path.join(local_dir, os.path.basename(blob.name))
        blob.download_to_filename(local_file)
        n += 1
    if n == 0:
        raise FileNotFoundError(f"No objects found under gs://{bucket}/{prefix}")
    print(f"↳ Copied {n} file(s) from gs://{bucket}/{prefix} → {dbfs_dir}")
    return dbfs_dir

In [0]:
def to_dbfs(path: str) -> str:
    """
    Convert a path to DBFS format if not already in DBFS.
    """
    p = path.strip()
    if p.startswith("dbfs:/") or p.startswith("dbfs:"):
        return p
    if p.startswith("/"):
        return "dbfs:" + p
    return "dbfs:/" + p

# Main ETL Logic

In [0]:
rows = fetch_eligible_tables()
if not rows:
    print("Nothing to process (No eilgible tables)")
else:
    spark.sql(f"Create database if not exist {bronze}")
    print(f"Processing {len(rows)} table(s): {[r['table_name'] for r in rows]}")

    for r in rows:
        t = r['table_name']
        gcs_base = r['gcs_path']
        dst_path = to_dbfs(r['target_path'])
        print(f"Processing Started for ----> {t}...")

    try:
        set_bronze_status(t, 'In_Progress')

        # Pick the latest dt folder in GCS of table
        src_uri = latest_dt_uri(gcs_base)
        print(f"latest_uri: {src_uri}")   

        # Copy the parquet files from GCS to DBFS Directory
        stage_dir = f"dbfs:/tmp/gcs_stage/{t}"
        stage_dir = copy_gcs_prefix_to_dbfs(src_uri, stage_dir)
        print(f"\n{t}: staged to {stage_dir}")

        df = spark.read.parquet(stage_dir)

        # Write it to the Delta Table format
        # main usecase is to bring all the tables to the databrics azure platform present in gcs bucket
        table_name = t
        full_table_name = f"{catalog_name}.{schema_name}.{table_name}"

        df.write \
        .format("delta") \
        .mode("overwrite") \
        .option("overwriteSchema", "true") \
        .saveAsTable(full_table_name)

        # Register as delta table to databricks using destination path
        spark.sql(f"Drop Table if Exists {catalog_name}.{schema_name}.{t}")
        spark.sql(f"Create table {catalog_name}.{schema_name}.{t} using delta location '{dst_path}'")
        print(f"\n{t}: written to {dst_path}")
        reset_load_flag(t)
        set_bronze_status(t, "Completed")
        print(f"{t} Table has been written at {dst_path}")

    except:
        set_bronze_status(t,'Failed')
        print(f"{t} Table has failed to write at {dst_path}")

dbutils.fs.rm("dbfs:/tmp/gcs_stage",recu)