# Aggregator Verification

This benchmark verifies the aggregation pipelines for accuracy, consistency and read/write speed.

**Accuracy Verification**   
The ground truth ledger counts the metrics in memory once new events are generated and sent to the engine, then after events' ingestion is complete we query aggregated data and compare it against the ledger values. If there is a difference between the ground truth and the aggregation this report outputs the errors.

**Consistency**  
The aggregated metrics in the database and the ledger must have the same values, if there is a difference it could mean that the engine has bugs which lead to incosistent writes. *Because we generate data on the same machine, sometimes the engine may experience backpressure (queue might need some time to finish the jobs) - in this case give it a couple miniutes after run.*

**Read/Write speed**  
The db write speed is not so important for this engine because for the realtime data it uses in-memory buffer and in most cases this should be enough.
The DB read speed on the other hand, might be crutial if you need to aggregate different reports. Currently the aggregation speed is ~700ms per 1M records for different granularities (100k events generates ~1M-15M aggregated metrics depending on event payload). *This is good speed for the core engine w\o caching layers (the api has precaching options for timeseries data - a 50x speed improvement)* 

In [1]:
[Deno.env.get("OTEL_DENO"), Deno.env.get("OTEL_SERVICE_NAME")];

[ [32m"true"[39m, [32m"bench"[39m ]

## Run the benchmark

In [None]:
import { Engine } from "@/core/mod.ts";
import { BenchConfig, runBench } from "./agg_verification.ts";

const MONGODB_URI =
  "mongodb://root:example@localhost:27017/quant_bench?&authSource=admin";

const engine = new Engine({
  mongoUri: MONGODB_URI,
  bufferAgeMs: 1000 * 60 * 5,
});

const playground: BenchConfig = {
  numSources: 15,
  eventTypesPerSource: 200,
  numEvents: 100000,
  partition: {
    granularity: "second",
    length: 100000, // Number of spans per partion
  },
  attributions: Object.fromEntries(
    new Array(7).fill(0).map((_, i) => ["user", ["user" + String(i)]]),
  ),
  payloadSchema: {
    "price": "number",
    "volume": "number",
    "asset": "string",
  },
};

const result = await runBench(engine, playground);

Starting aggregator service...
Starting stats service for instance 312b5624-bede-4c5a-aa11-6da07d2819dd
Starting LifecycleManager service...
LifecycleManager: Running retention policy checks...
---= Starting Bench =---
Setting up event sources and report...
Generating 100000 events...
  ... 10000 / 100000 events generated.
  ... 20000 / 100000 events generated.
  ... 30000 / 100000 events generated.
  ... 40000 / 100000 events generated.
  ... 50000 / 100000 events generated.
  ... 60000 / 100000 events generated.
  ... 70000 / 100000 events generated.
  ... 80000 / 100000 events generated.
  ... 90000 / 100000 events generated.
  ... 100000 / 100000 events generated.
Event generation finished in 482.61 seconds.


### Verify Event Writes

In [3]:
await engine.getTotalRawEventCount();

[33m100000[39m

### Verify Aggregated Metrics
*shoud be equal to groud truth

In [None]:
const engineReport = await result.getEngineReport("hour");
for (const [key, value] of Object.entries(engineReport)) {
  const ok = result.groundTruthResults[key] === value;
  if (!ok) {
    console.log(
      key,
      ok ? "✔️ OK" : "❌ KO",
      value,
      ok ? "=" : "!=",
      result.groundTruthResults[key],
    );
  }
}

Query time: 2.368s


LifecycleManager: Running retention policy checks...
