# Event Processing Analysis

### What's the goal?
Understand where Kalium is spending more time during event processing, in order to find possible performance improvements.


### Current context
- We've added [logs that measure how much time we're taking to process individual events](https://github.com/wireapp/kalium/pull/2827).
- The average time to process events oscilates every day between 368ms all the way up to 1280ms;
- 97,3% of the time spent processing events was processing end-to-end encrypted messages;
- 51% of the time spent processing messages was in order to process Text messages, even though they only account for 24,04% of the messages;
- `LastRead` is another interesting one: 18,57% of the count, but takes 24,3% of the time

## What even is measured within "Event Processing"?
We measure all the time taken from the beginning of `EventReceiver<EventType>.onEvent` until the event is handled completely.
For some Event Types, this can be super quick, like handling in-conversation Typing indication. Typing indication consists of just emitting to an in-memory flow and the event is fully processed.
For some events, this can be tricky. All End-To-End messages require parsing the encrypted blob, parsing the Protobuf, and then "actually" handling the different message types. Different message types often need completely different logic, so they can be as diverse as different event types.

## Starting plan of action
Collect data locally, by adding more logs and generating events.

My first fail was using an emulator. Running the debug of the Android app on the emulator resulted in nothing but weird looks. DataDog was giving hundreds of milliseconds as the average for event processing, while my emulator was taking about 20~30ms. 

How could that be? Running on an actual device – with the screen locked – made it right. An emulator has almost no apps installed competing for resources with the app being tested, and isn't prone to battery saving measures like CPU speed reduction, etc.
  
## Lesson #1
We can focus on "background performance": receive a push notification, sync, decrypt, show notifications.
Or on "foreground performance": open the app, sync, decrypt, show notifications and unread indicators.

The main difference is how the app might be subject to battery saving measures taken by the OS of a real device, and the time to perform the same actions might be completely different between the two scenarios.
Proposed action:
- Investigate further if we can add battery, back/fore-ground information, etc. to logs; This way we can track performance indication separately.

## Keep digging 

Considering how there are multiple steps to "event processing", I've added logs to gather more information about these steps and their "substeps".
Also, considering 97,3% of the time is spent during Messages processing, I focused on these.

Time spent during message events processing can be seen defined below in `TimeTaken`. 
 

Just adding some dependencies for manipulating data and Kotlin's datetime

In [13]:
%use kandy
%use dataframe

In [14]:
@file:DependsOn("org.jetbrains.kotlinx:kotlinx-datetime:0.6.1")

Defining a way of representing event processing measurements and how to import data from the `logs`:

In [15]:

import kotlin.time.Duration
import kotlin.time.Duration.Companion.milliseconds

data class ProteusDecryption(
    val checkingSession: Duration,
    val decrypting: Duration,
    val savingSession: Duration,
) {
    val total: Duration = checkingSession + decrypting + savingSession
}

data class ProcessingMessage(
    /**
     * Dealing with possibility of LegalHold
     */
    val checkingLegalhold: Duration,
    /**
     * Actually handling the content of the message,
     * if LastRead, marking as read in the DB,
     * if Text, inserting into the DB, emitting notifications,
     * ...
     */
    val handlingMessageContent: Duration,
    /**
     * Other actions we take after the message is handled, like enqueueing delivery receipt
     * and enqueueing message deletion if the message is SelfDeleting
     */
    val postHandling: Duration
) {
    val total: Duration = checkingLegalhold + handlingMessageContent + postHandling
}

data class TimeTaken(
    val messageType: String,
    val total: Duration,
    val proteus: ProteusDecryption,
    val processingMessage: ProcessingMessage
) {
    /**
     * Unknown time can be things like parsing the Protobuf, converting the blob from Base64, etc.
     * In general: it shouldn't be slow, and it's less likely we can speed it up.
     */
    val unknownTime = total - proteus.total - processingMessage.total
}

fun getDataFromLogs(logFileName: String): List<TimeTaken> {
    val file = File("logs/$logFileName")
    val groups = file.readText().split("""^$""".toRegex(RegexOption.MULTILINE))

    val allData = groups.map {
        it.trim().split("\n").filter { it.isNotBlank() }
    }.filter {
        it.isNotEmpty()
    }.map { lines ->
        val logLine = lines[0]
        val measurements = logLine.split(";").map { it.split(" ").last() }
        val proteusDuration = ProteusDecryption(
            Duration.parse(measurements[0].trim()),
            Duration.parse(measurements[1].trim()),
            Duration.parse(measurements[2].trim()),
        )

        val processingLine = lines[1]
        val processingMeasurements = processingLine.split(";").map { it.split(" ").last() }
        val processingDuration = ProcessingMessage(
            Duration.parse(processingMeasurements[0].trim()),
            Duration.parse(processingMeasurements[1].trim()),
            Duration.parse(processingMeasurements[2].trim()),
        )
        val mainInfo = lines[2].split(" ")
        val timeTaken = mainInfo[0].toLong().milliseconds
        val messageType = mainInfo[1]
        TimeTaken(messageType, timeTaken, proteusDuration, processingDuration)
    }
    return allData
}

Ok, so let's do a simple analysis of how much time is spent during Proteus, Message handling and other.

In [16]:
@file:DependsOn("org.jetbrains.kotlinx:kotlinx-datetime:0.6.1")

val initialData = getDataFromLogs("InitialData.txt")

val pairs =
    initialData.map { it.proteus.total.inWholeMicroseconds to "Proteus" } +
            initialData.map { it.processingMessage.total.inWholeMicroseconds to "Processing" } +
            initialData.map { it.unknownTime.inWholeMicroseconds to "Other" }

val value by column(pairs.map { it.first })
val stepName by column(pairs.map { it.second })
val dataSet = dataFrameOf(value, stepName)
dataSet.groupBy(stepName).aggregate {
    sum(value) into "total"
}.plot {
    pie {
        slice("total")
        fillColor("stepName")
    }
    layout {
        title = "Time spent unpacking and processing message events"
        style(Style.Void)
    }
}

While Processing clearly is the slowest part, the Proteus part is still interesting. Because it affects _all_ message types.
Processing can be very different between messages and requires an investigation for _each_ message type to find bottlenecks and improvement opportunities.

With that  in mind, let's dive a bit deeper into Proteus. Let's split it into smaller chunks and see what we can improve there.

In [17]:
val initialProteusMeasurements = initialData.map { it.proteus }
val checkingSession by column(initialProteusMeasurements.map { it.checkingSession.inWholeMilliseconds })
val decrypting by column(initialProteusMeasurements.map { it.decrypting.inWholeMilliseconds })
val savingSession by column(initialProteusMeasurements.map { it.savingSession.inWholeMilliseconds })
val eventNumber by column(buildList { repeat(initialProteusMeasurements.size) { i -> add(i + 1)} })
val detailedFrame = dataFrameOf(eventNumber, checkingSession, decrypting, savingSession)
    .gather(checkingSession, decrypting, savingSession)
    .into("measurements", "timeSpent")

detailedFrame.plot {
    layout.title = "Time taken in each phase of Proteus Unpacking, for different events"
    points {
        x("measurements")
        y("timeSpent")
        color("measurements")
        position = Position.jitter()
    }
}


While the colours might cause seizure, it goes to show how random it is. In some times saving session is slow. In some times checking session is slow. In some times decrypting is slow.

One thing can definitely be seen from this, is that decrypting is the slowest part.

Unfortunately, decrypting is something we can't solve on our end. It's embedded within CoreCrypto / CryptoBox, and we are unable to change it.

Here's a simpler visualization:

In [18]:
val steps by columnOf("Checking Session", "Decrypting", "Saving Session")
val values by columnOf(checkingSession.sum(), decrypting.sum(), savingSession.sum())

val dataFrame = dataFrameOf(steps, values)
dataFrame.plot {
    pie {
        slice(values)
        fillColor(steps)
    }
    layout {
        title = "Time share in different Proteus phases"
        style(Style.Void)
    }
}

Clearly decrypting is the biggest offender. And as mentioned before, we can't do much about it.

However, we might have quick wins and reduce `Saving Session` and `Checking Session`, which account for almost 50% of the total Proteus time.

## Idea #1
Remove `Saving Session` completely. It's unnecessary as CoreCrypto will perform session saving in every `decrypt`. We're doing that unnecessarily.

## Idea #2
Cache existing sessions in-memory. Instead of using IO and relying solely on CoreCrypto to know if sessions exist or not, we can store known sessions in memory.
If sessions are deleted, we can remove from the memory cache. 

## Results
After doing the first idea, I collected data and stored it in `NotSavingSession.txt`. After that, I also did the second idea and collected data, bringing to the joined improvements.

In [19]:
val withoutSessionSavingData = getDataFromLogs("NotSavingSession.txt")
val withSessionCacheData = getDataFromLogs("WithExistingSessionCache.txt")

val initialTime = initialData.map { it.total.inWholeMilliseconds to "Initial" }
val withoutSessionSaving =
    withoutSessionSavingData.map { it.total.inWholeMilliseconds to "NoSessionSaving" }
val withCache = withSessionCacheData.map { it.total.inWholeMilliseconds to "WithSessionCache" }
val allData = (initialTime + withoutSessionSaving + withCache)
val timeTaken by column(allData.map { it.first })
val approach by column(allData.map { it.second })

val averages = listOf(initialTime.map { it.first }.average(), withoutSessionSaving.map { it.first }.average(), withCache.map { it.first }.average())
val dataFrame = dataFrameOf(timeTaken, approach)

dataFrame.plot {
    layout.title = "Average time taken to fully process events across different approaches"
    points {
        y(timeTaken)
        x(approach)
        color(approach)
        position = Position.jitter()
    }
    line {
        y(averages) {
            axis.name = "Time Taken (ms)"
        }
        x(approach.distinct()) {
            axis.name = "Approach"
        }
        color = Color.PURPLE
        this.tooltips(true)
    }
}

## First win
Clearly we're up to something. Processing events is clearly faster with the changes in Proteus. The average time went from 247ms, to 196ms without saving session, and then down to 147ms without saving session + caching existing sessions.

## But we're not over yet
Maybe we can also analyse the different parts of `ProcessingMessage` and identify what is taking more time? For that, we can join all the logs taken so far and ignore the Proteus part.

In [20]:
val allMeasurements = initialData + withoutSessionSavingData + withSessionCacheData
val allMessageProcessing = allMeasurements.map { 
    it.processingMessage
}
val checkingLegalhold by column(allMessageProcessing.map { it.checkingLegalhold.inWholeMilliseconds })
val handlingContent by column(allMessageProcessing.map { it.handlingMessageContent.inWholeMilliseconds })
val postHandling by column(allMessageProcessing.map { it.postHandling.inWholeMilliseconds })
val eventNumber by column(buildList { repeat(allMessageProcessing.size) { i -> add(i + 1)} })
val detailedFrame = dataFrameOf(eventNumber, checkingLegalhold, handlingContent, postHandling)
    .gather(checkingLegalhold, handlingContent, postHandling)
    .into("measurements", "timeSpent")

detailedFrame.plot { 
    points { 
        x("measurements")
        y("timeSpent")
        position = Position.jitter()
        color("measurements")
    }
}


## Bad news

While checking legalhold is definitely a bit more random (probably due to IO operations), handling each different content is definitely the biggest offender.
Here's a comparison across all different steps using the most improved data.

In [21]:
import org.jetbrains.kotlinx.kandy.ir.feature.FeatureName

val pairs =
    withSessionCacheData.map { it.proteus.checkingSession.inWholeMicroseconds to "Checking Session" } +
            withSessionCacheData.map { it.proteus.decrypting.inWholeMicroseconds to "Decrypting" } +
            withSessionCacheData.map { it.proteus.savingSession.inWholeMicroseconds to "Saving Session" } +
            withSessionCacheData.map { it.processingMessage.checkingLegalhold.inWholeMicroseconds to "Checking Legalhold" } +
            withSessionCacheData.map { it.processingMessage.handlingMessageContent.inWholeMicroseconds to "Handling Content" } +
            withSessionCacheData.map { it.processingMessage.postHandling.inWholeMicroseconds to "Post Handling" } +
            withSessionCacheData.map { it.unknownTime.inWholeMicroseconds to "Other" }

val value by column(pairs.map { it.first })
val stepName by column(pairs.map { it.second })
val dataSet = dataFrameOf(value, stepName)
dataSet.groupBy(stepName).aggregate {
    sum(value) into "total"
}.plot {
    pie {
        slice("total")
        fillColor("stepName")
    }
    layout {
        title = "Time spent unpacking and processing message events after improvements"
        style(Style.Void)
    }
}

## What's next?

### CoreCrypto transactions
CoreCrypto has plans to create a transaction API so we can batch-decrypt messages, which should speed up the `decrypting` part.

### Check "Other" slice
Perhaps there is still a bit of time to gain in the "Other" part of the code, which is related to Protobuf parsing, Base64 decoding, etc. ?

### Dive deeper into each message type
Keep in mind that most of the events collected for the experiments above were Text events. Which seems to be one of the worst performing messages we process.
This is definitely the most demanding job, as it requires pretty much one investigation for Text, one for LastRead, one for Asset, one for Receipt, etc.
But might be one of the most fruitful ones, and strikes a good balance between a major refactor that would be with CoreCrypto transactions, and small / segmented improvements across the processing pipeline.