In [1]:
dims_df = (
    spark.read.option("header", True)
    .option("inferSchema", True)
    .parquet("Files/silver/dims-silver")
)
dims_df.show()

dims_groupedby_filename = dims_df.groupBy("filename").count().collect()

StatementMeta(, 636d0a91-a338-41b1-a74f-9b2469d8796d, 3, Finished, Available)

+-----+--------------------+--------------------+--------+------+---+--------+
| Code|         Description|          start_date|end_date|status|key|filename|
+-----+--------------------+--------------------+--------+------+---+--------+
|   R6|ACP (African, Car...|2024-04-05 14:29:...|    null|     1|  1|DIM_AREA|
|   5U|ADC (Andean Devel...|2024-04-05 14:29:...|    null|     1|  2|DIM_AREA|
|   7H|AFREXIMBANK (Afri...|2024-04-05 14:29:...|    null|     1|  3|DIM_AREA|
|   5M|AMF (Arab Monetar...|2024-04-05 14:29:...|    null|     1|  4|DIM_AREA|
|  R16|APEC (Asia-Pacifi...|2024-04-05 14:29:...|    null|     1|  5|DIM_AREA|
|   R4|ASEAN (Countries ...|2024-04-05 14:29:...|    null|     1|  6|DIM_AREA|
|  R45|             ASEAN-5|2024-04-05 14:29:...|    null|     1|  7|DIM_AREA|
|4J842|    ATHENA Mechanism|2024-04-05 14:29:...|    null|     1|  8|DIM_AREA|
| XR29|Advanced Economie...|2024-04-05 14:29:...|    null|     1|  9|DIM_AREA|
| XR23|Advanced Economie...|2024-04-05 14:29:...|   

In [2]:
print(dims_df.head())

StatementMeta(, 636d0a91-a338-41b1-a74f-9b2469d8796d, 4, Finished, Available)

Row(Code='R6', Description='ACP (African, Caribbean and Pacific countries, Lome convention)', start_date=datetime.datetime(2024, 4, 5, 14, 29, 55, 870160), end_date=None, status=1, key=1, filename='DIM_AREA')


In [3]:
from pyspark.sql.window import Window
from pyspark.sql.functions import lit,current_timestamp, lower, row_number, trim

for row in dims_groupedby_filename: 
    current_file = (row["filename"])
    print(current_file)

    df_area_silver = dims_df.where(dims_df.filename ==current_file)
    row = df_area_silver.agg({"key":"max"}).collect()[0]
    new_key_seed = (row["max(key)"])

    df_area_bronze = (
        spark.read.option("header", True)
        .option("inferSchema", True)
        .option("header", True)
        .option("sep", "|")
        .csv("Files/bronze/"+current_file+".txt")
    )

    df_area_bronze = df_area_bronze.withColumn("Code", trim(df_area_bronze.Code))\
                                   .withColumn("Description", trim(df_area_bronze.Description))\
                                   .withColumnRenamed("Code", "Code_updated")\
                                   .withColumnRenamed("Description", "Description_updated")

    #target_df is the silver, source_df is the bronze 
    cond = [df_area_silver.Code == df_area_bronze.Code_updated, df_area_silver.Description == df_area_bronze.Description_updated] 
    new_updated_records_df = df_area_bronze.join(df_area_silver, cond, 'leftanti')

    existing_df = df_area_silver.join(df_area_bronze, cond, 'left')

    unchanged_df = existing_df.where(existing_df.Description_updated == existing_df.Description)\
                                .select("Code","Description","start_date","end_date","status","key")

    updated_df = existing_df.where((existing_df.Description_updated != existing_df.Description) | (existing_df.Description_updated.isNull()))\
                            .select("Code","Description","start_date","end_date","status","key")

    obsolete_df = updated_df \
            .withColumn("end_date", current_timestamp()) \
            .withColumn("status", lit(0))

    new_updated_records_df = new_updated_records_df.withColumn("start_date", current_timestamp())\
                    .withColumn("end_date", lit(None).cast("TIMESTAMP"))\
                    .withColumn("status", lit(1))\
                    .withColumnRenamed("Code_updated","Code")\
                    .withColumnRenamed("Description_updated","Description")

    window = Window.orderBy("Description")
    union_df = new_updated_records_df.withColumn("key",new_key_seed + row_number().over(window))\
                        .unionAll(obsolete_df)\
                        .unionAll(unchanged_df)
    #union_df.sort(union_df.key).show()

    union_df.write.mode("overwrite").format("delta").save('Files/silver/'+current_file.lower())



StatementMeta(, 636d0a91-a338-41b1-a74f-9b2469d8796d, 5, Finished, Available)

DIM_UNIT_MEASURE
DIM_AREA
DIM_CURRENCY
DIM_INSTR_ASSET
DIM_SECTOR
DIM_ACCOUNTS_ITEM
DIM_ACCOUNTING
DIM_MATURITY
DIM_VALUATION
DIM_FLOW_STOCK_ENTRY
DIM_COMP_METHOD
DIM_TYPE_ENTITY
