In [1]:
import uuid
from pyspark.sql import SparkSession

def get_spark_session(config):
    spark = SparkSession \
        .builder \
        .appName(config['app_name']) \
        .config("spark.streaming.stopGracefullyOnShutdown", True) \
        .config("spark.jars.packages", "com.datastax.spark:spark-cassandra-connector_2.12:3.4.1,org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0") \
        .config("spark.cassandra.connection.host", config['store_host']) \
        .config("spark.cassandra.connection.port",config['store_port'])\
        .config("spark.cassandra.auth.username", config['store_user']) \
        .config("spark.cassandra.auth.password", config['store_pass']) \
        .config("spark.sql.shuffle.partitions", 4) \
        .master(config['spark_uri']) \
        .getOrCreate()
    return spark

config ={
    "app_name": "store events",
    "store_host": "events-db",
    "store_port": "9042",
    "store_user": "cassandra",
    "store_pass": "cassandra",
    "spark_uri": "spark://spark-master:7077",
    "kafka_bootstrap_servers": "kafka1:19092,kafka2:19093,kafka3:19094",
    "kafka_topic": "events"
}
spark = get_spark_session(config)

In [None]:
import time
from pyspark.sql.types import StructField, StructType, StringType, IntegerType, LongType
from pyspark.sql.functions import from_json, col, to_timestamp, date_format, lit, count

events_schema = StructType([ 
    StructField('timestamp', StringType(), True),
    StructField('type', StringType(), True),
    StructField('appName', StringType(), True), 
    StructField('appInstance', LongType(), True),
    StructField('appID', StringType(), True),
    StructField('probeID', StringType(), True),
    StructField('eventID', StringType(), True),
    StructField('correletionID', LongType(), True),
    StructField('locationID', StringType(), True),
    StructField('transactionStart', LongType(), True), 
    StructField('transactionEnd', LongType(), True), 
    StructField('transactionDuration', LongType(), True), 
    StructField('clientIPAddress', StringType(), True),
    StructField('clientPort', IntegerType(), True), 
    StructField('serverIPAddress', StringType(), True), 
    StructField('serverPort', IntegerType(), True), 
    StructField('ipProtocol', StringType(), True), 
    StructField('category', StringType(), True), 
    StructField('bytesFromClient', LongType(), True), 
    StructField('bytesToClient', LongType(), True), 
    StructField('bytesFromServer', LongType(), True), 
    StructField('bytesToServer', LongType(), True), 
    StructField('subscriberID', StringType(), True), 
    StructField('applicationProtocol', StringType(), True), 
    StructField('applicationName', StringType(), True), 
    StructField('domain', StringType(), True), 
    StructField('deviceType', StringType(), True), 
    StructField('networkType', StringType(), True), 
    StructField('contentType', StringType(), True), 
    StructField('lostBytesClient', LongType(), True), 
    StructField('lostBytesServer', LongType(), True), 
    StructField('srttMsClient', LongType(), True), 
    StructField('srttMsServer', LongType(), True), 
])


def init_stream(spark, config):    
    df = spark.readStream.format("kafka") \
        .option("kafka.bootstrap.servers", config['kafka_bootstrap_servers']) \
        .option("subscribe", config['kafka_topic']) \
        .option("startingOffsets", "earliest") \
        .option("failOnDataLoss","FALSE") \
        .load() 
    return df

def process_df(df):
    batch_id = str(uuid.uuid4())
    df = df.selectExpr("CAST(value AS STRING)")\
        .select(from_json(col("value"),events_schema)\
        .alias("data"))\
        .withColumn("batchID", lit(batch_id))\
        .select(date_format(to_timestamp("data.transactionEnd"),"yyyyMMddHHmm").alias("bucket_id")\
            ,col("batchID").alias("batch_id")\
            ,col("data.probeID").alias("probe_id")\
            ,col("data.eventID").alias("event_id")\
            ,col("data.locationID").alias("location_id")\
            ,col("data.transactionStart").alias("transaction_start")\
            ,col("data.transactionEnd").alias("transaction_end")\
            ,col("data.transactionDuration").alias("transaction_duration")\
            ,col("data.clientIPAddress").alias("client_ip_address")\
            ,col("data.clientPort").alias("client_port")\
            ,col("data.serverIPAddress").alias("server_ip_address")\
        )
    return df
 
init_df = init_stream(spark, config)
df = process_df(init_df)
#df1 = df.groupBy("batch_id").agg(count("*").alias("cnt"))

query = (df.writeStream\
            .outputMode("append")\
            .format("console")\
            .start())

#query1 = (df1.writeStream\
#            .outputMode("append")\
#            .format("console")\
#            .start())


while query.isActive:
    print (query.lastProgress)
    time.sleep(10)
query.awaitTermination()
#query1.awaitTermination()

None
None
None
None
None
None
None
None
None
None
None
None
None
None
None
{'id': '70e524ad-41d7-4fd6-b083-1655eaab66e9', 'runId': 'aa0257fb-32c8-4569-b56b-f47bce79de45', 'name': None, 'timestamp': '2023-11-27T01:22:30.991Z', 'batchId': 0, 'numInputRows': 197000, 'inputRowsPerSecond': 0.0, 'processedRowsPerSecond': 1343.36195080704, 'durationMs': {'addBatch': 136465, 'commitOffsets': 292, 'getBatch': 101, 'latestOffset': 6255, 'queryPlanning': 2797, 'triggerExecution': 146626, 'walCommit': 447}, 'stateOperators': [], 'sources': [{'description': 'KafkaV2[Subscribe[events]]', 'startOffset': None, 'endOffset': {'events': {'2': 66585, '1': 63124, '0': 67291}}, 'latestOffset': {'events': {'2': 66585, '1': 63124, '0': 67291}}, 'numInputRows': 197000, 'inputRowsPerSecond': 0.0, 'processedRowsPerSecond': 1343.36195080704, 'metrics': {'avgOffsetsBehindLatest': '0.0', 'maxOffsetsBehindLatest': '0', 'minOffsetsBehindLatest': '0'}}], 'sink': {'description': 'org.apache.spark.sql.execution.streamin