Skip to content

Commit

Permalink
Refactor to use mReplace
Browse files Browse the repository at this point in the history
  • Loading branch information
fmauNeko committed Apr 7, 2023
1 parent 658c288 commit 453276a
Show file tree
Hide file tree
Showing 3 changed files with 122 additions and 117 deletions.
203 changes: 119 additions & 84 deletions lib/modules/asset/AssetService.ts
Original file line number Diff line number Diff line change
@@ -1,34 +1,42 @@
import _ from "lodash";
import { Backend, BadRequestError, PluginContext, User } from "kuzzle";
import { JSONObject, KDocument, KHit, SearchResult } from "kuzzle-sdk";
import {
BaseRequest,
JSONObject,
KDocument,
KHit,
SearchResult,
mReplaceResponse,
} from "kuzzle-sdk";
import _ from "lodash";

import { MeasureContent } from "../measure/";
import { AskDeviceUnlinkAsset } from "../device";
import { MeasureContent } from "../measure/";
import { AskModelAssetGet } from "../model";
import {
AskEngineList,
DeviceManagerConfiguration,
DeviceManagerPlugin,
InternalCollection,
} from "../plugin";
import {
EmbeddedMeasure,
Metadata,
lock,
ask,
onAsk,
flattenObject,
lock,
onAsk,
} from "../shared";
import {
DeviceManagerConfiguration,
InternalCollection,
DeviceManagerPlugin,
} from "../plugin";
import { AskModelAssetGet } from "../model";

import { AssetContent } from "./types/AssetContent";
import { AssetHistoryService } from "./AssetHistoryService";
import { ApiAssetGetMeasuresResult } from "./exports";
import { AssetSerializer } from "./model/AssetSerializer";
import { AssetContent } from "./types/AssetContent";
import {
AskAssetRefreshModel,
EventAssetUpdateAfter,
EventAssetUpdateBefore,
} from "./types/AssetEvents";
import { AssetHistoryService } from "./AssetHistoryService";
import { AssetHistoryEventMetadata } from "./types/AssetHistoryContent";
import { ApiAssetGetMeasuresResult } from "./types/AssetApi";

