In [None]:
import argparse, os, logging, boto3
from pathlib import Path
from pyspark.sql import SparkSession, functions as F, Window
from pyspark.sql.types import *
from textwrap import dedent

PG = dict(
    host="localhost",
    port=5436,
    dbname="scholarships",
    user="scholarships",
    password="scholar_pw",
)

In [2]:
url = f"jdbc:postgresql://{PG['host']}:{PG['port']}/{PG['dbname']}"
pg_props = {
    "user":     PG["user"],
    "password": PG["password"],
    "driver":   "org.postgresql.Driver",
    # let Postgres treat text as text, json as json
    "stringtype": "unspecified",
}

logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s | %(levelname)s | %(message)s",
    datefmt="%H:%M:%S",
)

In [3]:
#target schema

TARGET_SCHEMA = StructType([
    StructField("scholarship_id",        StringType(), False),
    StructField("scholarship_name",      StringType(), True),
    StructField("description",           StringType(), True),
    StructField("program_country",       ArrayType(StringType()), True),
    StructField("origin_country",        ArrayType(StringType()), True),
    StructField("program_level",         StringType(), True),
    StructField("required_level",        StringType(), True),
    StructField("intake",                StringType(), True),
    StructField("documents",             StringType(), True),
    StructField("website",               StringType(), True),
    StructField("last_updated_utc",      StringType(), True),
    StructField("funding_category",      StringType(), True),
    StructField("deadline",              StringType(), True),
    StructField("scholarship_coverage",  MapType(StringType(), StringType()), True),
    StructField("language_certificates", MapType(StringType(), StringType()), True),
    StructField("fields_of_study_code",  ArrayType(StringType()), True),
    StructField("study_duration",        StringType(), True),
    StructField("gpa",                   StringType(), True),
    StructField("list_of_universities",  ArrayType(StringType()), True),
    StructField("status",                StringType(), True),
    StructField("other_information",     MapType(StringType(), StringType()), True)
])


In [4]:
AWS_PROFILE       = "bdm-2025"
os.environ["AWS_PROFILE"] = AWS_PROFILE

In [5]:
spark = (SparkSession.builder
         .appName("LoadParquetToPG")
         # to switch later - when go to cluster
         .master(os.getenv("SPARK_MASTER","local[*]")) # num threads = CPU count
         # only driver settings matter in local mode
         .config("spark.driver.memory","4g")
         # jars for S3 and Postgres
         .config("spark.jars.packages",
                 "org.apache.hadoop:hadoop-aws:3.3.4,"
                 "com.amazonaws:aws-java-sdk-bundle:1.12.665,"
                 "org.postgresql:postgresql:42.7.3")
         .config("spark.hadoop.fs.s3a.aws.credentials.provider",
                 "com.amazonaws.auth.profile.ProfileCredentialsProvider")
         .config("spark.hadoop.fs.s3a.aws.profile",AWS_PROFILE)
         .getOrCreate())
spark.sparkContext.setLogLevel("ERROR")

25/06/08 22:34:36 WARN Utils: Your hostname, MacBook-Pro-Olha.local resolves to a loopback address: 127.0.0.1; using 192.168.1.156 instead (on interface en0)
25/06/08 22:34:36 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Ivy Default Cache set to: /Users/olhabaliasina/.ivy2/cache
The jars for the packages stored in: /Users/olhabaliasina/.ivy2/jars
org.apache.hadoop#hadoop-aws added as a dependency
com.amazonaws#aws-java-sdk-bundle added as a dependency
org.postgresql#postgresql added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-03c82112-54f2-462f-96a9-fb2b318ec219;1.0
	confs: [default]


:: loading settings :: url = jar:file:/Users/olhabaliasina/Documents/BDMA/2nd_semester/Big_Data_Management/Project/ScholAmigo/.venv/lib/python3.13/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


	found org.apache.hadoop#hadoop-aws;3.3.4 in central
	found org.wildfly.openssl#wildfly-openssl;1.0.7.Final in central
	found com.amazonaws#aws-java-sdk-bundle;1.12.665 in central
	found org.postgresql#postgresql;42.7.3 in central
	found org.checkerframework#checker-qual;3.42.0 in central
:: resolution report :: resolve 134ms :: artifacts dl 4ms
	:: modules in use:
	com.amazonaws#aws-java-sdk-bundle;1.12.665 from central in [default]
	org.apache.hadoop#hadoop-aws;3.3.4 from central in [default]
	org.checkerframework#checker-qual;3.42.0 from central in [default]
	org.postgresql#postgresql;42.7.3 from central in [default]
	org.wildfly.openssl#wildfly-openssl;1.0.7.Final from central in [default]
	:: evicted modules:
	com.amazonaws#aws-java-sdk-bundle;1.12.262 by [com.amazonaws#aws-java-sdk-bundle;1.12.665] in [default]
	---------------------------------------------------------------------
	|                  |            modules            ||   artifacts   |
	|       conf       | number|

