In [3]:
from google.cloud import storage, bigquery
import pandas as pd
from pyspark.sql import SparkSession
import datetime
import json

# Initialize Spark Session
spark = SparkSession.builder.appName("RetailerMySQLToLanding").getOrCreate()

# Google Cloud Storage (GCS) Configuration variables
GCS_BUCKET = "retailer-datalake-project-1172025"
LANDING_PATH = f"gs://{GCS_BUCKET}/landing/retailer-db/"
ARCHIVE_PATH = f"gs://{GCS_BUCKET}/landing/retailer-db/archive/"
CONFIG_FILE_PATH = f"gs://{GCS_BUCKET}/configs/retailer_config.csv"

# BigQuery Configuration
BQ_PROJECT = "expanded-league-477308-m5"
BQ_AUDIT_TABLE = f"{BQ_PROJECT}.temp_dataset.audit_log"
BQ_LOG_TABLE = f"{BQ_PROJECT}.temp_dataset.pipeline_logs"
BQ_TEMP_PATH = f"{GCS_BUCKET}/temp/"  

# MySQL Configuration
MYSQL_CONFIG = {

    "url": "jdbc:mysql://34.57.168.126:3306/retailerDB"
    "?cloudSqlInstance=expanded-league-477308-m5:us-central1:retailer-mysql-db"
    "useSSL=false"
    "&allowPublicKeyRetrieval=true",
    "driver": "com.mysql.cj.jdbc.Driver",
    "user": "myuser",
    "password": "Mysql@1245", 
}



# Initialize GCS & BigQuery Clients
storage_client = storage.Client()
bq_client = bigquery.Client()

# Logging Mechanism
log_entries = []  # Stores logs before writing to GCS
##---------------------------------------------------------------------------------------------------##
def log_event(event_type, message, table=None):
    """Log an event and store it in the log list"""
    log_entry = {
        "timestamp": datetime.datetime.now().isoformat(),
        "event_type": event_type,
        "message": message,
        "table": table
    }
    log_entries.append(log_entry)
    print(f"[{log_entry['timestamp']}] {event_type} - {message}")  # Print for visibility
##---------------------------------------------------------------------------------------------------##
def save_logs_to_gcs():
    """Save logs to a JSON file and upload to GCS"""
    log_filename = f"pipeline_log_{datetime.datetime.now().strftime('%Y%m%d%H%M%S')}.json"
    log_filepath = f"temp/pipeline_logs/{log_filename}"  
    
    json_data = json.dumps(log_entries, indent=4)

    # Get GCS bucket
    bucket = storage_client.bucket(GCS_BUCKET)
    blob = bucket.blob(log_filepath)
    
    # Upload JSON data as a file
    blob.upload_from_string(json_data, content_type="application/json")

    print(f"✅ Logs successfully saved to GCS at gs://{GCS_BUCKET}/{log_filepath}")
    
def save_logs_to_bigquery():
    """Save logs to BigQuery"""
    if log_entries:
        log_df = spark.createDataFrame(log_entries)
        log_df.write.format("bigquery") \
            .option("table", BQ_LOG_TABLE) \
            .option("temporaryGcsBucket", BQ_TEMP_PATH) \
            .mode("append") \
            .save()
        print("✅ Logs stored in BigQuery for future analysis")


##---------------------------------------------------------------------------------------------------##
# Function to Read Config File from GCS
def read_config_file():
    df = spark.read.csv(CONFIG_FILE_PATH, header=True)
    log_event("INFO", "✅ Successfully read the config file")
    return df
