In [2]:
from pyspark.sql import SparkSession



spark = (
    SparkSession.builder.master("local[1]") 
    .appName("read-postgres-jdbc") 
    .getOrCreate()
)
    # PostgreSQL connection details
jdbc_url = "jdbc:postgresql://plaid-assessment:5432/bookings"
table_name = "bookable_flights"
username = "postgres"
password = "plaid"
# Connection properties
connection_properties = {
    "user": username,
    "password": password,
    "driver": "org.postgresql.Driver"
}
# Read data from PostgreSQL into a Spark DataFrame
df = spark.read.format("jdbc") \
    .option("url", jdbc_url) \
    .option("dbtable", table_name) \
    .options(**connection_properties) \
    .load()
# Show the DataFrame schema and some data
df.printSchema()
df.show()
df.show(1)

root
 |-- departure_airport: string (nullable = true)
 |-- departure_time: string (nullable = true)
 |-- arrival_airport: string (nullable = true)
 |-- arrival_time: string (nullable = true)
 |-- transit_airport: string (nullable = true)
 |-- transit_airport_arrival_time: string (nullable = true)
 |-- transit_airport_departure_time: string (nullable = true)
 |-- flights: array (nullable = true)
 |    |-- element: string (containsNull = true)

+-----------------+--------------+---------------+-------------+---------------+----------------------------+------------------------------+--------+
|departure_airport|departure_time|arrival_airport| arrival_time|transit_airport|transit_airport_arrival_time|transit_airport_departure_time| flights|
+-----------------+--------------+---------------+-------------+---------------+----------------------------+------------------------------+--------+
|              UIK| Fri  12:15:00|            SGC|Fri  14:35:00|           NULL|                       

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import current_timestamp

# Initialize Spark session
spark = SparkSession.builder \
    .appName("Hudi Event Time Example") \
    .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.hudi.catalog.HoodieCatalog") \
    .config("spark.sql.extensions", "org.apache.spark.sql.hudi.HoodieSparkSessionExtension") \
    .config("spark.kryo.registrator", "org.apache.spark.HoodieSparkKryoRegistrar") \
    .getOrCreate()

# Define table name and base path
tableName = "iot_sensor_data"
basePath = "file:///tmp/iot_sensor_data"

# Sample data without event_time
columns = ["sensor_id", "temperature", "city"]
data = [
    ("sensor_1", 34, "Patiala"),
    ("sensor_2", 31, "Gurugram")
]
inserts = spark.createDataFrame(data).toDF(*columns)

# Add current timestamp as event_time
inserts_with_event_time = inserts.withColumn("event_time", current_timestamp())

# Reorder columns to match the original schema
inserts_with_event_time = inserts_with_event_time.select("event_time", "sensor_id", "temperature", "city")

# Hudi options
hudi_options = {
    'hoodie.table.name': tableName,
    'hoodie.datasource.write.recordkey.field': 'sensor_id',
    'hoodie.datasource.write.partitionpath.field': 'city',
    'hoodie.datasource.write.precombine.field': 'event_time',
    'hoodie.datasource.write.table.type': 'COPY_ON_WRITE'
}

# Write data to Hudi table
inserts_with_event_time.write.format("hudi") \
    .options(**hudi_options) \
    .mode("overwrite") \
    .save(basePath)

# Verify the data
tripsDF = spark.read.format("hudi").load(basePath)
tripsDF.show()

+-------------------+--------------------+------------------+----------------------+--------------------+--------------------+---------+-----------+--------+
|_hoodie_commit_time|_hoodie_commit_seqno|_hoodie_record_key|_hoodie_partition_path|   _hoodie_file_name|          event_time|sensor_id|temperature|    city|
+-------------------+--------------------+------------------+----------------------+--------------------+--------------------+---------+-----------+--------+
|  20251116020227821|20251116020227821...|          sensor_2|              Gurugram|5b75dd93-1f18-4ce...|2025-11-16 02:02:...| sensor_2|         31|Gurugram|
|  20251116020227821|20251116020227821...|          sensor_1|               Patiala|d92fa921-8320-41f...|2025-11-16 02:02:...| sensor_1|         34| Patiala|
+-------------------+--------------------+------------------+----------------------+--------------------+--------------------+---------+-----------+--------+



