In [41]:
from google.cloud import storage, bigquery
import pandas as pd
from pyspark.sql import SparkSession
import datetime
import json

# Initialize GCS & BigQuery Clients
storage_client = storage.Client()
bq_client = bigquery.Client()

# Initialize Spark Session
spark = SparkSession.builder.appName("HospitalAMySQLToLanding").getOrCreate()

# Google Cloud Storage (GCS) Configuration
GCS_BUCKET = "test-project-1857-de"
HOSPITAL_NAME = "hospital-a"
LANDING_PATH = f"gs://{GCS_BUCKET}/landing/{HOSPITAL_NAME}/"
ARCHIVE_PATH = f"gs://{GCS_BUCKET}/landing/{HOSPITAL_NAME}/archive/"
CONFIG_FILE_PATH = f"gs://{GCS_BUCKET}/configs/load_config.csv"

# BigQuery Configuration
BQ_PROJECT = "test-project-1857"
BQ_AUDIT_TABLE = f"{BQ_PROJECT}.temp_dataset.audit_log"
BQ_LOG_TABLE = f"{BQ_PROJECT}.temp_dataset.pipeline_logs"
BQ_TEMP_PATH = f"{GCS_BUCKET}/temp/"  

# MySQL Configuration
# MYSQL_CONFIG = {
#     "url": "jdbc:mysql://34.60.182.151:3306/hospital_a_db?useSSL=false&allowPublicKeyRetrieval=true",
#     "driver": "com.mysql.cj.jdbc.Driver",
#     "user": "user1",
#     "password": "User1-1234"
# }

MYSQL_CONFIG = {
"url": "jdbc:mysql://34.60.182.151:3306/hospital_a_db",
"driver": "com.mysql.cj.jdbc.Driver",
"user": "user1",
"password": "User1-1234",
"socketFactory": "com.google.cloud.sql.mysql.SocketFactory",
"cloudSqlInstance": "test-project-1857:us-central1:hospital-a-mysql-db",
"useSSL": "false",
 "database": "hospital_a_db"    
}

##------------------------------------------------------------------------------------------------------------------##
# Logging Mechanism
log_entries = []  # Stores logs before writing to GCS

def log_event(event_type, message, table=None):
    """Log an event and store it in the log list"""
    log_entry = {
        "timestamp": datetime.datetime.now().isoformat(),
        "event_type": event_type,
        "message": message,
        "table": table
    }
    log_entries.append(log_entry)
    print(f"[{log_entry['timestamp']}] {event_type} - {message}")  # Print for visibility
    
def save_logs_to_gcs():
    """Save logs to a JSON file and upload to GCS"""
    log_filename = f"pipeline_log_{datetime.datetime.now().strftime('%Y%m%d%H%M%S')}.json"
    log_filepath = f"temp/pipeline_logs/{log_filename}"  
    
    json_data = json.dumps(log_entries, indent=4)

    # Get GCS bucket
    bucket = storage_client.bucket(GCS_BUCKET)
    blob = bucket.blob(log_filepath)
    
    # Upload JSON data as a file
    blob.upload_from_string(json_data, content_type="application/json")

    print(f"✅ Logs successfully saved to GCS at gs://{GCS_BUCKET}/{log_filepath}")

def save_logs_to_bigquery():
    """Save logs to BigQuery"""
    if log_entries:
        log_df = spark.createDataFrame(log_entries)
        log_df.write.format("bigquery") \
            .option("table", BQ_LOG_TABLE) \
            .option("temporaryGcsBucket", BQ_TEMP_PATH) \
            .mode("append") \
            .save()
        print("✅ Logs stored in BigQuery for future analysis")
    
##------------------------------------------------------------------------------------------------------------------##

# Function to Move Existing Files to Archive
def move_existing_files_to_archive(table):
    blobs = list(storage_client.bucket(GCS_BUCKET).list_blobs(prefix=f"landing/{HOSPITAL_NAME}/{table}/"))
    existing_files = [blob.name for blob in blobs if blob.name.endswith(".json")]

    if not existing_files:
        log_event("INFO", f"No existing files for table {table}")
        return

    for file in existing_files:
        source_blob = storage_client.bucket(GCS_BUCKET).blob(file)

        # Extract Date from File Name
        date_part = file.split("_")[-1].split(".")[0]
        year, month, day = date_part[-4:], date_part[2:4], date_part[:2]

        # Move to Archive
        archive_path = f"landing/{HOSPITAL_NAME}/archive/{table}/{year}/{month}/{day}/{file.split('/')[-1]}"
        destination_blob = storage_client.bucket(GCS_BUCKET).blob(archive_path)

        # Copy file to archive and delete original
        storage_client.bucket(GCS_BUCKET).copy_blob(source_blob, storage_client.bucket(GCS_BUCKET), destination_blob.name)
        source_blob.delete()

        log_event("INFO", f"Moved {file} to {archive_path}", table=table)
        
