In [None]:

confluentClusterName = "databricks"
confluentBootstrapServers = ""
confluentTopicName = "orders"
confluentApiKey = ""
confluentSecret = ""
schemaRegistryUrl = ""
confluentRegistryApiKey = ""
confluentRegistrySecret = ""
deltaTablePath = '/mnt/databricks/kafka/orders'
checkpointPath = '/mnt/databricks/kafka/_checkpoint/orders'

In [None]:
from confluent_kafka.schema_registry import SchemaRegistryClient
import ssl
schema_registry_conf = {
    'url': schemaRegistryUrl,
    'basic.auth.user.info': '{}:{}'.format(confluentRegistryApiKey, confluentRegistrySecret)}

schema_registry_client = SchemaRegistryClient(schema_registry_conf)

In [None]:
import pyspark.sql.functions as fn
from pyspark.sql.types import StringType
binary_to_string = fn.udf(lambda x: str(int.from_bytes(x, byteorder='big')), StringType())
streamTestDf = (
  spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", confluentBootstrapServers)
  .option("kafka.security.protocol", "SASL_SSL")
  .option("kafka.sasl.jaas.config", "kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username='{}' password='{}';".format(confluentApiKey, confluentSecret))
  .option("kafka.ssl.endpoint.identification.algorithm", "https")
  .option("kafka.sasl.mechanism", "PLAIN")
  .option("subscribe", confluentTopicName)
  .option("startingOffsets", "earliest")
  .option("failOnDataLoss", "false")
  .load()
  .withColumn('key', fn.col("key").cast(StringType()))
  .withColumn('fixedValue', fn.expr("substring(value, 6, length(value)-5)"))
  .withColumn('valueSchemaId', binary_to_string(fn.expr("substring(value, 2, 4)")))
  .select('topic', 'partition', 'offset', 'timestamp', 'timestampType', 'key', 'valueSchemaId','fixedValue')
)


In [None]:
display(streamTestDf) 

topic,partition,offset,timestamp,timestampType,key,valueSchemaId,fixedValue
orders,0,0,2024-05-13T08:17:30.287Z,0,5,100002,oIqf4uxWCg5JdGVtXzUwhQfI6WRmAUAOQ2l0eV8yMgxTdGF0ZV/Wlwc=
orders,0,1,2024-05-13T08:17:33.374Z,0,11,100002,1revottXFg5JdGVtXzM4uytmuXy3IEAOQ2l0eV8xMQxTdGF0ZV+Y2QM=
orders,0,2,2024-05-13T08:17:34.352Z,0,15,100002,6p+t8cFXHgxJdGVtXzQSDHHBsKYFQApDaXR5Xw5TdGF0ZV8x3sUB
orders,0,3,2024-05-13T08:17:35.955Z,0,17,100002,qtWJ1PVWIgxJdGVtXzEmkc5gv/YdQAxDaXR5XzMMU3RhdGVfnJ0E
orders,0,4,2024-05-13T08:17:38.734Z,0,21,100002,yM67vKZXKhBJdGVtXzU5Nd3dhoKrEA1ADkNpdHlfOTYMU3RhdGVfvKUD
orders,0,5,2024-05-13T08:17:39.803Z,0,23,100002,tt6f655XLhBJdGVtXzczNvkm1ti8mRZACkNpdHlfDlN0YXRlXzP6gQc=
orders,0,6,2024-05-13T08:17:42.339Z,0,27,100002,rvfh6q5XNgxJdGVtXzYgJv5AedsiQApDaXR5XxBTdGF0ZV84Mrz+CQ==
orders,0,7,2024-05-13T08:17:44.627Z,0,34,100002,6ODz6s5WRAxJdGVtXzdgLaI5+eoWQAxDaXR5XzEQU3RhdGVfMjKWiQk=
orders,0,8,2024-05-13T08:17:46.5Z,0,37,100002,nOzx/5NXShBJdGVtXzM5NRw0EUu2BiNACkNpdHlfEFN0YXRlXzQ0iJwD
orders,0,9,2024-05-13T08:17:49.99Z,0,43,100002,lLLwk/pWVgxJdGVtXzEjB40R4KwfQApDaXR5XxBTdGF0ZV8yNbbtBQ==


In [None]:
import pyspark.sql.functions as fn
from pyspark.sql.avro.functions import from_avro
def parseAvroDataWithSchemaId(df, ephoch_id):
  cachedDf = df.cache()
  fromAvroOptions = {"mode":"FAILFAST"}
  def getSchema(id):
    return str(schema_registry_client.get_schema(id).schema_str)
  distinctValueSchemaIdDF = cachedDf.select(fn.col('valueSchemaId').cast('integer')).distinct()
  for valueRow in distinctValueSchemaIdDF.collect():
    currentValueSchemaId = sc.broadcast(valueRow.valueSchemaId)
    currentValueSchema = sc.broadcast(getSchema(currentValueSchemaId.value))
    filterValueDF = cachedDf.filter(fn.col('valueSchemaId') == currentValueSchemaId.value)
    filterValueDF \
      .select('topic', 'partition', 'offset', 'timestamp', 'timestampType', 'key', from_avro('fixedValue', currentValueSchema.value, fromAvroOptions).alias('parsedValue')) \
      .write \
      .format("delta") \
      .mode("append") \
      .option("mergeSchema", "true") \
     .save(deltaTablePath)


