# Kafka Real-time Transactions Source

Simulation of real-time data by feeding transaction data from a file into a Kafka message queue.

---

### 1. Import the required libraries

In [1]:
import $ivy.`org.apache.kafka:kafka-clients:3.4.0`
import $ivy.`org.scalanlp::breeze-viz:2.1.0`
import $ivy.`log4j:log4j:1.2.17`
//import $ivy.`org.slf4j:slf4j-jdk14:2.0.7`

[32mimport [39m[36m$ivy.$                                     
[39m
[32mimport [39m[36m$ivy.$                               
[39m
[32mimport [39m[36m$ivy.$                   
//import $ivy.`org.slf4j:slf4j-jdk14:2.0.7`[39m

In [2]:
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import org.apache.kafka.common.serialization.{ByteArrayDeserializer, ByteArraySerializer,
                                              LongDeserializer, LongSerializer,
                                              StringDeserializer, StringSerializer}
import scala.jdk.CollectionConverters._

[32mimport [39m[36morg.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
[39m
[32mimport [39m[36morg.apache.kafka.common.serialization.{ByteArrayDeserializer, ByteArraySerializer,
                                              LongDeserializer, LongSerializer,
                                              StringDeserializer, StringSerializer}
[39m
[32mimport [39m[36mscala.jdk.CollectionConverters._[39m

In [3]:
import scala.io.Source
import scala.util.control.Breaks._
import java.io.File
import java.util.Properties
import java.nio.ByteBuffer
import java.sql.Timestamp

[32mimport [39m[36mscala.io.Source
[39m
[32mimport [39m[36mscala.util.control.Breaks._
[39m
[32mimport [39m[36mjava.io.File
[39m
[32mimport [39m[36mjava.util.Properties
[39m
[32mimport [39m[36mjava.nio.ByteBuffer
[39m
[32mimport [39m[36mjava.sql.Timestamp[39m

In [4]:
import breeze.linalg._
import breeze.plot._
import org.apache.log4j.{Level, Logger}

[32mimport [39m[36mbreeze.linalg._
[39m
[32mimport [39m[36mbreeze.plot._
[39m
[32mimport [39m[36morg.apache.log4j.{Level, Logger}[39m

## 2. Initialise the Environment


In [5]:
val logger: Logger = Logger.getLogger("Kafka_Realtime_Source")
logger.setLevel(Level.INFO)

[36mlogger[39m: [32mLogger[39m = org.apache.log4j.Logger@106032

In [6]:
var topic:String = "credit_card_txns";
var broker:String = "localhost:9092";

[36mtopic[39m: [32mString[39m = [32m"credit_card_txns"[39m
[36mbroker[39m: [32mString[39m = [32m"localhost:9092"[39m

In [7]:
var inputFileName:String = "/mnt/shared/datasets/creditcard.csv";

[36minputFileName[39m: [32mString[39m = [32m"/mnt/shared/datasets/creditcard.csv"[39m

### 2.1 Configure the Producer

In [8]:
val props = new Properties()
props.put("bootstrap.servers", broker)
props.put("key.serializer", classOf[LongSerializer].getName)
props.put("value.serializer", classOf[ByteArraySerializer].getName)

[36mprops[39m: [32mProperties[39m = {value.serializer=org.apache.kafka.common.serialization.ByteArraySerializer, bootstrap.servers=localhost:9092, key.serializer=org.apache.kafka.common.serialization.LongSerializer}
[36mres7_1[39m: [32mObject[39m = [32mnull[39m
[36mres7_2[39m: [32mObject[39m = [32mnull[39m
[36mres7_3[39m: [32mObject[39m = [32mnull[39m

In [9]:
case class TransactionRecord(
    eventTime: Timestamp,
    txnID: Long,
    Amount: Double,
    IsFraud: Int,
    V4: Double,
    V9: Double,
    V11: Double,
    V14: Double,
    V16: Double,
    LOCATION_ID: Long,
    ACC_NO: Long,
    IsRedemption: Int=0
){
    override def toString():String = {
        val sb = new StringBuilder("txnID: ")
        sb.append(this.txnID)
        sb.append(", Event time=")
        sb.append(this.eventTime)
        sb.append(", Amount=")
        sb.append(this.Amount)
        sb.append(", IsRedemption? ")
        sb.append(this.IsRedemption)
        sb.append(", IsFraud? ")
        sb.append(this.IsFraud)
        sb.append(", Location=")
        sb.append(this.LOCATION_ID)
        sb.append(", Account No=")
        sb.append(this.ACC_NO)
        sb.append(", V4=")
        sb.append(this.V4)
        sb.append(", V9=")
        sb.append(this.V9)
        sb.append(", V11=")
        sb.append(this.V11)
        sb.append(", V14=")
        sb.append(this.V14)
        sb.append(", V16=")
        sb.append(this.V16)
        return sb.toString
    }
}

defined [32mclass[39m [36mTransactionRecord[39m

In [10]:
def serializeTransactionRecord(record: TransactionRecord): Array[Byte] = {

    val buffer = ByteBuffer.allocate(
        java.lang.Long.BYTES  // event time
        + java.lang.Double.BYTES  // V4
        + java.lang.Double.BYTES  // V9
        + java.lang.Double.BYTES  // V11
        + java.lang.Double.BYTES  // V14
        + java.lang.Double.BYTES  // V16
        + java.lang.Double.BYTES  // Amount
        + java.lang.Integer.BYTES  // is Fraud?
        + java.lang.Long.BYTES  // location ID
        + java.lang.Long.BYTES  // Account No.
        + java.lang.Integer.BYTES  // Is Redemption?
    )

    buffer.putLong(record.eventTime.getTime)
    buffer.putDouble(record.V4)
    buffer.putDouble(record.V9)
    buffer.putDouble(record.V11)
    buffer.putDouble(record.V14)
    buffer.putDouble(record.V16)
    buffer.putDouble(record.Amount)
    buffer.putInt(record.IsFraud)
    buffer.putLong(record.LOCATION_ID)
    buffer.putLong(record.ACC_NO)
    buffer.putInt(record.IsRedemption)
    
    // return the serialised data
    buffer.array()
}

defined [32mfunction[39m [36mserializeTransactionRecord[39m

## Read transactions file

In [11]:
import java.nio.file.{Paths, Files}

if(Files.exists(Paths.get(inputFileName))){
    println(s"Opening file $inputFileName")
    
}else{
    throw new Exception(s"Error: file $inputFileName does not exist")
}

val bufferedSource = Source.fromFile(inputFileName)
val lineIterator = bufferedSource.getLines()

Opening file /mnt/shared/datasets/creditcard.csv


[32mimport [39m[36mjava.nio.file.{Paths, Files}

[39m
[36mbufferedSource[39m: [32mscala[39m.[32mio[39m.[32mBufferedSource[39m = [32mnon-empty iterator[39m
[36mlineIterator[39m: [32mIterator[39m[[32mString[39m] = [32mnon-empty iterator[39m

In [12]:
val header = lineIterator.next();

val header_fields: Array[String] = header.split(',').map(
    x => x.stripPrefix("\"").stripSuffix("\"").trim
)

[36mheader[39m: [32mString[39m = [32m"ime,V1,V2,V3,V4,V5,V6,V7,V8,V9,V10,V11,V12,V13,V14,V15,V16,V17,V18,V19,V20,V21,V22,V23,V24,V25,V26,V27,V28,TXN_AMT,IS_FRAUD,TXN_ID,LOCATION_ID,ACC_NO,IS_REDEMPTION"[39m
[36mheader_fields[39m: [32mArray[39m[[32mString[39m] = [33mArray[39m(
  [32m"ime"[39m,
  [32m"V1"[39m,
  [32m"V2"[39m,
  [32m"V3"[39m,
  [32m"V4"[39m,
  [32m"V5"[39m,
  [32m"V6"[39m,
  [32m"V7"[39m,
  [32m"V8"[39m,
  [32m"V9"[39m,
  [32m"V10"[39m,
  [32m"V11"[39m,
  [32m"V12"[39m,
  [32m"V13"[39m,
  [32m"V14"[39m,
  [32m"V15"[39m,
  [32m"V16"[39m,
  [32m"V17"[39m,
  [32m"V18"[39m,
  [32m"V19"[39m,
  [32m"V20"[39m,
  [32m"V21"[39m,
  [32m"V22"[39m,
  [32m"V23"[39m,
  [32m"V24"[39m,
  [32m"V25"[39m,
  [32m"V26"[39m,
  [32m"V27"[39m,
  [32m"V28"[39m,
  [32m"TXN_AMT"[39m,
  [32m"IS_FRAUD"[39m,
  [32m"TXN_ID"[39m,
  [32m"LOCATION_ID"[39m,
  [32m"ACC_NO"[39m,
  [32m"IS_REDEMPTION"[39m
)

In [13]:
val cols_to_keep = Array(0, 29, 30, 4, 9, 11, 14, 16, 31, 32, 33, 34);
// Time, Amount, Class, V4, V9, V11, V14, V16, TXN_ID, LOCATION_ID, ACC_NO

println("The following columns will be retained:");
cols_to_keep.foreach( x => println(s"\tColumn #$x: " + header_fields(x)) );

val start_time_ms = System.currentTimeMillis() - 70000000000L;
println("\nStarting time: " + new Timestamp(start_time_ms))

The following columns will be retained:
	Column #0: ime
	Column #29: TXN_AMT
	Column #30: IS_FRAUD
	Column #4: V4
	Column #9: V9
	Column #11: V11
	Column #14: V14
	Column #16: V16
	Column #31: TXN_ID
	Column #32: LOCATION_ID
	Column #33: ACC_NO
	Column #34: IS_REDEMPTION

Starting time: 2021-04-05 14:00:55.691


[36mcols_to_keep[39m: [32mArray[39m[[32mInt[39m] = [33mArray[39m([32m0[39m, [32m29[39m, [32m30[39m, [32m4[39m, [32m9[39m, [32m11[39m, [32m14[39m, [32m16[39m, [32m31[39m, [32m32[39m, [32m33[39m, [32m34[39m)
[36mstart_time_ms[39m: [32mLong[39m = [32m1617611455691L[39m

In [14]:
val producer:KafkaProducer[Long, Array[Byte]] = new KafkaProducer[Long, Array[Byte]](props)

SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.


[36mproducer[39m: [32mKafkaProducer[39m[[32mLong[39m, [32mArray[39m[[32mByte[39m]] = org.apache.kafka.clients.producer.KafkaProducer@b37759e

In [23]:
println("Partitions for this topic:")

producer.partitionsFor(topic).forEach(
    x => println(
        s"Number ${x.partition}, topic=${x.topic}, leader=${x.leader.host}:${x.leader.port}, broker id=${x.leader.id}"
    )
)

Partitions for this topic:
Number 2, topic=credit_card_txns, leader=localhost:9092, broker id=0
Number 3, topic=credit_card_txns, leader=localhost:9092, broker id=0
Number 0, topic=credit_card_txns, leader=localhost:9092, broker id=0
Number 1, topic=credit_card_txns, leader=localhost:9092, broker id=0
Number 4, topic=credit_card_txns, leader=localhost:9092, broker id=0
Number 5, topic=credit_card_txns, leader=localhost:9092, broker id=0


In [44]:
var counter:Int = 0
val maxCount = 50

val sleep_start = 100
val sleep_end   = 400
val rnd = new scala.util.Random

try {
    
    println("Waiting for 3 seconds...")
    Thread.sleep(3000)
    
    for (line <- lineIterator) {

        // Split each line in the CSV file to create the message to be sent:
        val fields: Array[String] = line.split(',')
        
        // extract each field after converting these into the correct datatypes
        val new_time = new Timestamp(start_time_ms + fields(0).toLong * 100000)
        val txn_amount = fields(29).toDouble
        val is_fraud = fields(30).stripPrefix("\"").stripSuffix("\"").trim.toInt
        val v4 = fields(4).toDouble
        val v9 = fields(9).toDouble
        val v11 = fields(11).toDouble
        val v14 = fields(14).toDouble
        val v16 = fields(16).toDouble
        val txn_id = fields(31).toLong
        val location_id = fields(32).toLong
        val acc_no = fields(33).toLong
        val is_redemption = fields(34).stripPrefix("\"").stripSuffix("\"").trim.toInt

        // encapsulate the fields into a transaction record
        val txnRec = TransactionRecord(
            new_time,
            txn_id,
            txn_amount,
            is_fraud,
            v4,
            v9,
            v11,
            v14,
            v16,
            location_id,
            acc_no,
            is_redemption
            )
        println(txnRec)
        
        val valueBytes = serializeTransactionRecord(txnRec)
        val serializedRecord = new ProducerRecord[Long, Array[Byte]](
          topic,
          txn_id,
          valueBytes
        )

        producer.send(serializedRecord)

        counter = counter + 1
        
        Thread.sleep(sleep_start + rnd.nextInt( (sleep_end - sleep_start) + 1 ))

        if(counter==maxCount){
            println(s"*** Published $counter messages to Kafka topic. ***")
            break
        }
    }

} catch {
    case e: Exception => {
        logger.error(s"Error caught when sending messages: $e.")
        e.printStackTrace()
    }
}

Waiting for 3 seconds...
txnID: 1591, Event time=2021-04-07 00:24:15.691, Amount=16000.63, IsRedemption? 0, IsFraud? 0, Location=16, Account No=32, V4=-1.211770949, V9=0.94658441, V11=0.640589733, V14=0.565934946, V16=-1.180130432
txnID: 1592, Event time=2021-04-07 00:25:55.691, Amount=100.86, IsRedemption? 0, IsFraud? 0, Location=21, Account No=15, V4=-2.216486564, V9=0.319188599, V11=1.103497388, V14=1.00359516, V16=-0.614525499
txnID: 1593, Event time=2021-04-07 00:27:35.691, Amount=17450.82, IsRedemption? 0, IsFraud? 0, Location=14, Account No=10, V4=1.5190271, V9=0.020568174, V11=0.561427081, V14=0.243707244, V16=-0.381760871
txnID: 1594, Event time=2021-04-07 00:27:35.691, Amount=100.02, IsRedemption? 0, IsFraud? 0, Location=5, Account No=65, V4=-1.900352243, V9=0.454187909, V11=0.249596658, V14=0.772317363, V16=-1.083015925
txnID: 1595, Event time=2021-04-07 00:29:15.691, Amount=1500.63, IsRedemption? 0, IsFraud? 0, Location=24, Account No=22, V4=-2.368229278, V9=-2.348430359, V

: 

In [22]:
producer.close()

In [23]:
bufferedSource.close()