### **Importation des bibliothÃ¨ques**

In [1]:
from google.cloud import storage, bigquery
import pandas as pd
from pyspark.sql import SparkSession
import datetime
import json

###  **Initialize GCS & BigQuery Clients**

In [2]:

storage_client = storage.Client()
bq_client = bigquery.Client()

### **session Pyspark** 

In [3]:
spark = SparkSession.builder.appName("HospitalAMySQLToLanding").getOrCreate()

### **Configuration du Google Cloud Storage (GCS)**

In [4]:
GCS_BUCKET = "healthcare-bucket-lte"
HOSPITAL_NAME = "hospital-a"
LANDING_PATH = f"gs://{GCS_BUCKET}/landing/{HOSPITAL_NAME}/"
ARCHIVE_PATH = f"gs://{GCS_BUCKET}/landing/{HOSPITAL_NAME}/archive"
CONFIG_FILE_PATH = f"gs://{GCS_BUCKET}/configs/load_config.csv"

### **Configuration du BigQuery**

In [5]:
BQ_PROJECT = "gcp-demo-lte" 
BQ_AUDIT_TABLE = f"{BQ_PROJECT}.temp_dataset.audit_log"
BQ_LOG_TABLE = f"{BQ_PROJECT}.temp_dataset.pipeline_logs"
BQ_TEMP_PATH = f"{GCS_BUCKET}/temp/"

### **MySql Configuration** 

In [6]:
MYSQL_CONFIG = {
    "url": "jdbc:mysql://34.59.213.195:3306/hospital_a_db?useSSL=false&allowPublicKeyRetrieval=true",
    "driver": "com.mysql.cj.jdbc.Driver",
    "user": "myuser",
    "password": "Laziz04092021@"
}


### **Mecanisme de sauvgarde de log** 

In [7]:

log_entries = []   # les stocker dans un dictionnaire avant de les charger 

def log_event(event_type, message, table=None):
    """Log an event and store it in the log list"""
    log_entry = {
        "timestamp": datetime.datetime.now().isoformat(),
        "event_type": event_type,
        "message": message,
        "table": table
    }
    log_entries.append(log_entry)
    print(f"[{log_entry['timestamp']}] {event_type} - {message}")  # Print for visibility
    
#-------------------------------------------------------------------------------------------   

def save_logs_to_gcs():
    """Save logs to a JSON file and upload to GCS"""
    log_filename = f"pipeline_log_{datetime.datetime.now().strftime('%Y%m%d%H%M%S')}.json"
    log_filepath = f"temp/pipeline_logs/{log_filename}"  
    
    json_data = json.dumps(log_entries, indent=4)

    # Get GCS bucket
    bucket = storage_client.bucket(GCS_BUCKET)
    blob = bucket.blob(log_filepath)
    
    # Upload JSON data as a file
    blob.upload_from_string(json_data, content_type="application/json")

    print(f"âœ… Logs successfully saved to GCS at gs://{GCS_BUCKET}/{log_filepath}")

#-------------------------------------------------------------------------------------------    
def save_logs_to_bigquery():
    """Save logs to BigQuery"""
    if log_entries:
        log_df = spark.createDataFrame(log_entries)
        log_df.write.format("bigquery") \
            .option("table", BQ_LOG_TABLE) \
            .option("temporaryGcsBucket", BQ_TEMP_PATH) \
            .mode("append") \
            .save()
        print("âœ… Logs stored in BigQuery for future analysis")

### **Fonction pour archiver les fichier deja traitÃ©s**

In [8]:

