## Listing and query Kafka Topics as Iceberg tables

In [1]:
# Start a Spark Session

from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Streambased").getOrCreate()

spark

25/07/08 12:51:37 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [2]:
# In Streambased a Kafka cluster is equivalent to an Iceberg namespace we only have 1 in this demo but 
# it could operate over many 

In [3]:
%%sql

USE isk.isk

25/07/08 12:51:39 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [4]:
# List Kafka topics that are represented as Iceberg tables

In [5]:
%%sql

show tables;

namespace,tableName,isTemporary
isk,payment_terms,False
isk,transactions,False


In [6]:
# Describe a topic/table - these descriptions are driven by Schema Registry but could come from other sources

In [7]:
%%sql

DESCRIBE transactions;

col_name,data_type,comment
storeId,string,
amount,double,
paymentTermCode,string,
itemCode,string,
transactionTime,timestamp_ntz,
kafka_offset,bigint,
kafka_ts,timestamp_ntz,
,,
# Partitioning,,
Part 0,hours(transactionTime),


In [8]:
# Iceberg gives us the ability to inspect the data files that make up a table's population. 
# Note the naming convention that indicates these represent chunks of Kafka offsets 

# Also note the partitioning applied, in this case we are partitioned by hour and have chosen to represent each hour 
# as one file

In [9]:
%%sql

SELECT *
FROM isk.isk.transactions.files;

