In [0]:
import os
import json

from pyspark.sql.avro.functions import from_avro
from pyspark.sql.functions import parse_json, col
from dotenv import load_dotenv

from src.avro.decode import deserialize_data, deserialize_data_id
load_dotenv()

In [0]:
@udf
def deserialize_avro_with_schema(schema_json, value):
    return json.dumps(deserialize_data(schema_json, value))

@udf
def deserialize_avro_with_schema_id(schema_id, value):
    return json.dumps(deserialize_data_id(schema_id, value))

In [0]:

kafka_options = {
    "kafka.bootstrap.servers": "kafka-2e83401f-premgowda.e.aivencloud.com:21292",
    "subscribe": "medallion",

    # SSL (mTLS)
    "kafka.security.protocol": "SSL",
    "kafka.ssl.truststore.location": "/Volumes/nonprod/landing/data/certs/java/ca.pem",
    "kafka.ssl.keystore.location": "/Volumes/nonprod/landing/data/certs/java/svc.pem",
    "kafka.ssl.truststore.type": "PEM",
    "kafka.ssl.keystore.type": "PEM",

    "startingOffsets": "earliest"
}

schmea_options = {
    "confluent.schema.registry.basic.auth.credentials.source": "USER_INFO",
    "confluent.schema.registry.basic.auth.user.info": f"{os.environ['SCHEMA_REGISTRY_USERNAME']}:{os.environ['SCHEMA_REGISTRY_PASSWORD']}"
}


In [0]:
df = (
    spark.read
        .format("kafka")
        .options(**kafka_options)
        .load()
)

display(df)

In [0]:
df.withColumn(
        "d_key",
        col('key').cast("string")
    ).withColumn(
        "Product", 
        from_avro(
            data="value", 
            schemaRegistryAddress=os.environ["SCHEMA_REGISTRY_URL"],
            subject="premgowda.in.products.genric",
            options=schmea_options)
    ).withColumn(
        "InnerRecord",
        parse_json(deserialize_avro_with_schema_id('Product.body.data.schemaId', 'Product.body.data.valueBytes'))
    ).withColumn(
        "InnerRecordUsingBase64",
        parse_json(deserialize_avro_with_schema_id('Product.body.data.schemaId', 'Product.body.data.value'))
    ).withColumn(
        "InnerRecordSchema",
        parse_json(deserialize_avro_with_schema('Product.body.data.schema', 'Product.body.data.valueBytes'))
    ).withColumn(
        "InnerRecordSchemaBase64",
        parse_json(deserialize_avro_with_schema('Product.body.data.schema', 'Product.body.data.value'))
    ).display()