Skip to content

Commit

Permalink
feat(assets): assets migrate tenant
Browse files Browse the repository at this point in the history
  • Loading branch information
thomas-mauran committed Sep 28, 2023
1 parent f31f45b commit 5f05101
Show file tree
Hide file tree
Showing 5 changed files with 326 additions and 1 deletion.
197 changes: 196 additions & 1 deletion lib/modules/asset/AssetService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,12 @@ import {
} from "kuzzle-sdk";
import _ from "lodash";

import { AskDeviceUnlinkAsset } from "../device";
import {
AskDeviceAttachEngine,
AskDeviceDetachEngine,
AskDeviceLinkAsset,
AskDeviceUnlinkAsset,
} from "../device";
import { AskModelAssetGet, AssetModelContent } from "../model";
import {
AskEngineList,
Expand Down Expand Up @@ -39,6 +44,7 @@ import {
AssetHistoryContent,
AssetHistoryEventMetadata,
} from "./types/AssetHistoryContent";
import { RecoveryQueue } from "../shared/utils/recoveryQueue";

export class AssetService {
private context: PluginContext;
Expand Down Expand Up @@ -268,6 +274,195 @@ export class AssetService {
return result;
}

public async migrateTenant(
user: User,
assetsList: string[],
engineId: string,
newEngineId: string
) {
return lock(`engine:${engineId}:${newEngineId}`, async () => {
const recovery = new RecoveryQueue();

try {
// check if tenant destination of the the same group
const engine = await this.getEngine(engineId);
const newEngine = await this.getEngine(newEngineId);

if (engine.group !== newEngine.group) {
throw new BadRequestError(
`Tenant ${newEngineId} is not in the same group as ${engineId}`
);
}

const assets = await this.sdk.document.mGet<AssetContent>(
engineId,
InternalCollection.ASSETS,
assetsList
);

// check if the assets exists in the other engine
const existingAssets = await this.sdk.document.mGet<AssetContent>(
newEngineId,
InternalCollection.ASSETS,
assetsList
);

if (existingAssets.successes.length > 0) {
throw new BadRequestError(
`Assets ${existingAssets.successes
.map((asset) => asset._id)
.join(", ")} already exists in engine ${newEngineId}`
);
}
const assetsToMigrate = assets.successes.map((asset) => ({
_id: asset._id,
body: asset._source,
}));

const devices = await this.sdk.document.search<AssetContent>(
engineId,
InternalCollection.DEVICES,
{
query: {
bool: {
filter: {
terms: {
assetId: assetsList,
},
},
},
},
}
);

// Map linked devices for assets.
const assetLinkedDevices = assets.successes
.filter((asset) => asset._source.linkedDevices.length > 0)
.map((asset) => ({
assetId: asset._id,
linkedDevices: asset._source.linkedDevices,
}));

// Extra recovery step to relink back assets to their devices in case of rollback
recovery.addRecovery(async () => {
// Link the devices to the new assets
for (const asset of assetLinkedDevices) {
const assetId = asset.assetId;
for (const device of asset.linkedDevices) {
const deviceId = device._id;
const measureNames = device.measureNames;
await ask<AskDeviceLinkAsset>(
"ask:device-manager:device:link-asset",
{
assetId,
deviceId,
engineId,
measureNames: measureNames,
user,
}
);
}
}
});

// detach from current tenant
for (const device of devices.hits) {
await ask<AskDeviceDetachEngine>(
"ask:device-manager:device:detach-engine",
{ deviceId: device._id, user }
);
await ask<AskDeviceAttachEngine>(
"ask:device-manager:device:attach-engine",
{ deviceId: device._id, engineId: newEngineId, user }
);
}

// recovery function to reattach devices to the old tenant
recovery.addRecovery(async () => {
for (const device of devices.hits) {
await ask<AskDeviceDetachEngine>(
"ask:device-manager:device:detach-engine",
{ deviceId: device._id, user }
);
await ask<AskDeviceAttachEngine>(
"ask:device-manager:device:attach-engine",
{ deviceId: device._id, engineId, user }
);
}
});

// Create the assets in the new tenant
await this.sdk.document.mCreate(
newEngineId,
InternalCollection.ASSETS,
assetsToMigrate
);

recovery.addRecovery(async () => {
await this.sdk.document.mDelete(
newEngineId,
InternalCollection.ASSETS,
assetsList
);
});

// Delete the assets in the old tenant
await this.sdk.document.mDelete(
engineId,
InternalCollection.ASSETS,
assetsList
);

recovery.addRecovery(async () => {
await this.sdk.document.mCreate(
engineId,
InternalCollection.ASSETS,
assetsToMigrate
);
});

// Link the devices to the new assets
for (const asset of assetLinkedDevices) {
const assetId = asset.assetId;
for (const device of asset.linkedDevices) {
const deviceId = device._id;
const measureNames = device.measureNames;
await ask<AskDeviceLinkAsset>(
"ask:device-manager:device:link-asset",
{
assetId,
deviceId,
engineId: newEngineId,
measureNames: measureNames,
user,
}
);
}
}

recovery.addRecovery(async () => {
for (const asset of assetLinkedDevices) {
for (const device of asset.linkedDevices) {
const deviceId = device._id;
await ask<AskDeviceUnlinkAsset>(
"ask:device-manager:device:unlink-asset",
{
deviceId,
user,
}
);
}
}
});
} catch (error) {
await recovery.rollback();
throw new BadRequestError(
`An error occured while migrating assets: ${error}`
);
}
});
}

/**
* Replace an asset metadata
*/
Expand Down
21 changes: 21 additions & 0 deletions lib/modules/asset/AssetsController.ts
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,15 @@ export class AssetsController {
},
],
},
migrateTenant: {
handler: this.migrateTenant.bind(this),
http: [
{
path: "device-manager/:engineId/assets/_migrateTenant",
verb: "post",
},
],
},
},
};
/* eslint-enable sort-keys */
Expand Down Expand Up @@ -341,4 +350,16 @@ export class AssetsController {

return { link };
}

