In [0]:
import pyspark.sql.functions as F
from  pyspark.sql.functions import col, struct, to_json
from pyspark.sql.types import StructField, StructType, StringType, MapType

In [0]:
dataDictionary = [
        ('James','driver'),
        ('Michael','teacher'),
        ('Robert','engineer'),
        ('Washington','architect'),
        ('Jefferson','CEO')
        ]
df = spark.createDataFrame(data=dataDictionary, schema = ["name","occupation"])
df.printSchema()
df.show(truncate=False)

root
 |-- name: string (nullable = true)
 |-- occupation: string (nullable = true)

+----------+----------+
|name      |occupation|
+----------+----------+
|James     |driver    |
|Michael   |teacher   |
|Robert    |engineer  |
|Washington|architect |
|Jefferson |CEO       |
+----------+----------+



In [0]:
#display input data
df2 = df.selectExpr("name AS key", "to_json(struct(*)) AS value")
display(df2)


key,value
James,"{""name"":""James"",""occupation"":""driver""}"
Michael,"{""name"":""Michael"",""occupation"":""teacher""}"
Robert,"{""name"":""Robert"",""occupation"":""engineer""}"
Washington,"{""name"":""Washington"",""occupation"":""architect""}"
Jefferson,"{""name"":""Jefferson"",""occupation"":""CEO""}"


In [0]:
#write to topic
# bootstrap servers: is we need to keep our cluster server id
#username : we need to keep our cluster key username
#password: we need to keep our cluster key password
(df.selectExpr("name AS key", "to_json(struct(*)) AS value") \
  .write \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "pkc-12576z.us-west2.gcp.confluent.cloud:9092") \
  .option("topic", "Basic_0") \
  .option("kafka.security.protocol","SASL_SSL") \
  .option("kafka.sasl.mechanism", "PLAIN") \
  .option("kafka.sasl.jaas.config", """kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="NJHYJFIAU2PR7ITR" password="Z+zNfpN/lo6I670vhizqXNgY8qemwsXc7fKmxLXb4PxkrUIUriDvJe9+E32vtFG1";""") \
  .save()
)


In [0]:
#read from topic
# bootstrap servers: is we need to keep our cluster server id
#username : we need to keep our cluster key username
#password: we need to keep our cluster key password
dfread = spark \
      .read \
      .format("kafka") \
      .option("kafka.bootstrap.servers", "pkc-12576z.us-west2.gcp.confluent.cloud:9092") \
      .option("subscribe", "Basic_0") \
      .option("startingOffsets", "earliest") \
      .option("endingOffsets", "latest")  \
      .option("kafka.security.protocol","SASL_SSL") \
      .option("kafka.sasl.mechanism", "PLAIN") \
      .option("kafka.sasl.jaas.config", """kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="NJHYJFIAU2PR7ITR" password="Z+zNfpN/lo6I670vhizqXNgY8qemwsXc7fKmxLXb4PxkrUIUriDvJe9+E32vtFG1";""") \
      .load()
display(dfread)

key,value,topic,partition,offset,timestamp,timestampType
SmVmZmVyc29u,eyJuYW1lIjoiSmVmZmVyc29uIiwib2NjdXBhdGlvbiI6IkNFTyJ9,Basic_0,4,0,2024-07-14T23:47:25.652+0000,0
SmVmZmVyc29u,eyJuYW1lIjoiSmVmZmVyc29uIiwib2NjdXBhdGlvbiI6IkNFTyJ9,Basic_0,4,1,2024-07-14T23:48:07.764+0000,0
TWljaGFlbA==,eyJuYW1lIjoiTWljaGFlbCIsIm9jY3VwYXRpb24iOiJ0ZWFjaGVyIn0=,Basic_0,1,0,2024-07-14T23:47:25.652+0000,0
TWljaGFlbA==,eyJuYW1lIjoiTWljaGFlbCIsIm9jY3VwYXRpb24iOiJ0ZWFjaGVyIn0=,Basic_0,1,1,2024-07-14T23:48:07.868+0000,0
SmFtZXM=,eyJuYW1lIjoiSmFtZXMiLCJvY2N1cGF0aW9uIjoiZHJpdmVyIn0=,Basic_0,0,0,2024-07-14T23:47:25.655+0000,0
V2FzaGluZ3Rvbg==,eyJuYW1lIjoiV2FzaGluZ3RvbiIsIm9jY3VwYXRpb24iOiJhcmNoaXRlY3QifQ==,Basic_0,0,1,2024-07-14T23:47:25.655+0000,0
Um9iZXJ0,eyJuYW1lIjoiUm9iZXJ0Iiwib2NjdXBhdGlvbiI6ImVuZ2luZWVyIn0=,Basic_0,0,2,2024-07-14T23:47:25.656+0000,0
V2FzaGluZ3Rvbg==,eyJuYW1lIjoiV2FzaGluZ3RvbiIsIm9jY3VwYXRpb24iOiJhcmNoaXRlY3QifQ==,Basic_0,0,3,2024-07-14T23:48:07.681+0000,0
SmFtZXM=,eyJuYW1lIjoiSmFtZXMiLCJvY2N1cGF0aW9uIjoiZHJpdmVyIn0=,Basic_0,0,4,2024-07-14T23:48:07.768+0000,0
Um9iZXJ0,eyJuYW1lIjoiUm9iZXJ0Iiwib2NjdXBhdGlvbiI6ImVuZ2luZWVyIn0=,Basic_0,0,5,2024-07-14T23:48:07.789+0000,0


In [0]:
json_schema = StructType(
    [   StructField("name", StringType(), True),
        StructField("occupation", StringType(), True)
    ]
)


In [0]:
#display data from topic
df3 = dfread.withColumn('value', F.from_json(F.col('value').cast('string'), json_schema))  \
      .select(F.col("value.name"),F.col("value.occupation")) 
display(df3)

name,occupation
Jefferson,CEO
Jefferson,CEO
Michael,teacher
Michael,teacher
James,driver
Washington,architect
Robert,engineer
Washington,architect
James,driver
Robert,engineer