##---------------------------------------------------------------------------------------------------##
# Function to Move Existing Files to Archive
def move_existing_files_to_archive(table):
    blobs = list(storage_client.bucket(GCS_BUCKET).list_blobs(prefix=f"landing/retailer-db/{table}/"))
    existing_files = [blob.name for blob in blobs if blob.name.endswith(".json")]
    
    if not existing_files:
        log_event("INFO", f"No existing files for table {table}")
        return
    
    for file in existing_files:
        source_blob = storage_client.bucket(GCS_BUCKET).blob(file)
        
        # Extract Date from File Name (products_27032025.json)
        date_part = file.split("_")[-1].split(".")[0]
        year, month, day = date_part[-4:], date_part[2:4], date_part[:2]
        
        # Move to Archive
        archive_path = f"landing/retailer-db/archive/{table}/{year}/{month}/{day}/{file.split('/')[-1]}"
        destination_blob = storage_client.bucket(GCS_BUCKET).blob(archive_path)
        
        # Copy file to archive and delete original
        storage_client.bucket(GCS_BUCKET).copy_blob(source_blob, storage_client.bucket(GCS_BUCKET), destination_blob.name)
        source_blob.delete()
        
        log_event("INFO", f"✅ Moved {file} to {archive_path}", table=table)    
        
##---------------------------------------------------------------------------------------------------##

# Function to Get Latest Watermark from BigQuery Audit Table
def get_latest_watermark(table_name):
    query = f"""
        SELECT MAX(load_timestamp) AS latest_timestamp
        FROM `{BQ_AUDIT_TABLE}`
        WHERE tablename = '{table_name}'
    """
    query_job = bq_client.query(query)
    result = query_job.result()
    for row in result:
        return row.latest_timestamp if row.latest_timestamp else "1900-01-01 00:00:00"
    return "1900-01-01 00:00:00"
        
##---------------------------------------------------------------------------------------------------##

# Function to Extract Data from MySQL and Save to GCS
def extract_and_save_to_landing(table, load_type, watermark_col):
    try:
        # Get Latest Watermark
        last_watermark = get_latest_watermark(table) if load_type.lower() == "incremental" else None
        log_event("INFO", f"Latest watermark for {table}: {last_watermark}", table=table)
        
        # Generate SQL Query
        query = f"(SELECT * FROM {table}) AS t" if load_type.lower() == "full load" else \
                f"(SELECT * FROM {table} WHERE {watermark_col} > '{last_watermark}') AS t"
        
        # Read Data from MySQL
        df = (spark.read
                .format("jdbc")
                .option("url", MYSQL_CONFIG["url"])
                .option("user", MYSQL_CONFIG["user"])
                .option("password", MYSQL_CONFIG["password"])
                .option("driver", MYSQL_CONFIG["driver"])
                .option("dbtable", query)
                .load())
        log_event("SUCCESS", f"✅ Successfully extracted data from {table}", table=table)
        
        # Convert Spark DataFrame to JSON
        pandas_df = df.toPandas()
        json_data = pandas_df.to_json(orient="records", lines=True)
        
        # Generate File Path in GCS
        today = datetime.datetime.today().strftime('%d%m%Y')
        JSON_FILE_PATH = f"landing/retailer-db/{table}/{table}_{today}.json"
        
        # Upload JSON to GCS
        bucket = storage_client.bucket(GCS_BUCKET)
        blob = bucket.blob(JSON_FILE_PATH)
        blob.upload_from_string(json_data, content_type="application/json")

        log_event("SUCCESS", f"✅ JSON file successfully written to gs://{GCS_BUCKET}/{JSON_FILE_PATH}", table=table)
        
        # Insert Audit Entry
        audit_df = spark.createDataFrame([
            (table, load_type, df.count(), datetime.datetime.now(), "SUCCESS")], ["tablename", "load_type", "record_count", "load_timestamp", "status"])

        (audit_df.write.format("bigquery")
            .option("table", BQ_AUDIT_TABLE)
            .option("temporaryGcsBucket", GCS_BUCKET)
            .mode("append")
            .save())

        log_event("SUCCESS", f"✅ Audit log updated for {table}", table=table)
    
    except Exception as e:
        log_event("ERROR", f"Error processing {table}: {str(e)}", table=table)
        
##---------------------------------------------------------------------------------------------------##

# Main Execution
config_df = read_config_file()

for row in config_df.collect():
    if row["is_active"] == '1':
        db, src, table, load_type, watermark, _, targetpath = row
        move_existing_files_to_archive(table)
        extract_and_save_to_landing(table, load_type, watermark)