content,file_path,file_format,spec_id,partition,record_count,file_size_in_bytes,column_sizes,value_counts,null_value_counts,nan_value_counts,lower_bounds,upper_bounds,key_metadata,split_offsets,equality_ids,sort_order_id,referenced_data_file,content_offset,content_size_in_bytes,readable_metrics
0,s3://transactions/iceberg_data/10000/0-946000-956000.avro,AVRO,0,Row(transactionTime_hour=484871),10000,100024,,,,,,,,,,0,,,,"Row(amount=Row(column_size=None, value_count=None, null_value_count=None, nan_value_count=None, lower_bound=None, upper_bound=None), itemCode=Row(column_size=None, value_count=None, null_value_count=None, nan_value_count=None, lower_bound=None, upper_bound=None), kafka_offset=Row(column_size=None, value_count=None, null_value_count=None, nan_value_count=None, lower_bound=None, upper_bound=None), kafka_ts=Row(column_size=None, value_count=None, null_value_count=None, nan_value_count=None, lower_bound=None, upper_bound=None), paymentTermCode=Row(column_size=None, value_count=None, null_value_count=None, nan_value_count=None, lower_bound=None, upper_bound=None), storeId=Row(column_size=None, value_count=None, null_value_count=None, nan_value_count=None, lower_bound=None, upper_bound=None), transactionTime=Row(column_size=None, value_count=None, null_value_count=None, nan_value_count=None, lower_bound=None, upper_bound=None))"
0,s3://transactions/iceberg_data/10000/0-937000-947000.avro,AVRO,0,Row(transactionTime_hour=484870),10000,100024,,,,,,,,,,0,,,,"Row(amount=Row(column_size=None, value_count=None, null_value_count=None, nan_value_count=None, lower_bound=None, upper_bound=None), itemCode=Row(column_size=None, value_count=None, null_value_count=None, nan_value_count=None, lower_bound=None, upper_bound=None), kafka_offset=Row(column_size=None, value_count=None, null_value_count=None, nan_value_count=None, lower_bound=None, upper_bound=None), kafka_ts=Row(column_size=None, value_count=None, null_value_count=None, nan_value_count=None, lower_bound=None, upper_bound=None), paymentTermCode=Row(column_size=None, value_count=None, null_value_count=None, nan_value_count=None, lower_bound=None, upper_bound=None), storeId=Row(column_size=None, value_count=None, null_value_count=None, nan_value_count=None, lower_bound=None, upper_bound=None), transactionTime=Row(column_size=None, value_count=None, null_value_count=None, nan_value_count=None, lower_bound=None, upper_bound=None))"
0,s3://transactions/iceberg_data/10000/0-928000-938000.avro,AVRO,0,Row(transactionTime_hour=484869),10000,100024,,,,,,,,,,0,,,,"Row(amount=Row(column_size=None, value_count=None, null_value_count=None, nan_value_count=None, lower_bound=None, upper_bound=None), itemCode=Row(column_size=None, value_count=None, null_value_count=None, nan_value_count=None, lower_bound=None, upper_bound=None), kafka_offset=Row(column_size=None, value_count=None, null_value_count=None, nan_value_count=None, lower_bound=None, upper_bound=None), kafka_ts=Row(column_size=None, value_count=None, null_value_count=None, nan_value_count=None, lower_bound=None, upper_bound=None), paymentTermCode=Row(column_size=None, value_count=None, null_value_count=None, nan_value_count=None, lower_bound=None, upper_bound=None), storeId=Row(column_size=None, value_count=None, null_value_count=None, nan_value_count=None, lower_bound=None, upper_bound=None), transactionTime=Row(column_size=None, value_count=None, null_value_count=None, nan_value_count=None, lower_bound=None, upper_bound=None))"
0,s3://transactions/iceberg_data/10000/0-919000-929000.avro,AVRO,0,Row(transactionTime_hour=484868),10000,100024,,,,,,,,,,0,,,,"Row(amount=Row(column_size=None, value_count=None, null_value_count=None, nan_value_count=None, lower_bound=None, upper_bound=None), itemCode=Row(column_size=None, value_count=None, null_value_count=None, nan_value_count=None, lower_bound=None, upper_bound=None), kafka_offset=Row(column_size=None, value_count=None, null_value_count=None, nan_value_count=None, lower_bound=None, upper_bound=None), kafka_ts=Row(column_size=None, value_count=None, null_value_count=None, nan_value_count=None, lower_bound=None, upper_bound=None), paymentTermCode=Row(column_size=None, value_count=None, null_value_count=None, nan_value_count=None, lower_bound=None, upper_bound=None), storeId=Row(column_size=None, value_count=None, null_value_count=None, nan_value_count=None, lower_bound=None, upper_bound=None), transactionTime=Row(column_size=None, value_count=None, null_value_count=None, nan_value_count=None, lower_bound=None, upper_bound=None))"
0,s3://transactions/iceberg_data/10000/0-910000-920000.avro,AVRO,0,Row(transactionTime_hour=484867),10000,100024,,,,,,,,,,0,,,,"Row(amount=Row(column_size=None, value_count=None, null_value_count=None, nan_value_count=None, lower_bound=None, upper_bound=None), itemCode=Row(column_size=None, value_count=None, null_value_count=None, nan_value_count=None, lower_bound=None, upper_bound=None), kafka_offset=Row(column_size=None, value_count=None, null_value_count=None, nan_value_count=None, lower_bound=None, upper_bound=None), kafka_ts=Row(column_size=None, value_count=None, null_value_count=None, nan_value_count=None, lower_bound=None, upper_bound=None), paymentTermCode=Row(column_size=None, value_count=None, null_value_count=None, nan_value_count=None, lower_bound=None, upper_bound=None), storeId=Row(column_size=None, value_count=None, null_value_count=None, nan_value_count=None, lower_bound=None, upper_bound=None), transactionTime=Row(column_size=None, value_count=None, null_value_count=None, nan_value_count=None, lower_bound=None, upper_bound=None))"
0,s3://transactions/iceberg_data/10000/0-901000-911000.avro,AVRO,0,Row(transactionTime_hour=484866),10000,100024,,,,,,,,,,0,,,,"Row(amount=Row(column_size=None, value_count=None, null_value_count=None, nan_value_count=None, lower_bound=None, upper_bound=None), itemCode=Row(column_size=None, value_count=None, null_value_count=None, nan_value_count=None, lower_bound=None, upper_bound=None), kafka_offset=Row(column_size=None, value_count=None, null_value_count=None, nan_value_count=None, lower_bound=None, upper_bound=None), kafka_ts=Row(column_size=None, value_count=None, null_value_count=None, nan_value_count=None, lower_bound=None, upper_bound=None), paymentTermCode=Row(column_size=None, value_count=None, null_value_count=None, nan_value_count=None, lower_bound=None, upper_bound=None), storeId=Row(column_size=None, value_count=None, null_value_count=None, nan_value_count=None, lower_bound=None, upper_bound=None), transactionTime=Row(column_size=None, value_count=None, null_value_count=None, nan_value_count=None, lower_bound=None, upper_bound=None))"
0,s3://transactions/iceberg_data/10000/0-892000-902000.avro,AVRO,0,Row(transactionTime_hour=484865),10000,100024,,,,,,,,,,0,,,,"Row(amount=Row(column_size=None, value_count=None, null_value_count=None, nan_value_count=None, lower_bound=None, upper_bound=None), itemCode=Row(column_size=None, value_count=None, null_value_count=None, nan_value_count=None, lower_bound=None, upper_bound=None), kafka_offset=Row(column_size=None, value_count=None, null_value_count=None, nan_value_count=None, lower_bound=None, upper_bound=None), kafka_ts=Row(column_size=None, value_count=None, null_value_count=None, nan_value_count=None, lower_bound=None, upper_bound=None), paymentTermCode=Row(column_size=None, value_count=None, null_value_count=None, nan_value_count=None, lower_bound=None, upper_bound=None), storeId=Row(column_size=None, value_count=None, null_value_count=None, nan_value_count=None, lower_bound=None, upper_bound=None), transactionTime=Row(column_size=None, value_count=None, null_value_count=None, nan_value_count=None, lower_bound=None, upper_bound=None))"
0,s3://transactions/iceberg_data/10000/0-883000-893000.avro,AVRO,0,Row(transactionTime_hour=484864),10000,100024,,,,,,,,,,0,,,,"Row(amount=Row(column_size=None, value_count=None, null_value_count=None, nan_value_count=None, lower_bound=None, upper_bound=None), itemCode=Row(column_size=None, value_count=None, null_value_count=None, nan_value_count=None, lower_bound=None, upper_bound=None), kafka_offset=Row(column_size=None, value_count=None, null_value_count=None, nan_value_count=None, lower_bound=None, upper_bound=None), kafka_ts=Row(column_size=None, value_count=None, null_value_count=None, nan_value_count=None, lower_bound=None, upper_bound=None), paymentTermCode=Row(column_size=None, value_count=None, null_value_count=None, nan_value_count=None, lower_bound=None, upper_bound=None), storeId=Row(column_size=None, value_count=None, null_value_count=None, nan_value_count=None, lower_bound=None, upper_bound=None), transactionTime=Row(column_size=None, value_count=None, null_value_count=None, nan_value_count=None, lower_bound=None, upper_bound=None))"
0,s3://transactions/iceberg_data/2000/0-1000000-1002000.avro,AVRO,0,Row(transactionTime_hour=484877),2000,100024,,,,,,,,,,0,,,,"Row(amount=Row(column_size=None, value_count=None, null_value_count=None, nan_value_count=None, lower_bound=None, upper_bound=None), itemCode=Row(column_size=None, value_count=None, null_value_count=None, nan_value_count=None, lower_bound=None, upper_bound=None), kafka_offset=Row(column_size=None, value_count=None, null_value_count=None, nan_value_count=None, lower_bound=None, upper_bound=None), kafka_ts=Row(column_size=None, value_count=None, null_value_count=None, nan_value_count=None, lower_bound=None, upper_bound=None), paymentTermCode=Row(column_size=None, value_count=None, null_value_count=None, nan_value_count=None, lower_bound=None, upper_bound=None), storeId=Row(column_size=None, value_count=None, null_value_count=None, nan_value_count=None, lower_bound=None, upper_bound=None), transactionTime=Row(column_size=None, value_count=None, null_value_count=None, nan_value_count=None, lower_bound=None, upper_bound=None))"
0,s3://transactions/iceberg_data/10000/0-991000-1001000.avro,AVRO,0,Row(transactionTime_hour=484876),10000,100024,,,,,,,,,,0,,,,"Row(amount=Row(column_size=None, value_count=None, null_value_count=None, nan_value_count=None, lower_bound=None, upper_bound=None), itemCode=Row(column_size=None, value_count=None, null_value_count=None, nan_value_count=None, lower_bound=None, upper_bound=None), kafka_offset=Row(column_size=None, value_count=None, null_value_count=None, nan_value_count=None, lower_bound=None, upper_bound=None), kafka_ts=Row(column_size=None, value_count=None, null_value_count=None, nan_value_count=None, lower_bound=None, upper_bound=None), paymentTermCode=Row(column_size=None, value_count=None, null_value_count=None, nan_value_count=None, lower_bound=None, upper_bound=None), storeId=Row(column_size=None, value_count=None, null_value_count=None, nan_value_count=None, lower_bound=None, upper_bound=None), transactionTime=Row(column_size=None, value_count=None, null_value_count=None, nan_value_count=None, lower_bound=None, upper_bound=None))"


