In [0]:
import dlt
import json
from pyspark.sql.functions import from_json, expr, lit, col, to_timestamp, regexp_extract, input_file_name
from pyspark.sql.types import StructType, StructField, StringType, ArrayType, BooleanType, IntegerType, DateType

In [0]:
catalog_name = spark.conf.get("catalog_name")
schema_name = spark.conf.get("schema_name")
landing_volume_name = spark.conf.get("landing_volume_name")

In [0]:
@dlt.table(
  name="silver_bike_point",
  comment="Clean bike point data from bronze and load into bronze",
  table_properties={
    "quality": "silver"
  }
)

@dlt.expect_all({
  "Positive bike count": "bike_count >= 0",
  "Positive empty dock count": "empty_dock_count >= 0",
  "Positive dock count": "dock_count >= 0"
})

@dlt.expect_all_or_drop({
  "Key column is not null": "bikepoint_id IS NOT NULL",
  "Bike station is not remove": "removal_date IS NULL"
})

def silver_bike_point():
  df = spark.readStream.table("bronze_bike_point")

  additional_properties_schema = ArrayType(
      StructType([
          StructField("key", StringType(), True),
          StructField("value", StringType(), True)
      ])
  )

  df = df.withColumn(
    "additionalProperties",
    from_json(col("additionalProperties"), additional_properties_schema)
  )

  # Define fields to extract from JSON
  fields_to_extract = [
      "TerminalName",
      "Installed",
      "Locked",
      "InstallDate",
      "RemovalDate",
      "Temporary",
      "NbBikes",
      "NbEmptyDocks",
      "NbDocks",
      "NbStandardBikes",
      "NbEBikes"
  ]

  # Define expressions for extraction from JSON
  expressions_extract = [
    f"filter(additionalProperties, x -> x.key = '{field}')[0].value as {field}"
    for field in fields_to_extract
  ]

  # Extract specified fields from JSON
  df = df.selectExpr("*", *expressions_extract)

  # Map each field to a Spark expression
  for field in fields_to_extract:
    df = df.withColumn(
        field,
        expr(f"filter(additionalProperties, x -> x.key = '{field}')[0].value as {field}")
    )

  # Define columns to keep, their column names, and data types
  fields_to_rename = {
    "id": ("bikepoint_id", "string"),
    "commonName": ("bikepoint_name", "string"),
    "lat": ("bikepoint_latitude", "double"),
    "lon": ("bikepoint_longitude", "double"),
    "TerminalName": ("terminal_name", "int"),
    "Installed": ("installed", "boolean"),
    "Locked": ("locked", "boolean"),
    "InstallDate": ("install_date", "date"),
    "RemovalDate": ("removal_date", "date"),
    "Temporary": ("temporary", "boolean"),
    "NbBikes": ("bike_count", "int"),
    "NbEmptyDocks": ("empty_dock_count", "int"),
    "NbDocks": ("dock_count", "int"),
    "NbStandardBikes": ("standard_bike_count", "int"),
    "NbEBikes": ("ebike_count", "int"),
    "source_system": ("source_system", "string"),
    "source_file": ("source_file", "string"),
    "ingestion_user": ("ingestion_user", "string"),
    "landing_timestamp": ("landing_timestamp", "timestamp"),
    "bronze_timestamp": ("bronze_timestamp", "timestamp"),
  }

  # Define expression to filter, rename, and cast columns
  expressions_rename = [
    f"{old_name}::{new_type} as {new_name}"
    for old_name, (new_name, new_type) in fields_to_rename.items()
  ]

  # Filter and rename dataframe
  df = df.selectExpr(
    [*expressions_rename]
  )

  # Add audit information to the table
  df = (
    df.withColumn("silver_timestamp", expr("CURRENT_TIMESTAMP()"))
  )
  return(df)