def move_existing_table_to_archive(table):
    blobs = list(storage_client.bucket(GCS_BUCKET).list_blobs(prefix=f"landing/{HOSPITAL_NAME}/{table}"))
    existing_files =  [blob.name for blob in blobs if blob.name.endswith(".json")]

    
    if not existing_files:
        log_event("INFO", f"No exiting files for table {table}")
        return
    
    for file in existing_files:
        source_blob =  storage_client.bucket(GCS_BUCKET).blob(file)
        
        # Extraire la date du fichier 
        
        date_part = file.split("_")[-1].split(".")[0]
        year, month , day = date_part[-4:], date_part[2:4], date_part[:2]
        
        
        # Deplacer vers l'archive
        
        archive_path =  f"landing/{HOSPITAL_NAME}/archive/{table}/{year}/{month}/{day}/{file.split('/')[-1]}"
        destinantion_blob = storage_client.bucket(GCS_BUCKET).blob(archive_path)
        
        
        # Copier le fichier dans l'archive et supprfession de l'original 
        
        storage_client.bucket(GCS_BUCKET).copy_blob(source_blob, storage_client.bucket(GCS_BUCKET), destinantion_blob.name)
        source_blob.delete()
        
        log_event("INFO", f"Moved {file} to {archive_path}", table = table)

### **Function to Get Latest Watermark from BigQuery Audit Table**

In [9]:
def get_latest_watermark(table_name):
    query = f"""
        SELECT MAX(load_timestamp) AS latest_timestamp
        FROM `{BQ_AUDIT_TABLE}`
        WHERE tablename = '{table_name}' and data_source = "hospital_a_db"
    """
    query_job = bq_client.query(query)
    result = query_job.result()
    for row in result:
        return row.latest_timestamp if row.latest_timestamp else "1900-01-01 00:00:00"
    return "1900-01-01 00:00:00"

### **Function to Extract Data from MySQL and Save to GCS**

In [10]:

def extract_and_save_to_landing(table, load_type, watermark_col):
    try:
        last_watermark = get_latest_watermark(table) if load_type.lower() == "incremental" else None
        log_event("INFO", f"Latest watermark for {table}: {last_watermark}", table=table)

        query = f"(SELECT * FROM {table}) AS t" if load_type.lower() == "full" else \
                f"(SELECT * FROM {table} WHERE {watermark_col} > '{last_watermark}') AS t"

        df = (spark.read.format("jdbc")
                .option("url", MYSQL_CONFIG["url"])
                .option("user", MYSQL_CONFIG["user"])
                .option("password", MYSQL_CONFIG["password"])
                .option("driver", MYSQL_CONFIG["driver"])
                .option("dbtable", query)
                .load())

        log_event("SUCCESS", f"âœ… Successfully extracted data from {table}", table=table)

        today = datetime.datetime.today().strftime('%d%m%Y')
        JSON_FILE_PATH = f"landing/{HOSPITAL_NAME}/{table}/{table}_{today}.json"

        bucket = storage_client.bucket(GCS_BUCKET)
        blob = bucket.blob(JSON_FILE_PATH)
        blob.upload_from_string(df.toPandas().to_json(orient="records", lines=True), content_type="application/json")

        log_event("SUCCESS", f"âœ… JSON file successfully written to gs://{GCS_BUCKET}/{JSON_FILE_PATH}", table=table)
        
        # Insert Audit Entry
        audit_df = spark.createDataFrame([
            ("hospital_a_db", table, load_type, df.count(), datetime.datetime.now(), "SUCCESS")], 
            ["data_source", "tablename", "load_type", "record_count", "load_timestamp", "status"])

        (audit_df.write.format("bigquery")
            .option("table", BQ_AUDIT_TABLE)
            .option("temporaryGcsBucket", GCS_BUCKET)
            .mode("append")
            .save())

        log_event("SUCCESS", f"âœ… Audit log updated for {table}", table=table)

    except Exception as e:
        log_event("ERROR", f"Error processing {table}: {str(e)}", table=table)

### **Fonction de lecture de fichier de depuis le GCS**

In [11]:
def read_config_file():
    df =  spark.read.csv(CONFIG_FILE_PATH, header = True)
    log_event("INFO", "Successfully read the config file")
    return df

### **lecture du config file + chargement dans lanfing + archivage des fichiers** 

In [12]:
config_df = read_config_file()

for row in config_df.collect():
    if row["is_active"] == '1' and row["datasource"] == 'hospital_a_db':
        db, src, table, load_type, watermark, _,targetpath = row
        move_existing_table_to_archive(table)
        extract_and_save_to_landing(table, load_type, watermark)

save_logs_to_gcs()
save_logs_to_bigquery()


                                                                                

[2025-05-23T13:09:24.703875] INFO - Successfully read the config file


                                                                                

