Skip to content

Commit

Permalink
fix: composite measures should be correctly exported to CSV (#309)
Browse files Browse the repository at this point in the history
* refactor: add acceleration measure to the DummyTemp Decoder

* fix: test name

* test: implement failing tests

* feat: add an ask event to retrieve the content of a measure

* feat: generate a measure column for each property of a measure

* fix: position needs to be the first property for consistency purposes

* fix: failing test

* refactor: more proper way of computing header string

---------

Co-authored-by: Théo Dislay <tdislay@kuzzle.io>
  • Loading branch information
2 people authored and sebtiz13 committed Jul 24, 2023
1 parent 8ce57cb commit 487c1e8
Show file tree
Hide file tree
Showing 10 changed files with 197 additions and 55 deletions.
81 changes: 72 additions & 9 deletions lib/modules/measure/MeasureExporter.ts
Original file line number Diff line number Diff line change
@@ -1,20 +1,25 @@
import { randomUUID } from "node:crypto";
import { PassThrough } from "node:stream";

import _ from "lodash";
import { stringify } from "csv-stringify/sync";
import {
InternalError,
JSONObject,
KDocument,
NotFoundError,
User,
} from "kuzzle";
import { stringify } from "csv-stringify/sync";
import _ from "lodash";

import { AskModelAssetGet, AskModelDeviceGet } from "../model";
import {
AskModelAssetGet,
AskModelDeviceGet,
AskModelMeasureGet,
} from "../model";
import { DeviceManagerPlugin, InternalCollection } from "../plugin";
import { ask, DigitalTwinContent } from "../shared";
import { DigitalTwinContent, ask, flattenObject } from "../shared";

import { NamedMeasures } from "../decoder";
import { MeasureContent } from "./exports";

type ExportOptions = {
Expand Down Expand Up @@ -177,6 +182,10 @@ export class MeasureExporter {
}
);

const measureColumns = await this.generateMeasureColumns(
modelDocument[target].measures
);

const columns: Column[] = [
{ header: "Payload Id", path: "_id" },
{ header: "Measured At", path: "_source.measuredAt" },
Expand All @@ -185,11 +194,7 @@ export class MeasureExporter {
{ header: "Device Model", path: "_source.origin.deviceModel" },
{ header: "Asset Id", path: "_source.asset._id" },
{ header: "Asset Model", path: "_source.asset.model" },
...modelDocument[target].measures.map((measure) => ({
header: measure.name,
isMeasure: true,
path: `_source.values.${measure.type}`,
})),
...measureColumns,
];

stream.write(stringify([columns.map((column) => column.header)]));
Expand Down Expand Up @@ -224,6 +229,64 @@ export class MeasureExporter {
}
}

private async generateMeasureColumns(
documentMeasures: NamedMeasures
): Promise<Array<Column & { isMeasure: true }>> {
/**
* @example
* {
* temperature: ['temperature.type'],
* acceleration: [
* 'acceleration.properties.x.type',
* 'acceleration.properties.y.type',
* 'acceleration.properties.z.type',
* 'accuracy.type'
* ]
* }
*/
const mappingsByMeasureType: Record<string, string[]> = {};
const measures: Array<Column & { isMeasure: true }> = [];

for (const { name, type } of documentMeasures) {
if (!(type in mappingsByMeasureType)) {
const { measure } = await ask<AskModelMeasureGet>(
"ask:device-manager:model:measure:get",
{ type }
);

mappingsByMeasureType[type] = Object.keys(
flattenObject(measure.valuesMappings)
);
}

for (const measure of mappingsByMeasureType[type]) {
/**
* The path to be used to retrieve the measure values in a Measure Document
*
* @example
* 'temperature', 'acceleration.x', 'accuracy'
*/
const path = measure.replace(".type", "").replace(".properties", "");

/**
* All of this is here to avoid a long header name like 'accelerationSensor.acceleration.x' (we don't need the 'acceleration' part)
*
* @example
* 'temperatureInt', 'accelerationSensor.x', 'accelerationSensor.accuracy'
*/
const header = `${name}.${path}`.replace(`${name}.${type}`, name);

measures.push({
header,
isMeasure: true,
path: `_source.values.${path}`,
});
}
}

return measures;
}

async getExport(exportId: string) {
const exportParams = await this.sdk.ms.get(this.exportRedisKey(exportId));

Expand Down
2 changes: 1 addition & 1 deletion lib/modules/measure/measures/PositionMeasure.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ export type PositionMeasurement = {

export const positionMeasureDefinition: MeasureDefinition = {
valuesMappings: {
position: { type: "geo_point" },
accuracy: { type: "float" },
altitude: { type: "float" },
position: { type: "geo_point" },
},
};
14 changes: 13 additions & 1 deletion lib/modules/model/ModelService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,11 @@ import {
DeviceModelContent,
MeasureModelContent,
} from "./types/ModelContent";
import { AskModelAssetGet, AskModelDeviceGet } from "./types/ModelEvents";
import {
AskModelAssetGet,
AskModelDeviceGet,
AskModelMeasureGet,
} from "./types/ModelEvents";

export class ModelService {
private config: DeviceManagerConfiguration;
Expand Down Expand Up @@ -56,6 +60,14 @@ export class ModelService {
return deviceModel._source;
}
);
onAsk<AskModelMeasureGet>(
"ask:device-manager:model:measure:get",
async ({ type }) => {
const measureModel = await this.getMeasure(type);

return measureModel._source;
}
);
}

async writeAsset(
Expand Down
14 changes: 13 additions & 1 deletion lib/modules/model/types/ModelEvents.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
import { AssetModelContent, DeviceModelContent } from "./ModelContent";
import {
AssetModelContent,
DeviceModelContent,
MeasureModelContent,
} from "./ModelContent";

export type AskModelAssetGet = {
name: "ask:device-manager:model:asset:get";
Expand All @@ -15,3 +19,11 @@ export type AskModelDeviceGet = {

result: DeviceModelContent;
};

export type AskModelMeasureGet = {
name: "ask:device-manager:model:measure:get";

payload: { type: string };

result: MeasureModelContent;
};
41 changes: 6 additions & 35 deletions tests/application/app.ts
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
import util from "node:util";

import { Backend, HttpStream, KuzzleRequest } from "kuzzle";
import { Backend, KuzzleRequest } from "kuzzle";

import { DeviceManagerPlugin } from "../../index";

import { DummyTempDecoder, DummyTempPositionDecoder } from "./decoders";
import { registerTestPipes } from "./tests/pipes";
import { TestsController } from "./tests/controller";
import { containerAssetDefinition } from "./assets/Container";
import { warehouseAssetDefinition } from "./assets/Warehouse";
import { PassThrough } from "node:stream";
import { DummyTempDecoder, DummyTempPositionDecoder } from "./decoders";
import { TestsController } from "./tests/controller";
import { registerTestPipes } from "./tests/pipes";
import { accelerationMeasureDefinition } from "./measures/AccelerationMeasure";

const app = new Backend("kuzzle");

Expand Down Expand Up @@ -43,13 +43,7 @@ deviceManager.models.registerAsset(
warehouseAssetDefinition
);

deviceManager.models.registerMeasure("acceleration", {
valuesMappings: {
x: { type: "float" },
y: { type: "float" },
z: { type: "float" },
},
});
deviceManager.models.registerMeasure("acceleration", accelerationMeasureDefinition);

registerTestPipes(app);

Expand All @@ -70,29 +64,6 @@ app.config.content.plugins["kuzzle-plugin-logger"].services.stdout.level =
// @ts-ignore
app.config.content.limits.documentsWriteCount = 5000;

let searchQuery;

async function sendResult(stream, searchQuery) {
let result = await app.sdk.document.search(
"device-manager",
"payloads",
{
query: searchQuery,
},
{ scroll: "5s" }
);

while (result) {
for (const hit of result.hits) {
stream.write(JSON.stringify(hit));
}

result = await result.next();
}

stream.end();
}

app
.start()
.then(() => {
Expand Down
21 changes: 21 additions & 0 deletions tests/application/decoders/DummyTempDecoder.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,12 @@ import {
TemperatureMeasurement,
BatteryMeasurement,
} from "../../../index";
import { AccelerationMeasurement } from "../measures/AccelerationMeasure";

export class DummyTempDecoder extends Decoder {
public measures = [
{ name: "temperature", type: "temperature" },
{ name: "accelerationSensor", type: "acceleration" },
{ name: "battery", type: "battery" },
] as const;

Expand Down Expand Up @@ -73,6 +75,25 @@ export class DummyTempDecoder extends Decoder {
}
);

if (payload.acceleration !== undefined) {
decodedPayload.addMeasurement<AccelerationMeasurement>(
payload.deviceEUI,
"accelerationSensor",
{
measuredAt: payload.measuredAt || Date.now(),
type: "acceleration",
values: {
acceleration: {
x: payload.acceleration.x,
y: payload.acceleration.y,
z: payload.acceleration.z,
},
accuracy: payload.acceleration.accuracy,
},
}
);
}

decodedPayload.addMeasurement<BatteryMeasurement>(
payload.deviceEUI,
"battery",
Expand Down
25 changes: 25 additions & 0 deletions tests/application/measures/AccelerationMeasure.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
import { MeasureDefinition } from "lib/modules/measure";

/* eslint-disable sort-keys */

export type AccelerationMeasurement = {
acceleration: {
x: number;
y: number;
z: number;
};
accuracy: number;
};

export const accelerationMeasureDefinition: MeasureDefinition = {
valuesMappings: {
acceleration: {
properties: {
x: { type: "float" },
y: { type: "float" },
z: { type: "float" },
},
},
accuracy: { type: "float" },
},
};
1 change: 1 addition & 0 deletions tests/scenario/migrated/model-controller.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -548,6 +548,7 @@ describe("features/Model/Controller", () => {
metadataMappings: { color: { type: "keyword" } },
measures: [
{ name: "temperature", type: "temperature" },
{ name: "accelerationSensor", type: "acceleration" },
{ name: "battery", type: "battery" },
],
},
Expand Down
4 changes: 2 additions & 2 deletions tests/scenario/modules/assets/action-export-measures.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ jest.setTimeout(10000);
describe("AssetsController:exportMeasures", () => {
const sdk = setupHooks();

it("should prepare export of position measures and return a CSV as stream", async () => {
it("should prepare export of temperature measures and return a CSV as stream", async () => {
await sendPayloads(sdk, "dummy-temp-position", [
{
deviceEUI: "linked2",
Expand Down Expand Up @@ -67,7 +67,7 @@ describe("AssetsController:exportMeasures", () => {

expect(csv).toHaveLength(5);
expect(csv[0]).toBe(
"Payload Id,Measured At,Measure Type,Device Id,Device Model,Asset Id,Asset Model,temperatureExt,temperatureInt,position,temperatureWeather\n"
"Payload Id,Measured At,Measure Type,Device Id,Device Model,Asset Id,Asset Model,temperatureExt,temperatureInt,position,position.accuracy,position.altitude,temperatureWeather\n"
);
const [
payloadId,
Expand Down

0 comments on commit 487c1e8

Please sign in to comment.