async migrateTenant(request: KuzzleRequest) {
const assetsList = request.getBodyArray("assetsList");
const engineId = request.getString("engineId");
const newEngineId = request.getBodyString("newEngineId");
await this.assetService.migrateTenant(
request.getUser(),
assetsList,
engineId,
newEngineId
);
}
}
27 changes: 27 additions & 0 deletions lib/modules/device/DeviceService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,11 @@ import { DeviceContent } from "./types/DeviceContent";
import { DeviceSerializer } from "./model/DeviceSerializer";
import {
AskDeviceUnlinkAsset,
AskDeviceDetachEngine,
EventDeviceUpdateAfter,
EventDeviceUpdateBefore,
AskDeviceAttachEngine,
AskDeviceLinkAsset,
} from "./types/DeviceEvents";
import { ApiDeviceLinkAssetRequest } from "./types/DeviceApi";
import { AskPayloadReceiveFormated } from "../decoder/types/PayloadEvents";
Expand Down Expand Up @@ -54,13 +57,37 @@ export class DeviceService {
constructor(plugin: Plugin) {
this.config = plugin.config as any;
this.context = plugin.context;
this.registerAskEvents();
}

registerAskEvents() {
onAsk<AskDeviceLinkAsset>(
"ask:device-manager:device:link-asset",
async ({ deviceId, engineId, user, assetId, measureNames }) => {
await this.linkAsset(user, engineId, deviceId, assetId, measureNames);
}
);

onAsk<AskDeviceUnlinkAsset>(
"ask:device-manager:device:unlink-asset",
async ({ deviceId, user }) => {
await this.unlinkAsset(user, deviceId);
}
);

onAsk<AskDeviceDetachEngine>(
"ask:device-manager:device:detach-engine",
async ({ deviceId, user }) => {
await this.detachEngine(user, deviceId);
}
);

onAsk<AskDeviceAttachEngine>(
"ask:device-manager:device:attach-engine",
async ({ deviceId, engineId, user }) => {
await this.attachEngine(user, engineId, deviceId);
}
);
}

/**
Expand Down
38 changes: 38 additions & 0 deletions lib/modules/device/types/DeviceEvents.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import { KDocument } from "kuzzle-sdk";
import { Metadata } from "../../../modules/shared";

import { DeviceContent } from "./DeviceContent";
import { ApiDeviceLinkAssetRequest } from "./DeviceApi";

export type EventDeviceUpdateBefore = {
name: "device-manager:device:update:before";
Expand All @@ -20,6 +21,20 @@ export type EventDeviceUpdateAfter = {
/**
* @internal
*/
export type AskDeviceLinkAsset = {
name: "ask:device-manager:device:link-asset";

payload: {
engineId: string;
assetId: string;
deviceId: string;
user: User;
measureNames: ApiDeviceLinkAssetRequest["body"]["measureNames"];
};

result: void;
};

export type AskDeviceUnlinkAsset = {
name: "ask:device-manager:device:unlink-asset";

Expand All @@ -30,3 +45,26 @@ export type AskDeviceUnlinkAsset = {

result: void;
};

export type AskDeviceDetachEngine = {
name: "ask:device-manager:device:detach-engine";

payload: {
deviceId: string;
user: User;
};

result: void;
};

export type AskDeviceAttachEngine = {
name: "ask:device-manager:device:attach-engine";

payload: {
engineId: string;
deviceId: string;
user: User;
};

result: void;
};
Loading

0 comments on commit 5f05101

Please sign in to comment.