[2025-05-23T13:09:27.861334] INFO - No exiting files for table encounters
[2025-05-23T13:09:28.707001] INFO - Latest watermark for encounters: 1900-01-01 00:00:00
[2025-05-23T13:09:29.565467] SUCCESS - âœ… Successfully extracted data from encounters


                                                                                

[2025-05-23T13:09:32.495645] SUCCESS - âœ… JSON file successfully written to gs://healthcare-bucket-lte/landing/hospital-a/encounters/encounters_23052025.json


                                                                                

[2025-05-23T13:09:46.916211] SUCCESS - âœ… Audit log updated for encounters
[2025-05-23T13:09:46.942896] INFO - No exiting files for table patients
[2025-05-23T13:09:47.843017] INFO - Latest watermark for patients: 1900-01-01 00:00:00
[2025-05-23T13:09:48.098240] SUCCESS - âœ… Successfully extracted data from patients
[2025-05-23T13:09:49.206460] SUCCESS - âœ… JSON file successfully written to gs://healthcare-bucket-lte/landing/hospital-a/patients/patients_23052025.json


                                                                                

[2025-05-23T13:09:56.961704] SUCCESS - âœ… Audit log updated for patients
[2025-05-23T13:09:56.990217] INFO - No exiting files for table transactions
[2025-05-23T13:09:57.900641] INFO - Latest watermark for transactions: 1900-01-01 00:00:00
[2025-05-23T13:09:58.174219] SUCCESS - âœ… Successfully extracted data from transactions


                                                                                

[2025-05-23T13:10:00.127317] SUCCESS - âœ… JSON file successfully written to gs://healthcare-bucket-lte/landing/hospital-a/transactions/transactions_23052025.json


                                                                                

[2025-05-23T13:10:07.424552] SUCCESS - âœ… Audit log updated for transactions
[2025-05-23T13:10:07.447150] INFO - No exiting files for table providers
[2025-05-23T13:10:07.447236] INFO - Latest watermark for providers: None
[2025-05-23T13:10:07.691462] SUCCESS - âœ… Successfully extracted data from providers
[2025-05-23T13:10:08.264017] SUCCESS - âœ… JSON file successfully written to gs://healthcare-bucket-lte/landing/hospital-a/providers/providers_23052025.json


                                                                                

[2025-05-23T13:10:15.163739] SUCCESS - âœ… Audit log updated for providers
[2025-05-23T13:10:15.191169] INFO - No exiting files for table departments
[2025-05-23T13:10:15.191335] INFO - Latest watermark for departments: None
[2025-05-23T13:10:15.427843] SUCCESS - âœ… Successfully extracted data from departments
[2025-05-23T13:10:15.939792] SUCCESS - âœ… JSON file successfully written to gs://healthcare-bucket-lte/landing/hospital-a/departments/departments_23052025.json


                                                                                

[2025-05-23T13:10:22.819599] SUCCESS - âœ… Audit log updated for departments
âœ… Logs successfully saved to GCS at gs://healthcare-bucket-lte/temp/pipeline_logs/pipeline_log_20250523131022.json


                                                                                

âœ… Logs stored in BigQuery for future analysis


----------------------------------------
Exception happened during processing of request from ('127.0.0.1', 58816)
Traceback (most recent call last):
  File "/opt/conda/miniconda3/lib/python3.8/socketserver.py", line 316, in _handle_request_noblock
    self.process_request(request, client_address)
  File "/opt/conda/miniconda3/lib/python3.8/socketserver.py", line 347, in process_request
    self.finish_request(request, client_address)
  File "/opt/conda/miniconda3/lib/python3.8/socketserver.py", line 360, in finish_request
    self.RequestHandlerClass(request, client_address, self)
  File "/opt/conda/miniconda3/lib/python3.8/socketserver.py", line 747, in __init__
    self.handle()
  File "/usr/lib/spark/python/pyspark/accumulators.py", line 262, in handle
    poll(accum_updates)
  File "/usr/lib/spark/python/pyspark/accumulators.py", line 235, in poll
    if func():
  File "/usr/lib/spark/python/pyspark/accumulators.py", line 239, in accum_updates
    num_updates = read_int(self.rfile