##------------------------------------------------------------------------------------------------------------------##

# Function to Get Latest Watermark from BigQuery Audit Table
def get_latest_watermark(table_name):
    query = f"""
        SELECT MAX(load_timestamp) AS latest_timestamp
        FROM `{BQ_AUDIT_TABLE}`
        WHERE tablename = '{table_name}' and data_source = "hospital_a_db"
    """
    query_job = bq_client.query(query)
    result = query_job.result()
    for row in result:
        return row.latest_timestamp if row.latest_timestamp else "1900-01-01 00:00:00"
    return "1900-01-01 00:00:00"

##------------------------------------------------------------------------------------------------------------------##

# Function to Extract Data from MySQL and Save to GCS
def extract_and_save_to_landing(table, load_type, watermark_col):
    try:
        last_watermark = get_latest_watermark(table) if load_type.lower() == "incremental" else None
        log_event("INFO", f"Latest watermark for {table}: {last_watermark}", table=table)

        query = f"(SELECT * FROM {table}) AS t" if load_type.lower() == "full" else \
                f"(SELECT * FROM {table} WHERE {watermark_col} > '{last_watermark}') AS t"

        df = (spark.read.format("jdbc")
                .option("url", MYSQL_CONFIG["url"])
                .option("user", MYSQL_CONFIG["user"])
                .option("password", MYSQL_CONFIG["password"])
                .option("driver", MYSQL_CONFIG["driver"])
                .option("dbtable", query)
                .option("database", MYSQL_CONFIG["database"])
                .load())
#         df = spark.read.format("jdbc") \
#         .option("url", MYSQL_CONFIG["url"]) \
#         .option("user", MYSQL_CONFIG["user"]) \
#         .option("password", MYSQL_CONFIG["password"]) \
#         .option("driver", MYSQL_CONFIG["driver"]) \
#         .option("dbtable", query) \
#         .option("database", MYSQL_CONFIG["database"]) \
#         .load()

        log_event("SUCCESS", f"✅ Successfully extracted data from {table}", table=table)

        today = datetime.datetime.today().strftime('%d%m%Y')
        JSON_FILE_PATH = f"landing/{HOSPITAL_NAME}/{table}/{table}_{today}.json"

        bucket = storage_client.bucket(GCS_BUCKET)
        blob = bucket.blob(JSON_FILE_PATH)
        blob.upload_from_string(df.toPandas().to_json(orient="records", lines=True), content_type="application/json")

        log_event("SUCCESS", f"✅ JSON file successfully written to gs://{GCS_BUCKET}/{JSON_FILE_PATH}", table=table)
        
        # Insert Audit Entry
        audit_df = spark.createDataFrame([
            ("hospital_a_db", table, load_type, df.count(), datetime.datetime.now(), "SUCCESS")], 
            ["data_source", "tablename", "load_type", "record_count", "load_timestamp", "status"])

        (audit_df.write.format("bigquery")
            .option("table", BQ_AUDIT_TABLE)
            .option("temporaryGcsBucket", GCS_BUCKET)
            .mode("append")
            .save())

        log_event("SUCCESS", f"✅ Audit log updated for {table}", table=table)

    except Exception as e:
        log_event("ERROR", f"Error processing {table}: {str(e)}", table=table)
##------------------------------------------------------------------------------------------------------------------##

# Function to Read Config File from GCS
def read_config_file():
    df = spark.read.csv(CONFIG_FILE_PATH, header=True)
    log_event("INFO", "✅ Successfully read the config file")
    return df

# read config file
config_df = read_config_file()

for row in config_df.collect():
    if row["is_active"] == '1' and row["datasource"] == "hospital_a_db": 
        db, src, table, load_type, watermark, _, targetpath = row
        move_existing_files_to_archive(table)
        extract_and_save_to_landing(table, load_type, watermark)
        
save_logs_to_gcs()
save_logs_to_bigquery()

[2026-01-16T08:48:07.977114] INFO - ✅ Successfully read the config file
[2026-01-16T08:48:08.537314] INFO - Moved landing/hospital-a/encounters/encounters_16012026.json to landing/hospital-a/archive/encounters/2026/01/16/encounters_16012026.json
[2026-01-16T08:48:09.468282] INFO - Latest watermark for encounters: 1900-01-01 00:00:00
[2026-01-16T08:48:09.515090] SUCCESS - ✅ Successfully extracted data from encounters
[2026-01-16T08:48:10.202256] SUCCESS - ✅ JSON file successfully written to gs://test-project-1857-de/landing/hospital-a/encounters/encounters_16012026.json


                                                                                