export class AssetService {
private context: PluginContext;
Expand Down Expand Up @@ -67,34 +75,84 @@ export class AssetService {
registerAskEvents() {
onAsk<AskAssetRefreshModel>(
"ask:device-manager:asset:refresh-model",
async ({ assetModel, assetId, engineId }) => {
const asset = await this.get(engineId, assetId);
async ({ assetModel }) => {
const engines = await ask<AskEngineList>(
"ask:device-manager:engine:list",
{
group: assetModel.engineGroup,
}
);

const targets = engines.map((engine) => ({
collections: [InternalCollection.ASSETS],
index: engine.index,
}));

const assetSearch = await this.sdk.query<
BaseRequest,
JSONObject // TODO: switch to DocumentSearchResult<AssetContent> once KHit<> has index and collection properties
>({
action: "search",
body: { query: { equals: { model: assetModel.asset.model } } },
controller: "document",
lang: "koncorde",
targets,
});

const modelMetadata = {};

for (const metadataName of Object.keys(
assetModel.asset.metadataMappings
)) {
modelMetadata[metadataName] = null;
}
for (const [metadataName, metadataValue] of Object.entries(
assetModel.asset.defaultMetadata
)) {
_.set(modelMetadata, metadataName, metadataValue);
const defaultMetadata =
assetModel.asset.defaultMetadata[metadataName];
modelMetadata[metadataName] = defaultMetadata ?? null;
}

const removedMetadata = _.difference(
Object.keys(asset._source.metadata),
Object.keys(modelMetadata)
const removedMetadata: string[] = [];

const updatedAssetsPerIndex: {
[x: string]: KDocument<AssetContent>[];
} = assetSearch.result.hits.reduce(
(
acc: { [x: string]: KDocument<AssetContent>[] },
asset: JSONObject
) => {
const assetMetadata = Object.fromEntries(
Object.entries(asset._source.metadata).filter(([key]) => {
if (!(key in modelMetadata)) {
removedMetadata.push(key);
return false;
}

return true;
})
);

asset._source.metadata = {
...modelMetadata,
...assetMetadata,
};

acc[asset.index].push(asset as KDocument<AssetContent>);

return acc;
},
Object.fromEntries(
engines.map((engine) => [
engine.index,
[] as KDocument<AssetContent>[],
])
)
);

const metadata = {
...modelMetadata,
..._.omit(asset._source.metadata, removedMetadata),
};

await this.replace(null, engineId, assetId, metadata, {
refresh: "wait_for",
});
for (const [index, updatedAssets] of Object.entries(
updatedAssetsPerIndex
)) {
await this.mReplace(null, index, updatedAssets, removedMetadata, {
refresh: "wait_for",
});
}
}
);
}
Expand Down Expand Up @@ -354,63 +412,40 @@ export class AssetService {
/**
* Replace an asset metadata
*/
public async replace(
public async mReplace(
user: User,
engineId: string,
assetId: string,
metadata: Metadata,
assets: KDocument<AssetContent>[],
removedMetadata: string[],
{ refresh }: { refresh: any }
): Promise<KDocument<AssetContent>> {
return lock(`asset:${engineId}:${assetId}`, async () => {
const asset = await this.get(engineId, assetId);

const updatedPayload = await this.app.trigger<EventAssetUpdateBefore>(
"device-manager:asset:update:before",
{ asset, metadata }
);

const updatedAsset = await this.impersonatedSdk(
user
).document.replace<AssetContent>(
engineId,
InternalCollection.ASSETS,
assetId,
{
...asset._source,
metadata: updatedPayload.metadata,
},
{ refresh, source: true }
);

const removedMetadata = _.difference(
Object.keys(asset._source.metadata),
Object.keys(updatedPayload.metadata)
);
): Promise<mReplaceResponse> {
const replacedAssets = await this.impersonatedSdk(
user
).document.mReplace<AssetContent>(
engineId,
InternalCollection.ASSETS,
assets.map((asset) => ({ _id: asset._id, body: asset._source })),
{ refresh, source: true }
);

// @todo fix the metadata path for nested metadata
await this.assetHistoryService.add<AssetHistoryEventMetadata>(
engineId,
{
metadata: {
names: Object.keys(updatedPayload.metadata).concat(
removedMetadata.map((name) => `-${name}`)
),
await Promise.all(
replacedAssets.successes.map((asset) =>
this.assetHistoryService.add<AssetHistoryEventMetadata>(
engineId,
{
metadata: {
names: Object.keys(flattenObject(asset._source.metadata)).concat(
removedMetadata.map((name) => `-${name}`)
),
},
name: "metadata",
},
name: "metadata",
},
updatedAsset
);

await this.app.trigger<EventAssetUpdateAfter>(
"device-manager:asset:update:after",
{
asset: updatedAsset,
metadata: updatedPayload.metadata,
}
);
asset as KDocument<AssetContent>
)
)
);

return updatedAsset;
});
return replacedAssets;
}

private async getEngine(engineId: string): Promise<JSONObject> {
Expand Down
2 changes: 0 additions & 2 deletions lib/modules/asset/types/AssetEvents.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@ export type AskAssetRefreshModel = {

payload: {
assetModel: AssetModelContent;
assetId: string;
engineId: string;
};

result: void;
Expand Down
34 changes: 3 additions & 31 deletions lib/modules/model/ModelService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,19 +7,14 @@ import {
import { JSONObject, KDocument } from "kuzzle-sdk";

import {
AskEngineList,
AskEngineUpdateAll,
DeviceManagerConfiguration,
DeviceManagerPlugin,
InternalCollection,
} from "../plugin";
import { ask, onAsk } from "../shared/utils/ask";

import {
ApiAssetSearchRequest,
ApiAssetSearchResult,
AskAssetRefreshModel,
} from "../asset";
import { AskAssetRefreshModel } from "../asset";
import { flattenObject } from "../shared/utils/flattenObject";
import { ModelSerializer } from "./ModelSerializer";
import {
Expand Down Expand Up @@ -96,33 +91,10 @@ export class ModelService {
);
await ask<AskEngineUpdateAll>("ask:device-manager:engine:updateAll");

const engines = await ask<AskEngineList>("ask:device-manager:engine:list", {
group: engineGroup,
await ask<AskAssetRefreshModel>("ask:device-manager:asset:refresh-model", {
assetModel: modelContent,
});

for (const engine of engines) {
const assets = await this.sdk.query<
ApiAssetSearchRequest,
ApiAssetSearchResult
>({
action: "search",
body: { query: { equals: { model } } },
controller: "device-manager/assets",
engineId: engine.index,
lang: "koncorde",
});

await Promise.all(
assets.result.hits.map((asset) =>
ask<AskAssetRefreshModel>("ask:device-manager:asset:refresh-model", {
assetId: asset._id,
assetModel: modelContent,
engineId: engine.index,
})
)
);
}

return assetModel;
}

Expand Down

0 comments on commit 453276a

Please sign in to comment.