In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
import os

gcs_credential_path = os.path.expanduser("~/gcs.json") 

POSTGRES_URL = "jdbc:postgresql://localhost:5433/hospital"
POSTGRES_PROPERTIES = {
    "user": "postgres",
    "password": "postgres",
    "driver": "org.postgresql.Driver"
}

GCS_BUCKET = "hospital_datalake"
TABLES = ["doctors", "patients", "medicines", "visits", "billing_payments", "prescriptions"]

spark = SparkSession.builder \
    .appName("PostgresToGCS") \
    .config("spark.jars", "/usr/lib/jars/postgresql-42.7.1.jar,/usr/lib/jars/gcs-connector-hadoop3-latest.jar") \
    .config("spark.hadoop.fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem") \
    .config("spark.hadoop.fs.AbstractFileSystem.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS") \
    .config("spark.hadoop.google.cloud.auth.service.account.enable", "true") \
    .config("spark.hadoop.google.cloud.auth.service.account.json.keyfile", f"{gcs_credential_path}") \
    .getOrCreate()

print(spark.conf.get("spark.hadoop.fs.gs.impl")) 
print(spark.conf.get("spark.hadoop.google.cloud.auth.service.account.json.keyfile")) 

for table in TABLES:
    print(f"Processing table: {table}")

    df = spark.read.jdbc(url=POSTGRES_URL, table=table, properties=POSTGRES_PROPERTIES)

    gcs_path = f"gs://{GCS_BUCKET}/pyspark/{table}/"

    df.write.mode("overwrite").json(gcs_path)
    
    print(f"Uploaded {table} to {gcs_path}")

spark.stop()



25/04/14 14:45:00 WARN Utils: Your hostname, Ubuntu-24 resolves to a loopback address: 127.0.1.1; using 10.0.2.15 instead (on interface enp0s3)
25/04/14 14:45:00 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
25/04/14 14:45:00 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/04/14 14:45:01 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem
/home/vboxuser/gcs.json
Processing table: doctors


                                                                                

Uploaded doctors to gs://hospital_datalake/pyspark/doctors/
Processing table: patients


25/04/14 14:45:12 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors
                                                                                

Uploaded patients to gs://hospital_datalake/pyspark/patients/
Processing table: medicines


                                                                                

Uploaded medicines to gs://hospital_datalake/pyspark/medicines/
Processing table: visits


                                                                                

Uploaded visits to gs://hospital_datalake/pyspark/visits/
Processing table: billing_payments


                                                                                

Uploaded billing_payments to gs://hospital_datalake/pyspark/billing_payments/
Processing table: prescriptions


                                                                                

Uploaded prescriptions to gs://hospital_datalake/pyspark/prescriptions/
