#Preparing Enviroment

##Creating and Setting Variables

In [0]:
KafkaServer = dbutils.secrets.get('DatabricksEmbracon', 'KafkaServer')
KafkaUsername = dbutils.secrets.get('DatabricksEmbracon', 'KafkaUsername')
KafkaPassword = dbutils.secrets.get('DatabricksEmbracon', 'KafkaPassword')
KafkaSchemaUsername = dbutils.secrets.get('DatabricksEmbracon', 'KafkaSchemaUsername')
KafkaSchemaPassword = dbutils.secrets.get('DatabricksEmbracon', 'KafkaSchemaPassword')
KafkaSchemaRegistryUrl = dbutils.secrets.get('DatabricksEmbracon', 'KafkaSchemaRegistryUrl')
KafkaSchemaRegistryOptions = {
    "confluent.schema.registry.basic.auth.credentials.source": "USER_INFO",
    "confluent.schema.registry.basic.auth.user.info": f"{KafkaSchemaUsername}:{KafkaSchemaPassword}",
}
CheckpointLocation = 'abfss://prod@databricksembracon.dfs.core.windows.net/dados/checkpoints'

# Creating Ingestion Function

In [0]:
# Create functions to merge turbine and weather data into their target Delta tables
def merge_delta(incremental, t): 
  pks = t.pk.split(',')
  on_clause = " AND ".join([f'i.{pk} = t.{pk}' for pk in pks])
  # Precisamos adicionar o campo de tempo de processamento que vem do tópico do Kafka
  incremental.orderBy("__source_ts_ms", ascending=False).dropDuplicates(pks).createOrReplaceTempView("incremental")
  
  if spark.sql(f"SHOW TABLES IN {t.catalog}.{t.database}_silver LIKE '{t.table}'").count() > 0:
    # MERGE records into the target table using the specified join key
    incremental._jdf.sparkSession().sql(f"""
      MERGE INTO {t.catalog}.{t.database}_silver.{t.table} t
      USING incremental i
      ON {on_clause}--i.ID_Advogado=t.ID_Advogado AND i.window = t.window AND i.deviceId = t.deviceid
      --WHEN MATCHED AND i.op = 'd' THEN DELETE
      WHEN MATCHED THEN UPDATE SET *
      WHEN NOT MATCHED THEN INSERT *
    """)
  else:
    # If the †arget table does not exist, create one
    incremental.writeTo(f"{t.catalog}.{t.database}_silver.{t.table}").createOrReplace()

In [0]:
from pyspark.sql.functions import *
from pyspark.sql.avro.functions import from_avro

def KafkaIngestion(t):

    catalog = t.catalog
    database = t.database
    table = t.table
    topic = t.topic
    schema = t.schema

    print(f"Ingesting table {catalog}.{database}_bronze.{table}")

    # Bronze Layer

    Raw = (
        spark.readStream.format("kafka")
        .option("kafka.bootstrap.servers", KafkaServer)
        .option("kafka.sasl.mechanism", "PLAIN")
        .option("kafka.security.protocol", "SASL_SSL")
        .option("kafka.client.dns.lookup", "use_all_dns_ips")
        .option("kafka.basic.auth.credentials.source", "USER_INFO")
        .option("kafka.sasl.jaas.config",f'kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="{KafkaUsername}" password="{KafkaPassword}";')
        .option("kafka.basic.auth.user.info", f"{KafkaSchemaUsername}:{KafkaSchemaPassword}")
        .option("kafka.schema.registry.url", KafkaSchemaRegistryUrl)
        .option("subscribe", topic)
        .option("startingOffsets", "earliest")
        .load()
    )

    Bronze = Raw.select(
        from_avro(
            data=col("value"),
            options=KafkaSchemaRegistryOptions,
            subject=f"{topic}-value",
            schemaRegistryAddress=KafkaSchemaRegistryUrl,
        ).alias("value")
    ).selectExpr("value.*")

    (
        Bronze.writeStream.outputMode("append")
        .option("checkpointLocation", f"{CheckpointLocation}/bronze/{table}")
        .trigger(availableNow=True)
        .table(f"{catalog}.{database}_bronze.{table}")
        .awaitTermination()
    )

    # Silver Layer

    Silver = (
        spark.readStream.table(f"{catalog}.{database}_bronze.{table}")
    )

    (
        Silver.writeStream.outputMode("append")
        .option("checkpointLocation", f"{CheckpointLocation}/silver/{table}")
        .trigger(availableNow=True)
        .foreachBatch(lambda i, b: merge_delta(i, t))
        .start()
        .awaitTermination()
    )

# Running Ingestion

In [0]:
tables = spark.sql('select * from embracon.admin.tables').collect()

In [0]:
for table in tables:
  try:
    KafkaIngestion(table)
  except Exception as e:
    print(f'Error: {e}')