# Spark Stream Analytics Example - Detecting Fraudulent Credit Card Transactions

This is a demo of stream analytics implemented in the Apache Structured streaming platform.

The following topics have been covered in this notebook:

  1. Initialising the environment
  1. Starting a Spark session
  1. Load data
  1. Basic stream calculations
  1. Updating stream data
  2. Writing streams to database


---
## Initialising the environment

### 1.1 Source the libraries for Apache Spark

When running in a jupyter notebook, sometimes the required libraries may not exist in the classpath.

Load essential spark libraries from maven public repositories at runtime like this:

In [1]:
import $ivy.`org.apache.spark::spark-core:3.4.0`
import $ivy.`org.apache.spark::spark-mllib-local:3.4.0`
import $ivy.`org.apache.spark::spark-mllib:3.4.0`
import $ivy.`org.apache.spark::spark-graphx:3.4.0`
import $ivy.`org.apache.spark::spark-streaming:3.4.0`
import $ivy.`org.apache.spark::spark-tags:3.4.0`

[32mimport [39m[36m$ivy.$                                   
[39m
[32mimport [39m[36m$ivy.$                                          
[39m
[32mimport [39m[36m$ivy.$                                    
[39m
[32mimport [39m[36m$ivy.$                                     
[39m
[32mimport [39m[36m$ivy.$                                        
[39m
[32mimport [39m[36m$ivy.$                                   [39m

In [2]:
import $ivy.`org.apache.spark:spark-sql-kafka-0-10_2.13:3.4.0`
// https://mvnrepository.com/artifact/org.apache.spark/spark-sql-kafka-0-10
import $ivy.`org.apache.kafka:kafka-clients:3.4.0`

[32mimport [39m[36m$ivy.$                                                 
// https://mvnrepository.com/artifact/org.apache.spark/spark-sql-kafka-0-10
[39m
[32mimport [39m[36m$ivy.$                                     [39m

In [3]:
import $ivy.`com.redislabs:spark-redis_2.12:3.1.0`

[32mimport [39m[36m$ivy.$                                     [39m

In [4]:
import $ivy.`io.lettuce:lettuce-core:6.2.4.RELEASE`

[32mimport [39m[36m$ivy.$                                      [39m

In [5]:
//import $ivy.`org.scoverage::scalac-scoverage-plugin:1.4.2:Test`
//import $ivy.`org.scalatest::scalatest:3.2.15:Test`
import $ivy.`org.scalactic::scalactic:3.2.15`
import $ivy.`com.typesafe:config:1.4.2`
import $ivy.`org.postgresql:postgresql:42.5.1`
import $ivy.`com.microsoft.sqlserver:mssql-jdbc:11.2.3.jre11`

[32mimport [39m[36m$ivy.$                                
[39m
[32mimport [39m[36m$ivy.$                          
[39m
[32mimport [39m[36m$ivy.$                                 
[39m
[32mimport [39m[36m$ivy.$                                                [39m

In [6]:
import $ivy.`org.scalanlp::breeze-viz:2.1.0`
import $ivy.`org.jfree:jfreechart:1.5.4`
import $ivy.`org.creativescala::doodle-core:0.18.0`
import $ivy.`com.fasterxml.jackson.core:jackson-databind:2.15.1`

[32mimport [39m[36m$ivy.$                               
[39m
[32mimport [39m[36m$ivy.$                           
[39m
[32mimport [39m[36m$ivy.$                                      
[39m
[32mimport [39m[36m$ivy.$                                                   [39m

In [6]:
//import $ivy.`org.slf4j:slf4j-jdk14:2.0.7`

---

### 1.2 Import the Spark Libraries

In [7]:
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.ml.linalg.{Matrix, Vectors}
import org.apache.spark.sql.Row
import org.apache.spark.sql.Dataset
import org.apache.spark.sql.functions.{col, udf, _}
//import org.apache.spark._
import org.apache.spark.sql._
import org.apache.spark.graphx._

import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel
import org.apache.spark.graphx.util.GraphGenerators
import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.sql.types.{LongType, IntegerType, BinaryType, DoubleType, StructType}
import org.apache.spark.sql.kafka010._
import org.apache.spark.sql.expressions.scalalang.typed

[32mimport [39m[36morg.apache.spark.SparkContext
[39m
[32mimport [39m[36morg.apache.spark.SparkConf
[39m
[32mimport [39m[36morg.apache.spark.sql.SparkSession
[39m
[32mimport [39m[36morg.apache.spark.ml.linalg.{Matrix, Vectors}
[39m
[32mimport [39m[36morg.apache.spark.sql.Row
[39m
[32mimport [39m[36morg.apache.spark.sql.Dataset
[39m
[32mimport [39m[36morg.apache.spark.sql.functions.{col, udf, _}
//import org.apache.spark._
[39m
[32mimport [39m[36morg.apache.spark.sql._
[39m
[32mimport [39m[36morg.apache.spark.graphx._

[39m
[32mimport [39m[36morg.apache.spark.rdd.RDD
[39m
[32mimport [39m[36morg.apache.spark.storage.StorageLevel
[39m
[32mimport [39m[36morg.apache.spark.graphx.util.GraphGenerators
[39m
[32mimport [39m[36morg.apache.spark.sql.streaming.Trigger
[39m
[32mimport [39m[36morg.apache.spark.sql.types.{LongType, IntegerType, BinaryType, DoubleType, StructType}
[39m
[32mimport [39m[36morg.apache.spark.sql.kafka010._
[39m


In [8]:
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import org.apache.kafka.common.serialization.{ByteArrayDeserializer, ByteArraySerializer, StringDeserializer, StringSerializer}
import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer}

[32mimport [39m[36morg.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
[39m
[32mimport [39m[36morg.apache.kafka.common.serialization.{ByteArrayDeserializer, ByteArraySerializer, StringDeserializer, StringSerializer}
[39m
[32mimport [39m[36morg.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer}[39m

In [9]:
import com.redislabs.provider.redis.streaming._

[32mimport [39m[36mcom.redislabs.provider.redis.streaming._[39m

In [10]:
import java.nio.ByteBuffer
import java.sql.Timestamp
import scala.io.Source
import scala.jdk.CollectionConverters._

[32mimport [39m[36mjava.nio.ByteBuffer
[39m
[32mimport [39m[36mjava.sql.Timestamp
[39m
[32mimport [39m[36mscala.io.Source
[39m
[32mimport [39m[36mscala.jdk.CollectionConverters._[39m

In [11]:
import com.fasterxml.jackson.core.`type`.TypeReference
import com.fasterxml.jackson.module.scala.DefaultScalaModule
import breeze.linalg._
import breeze.plot._
import com.typesafe.config.{Config, ConfigFactory}
import scala.annotation.tailrec

[32mimport [39m[36mcom.fasterxml.jackson.core.`type`.TypeReference
[39m
[32mimport [39m[36mcom.fasterxml.jackson.module.scala.DefaultScalaModule
[39m
[32mimport [39m[36mbreeze.linalg._
[39m
[32mimport [39m[36mbreeze.plot._
[39m
[32mimport [39m[36mcom.typesafe.config.{Config, ConfigFactory}
[39m
[32mimport [39m[36mscala.annotation.tailrec[39m

In [12]:
val appName = "Spark_Streaming_1"

[36mappName[39m: [32mString[39m = [32m"Spark_Streaming_1"[39m

### 1.3 Setup the Logger

To control the volume of log messages, change the log4j configuraiton programatically like this:

In [13]:
import org.apache.log4j.{Level, Logger}
//Logger.getLogger("org").setLevel(Level.INFO)

val logger: Logger = Logger.getLogger(appName)
Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
Logger.getLogger("org.apache.spark.storage.BlockManager").setLevel(Level.ERROR)
logger.setLevel(Level.INFO)

[32mimport [39m[36morg.apache.log4j.{Level, Logger}
//Logger.getLogger("org").setLevel(Level.INFO)

[39m
[36mlogger[39m: [32mLogger[39m = org.apache.log4j.Logger@5da67448

---
## 2. Starting a Spark session

### 2.1 Initialise Spark Session

In [13]:
// close the spark session and spark context before starting a new one, if re-executing the notebook.

//spark.stop()
//sc.stop()

In [14]:
val sparkConf = new SparkConf()
             .setAppName(appName)
             .setMaster("local[6]")
             //.setMaster("spark://sparkmaster320:7077")
             .set("spark.driver.extraClassPath", "/mnt/shared/lib/spark-sql-kafka-0-10_2.13-3.4.0.jar")
             .set("spark.executor.extraClassPath", "/mnt/shared/lib/spark-sql-kafka-0-10_2.13-3.4.0.jar")
            .set("spark.redis.host", "localhost")
            .set("spark.redis.port", "6379")
             .set("spark.default.parallelism", "6")

Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
23/06/26 00:24:43 WARN Utils: Your hostname, icy resolves to a loopback address: 127.0.1.1; using 10.0.2.15 instead (on interface enp0s3)
23/06/26 00:24:43 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


[36msparkConf[39m: [32mSparkConf[39m = org.apache.spark.SparkConf@4c4cc1ef

In [15]:
// Apply the config to start a spark session:
val spark = org.apache.spark.sql.SparkSession.builder()
    .config(sparkConf)
    .getOrCreate()

// For implicit conversions like converting RDDs to DataFrames
import spark.implicits._

23/06/26 00:24:44 INFO SparkContext: Running Spark version 3.4.0
23/06/26 00:24:44 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
23/06/26 00:24:44 INFO ResourceUtils: No custom resources configured for spark.driver.
23/06/26 00:24:44 INFO SparkContext: Submitted application: Spark_Streaming_1
23/06/26 00:24:44 INFO ResourceProfile: Default ResourceProfile created, executor resources: Map(cores -> name: cores, amount: 1, script: , vendor: , memory -> name: memory, amount: 1024, script: , vendor: , offHeap -> name: offHeap, amount: 0, script: , vendor: ), task resources: Map(cpus -> name: cpus, amount: 1.0)
23/06/26 00:24:44 INFO ResourceProfile: Limiting resource is cpu
23/06/26 00:24:44 INFO ResourceProfileManager: Added ResourceProfile id: 0
23/06/26 00:24:44 INFO SecurityManager: Changing view acls to: notebooker
23/06/26 00:24:44 INFO SecurityManager: Changing modify acls to: notebooker
23/06/26 00:24:44 

[36mspark[39m: [32mSparkSession[39m = org.apache.spark.sql.SparkSession@478ff477
[32mimport [39m[36mspark.implicits._[39m

### Set logging preferences

In [16]:
Logger.getLogger("org.apache.spark.storage.BlockManager").setLevel(Level.ERROR)
Logger.getLogger("org.apache.spark.storage.BlockManagerMaster").setLevel(Level.ERROR)
Logger.getLogger("org.apache.spark.storage.BlockManagerMasterEndpoint").setLevel(Level.ERROR)
Logger.getLogger("org.apache.spark.storage.BlockManagerInfo").setLevel(Level.ERROR)
Logger.getLogger("org.apache.spark.storage.DiskBlockManager").setLevel(Level.ERROR)
Logger.getLogger("org.apache.spark.storage.memory.MemoryStore").setLevel(Level.ERROR)
Logger.getLogger("org.apache.spark.storage.ShuffleBlockFetcherIterator").setLevel(Level.ERROR)

Logger.getLogger("org.apache.spark.scheduler.OutputCommitCoordinator$OutputCommitCoordinatorEndpoint").setLevel(Level.ERROR)
Logger.getLogger("org.apache.spark.scheduler.DAGScheduler").setLevel(Level.ERROR)
Logger.getLogger("org.apache.spark.scheduler.TaskSchedulerImpl").setLevel(Level.ERROR)
Logger.getLogger("org.apache.spark.scheduler.TaskSetManager").setLevel(Level.ERROR)
Logger.getLogger("org.apache.spark.executor.Executor").setLevel(Level.ERROR)

Logger.getLogger("org.apache.spark.SparkContext").setLevel(Level.ERROR)
Logger.getLogger("org.apache.spark.ui.JettyUtils").setLevel(Level.ERROR)
Logger.getLogger("org.apache.spark.network.netty.NettyBlockTransferService").setLevel(Level.ERROR)
Logger.getLogger("org.apache.spark.SparkEnv").setLevel(Level.ERROR)
Logger.getLogger("org.apache.spark.util.Utils").setLevel(Level.ERROR)
Logger.getLogger("org.apache.spark.rdd.HadoopRDD").setLevel(Level.ERROR)
Logger.getLogger("org.apache.spark.MapOutputTrackerMasterEndpoint").setLevel(Level.ERROR)
Logger.getLogger("org.apache.spark.mapred.SparkHadoopMapRedUtil").setLevel(Level.ERROR)
Logger.getLogger("org.apache.spark.internal.io.HadoopMapRedCommitProtocol").setLevel(Level.ERROR)
Logger.getLogger("org.apache.spark.internal.io.SparkHadoopWriter").setLevel(Level.ERROR)
Logger.getLogger("org.apache.spark.rdd.MapPartitionsRDD").setLevel(Level.ERROR)
Logger.getLogger("org.apache.hadoop.mapred.FileOutputCommitter").setLevel(Level.ERROR)

Logger.getLogger("org.apache.spark.sql.internal.SharedState").setLevel(Level.ERROR)
Logger.getLogger("org.apache.spark.sql.types.Metadata").setLevel(Level.ERROR)
Logger.getLogger("org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec").setLevel(Level.ERROR)
Logger.getLogger("org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask").setLevel(Level.ERROR)
Logger.getLogger("org.apache.spark.sql.execution.aggregate.HashAggregateExec").setLevel(Level.ERROR)
Logger.getLogger("org.apache.spark.sql.kafka010.KafkaOffsetReaderAdmin").setLevel(Level.ERROR)
Logger.getLogger("org.apache.spark.sql.kafka010.KafkaMicroBatchStream").setLevel(Level.ERROR)
Logger.getLogger("org.apache.spark.sql.execution.streaming.state.StateStoreCoordinatorRef").setLevel(Level.ERROR)
Logger.getLogger("org.apache.spark.sql.execution.streaming.state.StateStore").setLevel(Level.ERROR)
Logger.getLogger("org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider").setLevel(Level.ERROR)
Logger.getLogger("org.apache.spark.sql.execution.streaming.ResolveWriteToStream").setLevel(Level.ERROR)
Logger.getLogger("org.apache.spark.sql.execution.streaming.CheckpointFileManager").setLevel(Level.ERROR)
Logger.getLogger("org.apache.spark.sql.execution.streaming.MicroBatchExecution").setLevel(Level.ERROR)
Logger.getLogger("org.apache.spark.sql.execution.streaming.OffsetSeqLog").setLevel(Level.ERROR)

Logger.getLogger("org.apache.kafka.clients.consumer.ConsumerConfig").setLevel(Level.ERROR)
Logger.getLogger("org.apache.kafka.clients.consumer.ConsumerRecord").setLevel(Level.ERROR)
Logger.getLogger("org.apache.kafka.clients.consumer.KafkaConsumer").setLevel(Level.ERROR)
Logger.getLogger("org.apache.kafka.clients.consumer.internals.SubscriptionState").setLevel(Level.ERROR)
Logger.getLogger("org.apache.kafka.clients.consumer.internals.ConsumerCoordinator").setLevel(Level.ERROR)
Logger.getLogger("org.apache.kafka.clients.Metadata").setLevel(Level.ERROR)
Logger.getLogger("org.apache.kafka.clients.NetworkClient").setLevel(Level.ERROR)
Logger.getLogger("org.apache.kafka.common.utils.AppInfoParser").setLevel(Level.ERROR)
Logger.getLogger("org.apache.kafka.common.metrics.Metrics").setLevel(Level.ERROR)
Logger.getLogger("org.apache.kafka.clients.admin.AdminClientConfig").setLevel(Level.ERROR)
// reset log level for specific objects:

In [17]:
val sc = spark.sparkContext

[36msc[39m: [32mSparkContext[39m = org.apache.spark.SparkContext@2c2ab944

## Connect to Text Stream

Run netcat on console and enter text lines, this will feed data to the stream:

> nc -lk 9999

// Create DataFrame representing the stream of input lines from connection to localhost:9999
val lines = spark.readStream
  .format("socket")
  .option("host", "localhost")
  .option("port", 9999)
  .load()

// Split the lines into words
val words = lines.as[String].flatMap(_.split(" "))

// Generate running word count
val wordCounts = words.groupBy("value").count()

// Start running the query that prints the running counts to the console
val query = wordCounts.writeStream
  .outputMode("complete")
  .format("console")
  .start()

//query.awaitTermination()

---

## Connect to Kafka stream

Data can be loaded into stream from Kafka.
The following commands can be used while testing to publish simple text messages via the console.

First create a topic in Kafka server:
> /opt/kafka/bin/kafka-topics.sh --bootstrap-server localhost:9092 --topic topic1 --create

Send some messages to the topic:
> /opt/kafka/bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic topic1

Enter some text to send as messages to this topic.

In [18]:
// Subscribe to 1 topic
val df_src = spark
    .readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "localhost:9092")
    .option("subscribe", "credit_card_txns")
    //.option("includeHeaders", "true")  // Whether to include the Kafka headers in the row.
    .option("minPartitions", 6)
    .option("startingOffsets", "earliest") // Read from earliest starting offset
    //.option("startingOffsets", "latest") // Read from latest starting offset
    .load()

[36mdf_src[39m: [32mDataFrame[39m = [key: binary, value: binary ... 5 more fields]

In [18]:
// df_src.printSchema()

// root
//  |-- key: binary (nullable = true)
//  |-- value: binary (nullable = true)
//  |-- topic: string (nullable = true)
//  |-- partition: integer (nullable = true)
//  |-- offset: long (nullable = true)
//  |-- timestamp: timestamp (nullable = true)
//  |-- timestampType: integer (nullable = true)

In [19]:
case class sparkMsgKV(partition: Integer, offset: Long, timestamp: Timestamp, key: Array[Byte], value: Array[Byte])

val df_binary_kv = df_src.selectExpr("partition", "offset", "timestamp", "key", "value").as[sparkMsgKV]

df_binary_kv.printSchema()

root
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)



defined [32mclass[39m [36msparkMsgKV[39m
[36mdf_binary_kv[39m: [32mDataset[39m[[32msparkMsgKV[39m] = [partition: int, offset: bigint ... 3 more fields]

## Extract individual fields from the binary message

In [20]:
def kvExpand(row: sparkMsgKV) = {
    
    //apply transformation on these columns and derive multiple columns
    val buffer1 = ByteBuffer.wrap(row.key)
    val txnID = buffer1.getLong
    
    val buffer2 = ByteBuffer.wrap(row.value)
    val EventTime = new java.sql.Timestamp(buffer2.getLong)
    val V4 = buffer2.getDouble
    val V9 = buffer2.getDouble
    val V11 = buffer2.getDouble
    val V14 = buffer2.getDouble
    val V16 = buffer2.getDouble
    val Amount = buffer2.getDouble
    val IsFraud = buffer2.getInt
    val LocationID = buffer2.getLong
    val AccNo = buffer2.getLong
    val IsRedemption = buffer2.getInt
    
    (txnID, EventTime, V4, V9, V11, V14, V16, Amount, IsFraud, LocationID, AccNo, IsRedemption, row.partition, row.timestamp)
}

defined [32mfunction[39m [36mkvExpand[39m

In [21]:
val expandedColNames = Seq(
    "TxnID", "EventTime", "V4", "V9", "V11", "V14", "V16", "Amount", "IsFraud", "LocationID", "AccNo", "IsRedemption", "partition", "msgTime"
)

val df_expanded = df_binary_kv.map( row => kvExpand(row) ).toDF(expandedColNames: _*)

[36mexpandedColNames[39m: [32mSeq[39m[[32mString[39m] = [33mList[39m(
  [32m"TxnID"[39m,
  [32m"EventTime"[39m,
  [32m"V4"[39m,
  [32m"V9"[39m,
  [32m"V11"[39m,
  [32m"V14"[39m,
  [32m"V16"[39m,
  [32m"Amount"[39m,
  [32m"IsFraud"[39m,
  [32m"LocationID"[39m,
  [32m"AccNo"[39m,
  [32m"IsRedemption"[39m,
  [32m"partition"[39m,
  [32m"msgTime"[39m
)
[36mdf_expanded[39m: [32mDataFrame[39m = [TxnID: bigint, EventTime: timestamp ... 12 more fields]

## Write to Redis Server

In [26]:
val redisURI:io.lettuce.core.RedisURI = io.lettuce.core.RedisURI.Builder.redis("localhost", 6379)
    //.auth("password")
    //.database(1)
    .build();

val redisClient: io.lettuce.core.RedisClient = io.lettuce.core.RedisClient.create(redisURI);

val connection: io.lettuce.core.api.StatefulRedisConnection[String, String] = redisClient.connect();
val asyncCommands: io.lettuce.core.api.async.RedisAsyncCommands[String, String] = connection.async();

In [42]:
val syncCommands: io.lettuce.core.api.sync.RedisCommands[String, String] = connection.sync();

[36msyncCommands[39m: [32mio[39m.[32mlettuce[39m.[32mcore[39m.[32mapi[39m.[32msync[39m.[32mRedisCommands[39m[[32mString[39m, [32mString[39m] = io.lettuce.core.FutureSyncInvocationHandler@7fbe709e

In [36]:
val result: io.lettuce.core.RedisFuture[String] = asyncCommands.get("mycounter");

[36mresult[39m: [32mio[39m.[32mlettuce[39m.[32mcore[39m.[32mRedisFuture[39m[[32mString[39m] = AsyncCommand [type=GET, output=ValueOutput [output=null, error='null'], commandType=io.lettuce.core.protocol.Command]

In [37]:
if (result != null && result.isDone() && !result.isCancelled()) {
    println(result.get())
}

10


In [54]:
case class Person(name: String, age: Int)
val personSeq = Seq(Person("John", 30), Person("Peter", 45))
val df = spark.createDataFrame(personSeq).as[Person]
df.show()

+-----+---+
| name|age|
+-----+---+
| John| 30|
|Peter| 45|
+-----+---+



defined [32mclass[39m [36mPerson[39m
[36mpersonSeq[39m: [32mSeq[39m[[32mPerson[39m] = [33mList[39m(
  [33mPerson[39m(name = [32m"John"[39m, age = [32m30[39m),
  [33mPerson[39m(name = [32m"Peter"[39m, age = [32m45[39m)
)
[36mdf[39m: [32mDataset[39m[[32mPerson[39m] = [name: string, age: int]

In [53]:
df.collect().foreach(x => asyncCommands.set(x.name, x.age.toString))

23/06/26 01:01:37 INFO CodeGenerator: Code generated in 9.923091 ms


In [55]:
syncCommands.get("Peter")

[36mres54[39m: [32mString[39m = [32m"45"[39m

In [55]:
//df.map(x => {asyncCommands.set(x.name, x.age.toString)} )

## Query to view Transactions

In [None]:
val listAllQuery:streaming.StreamingQuery = df_expanded.writeStream
    .format("console")
    .trigger(Trigger.ProcessingTime("1 seconds"))
    .outputMode("append")
    .start()

listAllQuery.awaitTermination()

23/06/24 22:03:25 INFO CodeGenerator: Code generated in 384.424493 ms
23/06/24 22:03:27 INFO CodeGenerator: Code generated in 55.177078 ms
23/06/24 22:03:27 INFO CodeGenerator: Code generated in 48.721723 ms


-------------------------------------------
Batch: 0
-------------------------------------------


23/06/24 22:03:28 INFO CodeGenerator: Code generated in 17.518396 ms
23/06/24 22:03:28 INFO CodeGenerator: Code generated in 30.702781 ms


+-----+--------------------+------------+------------+------------+------------+------------+-------+-------+----------+-----+------------+---------+--------------------+
|TxnID|           EventTime|          V4|          V9|         V11|         V14|         V16| Amount|IsFraud|LocationID|AccNo|IsRedemption|partition|             msgTime|
+-----+--------------------+------------+------------+------------+------------+------------+-------+-------+----------+-----+------------+---------+--------------------+
|    7|2021-04-05 14:07:...| 1.202612737| 0.464959995|-1.416907243| 0.167371963|-0.443586798| 499.73|      0|        11|   47|           0|        5|2023-06-24 18:51:...|
|   17|2021-04-05 14:20:...|  1.28909147| 0.782332892| -0.45031128|-0.468647288|-0.246634656|1299.55|      0|         5|   65|           0|        5|2023-06-24 19:13:...|
|   26|2021-04-05 14:37:...| 0.410007514| 0.475664018|-0.856566364|-0.279796856| -0.33332061|2643.27|      0|         7|   53|           0|      

23/06/24 22:03:28 WARN ProcessingTimeExecutor: Current batch is falling behind. The trigger interval is 1000 milliseconds, but spent 5907 milliseconds


-------------------------------------------
Batch: 1
-------------------------------------------
+-----+--------------------+------------+------------+------------+------------+------------+-------+-------+----------+-----+------------+---------+--------------------+
|TxnID|           EventTime|          V4|          V9|         V11|         V14|         V16| Amount|IsFraud|LocationID|AccNo|IsRedemption|partition|             msgTime|
+-----+--------------------+------------+------------+------------+------------+------------+-------+-------+----------+-----+------------+---------+--------------------+
| 1291|2021-04-06 17:47:...| 1.346392066| 0.203681958| 1.132442591| 0.524115414|-0.526159908|2043.13|      0|        16|   33|           0|        5|2023-06-24 22:03:...|
| 1293|2021-04-06 17:49:...|-0.003437134|-0.214680727|-0.095228433| 0.104814624|  0.08652948| 495.97|      0|         3|   91|           0|        5|2023-06-24 22:03:...|
| 1294|2021-04-06 17:49:...| 0.559108426|-0.0885

23/06/24 22:03:56 WARN ProcessingTimeExecutor: Current batch is falling behind. The trigger interval is 1000 milliseconds, but spent 1231 milliseconds


-------------------------------------------
Batch: 2
-------------------------------------------
+-----+--------------------+------------+------------+------------+------------+------------+--------+-------+----------+-----+------------+---------+--------------------+
|TxnID|           EventTime|          V4|          V9|         V11|         V14|         V16|  Amount|IsFraud|LocationID|AccNo|IsRedemption|partition|             msgTime|
+-----+--------------------+------------+------------+------------+------------+------------+--------+-------+----------+-----+------------+---------+--------------------+
| 1297|2021-04-06 18:02:...|-0.048508381| 0.092823192|-0.011986416|-0.915538603| 0.199460912|  999.38|      0|        20|    7|           0|        0|2023-06-24 22:03:...|
| 1298|2021-04-06 18:12:...| 0.863117413| 0.834693947| 1.631862587|-4.440848307| 1.060899727|  145.02|      0|         9|   50|           0|        0|2023-06-24 22:03:...|
| 1295|2021-04-06 17:54:...|-1.024341985| -

23/06/24 22:03:57 WARN ProcessingTimeExecutor: Current batch is falling behind. The trigger interval is 1000 milliseconds, but spent 1018 milliseconds


-------------------------------------------
Batch: 3
-------------------------------------------
+-----+--------------------+------------+------------+------------+------------+------------+--------+-------+----------+-----+------------+---------+--------------------+
|TxnID|           EventTime|          V4|          V9|         V11|         V14|         V16|  Amount|IsFraud|LocationID|AccNo|IsRedemption|partition|             msgTime|
+-----+--------------------+------------+------------+------------+------------+------------+--------+-------+----------+-----+------------+---------+--------------------+
| 1300|2021-04-06 18:17:...|-0.750239813|-0.876722259| 0.988920253|-0.028895529|-0.613106906| 3297.07|      0|         5|   65|           0|        0|2023-06-24 22:03:...|
| 1301|2021-04-06 18:20:...| 1.707263662| -0.08975873|-0.529581256| 0.223668547|-0.348941992|43100.52|      0|        19|   23|           0|        4|2023-06-24 22:03:...|
| 1302|2021-04-06 18:22:...|-0.932665861| 1

23/06/24 22:03:58 WARN ProcessingTimeExecutor: Current batch is falling behind. The trigger interval is 1000 milliseconds, but spent 1008 milliseconds


-------------------------------------------
Batch: 4
-------------------------------------------
+-----+--------------------+------------+------------+------------+------------+------------+--------+-------+----------+-----+------------+---------+--------------------+
|TxnID|           EventTime|          V4|          V9|         V11|         V14|         V16|  Amount|IsFraud|LocationID|AccNo|IsRedemption|partition|             msgTime|
+-----+--------------------+------------+------------+------------+------------+------------+--------+-------+----------+-----+------------+---------+--------------------+
| 1305|2021-04-06 18:24:...|-0.161764886|-0.406495333| 1.153530606| 0.081534199| 0.455390916|  951.84|      0|        10|   47|           0|        5|2023-06-24 22:03:...|
| 1304|2021-04-06 18:24:...|-0.589063455|-0.228442576|-0.746476542| 0.816854008| 0.734554783|15800.34|      0|         7|   53|           0|        0|2023-06-24 22:03:...|
| 1307|2021-04-06 18:29:...|-2.858890227| 1

23/06/24 22:04:01 WARN ProcessingTimeExecutor: Current batch is falling behind. The trigger interval is 1000 milliseconds, but spent 1218 milliseconds


-------------------------------------------
Batch: 7
-------------------------------------------
+-----+--------------------+------------+------------+------------+------------+------------+--------+-------+----------+-----+------------+---------+--------------------+
|TxnID|           EventTime|          V4|          V9|         V11|         V14|         V16|  Amount|IsFraud|LocationID|AccNo|IsRedemption|partition|             msgTime|
+-----+--------------------+------------+------------+------------+------------+------------+--------+-------+----------+-----+------------+---------+--------------------+
| 1320|2021-04-06 18:44:...|-0.866310473| -0.67428956|-0.663034949| 0.294923938| 0.108927849|23315.46|      0|        21|   33|           0|        5|2023-06-24 22:04:...|
| 1321|2021-04-06 18:45:...|  1.30531683|-0.363527291| 1.403111135|-0.057040989|-1.074233627|15575.91|      0|        14|   34|           0|        5|2023-06-24 22:04:...|
| 1322|2021-04-06 18:47:...| 0.677032585|-0

23/06/24 22:04:02 WARN ProcessingTimeExecutor: Current batch is falling behind. The trigger interval is 1000 milliseconds, but spent 1074 milliseconds


-------------------------------------------
Batch: 8
-------------------------------------------
+-----+--------------------+------------+-----------+------------+------------+------------+--------+-------+----------+-----+------------+---------+--------------------+
|TxnID|           EventTime|          V4|         V9|         V11|         V14|         V16|  Amount|IsFraud|LocationID|AccNo|IsRedemption|partition|             msgTime|
+-----+--------------------+------------+-----------+------------+------------+------------+--------+-------+----------+-----+------------+---------+--------------------+
| 1327|2021-04-06 18:52:...| -0.30209026|0.082100546|-1.101503606|  0.51978435| 0.176891631|  100.42|      0|         8|   50|           0|        4|2023-06-24 22:04:...|
| 1325|2021-04-06 18:50:...| 0.569119287|0.598569781|  -0.6990663| 0.335024861|-0.021687251|15800.21|      0|         1|   30|           0|        3|2023-06-24 22:04:...|
| 1326|2021-04-06 18:50:...|-2.427066642|1.21257

23/06/24 22:04:44 WARN ProcessingTimeExecutor: Current batch is falling behind. The trigger interval is 1000 milliseconds, but spent 1065 milliseconds


-------------------------------------------
Batch: 14
-------------------------------------------
+-----+--------------------+-----------+------------+------------+------------+-----------+------+-------+----------+-----+------------+---------+--------------------+
|TxnID|           EventTime|         V4|          V9|         V11|         V14|        V16|Amount|IsFraud|LocationID|AccNo|IsRedemption|partition|             msgTime|
+-----+--------------------+-----------+------------+------------+------------+-----------+------+-------+----------+-----+------------+---------+--------------------+
| 1344|2021-04-06 19:05:...|0.074710601|-0.371528843|-0.239423103|-0.400583308|0.389658544|499.96|      0|         6|   57|           0|        1|2023-06-24 22:04:...|
| 1345|2021-04-06 19:07:...|-0.51667359|-0.684477828| 1.252563733| -0.21592856|0.722707777|950.83|      0|        10|   47|           0|        1|2023-06-24 22:04:...|
| 1346|2021-04-06 19:09:...|2.360856353|-1.007299665| 1.610349

23/06/24 22:06:40 INFO SparkUI: Stopped Spark web UI at http://ip-10-0-2-15.ap-south-1.compute.internal:4040
23/06/24 22:06:40 INFO ShutdownHookManager: Shutdown hook called
23/06/24 22:06:40 INFO ShutdownHookManager: Deleting directory /tmp/temporary-d26256b7-63c1-4367-b45c-358948cb98e6
23/06/24 22:06:40 INFO ShutdownHookManager: Deleting directory /tmp/spark-4a6869a4-81fb-4580-a16e-548e37491f82


In [None]:
// Running count of the number of updates for each value:
val df_locationwise_amounts = df_expanded.groupBy("LocationID").sum("Amount")  // using untyped API

val locationAmtQuery:streaming.StreamingQuery = df_locationwise_amounts.writeStream
    .format("console")
    .trigger(Trigger.ProcessingTime("1 seconds"))
    .outputMode("update")
    .start()

locationAmtQuery.awaitTermination()

23/06/24 22:08:48 INFO CodeGenerator: Code generated in 1267.533806 ms
23/06/24 22:08:49 INFO CodeGenerator: Code generated in 833.924088 ms
23/06/24 22:08:49 INFO CodeGenerator: Code generated in 165.669036 ms
23/06/24 22:08:49 INFO CodeGenerator: Code generated in 257.223396 ms
23/06/24 22:08:54 INFO CodeGenerator: Code generated in 137.789653 ms
23/06/24 22:08:55 INFO CodeGenerator: Code generated in 40.407025 ms
23/06/24 22:08:55 INFO CodeGenerator: Code generated in 25.8959 ms
23/06/24 22:08:55 INFO CodeGenerator: Code generated in 90.836712 ms
23/06/24 22:08:55 INFO CodeGenerator: Code generated in 58.123798 ms
23/06/24 22:08:55 INFO CodeGenerator: Code generated in 90.184955 ms
23/06/24 22:09:03 INFO CodeGenerator: Code generated in 70.489564 ms
23/06/24 22:09:03 INFO CodeGenerator: Code generated in 49.211957 ms


-------------------------------------------
Batch: 0
-------------------------------------------


23/06/24 22:10:29 INFO CodeGenerator: Code generated in 36.181735 ms
23/06/24 22:10:29 INFO CodeGenerator: Code generated in 64.409983 ms


+----------+------------------+
|LocationID|       sum(Amount)|
+----------+------------------+
|        19|         393746.01|
|        22|         407314.14|
|         7|418591.16000000003|
|         6| 718748.6799999999|
|         9|          396136.6|
|        17|         527596.05|
|         5|493735.07999999996|
|         1|          200071.7|
|        10|         261547.69|
|         3|         139025.88|
|        12|398008.47000000003|
|         8|         392269.83|
|        11|368336.17999999993|
|         2|         410700.12|
|         4|         277946.38|
|        13|         777811.23|
|        18|         558775.06|
|        14|358935.87000000005|
|        21|361317.04999999993|
|        15|         367797.02|
+----------+------------------+
only showing top 20 rows



23/06/24 22:10:30 WARN ProcessingTimeExecutor: Current batch is falling behind. The trigger interval is 1000 milliseconds, but spent 109805 milliseconds
23/06/24 22:20:18 INFO CodeGenerator: Code generated in 73.36491 ms


-------------------------------------------
Batch: 1
-------------------------------------------
+----------+-----------------+
|LocationID|      sum(Amount)|
+----------+-----------------+
|         6|719235.0199999999|
|         9|        396214.18|
|        17|        542316.15|
|         1|        203651.07|
+----------+-----------------+



23/06/24 22:20:31 WARN ProcessingTimeExecutor: Current batch is falling behind. The trigger interval is 1000 milliseconds, but spent 17820 milliseconds


-------------------------------------------
Batch: 2
-------------------------------------------
+----------+------------------+
|LocationID|       sum(Amount)|
+----------+------------------+
|        19|          401549.4|
|        22|         413007.99|
|         7|         441085.63|
|         9|         461999.48|
|        17|         558316.17|
|         5|497364.99999999994|
|         1|          207532.2|
|        10|         266548.41|
|         3|         155126.98|
|        12|         414078.78|
|         8|         425308.64|
|        11| 379013.3399999999|
|         2|         553464.13|
|         4|         278147.93|
|        13|         801119.69|
|        18| 562560.9900000001|
|        14|399357.00000000006|
|        15|         383797.09|
|        23|         550820.87|
|        20|         218824.05|
+----------+------------------+
only showing top 20 rows



23/06/24 22:21:06 WARN ProcessingTimeExecutor: Current batch is falling behind. The trigger interval is 1000 milliseconds, but spent 35062 milliseconds


-------------------------------------------
Batch: 3
-------------------------------------------
+----------+-----------+
|LocationID|sum(Amount)|
+----------+-----------+
|         4|  293948.19|
+----------+-----------+



23/06/24 22:21:40 WARN ProcessingTimeExecutor: Current batch is falling behind. The trigger interval is 1000 milliseconds, but spent 27515 milliseconds


-------------------------------------------
Batch: 4
-------------------------------------------
+----------+------------------+
|LocationID|       sum(Amount)|
+----------+------------------+
|        19|404044.16000000003|
|        22|         414881.18|
|         7|521949.16000000003|
|         6| 736262.8899999999|
|         9|         492562.85|
|        17| 563339.9800000001|
|         5|513165.01999999996|
|         1|209581.83000000002|
|        10|         271549.37|
|         3|         162345.35|
|        12|         433911.01|
|         8|         429009.28|
|        11|390533.54999999993|
|         2|         562464.62|
|        13|         824094.57|
|        18| 576975.7500000001|
|        14| 415357.0200000001|
|        21|385658.19999999995|
|        15|         402374.08|
|        20|         250624.33|
+----------+------------------+
only showing top 20 rows



23/06/24 22:22:10 WARN ProcessingTimeExecutor: Current batch is falling behind. The trigger interval is 1000 milliseconds, but spent 30093 milliseconds


-------------------------------------------
Batch: 5
-------------------------------------------
+----------+-----------------+
|LocationID|      sum(Amount)|
+----------+-----------------+
|         6|743910.9799999999|
|         2|        563864.53|
+----------+-----------------+



23/06/24 22:23:08 WARN ProcessingTimeExecutor: Current batch is falling behind. The trigger interval is 1000 milliseconds, but spent 22104 milliseconds


-------------------------------------------
Batch: 6
-------------------------------------------
+----------+------------------+
|LocationID|       sum(Amount)|
+----------+------------------+
|        22|         430835.17|
|         7|         564262.37|
|         9|         512526.67|
|         5|         528608.25|
|        10|         278720.08|
|         3|          171876.4|
|        12|452278.22000000003|
|         8|478680.99000000005|
|        11| 390732.3399999999|
|         2| 578696.2000000001|
|         4|         298568.35|
|        13| 827107.8999999999|
|        18| 603058.4300000002|
|        14|431157.94000000006|
|        21|          409939.1|
|        15|         414767.02|
|        23|         551221.75|
|        20|251721.88999999998|
|        16|         352080.97|
|        24|308359.20000000007|
+----------+------------------+



23/06/24 22:23:36 WARN ProcessingTimeExecutor: Current batch is falling behind. The trigger interval is 1000 milliseconds, but spent 28534 milliseconds
23/06/24 22:24:39 INFO SparkUI: Stopped Spark web UI at http://ip-10-0-2-15.ap-south-1.compute.internal:4040
23/06/24 22:24:39 INFO ShutdownHookManager: Shutdown hook called
23/06/24 22:24:39 INFO ShutdownHookManager: Deleting directory /tmp/spark-147c199b-3d33-4e31-bbdb-ebb7f2ca9585
23/06/24 22:24:39 INFO ShutdownHookManager: Deleting directory /tmp/temporary-99d84ef0-5b6b-4f2b-aadf-bf896d6480ca


In [21]:
import org.apache.spark.util.LongAccumulator

val rowsProcCounter: org.apache.spark.util.LongAccumulator = sc.longAccumulator("Rows processed Count")
rowsProcCounter.reset()

[32mimport [39m[36morg.apache.spark.util.LongAccumulator

[39m
[36mrowsProcCounter[39m: [32mLongAccumulator[39m = LongAccumulator(id: 1, name: Some(Rows processed Count), value: 0)

In [22]:
def batchOpDB(batchDF: DataFrame, batchId: scala.Long):Unit = {
    batchDF.persist()
    
    val rowCount = batchDF.count()
    rowsProcCounter.add(rowCount)
    println(s"Processing batch id $batchId of ${rowCount} rows, total = ${rowsProcCounter.value}")
    
    batchDF.groupBy("LocationID").sum("Amount").show(5)
    
    batchDF.unpersist()
}

defined [32mfunction[39m [36mbatchOpDB[39m

In [None]:
val batchOpsQuery:streaming.StreamingQuery = df_expanded.writeStream
    .foreachBatch {
        (batchDF, batchId) => batchOpDB(batchDF, batchId)
    }
    .trigger(Trigger.ProcessingTime("1 seconds"))
    .outputMode("update")
    .start()

batchOpsQuery.awaitTermination()

23/06/24 22:36:43 INFO CodeGenerator: Code generated in 421.617479 ms
23/06/24 22:36:43 INFO CodeGenerator: Code generated in 15.700915 ms
23/06/24 22:36:43 INFO CodeGenerator: Code generated in 16.511406 ms
23/06/24 22:36:43 INFO CodeGenerator: Code generated in 23.531089 ms
23/06/24 22:36:45 INFO CodeGenerator: Code generated in 70.05361 ms
23/06/24 22:36:45 INFO CodeGenerator: Code generated in 51.903098 ms
23/06/24 22:36:46 INFO CodeGenerator: Code generated in 18.397388 ms
23/06/24 22:36:46 INFO CodeGenerator: Code generated in 34.705005 ms


Processing batch id 0 of 1545 rows, total = 1545


23/06/24 22:36:47 INFO CodeGenerator: Code generated in 49.408046 ms
23/06/24 22:36:47 INFO CodeGenerator: Code generated in 67.330538 ms
23/06/24 22:36:47 INFO CodeGenerator: Code generated in 45.15933 ms
23/06/24 22:36:47 INFO CodeGenerator: Code generated in 24.804373 ms
23/06/24 22:36:47 INFO CodeGenerator: Code generated in 101.418869 ms
23/06/24 22:36:48 INFO CodeGenerator: Code generated in 69.59375 ms
23/06/24 22:36:48 INFO CodeGenerator: Code generated in 13.004723 ms
23/06/24 22:36:49 INFO CodeGenerator: Code generated in 10.964333 ms


+----------+------------------+
|LocationID|       sum(Amount)|
+----------+------------------+
|        19|404044.16000000003|
|        22| 430835.1699999999|
|         7| 564262.3700000001|
|         6| 743910.9800000001|
|         9| 512526.6699999999|
+----------+------------------+
only showing top 5 rows



23/06/24 22:36:50 WARN ProcessingTimeExecutor: Current batch is falling behind. The trigger interval is 1000 milliseconds, but spent 10135 milliseconds


Processing batch id 1 of 2 rows, total = 1547
+----------+-----------+
|LocationID|sum(Amount)|
+----------+-----------+
|         7|     377.56|
|         8|   16000.87|
+----------+-----------+



23/06/24 22:37:12 WARN ProcessingTimeExecutor: Current batch is falling behind. The trigger interval is 1000 milliseconds, but spent 3827 milliseconds


Processing batch id 2 of 13 rows, total = 1560
+----------+-----------+
|LocationID|sum(Amount)|
+----------+-----------+
|         7|     865.14|
|         6|   15499.42|
|         5|   16000.64|
|        12|     999.52|
|        11|   31600.25|
+----------+-----------+
only showing top 5 rows



23/06/24 22:37:15 WARN ProcessingTimeExecutor: Current batch is falling behind. The trigger interval is 1000 milliseconds, but spent 3052 milliseconds


Processing batch id 3 of 13 rows, total = 1573
+----------+------------------+
|LocationID|       sum(Amount)|
+----------+------------------+
|         6|            400.67|
|         9|           7900.68|
|         5|            100.86|
|         1|          15800.63|
|         8|3138.7599999999998|
+----------+------------------+
only showing top 5 rows



23/06/24 22:37:18 WARN ProcessingTimeExecutor: Current batch is falling behind. The trigger interval is 1000 milliseconds, but spent 2869 milliseconds


Processing batch id 4 of 10 rows, total = 1583
+----------+-----------+
|LocationID|sum(Amount)|
+----------+-----------+
|         6|     319.65|
|         9|      75.72|
|         1|   27993.06|
|         3|    1189.95|
|         4|     555.52|
+----------+-----------+
only showing top 5 rows



23/06/24 22:37:21 WARN ProcessingTimeExecutor: Current batch is falling behind. The trigger interval is 1000 milliseconds, but spent 2502 milliseconds


Processing batch id 5 of 8 rows, total = 1591
+----------+-----------+
|LocationID|sum(Amount)|
+----------+-----------+
|        22|   22328.64|
|         3|     269.41|
|         8|    2621.68|
|         4|   11500.93|
|        23|   18796.38|
+----------+-----------+
only showing top 5 rows



23/06/24 22:37:23 WARN ProcessingTimeExecutor: Current batch is falling behind. The trigger interval is 1000 milliseconds, but spent 2195 milliseconds


Processing batch id 6 of 4 rows, total = 1595
+----------+-----------+
|LocationID|sum(Amount)|
+----------+-----------+
|        17|    3569.66|
|         1|    9300.71|
|         2|     6621.8|
|        23|   16000.45|
+----------+-----------+



23/06/24 22:37:25 WARN ProcessingTimeExecutor: Current batch is falling behind. The trigger interval is 1000 milliseconds, but spent 2242 milliseconds
23/06/24 22:37:55 INFO CodeGenerator: Code generated in 14.82251 ms


Processing batch id 7 of 1 rows, total = 1596


23/06/24 22:37:55 INFO CodeGenerator: Code generated in 82.342261 ms


+----------+-----------+
|LocationID|sum(Amount)|
+----------+-----------+
|        16|   16000.63|
+----------+-----------+

Processing batch id 8 of 4 rows, total = 1600
+----------+-----------+
|LocationID|sum(Amount)|
+----------+-----------+
|         5|     100.02|
|        14|   17450.82|
|        21|     100.86|
|        24|    1500.63|
+----------+-----------+



23/06/24 22:37:58 WARN ProcessingTimeExecutor: Current batch is falling behind. The trigger interval is 1000 milliseconds, but spent 2182 milliseconds


Processing batch id 9 of 7 rows, total = 1607
+----------+-----------+
|LocationID|sum(Amount)|
+----------+-----------+
|         6|     654.29|
|        11|    1513.93|
|         2|     100.76|
|         4|    1777.62|
|        14|   16000.03|
+----------+-----------+
only showing top 5 rows



23/06/24 22:38:00 WARN ProcessingTimeExecutor: Current batch is falling behind. The trigger interval is 1000 milliseconds, but spent 2349 milliseconds


Processing batch id 10 of 10 rows, total = 1617
+----------+-----------+
|LocationID|sum(Amount)|
+----------+-----------+
|        22|    4354.02|
|         6|     100.54|
|         5|    2999.94|
|         1|     7187.9|
|        10|   50000.76|
+----------+-----------+
only showing top 5 rows



23/06/24 22:38:02 WARN ProcessingTimeExecutor: Current batch is falling behind. The trigger interval is 1000 milliseconds, but spent 1854 milliseconds


Processing batch id 11 of 7 rows, total = 1624
+----------+------------------+
|LocationID|       sum(Amount)|
+----------+------------------+
|        19|            100.75|
|         7|            100.88|
|        17|16180.099999999999|
|         5|            628.63|
|        21|            129.13|
+----------+------------------+
only showing top 5 rows



23/06/24 22:38:04 WARN ProcessingTimeExecutor: Current batch is falling behind. The trigger interval is 1000 milliseconds, but spent 2414 milliseconds


Processing batch id 12 of 10 rows, total = 1634
+----------+-----------+
|LocationID|sum(Amount)|
+----------+-----------+
|        19|    3415.44|
|         7|    4990.41|
|         9|     198.43|
|         5|     429.34|
|         3|   16000.05|
+----------+-----------+
only showing top 5 rows



23/06/24 22:38:07 WARN ProcessingTimeExecutor: Current batch is falling behind. The trigger interval is 1000 milliseconds, but spent 2393 milliseconds


Processing batch id 13 of 8 rows, total = 1642
+----------+-----------+
|LocationID|sum(Amount)|
+----------+-----------+
|         5|    7366.47|
|        11|   17800.39|
|         2|      99.78|
|        13|     2152.4|
|        18|    8579.58|
+----------+-----------+
only showing top 5 rows



23/06/24 22:38:09 WARN ProcessingTimeExecutor: Current batch is falling behind. The trigger interval is 1000 milliseconds, but spent 2036 milliseconds


Processing batch id 14 of 3 rows, total = 1645
+----------+-----------+
|LocationID|sum(Amount)|
+----------+-----------+
|         7|    1493.03|
|         9|    16000.7|
|        17|     200.82|
+----------+-----------+



23/06/24 22:38:11 WARN ProcessingTimeExecutor: Current batch is falling behind. The trigger interval is 1000 milliseconds, but spent 1871 milliseconds


---

### Stop the Spark Session

In [56]:
spark.stop()

23/06/26 01:16:46 INFO SparkUI: Stopped Spark web UI at http://10.0.2.15:4040


In [57]:
sc.stop()

### End of file