[2026-01-16T08:48:18.574601] SUCCESS - ✅ Audit log updated for encounters
[2026-01-16T08:48:18.917212] INFO - Moved landing/hospital-a/patients/patients_16012026.json to landing/hospital-a/archive/patients/2026/01/16/patients_16012026.json
[2026-01-16T08:48:19.558869] INFO - Latest watermark for patients: 1900-01-01 00:00:00
[2026-01-16T08:48:19.599528] SUCCESS - ✅ Successfully extracted data from patients
[2026-01-16T08:48:20.136785] SUCCESS - ✅ JSON file successfully written to gs://test-project-1857-de/landing/hospital-a/patients/patients_16012026.json


                                                                                

[2026-01-16T08:48:26.587449] SUCCESS - ✅ Audit log updated for patients
[2026-01-16T08:48:26.926399] INFO - Moved landing/hospital-a/transactions/transactions_16012026.json to landing/hospital-a/archive/transactions/2026/01/16/transactions_16012026.json
[2026-01-16T08:48:27.509417] INFO - Latest watermark for transactions: 1900-01-01 00:00:00
[2026-01-16T08:48:27.539990] SUCCESS - ✅ Successfully extracted data from transactions
[2026-01-16T08:48:28.711869] SUCCESS - ✅ JSON file successfully written to gs://test-project-1857-de/landing/hospital-a/transactions/transactions_16012026.json


                                                                                

[2026-01-16T08:48:35.037513] SUCCESS - ✅ Audit log updated for transactions
[2026-01-16T08:48:35.379835] INFO - Moved landing/hospital-a/providers/providers_16012026.json to landing/hospital-a/archive/providers/2026/01/16/providers_16012026.json
[2026-01-16T08:48:35.380041] INFO - Latest watermark for providers: None
[2026-01-16T08:48:35.410394] SUCCESS - ✅ Successfully extracted data from providers
[2026-01-16T08:48:35.613147] SUCCESS - ✅ JSON file successfully written to gs://test-project-1857-de/landing/hospital-a/providers/providers_16012026.json


                                                                                

[2026-01-16T08:48:41.834801] SUCCESS - ✅ Audit log updated for providers
[2026-01-16T08:48:42.165934] INFO - Moved landing/hospital-a/departments/departments_16012026.json to landing/hospital-a/archive/departments/2026/01/16/departments_16012026.json
[2026-01-16T08:48:42.166105] INFO - Latest watermark for departments: None
[2026-01-16T08:48:42.200072] SUCCESS - ✅ Successfully extracted data from departments


                                                                                

[2026-01-16T08:48:45.467480] SUCCESS - ✅ JSON file successfully written to gs://test-project-1857-de/landing/hospital-a/departments/departments_16012026.json


                                                                                

[2026-01-16T08:48:55.860333] SUCCESS - ✅ Audit log updated for departments
✅ Logs successfully saved to GCS at gs://test-project-1857-de/temp/pipeline_logs/pipeline_log_20260116084855.json


                                                                                

✅ Logs stored in BigQuery for future analysis


In [6]:
!curl ifconfig.me

35.226.25.250

In [8]:
spark.read.format("jdbc") \
  .option("url", "jdbc:mysql://34.60.182.151:3306/hospital_a_db") \
  .option("user", "user1") \
  .option("password", "User1-1234") \
  .option("driver", "com.mysql.cj.jdbc.Driver") \
  .option("query", "SELECT 1") \
  .load().show()

+---+
|  1|
+---+
|  1|
+---+



                                                                                

In [11]:
!mysql -h 34.60.182.151 -u user1 -pUser1-1234 -e "select 1;"

+---+
| 1 |
+---+
| 1 |
+---+


In [28]:
# MySQL Configuration
MYSQL_CONFIG = {
#     "url": "jdbc:mysql://34.60.182.151:3306/hospital_a_db?useSSL=false&allowPublicKeyRetrieval=true",
    "url": "jdbc:mysql://34.60.182.151:3306/hospital_a_db",
    "driver": "com.mysql.cj.jdbc.Driver",
    "user": "user1",
    "password": "User1-1234"
}
df = spark.read.format("jdbc") \
        .option("url", MYSQL_CONFIG["url"]) \
        .option("user", MYSQL_CONFIG["user"]) \
        .option("password", MYSQL_CONFIG["password"]) \
        .option("driver", MYSQL_CONFIG["driver"]) \
        .option("query", "SELECT CURRENT_USER()") \
        .load()
df.show()        

+--------------+
|CURRENT_USER()|
+--------------+
|       user1@%|
+--------------+



In [12]:
df = spark.read.format("jdbc") \
.option("url", "jdbc:mysql://34.60.182.151:3306/hospital_a_db") \
.option("user", "user1") \
.option("password", "User1-1234") \
.option("driver", "com.mysql.cj.jdbc.Driver") \
.option("query", "SELECT CURRENT_USER()") \
.load()