In [10]:
# We can now query our Kafka data directly. This query fetches data for a single daya and performs a common aggregation on it.

In [11]:
%%sql
-- Total taken in each currency - for single day - 21/04/2025.    
    
select 
    round(sum(t.amount),2) as total_taken,
    p.currency  
from 
    transactions t 
join 
    payment_terms p 
on 
    t.paymenttermcode=p.termcode
where 
    t.transactiontime between '2025-04-21 00:00:00' AND '2025-04-21 23:59:59'
group by 
    p.currency 
order by 
    p.currency asc;

                                                                                

total_taken,currency
345407.66,ADP
197153.0,AED
413853.0,AFA
209440.63,AFN
72520.84,ALL
613439.15,AMD
551251.97,ANG
135897.28,AOA
204507.53,ARS
349316.79,ATS


In [12]:
# After the above query has executed please navigate to: http://localhost:4041/SQL/ and look at the details for the 
# latest completed query. You should see only 24 files were read for the job "BatchScan isk.isk.transactions". This demonstrates 
# that partitioning is working correctly (1 file per hour for 1 day = 24 files required to be read).

# Now we will remove the time bounds and run the query again

In [13]:
%%sql
-- Total taken in each currency - without time bounds
    
select 
    round(sum(t.amount),2) as total_taken,
    p.currency  
from 
    transactions t 
join 
    payment_terms p 
on 
    t.paymenttermcode=p.termcode
--where 
--    t.transactiontime between '2025-04-21 00:00:00' AND '2025-04-21 23:59:59'
group by 
    p.currency 
order by 
    p.currency asc;

                                                                                

total_taken,currency
1575366.03,ADP
947567.82,AED
1910102.22,AFA
947117.13,AFN
323531.48,ALL
2863976.84,AMD
2509419.46,ANG
631476.39,AOA
954092.39,ARS
1595290.19,ATS


In [None]:
# returning to the latest query http://localhost:4041/SQL/ you will see this query read a far greater number of files.

In [None]:
# Please return to the demo script.