In [None]:
streamTestDf.writeStream \
  .option("checkpointLocation", checkpointPath) \
  .foreachBatch(parseAvroDataWithSchemaId) \
  .queryName("clickStreamTestFromConfluent") \
  .start()


<pyspark.sql.streaming.query.StreamingQuery at 0x7f8e547761a0>

In [None]:
deltaTestDf = spark.read.format("delta").load(deltaTablePath)
display(deltaTestDf)


topic,partition,offset,timestamp,timestampType,key,parsedValue
orders,4,0,2024-05-13T08:17:30.751Z,0,6,"List(1518002446426, 6, Item_9, 8.14590658485713, List(City_, State_6, 80417))"
orders,4,1,2024-05-13T08:17:32.701Z,0,10,"List(1493963283738, 10, Item_52, 3.5504099159029696, List(City_8, State_7, 37576))"
orders,4,2,2024-05-13T08:17:34.051Z,0,12,"List(1506057818258, 12, Item_93, 5.543583069523574, List(City_, State_17, 38308))"
orders,4,3,2024-05-13T08:17:36.846Z,0,18,"List(1496082951426, 18, Item_106, 6.4721068009937515, List(City_, State_39, 98738))"
orders,4,4,2024-05-13T08:17:38.196Z,0,20,"List(1489944276120, 20, Item_2, 0.5269117031370012, List(City_46, State_27, 53593))"
orders,4,5,2024-05-13T08:17:42.273Z,0,26,"List(1509501558019, 26, Item_390, 3.3149731852020925, List(City_31, State_, 63405))"
orders,4,6,2024-05-13T08:17:47.055Z,0,38,"List(1492206703294, 38, Item_918, 8.567666420273746, List(City_27, State_, 42624))"
orders,4,7,2024-05-13T08:17:49.165Z,0,42,"List(1509710766248, 42, Item_870, 6.014364951247247, List(City_, State_4, 31400))"
orders,4,8,2024-05-13T08:17:51.837Z,0,51,"List(1515777735371, 51, Item_803, 9.344389822133117, List(City_85, State_14, 49772))"
orders,4,9,2024-05-13T08:17:55.181Z,0,58,"List(1510907303808, 58, Item_1, 4.311036386746782, List(City_, State_38, 34110))"


In [None]:
deltaTestDf.count()

1709

In [None]:
deltaTestDf.printSchema()

root
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)
 |-- key: string (nullable = true)
 |-- parsedValue: struct (nullable = true)
 |    |-- ordertime: long (nullable = true)
 |    |-- orderid: integer (nullable = true)
 |    |-- itemid: string (nullable = true)
 |    |-- orderunits: double (nullable = true)
 |    |-- address: struct (nullable = true)
 |    |    |-- city: string (nullable = true)
 |    |    |-- state: string (nullable = true)
 |    |    |-- zipcode: long (nullable = true)



In [None]:
from pyspark.sql.functions import col

# Assuming your DataFrame is named 'df'
new_df = deltaTestDf.select(
    col('topic'),
    col('partition'),
    col('offset'),
    col('timestamp'),
    col('timestampType'),
    col('key'),
    col('parsedValue.ordertime').alias('ordertime'),
    col('parsedValue.orderid').alias('orderid'),
    col('parsedValue.itemid').alias('itemid'),
    col('parsedValue.orderunits').alias('orderunits'),
    col('parsedValue.address.city').alias('city'),
    col('parsedValue.address.state').alias('state'),
    col('parsedValue.address.zipcode').alias('zipcode')
)

# Now 'new_df' contains exploded columns from 'parsedValue'
new_df.display()

topic,partition,offset,timestamp,timestampType,key,ordertime,orderid,itemid,orderunits,city,state,zipcode
orders,4,0,2024-05-13T08:17:30.751Z,0,6,1518002446426,6,Item_9,8.14590658485713,City_,State_6,80417
orders,4,1,2024-05-13T08:17:32.701Z,0,10,1493963283738,10,Item_52,3.5504099159029696,City_8,State_7,37576
orders,4,2,2024-05-13T08:17:34.051Z,0,12,1506057818258,12,Item_93,5.543583069523574,City_,State_17,38308
orders,4,3,2024-05-13T08:17:36.846Z,0,18,1496082951426,18,Item_106,6.472106800993752,City_,State_39,98738
orders,4,4,2024-05-13T08:17:38.196Z,0,20,1489944276120,20,Item_2,0.5269117031370012,City_46,State_27,53593
orders,4,5,2024-05-13T08:17:42.273Z,0,26,1509501558019,26,Item_390,3.3149731852020925,City_31,State_,63405
orders,4,6,2024-05-13T08:17:47.055Z,0,38,1492206703294,38,Item_918,8.567666420273746,City_27,State_,42624
orders,4,7,2024-05-13T08:17:49.165Z,0,42,1509710766248,42,Item_870,6.014364951247247,City_,State_4,31400
orders,4,8,2024-05-13T08:17:51.837Z,0,51,1515777735371,51,Item_803,9.344389822133117,City_85,State_14,49772
orders,4,9,2024-05-13T08:17:55.181Z,0,58,1510907303808,58,Item_1,4.311036386746782,City_,State_38,34110