----------------------------------------
Exception occurred during processing of request from ('127.0.0.1', 53900)
Traceback (most recent call last):
  File "/opt/conda/lib/python3.11/socketserver.py", line 317, in _handle_request_noblock
    self.process_request(request, client_address)
  File "/opt/conda/lib/python3.11/socketserver.py", line 348, in process_request
    self.finish_request(request, client_address)
  File "/opt/conda/lib/python3.11/socketserver.py", line 361, in finish_request
    self.RequestHandlerClass(request, client_address, self)
  File "/opt/conda/lib/python3.11/socketserver.py", line 755, in __init__
    self.handle()
  File "/usr/local/spark/python/pyspark/accumulators.py", line 295, in handle
    poll(accum_updates)
  File "/usr/local/spark/python/pyspark/accumulators.py", line 267, in poll
    if self.rfile in r and func():
                           ^^^^^^
  File "/usr/local/spark/python/pyspark/accumulators.py", line 271, in accum_updates
    num_updates =

In [2]:


from pyspark.sql import SparkSession

spark = (
    SparkSession.builder
    .appName("KafkaBatchReadExample")
    .getOrCreate()
)

df = (
    spark.read
    .format("kafka")
    .option("kafka.bootstrap.servers", "kafkabroker:9092")
    .option("subscribe", "stock_ticks")  # or "earliest"
    .option("endingOffsets", "latest")
    .load()
)

df = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

df.show(truncate=False)

Py4JJavaError: An error occurred while calling o49.showString.
: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node assignment. Call: describeTopics
	at java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:396)
	at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2073)
	at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:165)
	at org.apache.spark.sql.kafka010.ConsumerStrategy.retrieveAllPartitions(ConsumerStrategy.scala:66)
	at org.apache.spark.sql.kafka010.ConsumerStrategy.retrieveAllPartitions$(ConsumerStrategy.scala:65)
	at org.apache.spark.sql.kafka010.SubscribeStrategy.retrieveAllPartitions(ConsumerStrategy.scala:102)
	at org.apache.spark.sql.kafka010.SubscribeStrategy.assignedTopicPartitions(ConsumerStrategy.scala:113)
	at org.apache.spark.sql.kafka010.KafkaOffsetReaderAdmin.fetchPartitionOffsets(KafkaOffsetReaderAdmin.scala:128)
	at org.apache.spark.sql.kafka010.KafkaOffsetReaderAdmin.getOffsetRangesFromUnresolvedOffsets(KafkaOffsetReaderAdmin.scala:374)
	at org.apache.spark.sql.kafka010.KafkaRelation.buildScan(KafkaRelation.scala:67)
	at org.apache.spark.sql.execution.datasources.DataSourceStrategy$.apply(DataSourceStrategy.scala:349)
	at org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$1(QueryPlanner.scala:63)
	at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491)
	at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
	at org.apache.spark.sql.execution.SparkStrategies.plan(SparkStrategies.scala:70)
	at org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$3(QueryPlanner.scala:78)
	at scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:196)
	at scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:194)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
	at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:199)
	at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:192)
	at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1431)
	at org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$2(QueryPlanner.scala:75)
	at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)
	at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
	at org.apache.spark.sql.execution.SparkStrategies.plan(SparkStrategies.scala:70)
	at org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$3(QueryPlanner.scala:78)
	at scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:196)
	at scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:194)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
	at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:199)
	at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:192)
	at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1431)
	at org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$2(QueryPlanner.scala:75)
	at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)
	at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
	at org.apache.spark.sql.execution.SparkStrategies.plan(SparkStrategies.scala:70)
	at org.apache.spark.sql.execution.QueryExecution$.createSparkPlan(QueryExecution.scala:496)
	at org.apache.spark.sql.execution.QueryExecution.$anonfun$sparkPlan$1(QueryExecution.scala:171)
	at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:138)
	at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$2(QueryExecution.scala:219)
	at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:546)
	at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:219)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
	at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:218)
	at org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:171)
	at org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:164)
	at org.apache.spark.sql.execution.QueryExecution.$anonfun$executedPlan$1(QueryExecution.scala:186)
	at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:138)
	at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$2(QueryExecution.scala:219)
	at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:546)
	at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:219)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
	at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:218)
	at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:186)
	at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:179)
	at org.apache.spark.sql.execution.QueryExecution.simpleString(QueryExecution.scala:238)
	at org.apache.spark.sql.execution.QueryExecution.org$apache$spark$sql$execution$QueryExecution$$explainString(QueryExecution.scala:284)
	at org.apache.spark.sql.execution.QueryExecution.explainString(QueryExecution.scala:252)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:117)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:201)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:108)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:4332)
	at org.apache.spark.sql.Dataset.head(Dataset.scala:3326)
	at org.apache.spark.sql.Dataset.take(Dataset.scala:3549)
	at org.apache.spark.sql.Dataset.getRows(Dataset.scala:280)
	at org.apache.spark.sql.Dataset.showString(Dataset.scala:315)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:568)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node assignment. Call: describeTopics
