In [0]:
from pyspark.sql import functions as F
from pyspark.sql.types import TimestampType
from delta.tables import DeltaTable
import random

In [0]:
vars = dbutils.jobs.taskValues.get(taskKey='bronze_task', key='layer_vars_key')

# extracting required vars from dict
bronze_reports = vars['bronze_reports']
silver_reports = vars['silver_reports']
silver_path = vars['silver_path']
gold_path = vars['gold_path']
gold_reports = vars['gold_reports']

# static lookup table for geocoding
coords_lookup_table = 'gemeente.lookup.coords'

[0;31m---------------------------------------------------------------------------[0m
[0;31mTypeError[0m                                 Traceback (most recent call last)
File [0;32m<command-5477239898229023>, line 1[0m
[0;32m----> 1[0m [38;5;28mvars[39m [38;5;241m=[39m dbutils[38;5;241m.[39mjobs[38;5;241m.[39mtaskValues[38;5;241m.[39mget(taskKey[38;5;241m=[39m[38;5;124m'[39m[38;5;124mbronze_task[39m[38;5;124m'[39m, key[38;5;241m=[39m[38;5;124m'[39m[38;5;124mlayer_vars_key[39m[38;5;124m'[39m)
[1;32m      3[0m bronze_path [38;5;241m=[39m [38;5;28mvars[39m[[38;5;124m'[39m[38;5;124mbronze_path[39m[38;5;124m'[39m]
[1;32m      4[0m silver_path [38;5;241m=[39m [38;5;28mvars[39m[[38;5;124m'[39m[38;5;124msilver_path[39m[38;5;124m'[39m]

File [0;32m/databricks/python_shell/lib/dbruntime/dbutils.py:254[0m, in [0;36mDBUtils.JobsHandler.TaskValuesHandler.get[0;34m(self, taskKey, key, default, debugValue)[0m
[1;32m    252[0m [38;5;28;

In [0]:
# ingest bronze reports as streaming delta table, delta takes cares of incrremental loading
df = spark.read.table(bronze_reports)

In [0]:
# removing duplicate records
df = df.dropDuplicates(['id']) 

In [0]:
# validating records for null values
null_counts = df.filter((F.col('street_name').isNull()) | (F.col('problem').isNull())).count()
if null_counts > 0:
    df = df.dropna(subset=['street_name','problem','id'])
    print(f'Dropped {null_counts} records with null values')

house_num_nulls = df.filter(F.col('house_number').isNull()).count()
if house_num_nulls > 0:
    df = df.withColumn('house_number', F.when(F.col('house_number').isNull(), F.lit(0)).otherwise(F.col('house_number')))
    print(f'Changed {house_num_nulls} records with an invalid house number to 0')

In [0]:
# normalizing problem column (lowercase and trimming whitespace)
df = df.withColumn('problem_norm',F.lower(F.trim(F.col('problem'))))

In [0]:
# unix time to readable format
df = df.withColumn('reported_on',F.from_unixtime(F.col('report_date')).cast('timestamp'))

In [0]:
# adding new timestamp column to support SCD
df = df.withColumn('status_updated_at',F.lit(None).cast('timestamp'))

In [0]:
# preparation for SQL join for enrichment
# loading df in temp view for SQL join
df.createOrReplaceTempView('reports_view')

In [0]:
# # enriching the dateframe with lat and long coordinates using the static lookup table
# # window function used to select best match for missing house numbers
df_enriched = spark.sql(f'''
          WITH enriched AS (
            SELECT
                l.id,
                l.problem,
                l.problem_norm,
                l.street_name,
                l.house_number,
                l.reported_on,
                r.postcode,
                r.lon,
                r.lat,
                l.status,
                l.status_updated_at,
                ABS(r.huisnummer - l.house_number) AS distance,
                ROW_NUMBER() OVER (
                    PARTITION BY l.id
                    ORDER BY ABS(r.huisnummer - l.house_number)
                ) AS rn
            FROM reports_view AS l
            LEFT JOIN {coords_lookup_table} AS r
                ON l.street_name = r.openbare_ruimte_naam
        )
        SELECT *
        FROM enriched
        WHERE rn = 1;
        ''')

In [0]:
# selecting only interesting columns for silver table
df_enriched = df_enriched.select(
    'id',
    'problem',
    'problem_norm',
    'street_name',
    'house_number',
    'reported_on',
    'postcode',
    'lon',
    'lat',
    'status',
    'status_updated_at',
    'distance'
)

In [0]:
# MERGE new records to prevent duplicates in case of unexpected reprocessing of task
# load existing records as DeltaTable, spark df does not support ACID
# using overwrite mode for first table initiation

if spark.catalog.tableExists(silver_reports):
    print('Silver table exists → started merge')
    silver_table = DeltaTable.forName(spark, silver_reports)
    (
        silver_table.alias('existing')
        .merge(
            df_enriched.alias('new'),     
            'existing.id = new.id',          
        )
        .whenMatchedUpdateAll()
        .whenNotMatchedInsertAll()
        .execute()
    )
else:
    print('Silver table does not exist → creating new table')
    (df_enriched.write.mode('overwrite').format('delta').option('path', silver_path).saveAsTable(silver_reports))

In [0]:
# ouput vars required for next notebook in job pipeline
vars = {

    'gold_path':gold_path,
    'silver_reports':silver_reports,
    'gold_reports':gold_reports
}

dbutils.jobs.taskValues.set(key='layer_vars_key', value=vars)

[0;31m---------------------------------------------------------------------------[0m
[0;31mNameError[0m                                 Traceback (most recent call last)
File [0;32m<command-5715372267379488>, line 5[0m
[1;32m      1[0m [38;5;28mvars[39m [38;5;241m=[39m {
[1;32m      2[0m     [38;5;66;03m#'source_path':source_path,[39;00m
[1;32m      3[0m     [38;5;66;03m#'bronze_path':bronze_path,[39;00m
[1;32m      4[0m     [38;5;66;03m#'silver_path':silver_path,[39;00m
[0;32m----> 5[0m     [38;5;124m'[39m[38;5;124mgold_path[39m[38;5;124m'[39m:gold_path,
[1;32m      6[0m     [38;5;124m'[39m[38;5;124msilver_reports[39m[38;5;124m'[39m:silver_reports,
[1;32m      7[0m     [38;5;124m'[39m[38;5;124mgold_reports[39m[38;5;124m'[39m:gold_reports
[1;32m      8[0m     [38;5;66;03m#'coords_lookup_table':'gemeente.lookup.coords'          # static lookup table managed through UC[39;00m
[1;32m      9[0m }
[1;32m     11[0m dbutils[38;5;241m.[