df.show()


+--------------+
|CURRENT_USER()|
+--------------+
|       user1@%|
+--------------+



In [31]:
MYSQL_CONFIG = {
"url": "jdbc:mysql://34.60.182.151:3306/hospital_a_db",
"driver": "com.mysql.cj.jdbc.Driver",
"user": "user1",
"password": "User1-1234",
"socketFactory": "com.google.cloud.sql.mysql.SocketFactory",
"cloudSqlInstance": "test-project-1857:us-central1:hospital-a-mysql-db",
"useSSL": "false"
}

df = spark.read.format("jdbc") \
        .option("url", MYSQL_CONFIG["url"]) \
        .option("user", MYSQL_CONFIG["user"]) \
        .option("password", MYSQL_CONFIG["password"]) \
        .option("driver", MYSQL_CONFIG["driver"]) \
        .option("query", "SELECT CURRENT_USER()") \
        .load()
df.show()  

+--------------+
|CURRENT_USER()|
+--------------+
|       user1@%|
+--------------+



In [38]:
MYSQL_CONFIG = {
"url": "jdbc:mysql://34.60.182.151:3306/hospital_a_db",
"driver": "com.mysql.cj.jdbc.Driver",
"user": "user1",
"password": "User1-1234",
"socketFactory": "com.google.cloud.sql.mysql.SocketFactory",
"cloudSqlInstance": "test-project-1857:us-central1:hospital-a-mysql-db",
"useSSL": "false",
 "database": "hospital_a_db"    
}

df = spark.read.format("jdbc") \
        .option("url", MYSQL_CONFIG["url"]) \
        .option("user", MYSQL_CONFIG["user"]) \
        .option("password", MYSQL_CONFIG["password"]) \
        .option("driver", MYSQL_CONFIG["driver"]) \
        .option("dbtable", "encounters") \
        .option("database", MYSQL_CONFIG["database"]) \
        .load()
                
df.show()  

+-----------+------------+-------------+---------------+----------+------------+-------------+------------+------------+
|EncounterID|   PatientID|EncounterDate|  EncounterType|ProviderID|DepartmentID|ProcedureCode|InsertedDate|ModifiedDate|
+-----------+------------+-------------+---------------+----------+------------+-------------+------------+------------+
|  ENC000001|HOSP1-001127|   2020-01-14|      Inpatient|  PROV0133|     DEPT019|        97099|  2023-03-08|  2020-03-01|
|  ENC000002|HOSP1-003842|   2021-01-21|     Outpatient|  PROV0147|     DEPT013|        19272|  2022-05-01|  2022-02-05|
|  ENC000003|HOSP1-001372|   2021-10-29|   Telemedicine|  PROV0444|     DEPT004|        65512|  2023-06-13|  2022-08-11|
|  ENC000004|HOSP1-002649|   2023-05-05|Routine Checkup|  PROV0169|     DEPT010|        38334|  2020-06-14|  2023-06-30|
|  ENC000005|HOSP1-001709|   2020-04-05|Routine Checkup|  PROV0479|     DEPT014|        10594|  2021-02-08|  2020-10-19|
|  ENC000006|HOSP1-004896|   202

                                                                                

In [None]:
MYSQL_CONFIG = {
    "url": "jdbc:mysql://34.60.182.151:3306/hospital_a_db",
    "driver": "com.mysql.cj.jdbc.Driver",
    "user": "user1",
    "password": "User1-1234",
    "socketFactory": "com.google.cloud.sql.mysql.SocketFactory",
    "cloudSqlInstance": "test-project-1857:us-central1:hospital-a-mysql-db",
    "useSSL": "false",
    "database": "hospital_a_db"
}

df = spark.read.format("jdbc") \
    .option("url", MYSQL_CONFIG["url"]) \
    .option("driver", MYSQL_CONFIG["driver"]) \
    .option("user", MYSQL_CONFIG["user"]) \
    .option("password", MYSQL_CONFIG["password"]) \
    .option("socketFactory", MYSQL_CONFIG["socketFactory"]) \
    .option("cloudSqlInstance", MYSQL_CONFIG["cloudSqlInstance"]) \
    .option("useSSL", MYSQL_CONFIG["useSSL"]) \
    .option("dbtable", "encounters") \
    .option("database", MYSQL_CONFIG["database"]) \
    .load()

df.show(20, False)


In [13]:
!hostname -I

10.128.0.15 


In [14]:
spark.conf.get("spark.executor.instances")

'2'

In [15]:
print("Driver IP:")
!hostname -I

print("Executor IP seen by Spark:")
spark.range(1).rdd.map(lambda x: __import__('socket').gethostbyname(__import__('socket').gethostname())).collect()


Driver IP:
10.128.0.15 
Executor IP seen by Spark:


['10.128.0.16']