save_logs_to_gcs()
save_logs_to_bigquery()       
        
print("✅ Pipeline completed successfully!")

[2025-12-23T10:27:41.768572] INFO - ✅ Successfully read the config file
[2025-12-23T10:27:42.524478] INFO - ✅ Moved landing/retailer-db/products/products_23122025.json to landing/retailer-db/archive/products/2025/12/23/products_23122025.json
[2025-12-23T10:27:42.524600] INFO - Latest watermark for products: None
[2025-12-23T10:27:42.547115] SUCCESS - ✅ Successfully extracted data from products
[2025-12-23T10:27:42.793359] SUCCESS - ✅ JSON file successfully written to gs://retailer-datalake-project-1172025/landing/retailer-db/products/products_23122025.json


                                                                                

[2025-12-23T10:27:49.543306] SUCCESS - ✅ Audit log updated for products
[2025-12-23T10:27:49.873825] INFO - ✅ Moved landing/retailer-db/categories/categories_23122025.json to landing/retailer-db/archive/categories/2025/12/23/categories_23122025.json
[2025-12-23T10:27:49.873979] INFO - Latest watermark for categories: None
[2025-12-23T10:27:49.896327] SUCCESS - ✅ Successfully extracted data from categories
[2025-12-23T10:27:50.093939] SUCCESS - ✅ JSON file successfully written to gs://retailer-datalake-project-1172025/landing/retailer-db/categories/categories_23122025.json


                                                                                

[2025-12-23T10:28:01.067254] SUCCESS - ✅ Audit log updated for categories
[2025-12-23T10:28:01.429415] INFO - ✅ Moved landing/retailer-db/customers/customers_23122025.json to landing/retailer-db/archive/customers/2025/12/23/customers_23122025.json
[2025-12-23T10:28:02.084148] INFO - Latest watermark for customers: 1900-01-01 00:00:00
[2025-12-23T10:28:02.105812] SUCCESS - ✅ Successfully extracted data from customers
[2025-12-23T10:28:02.327507] SUCCESS - ✅ JSON file successfully written to gs://retailer-datalake-project-1172025/landing/retailer-db/customers/customers_23122025.json


                                                                                

[2025-12-23T10:28:09.249374] SUCCESS - ✅ Audit log updated for customers
[2025-12-23T10:28:09.679735] INFO - ✅ Moved landing/retailer-db/orders/orders_23122025.json to landing/retailer-db/archive/orders/2025/12/23/orders_23122025.json
[2025-12-23T10:28:10.456353] INFO - Latest watermark for orders: 1900-01-01 00:00:00
[2025-12-23T10:28:10.476822] SUCCESS - ✅ Successfully extracted data from orders
[2025-12-23T10:28:10.687881] SUCCESS - ✅ JSON file successfully written to gs://retailer-datalake-project-1172025/landing/retailer-db/orders/orders_23122025.json


                                                                                

[2025-12-23T10:28:15.959590] SUCCESS - ✅ Audit log updated for orders
[2025-12-23T10:28:16.323469] INFO - ✅ Moved landing/retailer-db/order_items/order_items_23122025.json to landing/retailer-db/archive/order_items/2025/12/23/order_items_23122025.json
[2025-12-23T10:28:16.955675] INFO - Latest watermark for order_items: 1900-01-01 00:00:00
[2025-12-23T10:28:16.978551] SUCCESS - ✅ Successfully extracted data from order_items
[2025-12-23T10:28:17.249923] SUCCESS - ✅ JSON file successfully written to gs://retailer-datalake-project-1172025/landing/retailer-db/order_items/order_items_23122025.json


                                                                                

[2025-12-23T10:28:22.687035] SUCCESS - ✅ Audit log updated for order_items
✅ Logs successfully saved to GCS at gs://retailer-datalake-project-1172025/temp/pipeline_logs/pipeline_log_20251223102822.json


                                                                                

✅ Logs stored in BigQuery for future analysis
✅ Pipeline completed successfully!
