Skip to content

Commit

Permalink
fix(asset_migration): fixes multiple assets migration and optimize
Browse files Browse the repository at this point in the history
  • Loading branch information
Olive3034 committed Oct 12, 2023
1 parent 58cc2bd commit fd546cf
Showing 1 changed file with 63 additions and 182 deletions.
245 changes: 63 additions & 182 deletions lib/modules/asset/AssetService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import {
AskDeviceDetachEngine,
AskDeviceLinkAsset,
AskDeviceUnlinkAsset,
DeviceContent,

Check warning on line 18 in lib/modules/asset/AssetService.ts

View workflow job for this annotation

GitHub Actions / Lint

'DeviceContent' is defined but never used
} from "../device";
import { AskModelAssetGet, AssetModelContent } from "../model";
import {
Expand Down Expand Up @@ -44,7 +45,6 @@ import {
AssetHistoryContent,
AssetHistoryEventMetadata,
} from "./types/AssetHistoryContent";
import { RecoveryQueue } from "../shared/utils/recoveryQueue";

export class AssetService {
private context: PluginContext;
Expand Down Expand Up @@ -280,8 +280,9 @@ export class AssetService {
engineId: string,
newEngineId: string
): Promise<void> {
return lock(`engine:${engineId}:${newEngineId}`, async () => {
const recovery = new RecoveryQueue();
let migrated = 0;

await lock(`engine:${engineId}:${newEngineId}`, async () => {

Check failure on line 286 in lib/modules/asset/AssetService.ts

View workflow job for this annotation

GitHub Actions / Lint

Delete `⏎`
if (!user.profileIds.includes("admin")) {
throw new BadRequestError(
Expand All @@ -304,213 +305,93 @@ export class AssetService {
throw new BadRequestError("No assets to migrate");
}

//Get all assets to migrate
const assets = await this.sdk.document.mGet<AssetContent>(
engineId,
InternalCollection.ASSETS,
assetsList
);

// check if the assets exists in the other engine
const existingAssets = await this.sdk.document.mGet<AssetContent>(
newEngineId,
InternalCollection.ASSETS,
assetsList
);

if (existingAssets.successes.length > 0) {
throw new BadRequestError(
`Assets ${existingAssets.successes
.map((asset) => asset._id)
.join(", ")} already exists in engine ${newEngineId}`
);
}
const assetsToMigrate = assets.successes.map((asset) => ({
_id: asset._id,
body: asset._source,
}));
//Iterate over all asset, and migrate each one
for (const asset of assets.successes) {

Check failure on line 317 in lib/modules/asset/AssetService.ts

View workflow job for this annotation

GitHub Actions / Lint

Replace `⏎··········//Check·if·an·existing·asset·reference·already·exists·in·the·new·tenant·········` with `··········//Check·if·an·existing·asset·reference·already·exists·in·the·new·tenant`
const devices = await this.sdk.document.search<AssetContent>(
engineId,
InternalCollection.DEVICES,
{
query: {
bool: {
filter: {
terms: {
assetId: assetsList,
},
},
},
},
//Check if an existing asset reference already exists in the new tenant
if (await this.sdk.document.exists(newEngineId, InternalCollection.ASSETS, asset._id)) {

Check failure on line 319 in lib/modules/asset/AssetService.ts

View workflow job for this annotation

GitHub Actions / Lint

Replace `await·this.sdk.document.exists(newEngineId,·InternalCollection.ASSETS,·asset._id)` with `⏎············await·this.sdk.document.exists(⏎··············newEngineId,⏎··············InternalCollection.ASSETS,⏎··············asset._id⏎············)⏎··········`
continue;
}
);

// Map linked devices for assets.
const assetLinkedDevices = assets.successes
.filter((asset) => asset._source.linkedDevices.length > 0)
.map((asset) => ({
assetId: asset._id,
linkedDevices: asset._source.linkedDevices,
// Create the assets in the new tenant, with empty linkedDevices and groups

Check failure on line 323 in lib/modules/asset/AssetService.ts

View workflow job for this annotation

GitHub Actions / Lint

Delete `··········`
let assetContent = Object.assign({}, asset._source);

Check failure on line 324 in lib/modules/asset/AssetService.ts

View workflow job for this annotation

GitHub Actions / Lint

'assetContent' is never reassigned. Use 'const' instead
assetContent.linkedDevices = [];
assetContent.groups = [];
await this.sdk.document.create<AssetContent>(
newEngineId,
InternalCollection.ASSETS,
assetContent,
asset._id,

Check failure on line 331 in lib/modules/asset/AssetService.ts

View workflow job for this annotation

GitHub Actions / Lint

Delete `,`
);

// get linked devices to this asset, if any
const linkedDevices = asset._source.linkedDevices

Check failure on line 335 in lib/modules/asset/AssetService.ts

View workflow job for this annotation

GitHub Actions / Lint

Delete `⏎··········`
.map((d) => ({
id: d._id,
measureNames: d.measureNames,
}));

// Extra recovery step to relink back assets to their devices in case of rollback
recovery.addRecovery(async () => {
// Link the devices to the new assets
const promises: Promise<void>[] = [];

for (const asset of assetLinkedDevices) {
const assetId = asset.assetId;
for (const device of asset.linkedDevices) {
const deviceId = device._id;
const measureNames = device.measureNames;
promises.push(
ask<AskDeviceLinkAsset>(
"ask:device-manager:device:link-asset",
{
assetId,
deviceId,
engineId,
measureNames: measureNames,
user,
}
)
);
}
}
await Promise.all(promises);
});
// ... ant iterate over this list
for (const device of linkedDevices) {

Check failure on line 342 in lib/modules/asset/AssetService.ts

View workflow job for this annotation

GitHub Actions / Lint

Delete `⏎`

// detach from current tenant
await Promise.all(
devices.hits.map((device) => {
return ask<AskDeviceDetachEngine>(
// detach linked devices from current tenant (it also unkinks asset)
await ask<AskDeviceDetachEngine>(
"ask:device-manager:device:detach-engine",
{ deviceId: device._id, user }
{ deviceId: device.id, user }
);
})
);

// Attach to new tenant
await Promise.all(
devices.hits.map((device) => {
return ask<AskDeviceAttachEngine>(
// ... and attach to new tenant
await ask<AskDeviceAttachEngine>(
"ask:device-manager:device:attach-engine",
{ deviceId: device._id, engineId: newEngineId, user }
{ deviceId: device.id, engineId: newEngineId, user }
);
})
);

// recovery function to reattach devices to the old tenant
recovery.addRecovery(async () => {
await Promise.all(
devices.hits.map((device) => {
return ask<AskDeviceDetachEngine>(
"ask:device-manager:device:detach-engine",
{ deviceId: device._id, user }
);
})
);

await Promise.all(
devices.hits.map((device) => {
return ask<AskDeviceAttachEngine>(
"ask:device-manager:device:attach-engine",
{ deviceId: device._id, engineId, user }
);
})
);
});

// Create the assets in the new tenant
await this.sdk.document.mCreate(
newEngineId,
InternalCollection.ASSETS,
assetsToMigrate
);

recovery.addRecovery(async () => {
await this.sdk.document.mDelete(
newEngineId,
InternalCollection.ASSETS,
assetsList
);
});

// Delete the assets in the old tenant
await this.sdk.document.mDelete(
engineId,
InternalCollection.ASSETS,
assetsList
);

recovery.addRecovery(async () => {
await this.sdk.document.mCreate(
engineId,
InternalCollection.ASSETS,
assetsToMigrate
);
});

// Link the devices to the new assets
const promises: Promise<void>[] = [];

for (const asset of assetLinkedDevices) {
const assetId = asset.assetId;
for (const device of asset.linkedDevices) {
const deviceId = device._id;
const measureNames = device.measureNames;
promises.push(
ask<AskDeviceLinkAsset>("ask:device-manager:device:link-asset", {
assetId,
deviceId,

Check failure on line 355 in lib/modules/asset/AssetService.ts

View workflow job for this annotation

GitHub Actions / Lint

Delete `··········`
// ... and link this device to the asset in the new tenant
await ask<AskDeviceLinkAsset>(
"ask:device-manager:device:link-asset",
{
engineId: newEngineId,
measureNames: measureNames,
assetId: asset._id,

Check failure on line 361 in lib/modules/asset/AssetService.ts

View workflow job for this annotation

GitHub Actions / Lint

Expected object keys to be in ascending order. 'assetId' should be before 'engineId'
deviceId: device.id,
measureNames: device.measureNames,
user,
})
}
);
}
}

await Promise.all(promises);

recovery.addRecovery(async () => {
const promiseRecoveries: Promise<void>[] = [];

for (const asset of assetLinkedDevices) {
for (const device of asset.linkedDevices) {
const deviceId = device._id;
promiseRecoveries.push(
ask<AskDeviceUnlinkAsset>(
"ask:device-manager:device:unlink-asset",
{
deviceId,
user,
}
)
);
}
}

await Promise.all(promiseRecoveries);
});
// Finally here, we can delete the asset in the source engine !
await this.sdk.document.delete(
engineId,
InternalCollection.ASSETS,
asset._id,
);

// clear the groups
await this.sdk.document.mUpdate<AssetContent>(
newEngineId,
InternalCollection.ASSETS,
assetsList.map((assetId) => ({
_id: assetId,
body: {
groups: [],
},
}))
);
migrated ++;
}
} catch (error) {
await recovery.rollback();
throw new BadRequestError(
`An error occured while migrating assets: ${error}`
);
}

if(migrated == 0) {
throw new BadRequestError(
`Error occured while migrating all the assets !`
);
} else {
await this.sdk.collection.refresh(engineId, InternalCollection.ASSETS);
await this.sdk.collection.refresh(engineId, InternalCollection.DEVICES);
await this.sdk.collection.refresh(newEngineId, InternalCollection.ASSETS);
await this.sdk.collection.refresh(newEngineId, InternalCollection.DEVICES);
await this.sdk.collection.refresh(this.config.adminIndex, InternalCollection.DEVICES);
}
});
}

Expand Down

0 comments on commit fd546cf

Please sign in to comment.