In [6]:
def latest_dataset_prefix(s3, bucket, prefix):
    """Return S3 prefix that ends with ..._YYYY-MM_...parquet not a part-file."""
    resp = s3.list_objects_v2(Bucket=bucket, Prefix=prefix)
    folders = [
        obj["Key"].rsplit("/", 1)[0]   # strip /part-000xx…
        for obj in resp.get("Contents", [])
        if obj["Key"].endswith("_SUCCESS")
    ]
    if not folders:
        raise FileNotFoundError(f"No parquet datasets under {prefix}")
    # last modified of the _SUCCESS object == dataset timestamp
    latest = max(
        (o for o in resp["Contents"] if o["Key"].endswith("_SUCCESS")),
        key=lambda o: o["LastModified"],
    )["Key"].rsplit("/", 1)[0]
    return latest


In [7]:
args = {}

In [8]:
args["bucket"] = "scholarship-data-trusted"
args["prefix"] = "erasmus_scholarships_standardized/"

In [9]:
s3     = boto3.Session(profile_name=AWS_PROFILE).client("s3")
prefix = args["prefix"] if args["prefix"].endswith("/") else args["prefix"] + "/"
key    = latest_dataset_prefix(s3, args["bucket"], prefix)
src    = f"s3a://{args["bucket"]}/{key}"
logging.info(f"Reading dataset {src}")


22:34:39 | INFO | Found credentials in shared credentials file: ~/.aws/credentials
22:34:40 | INFO | Reading dataset s3a://scholarship-data-trusted/erasmus_scholarships_standardized/2025-06_erasmus_scholarship_data_std.parquet


In [10]:
df_raw = spark.read.schema(TARGET_SCHEMA).parquet(src)

df = (
    df_raw
      .withColumn("funding_category", F.lower("funding_category"))
      .withColumn("program_level",    F.lower("program_level"))
      .withColumn("required_level",   F.lower("required_level"))
      .withColumn("intake",           F.lower("intake"))
      .withColumn("status",           F.lower("status"))
      .withColumn("last_updated_utc", F.to_timestamp("last_updated_utc","dd/MM/yyyy HH:mm"))
      .withColumn("deadline",         F.to_date("deadline","dd/MM/yyyy"))
      # maps → json strings for JSONB columns in Postgres
      .withColumn("scholarship_coverage",  F.to_json("scholarship_coverage"))
      .withColumn("language_certificates", F.to_json("language_certificates"))
)

In [11]:
filter_df = df.select(
    "scholarship_id","funding_category","program_level",
    "required_level","intake","status"
)

In [12]:
info_df = df.select(
    "scholarship_id","scholarship_name","description","program_country","origin_country",
    "documents","website","last_updated_utc","deadline",
    "scholarship_coverage","language_certificates",
    "fields_of_study_code","study_duration","gpa",
    "list_of_universities","other_information"
)


In [13]:
print("Spark master :", spark.sparkContext.master)
print("Driver host  :", spark.conf.get("spark.driver.host"))


Spark master : local[*]
Driver host  : 192.168.1.156


In [None]:
countries_dim = spark.read.jdbc(
    url=url,
    table="(SELECT country_id, lower(name) AS name FROM countries) AS c",
    properties=pg_props
)


In [None]:
prog_country_df = (
    df.select("scholarship_id", F.explode("program_country").alias("name"))
      .withColumn("name", F.lower(F.trim("name")))
      .join(countries_dim,"name")
      .select("scholarship_id","country_id").distinct()
)

orig_country_df = (
    df.select("scholarship_id", F.explode("origin_country").alias("name"))
      .withColumn("name", F.lower(F.trim("name")))
      .join(countries_dim,"name")
      .select("scholarship_id","country_id").distinct()
)

In [None]:
fos_df = (
    df.select("scholarship_id", F.explode("fields_of_study_code")
                             .alias("field_code"))
      .distinct()
)


In [None]:
def save(df, table):
    df.write \
      .mode("append") \
      .option("batchsize", 1000) \
      .jdbc(url, table, properties=pg_props)

save(filter_df,              "scholarship_filter")
save(info_df,                "scholarship_info")
save(prog_country_df,        "program_country")
save(orig_country_df,        "origin_country")
save(fos_df,                 "scholarship_field_of_study")


In [None]:
spark.stop()
logging.info("PostgreSQL load finished")


----