Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
112 changes: 9 additions & 103 deletions packages/runtime/src/DatabaseSchemaUpgrader.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
import { IDatabaseCollection } from "@js-soft/docdb-access-abstractions";
import { ILogger, ILoggerFactory } from "@js-soft/logging-abstractions";
import { Serializable, serialize, type, validate } from "@js-soft/ts-serval";
import { ConsumptionController, LocalAttribute, LocalRequest } from "@nmshd/consumption";
import { AccountController, DatawalletModification, DatawalletModificationCategory, DatawalletModificationType, TransportIds } from "@nmshd/transport";
import _ from "lodash";
import { ConsumptionController } from "@nmshd/consumption";
import { AccountController } from "@nmshd/transport";

@type("RuntimeDatabaseSchemaMetadata")
class RuntimeDatabaseSchemaMetadata extends Serializable {
Expand All @@ -28,7 +26,7 @@ class RuntimeDatabaseSchemaMetadata extends Serializable {
}

export class DatabaseSchemaUpgrader {
private readonly CURRENT_DATABASE_SCHEMA_VERSION = 2;
private readonly CURRENT_DATABASE_SCHEMA_VERSION = 1;
private readonly DATABASE_SCHEMA_QUERY = { id: RuntimeDatabaseSchemaMetadata.DATABASE_SCHEMA_ID };

public constructor(
Expand All @@ -40,6 +38,12 @@ export class DatabaseSchemaUpgrader {
public async upgradeSchemaVersion(): Promise<void> {
let version = await this.getVersionFromDB();

if (version > this.CURRENT_DATABASE_SCHEMA_VERSION) {
throw new Error(
`Database schema version ${version} is higher than the current version ${this.CURRENT_DATABASE_SCHEMA_VERSION}. This can happen if you started the application with unsupported data. Please reset the database.`
);
}

while (version < this.CURRENT_DATABASE_SCHEMA_VERSION) {
version++;

Expand Down Expand Up @@ -81,103 +85,5 @@ export const UPGRADE_LOGIC = Object.freeze<Record<number, UpgradeLogicFunction |
// eslint-disable-next-line @typescript-eslint/naming-convention
1: (accountController, _, logger) => {
logger.info(`Upgrading database schema to version 1 for account '${accountController.identity.address.toString()}'`);
},
// eslint-disable-next-line @typescript-eslint/naming-convention
2: async (accountController, _, logger) => {
logger.info(`Upgrading database schema to version 2 for account '${accountController.identity.address.toString()}'`);

if (!accountController.config.datawalletEnabled) return;

const datawalletModifications = accountController["unpushedDatawalletModifications"] as IDatabaseCollection;
const datawalletVersion = accountController.config.supportedDatawalletVersion;

const requestsCollection = await accountController.getSynchronizedCollection("Requests");
const requestDocs = await requestsCollection.find({});

for (const requestDoc of requestDocs) {
logger.info(`Processing Request '${requestDoc.id}'`);

let request: LocalRequest;

try {
request = LocalRequest.from(requestDoc);
} catch (e) {
logger.error(`Failed to parse Request '${requestDoc.id}'`, e);
continue;
}

const objectIdentifier = request.id;

await datawalletModifications.create(
DatawalletModification.from({
localId: await TransportIds.datawalletModification.generate(),
type: DatawalletModificationType.Create,
collection: "Requests",
objectIdentifier,
payloadCategory: DatawalletModificationCategory.TechnicalData,
payload: extractPayloadFromObject(request, request.technicalProperties),
datawalletVersion
})
);

await datawalletModifications.create(
DatawalletModification.from({
localId: await TransportIds.datawalletModification.generate(),
type: DatawalletModificationType.Create,
collection: "Requests",
objectIdentifier,
payloadCategory: DatawalletModificationCategory.Userdata,
payload: extractPayloadFromObject(request, request.userdataProperties),
datawalletVersion
})
);

logger.info(`Successfully created datawallet modifications for Request '${requestDoc.id}'.`);
}

const attributesCollection = await accountController.getSynchronizedCollection("Attributes");
const attributeDocs = await attributesCollection.find({});

for (const attributeDoc of attributeDocs) {
logger.info(`Processing Attribute '${attributeDoc.id}'`);

let attribute: LocalAttribute;

try {
attribute = LocalAttribute.from(attributeDoc);
} catch (e) {
logger.error(`Failed to parse Attribute '${attributeDoc.id}'`, e);
continue;
}

const technicalModificationPayload = extractPayloadFromObject(attribute, attribute.technicalProperties);
if (!("succeededBy" in technicalModificationPayload) && !("shareInfo" in technicalModificationPayload) && !("parentId" in technicalModificationPayload)) {
logger.info(`Attribute '${attributeDoc.id}' does not contain any new technical properties. Skipping.`);
continue;
}

await datawalletModifications.create(
DatawalletModification.from({
localId: await TransportIds.datawalletModification.generate(),
type: DatawalletModificationType.Update,
collection: "Attributes",
objectIdentifier: attribute.id,
payloadCategory: DatawalletModificationCategory.TechnicalData,
payload: technicalModificationPayload,
datawalletVersion
})
);

logger.info(`Successfully created a datawallet modification for Attribute '${attributeDoc.id}'.`);
}

await accountController.syncDatawallet();
}
});

function extractPayloadFromObject(serializableObject: Serializable, properties: string[]): any {
const object = serializableObject.toJSON();
const predicate = (value: any, key: string) => value !== undefined && properties.includes(key);

return _.pickBy<any>(object, predicate);
}
205 changes: 3 additions & 202 deletions packages/runtime/test/misc/DatabaseSchemaUpgrader.test.ts
Original file line number Diff line number Diff line change
@@ -1,15 +1,11 @@
import { IDatabaseCollection } from "@js-soft/docdb-access-abstractions";
import { LokiJsConnection } from "@js-soft/docdb-access-loki";
import { MongoDbConnection } from "@js-soft/docdb-access-mongo";
import { NodeLoggerFactory } from "@js-soft/node-logger";
import { EventEmitter2EventBus } from "@js-soft/ts-utils";
import { ConsumptionController, LocalAttribute, LocalRequest } from "@nmshd/consumption";
import { DisplayName, IdentityAttribute } from "@nmshd/content";
import { CoreAddress, CoreDate, CoreId } from "@nmshd/core-types";
import { ConsumptionController } from "@nmshd/consumption";
import { AccountController, Transport } from "@nmshd/transport";
import { LocalRequestStatus } from "../../src";
import { DatabaseSchemaUpgrader, UPGRADE_LOGIC } from "../../src/DatabaseSchemaUpgrader";
import { RuntimeServiceProvider, TestRequestItem } from "../lib";
import { DatabaseSchemaUpgrader } from "../../src/DatabaseSchemaUpgrader";
import { RuntimeServiceProvider } from "../lib";

const loggerFactory = new NodeLoggerFactory({
appenders: {
Expand Down Expand Up @@ -76,199 +72,4 @@ describe("DatabaseSchemaUpgrader", () => {
expect(doc).toBeDefined();
expect(doc.version).toBeGreaterThan(0);
});

describe("migration from version 1 to 2", () => {
let unpushedDatawalletModifications: IDatabaseCollection;
const v2UpgradeFunction = UPGRADE_LOGIC[2]!;

beforeAll(() => {
unpushedDatawalletModifications = accountController["unpushedDatawalletModifications"] as IDatabaseCollection;
});

beforeEach(async () => {
jest.clearAllMocks();

const deleteAllDocuments = async (collection: IDatabaseCollection, identifier = "id") => {
const documents = await collection.find();
for (const document of documents) await collection.delete({ [identifier]: document[identifier] });
};

await deleteAllDocuments(unpushedDatawalletModifications, "localId");
await deleteAllDocuments(await accountController.db.getCollection("Requests"));
await deleteAllDocuments(await accountController.db.getCollection("Attributes"));

jest.spyOn(accountController, "syncDatawallet").mockImplementation();
});

test("should not create datawallet modifications or sync when no requests or attributes exist", async () => {
await v2UpgradeFunction(accountController, undefined as any as ConsumptionController, testLogger);

expect(accountController.syncDatawallet).toHaveBeenCalled();
expect(await unpushedDatawalletModifications.count()).toBe(0);
});

test("should create datawallet modifications and sync when a request exist", async () => {
const requestsCollection = await accountController.db.getCollection("Requests");
await requestsCollection.create(
LocalRequest.from({
id: CoreId.from("REQ123"),
content: { items: [TestRequestItem.from({ mustBeAccepted: false })] },
isOwn: true,
createdAt: CoreDate.utc(),
peer: CoreAddress.from(""),
status: LocalRequestStatus.Draft,
statusLog: []
})
);

await v2UpgradeFunction(accountController, undefined as any as ConsumptionController, testLogger);

expect(accountController.syncDatawallet).toHaveBeenCalled();
expect(await unpushedDatawalletModifications.count()).toBe(2);

const modifications = await unpushedDatawalletModifications.find({});
expect(modifications).toHaveLength(2);

const technicalDataModification = modifications[0];
expect(technicalDataModification.payloadCategory).toBe("TechnicalData");
expect(technicalDataModification.type).toBe("Create");
expect(Object.keys(technicalDataModification.payload)).toStrictEqual(["@type", "createdAt", "isOwn", "peer", "status", "statusLog"]);

const userDataModification = modifications[1];
expect(userDataModification.payloadCategory).toBe("Userdata");
expect(userDataModification.type).toBe("Create");
expect(Object.keys(userDataModification.payload)).toStrictEqual(["content"]);
});

test("should create datawallet modifications and sync when two requests exist", async () => {
const requestsCollection = await accountController.db.getCollection("Requests");
await requestsCollection.create(
LocalRequest.from({
id: CoreId.from("REQ123"),
content: { items: [TestRequestItem.from({ mustBeAccepted: false })] },
isOwn: true,
createdAt: CoreDate.utc(),
peer: CoreAddress.from(""),
status: LocalRequestStatus.Draft,
statusLog: []
})
);

await requestsCollection.create(
LocalRequest.from({
id: CoreId.from("REQ456"),
content: { items: [TestRequestItem.from({ mustBeAccepted: false })] },
isOwn: true,
createdAt: CoreDate.utc(),
peer: CoreAddress.from(""),
status: LocalRequestStatus.Draft,
statusLog: []
})
);

await v2UpgradeFunction(accountController, undefined as any as ConsumptionController, testLogger);

expect(accountController.syncDatawallet).toHaveBeenCalled();
expect(await unpushedDatawalletModifications.count()).toBe(4);
});

test("should create a datawallet modification and sync when an attribute exists", async () => {
const attributesCollection = await accountController.db.getCollection("Attributes");
await attributesCollection.create(
LocalAttribute.from({
id: CoreId.from("ATT123"),
content: IdentityAttribute.from({
owner: CoreAddress.from(""),
value: DisplayName.from("Test")
}),
createdAt: CoreDate.utc(),
shareInfo: {
peer: CoreAddress.from(""),
requestReference: CoreId.from("REQ123")
}
})
);

await v2UpgradeFunction(accountController, undefined as any as ConsumptionController, testLogger);

expect(accountController.syncDatawallet).toHaveBeenCalled();

const modifications = await unpushedDatawalletModifications.find({});
expect(modifications).toHaveLength(1);

const modification = modifications[0];

expect(modification.payloadCategory).toBe("TechnicalData");
expect(modification.type).toBe("Update");
expect(Object.keys(modification.payload)).toStrictEqual(["@type", "createdAt", "shareInfo"]);
});

test("should create no datawallet modification when an attribute exists but has no new technical properties", async () => {
const attributesCollection = await accountController.db.getCollection("Attributes");
await attributesCollection.create(
LocalAttribute.from({
id: CoreId.from("ATT123"),
content: IdentityAttribute.from({
owner: CoreAddress.from(""),
value: DisplayName.from("Test")
}),
createdAt: CoreDate.utc()
})
);

await v2UpgradeFunction(accountController, undefined as any as ConsumptionController, testLogger);

expect(accountController.syncDatawallet).toHaveBeenCalled();
expect(await unpushedDatawalletModifications.count()).toBe(0);
});

test("should create a datawallet modification and sync when two attributes exists", async () => {
const attributesCollection = await accountController.db.getCollection("Attributes");
await attributesCollection.create(
LocalAttribute.from({
id: CoreId.from("ATT123"),
content: IdentityAttribute.from({
owner: CoreAddress.from(""),
value: DisplayName.from("Test")
}),
createdAt: CoreDate.utc(),
shareInfo: {
peer: CoreAddress.from(""),
requestReference: CoreId.from("REQ123")
}
})
);

await attributesCollection.create(
LocalAttribute.from({
id: CoreId.from("ATT456"),
content: IdentityAttribute.from({
owner: CoreAddress.from(""),
value: DisplayName.from("Test")
}),
createdAt: CoreDate.utc(),
shareInfo: {
peer: CoreAddress.from(""),
requestReference: CoreId.from("REQ123")
}
})
);

await v2UpgradeFunction(accountController, undefined as any as ConsumptionController, testLogger);

expect(accountController.syncDatawallet).toHaveBeenCalled();
expect(await unpushedDatawalletModifications.count()).toBe(2);
});

test("should not create datawallet modifications or sync when the datawallet is disabled", async () => {
accountController.config.datawalletEnabled = false;

await v2UpgradeFunction(accountController, undefined as any as ConsumptionController, testLogger);

expect(accountController.syncDatawallet).not.toHaveBeenCalled();
expect(await unpushedDatawalletModifications.count()).toBe(0);

accountController.config.datawalletEnabled = true;
});
});
});
22 changes: 3 additions & 19 deletions packages/transport/src/modules/sync/migrations/DeviceMigrations.ts
Original file line number Diff line number Diff line change
@@ -1,26 +1,10 @@
import { CoreSynchronizable } from "../../../core/CoreSynchronizable";
import { AccountController } from "../../accounts/AccountController";

export class DeviceMigrations {
public constructor(private readonly accountController: AccountController) {}

public async v1(): Promise<void> {
const query = { cache: { $exists: false } };
const synchronizableToId = (c: CoreSynchronizable) => c.id.toString();

const files = await this.accountController.files.getFiles(query);
await this.accountController.files.updateCache(files.map(synchronizableToId));

const messages = await this.accountController.messages.getMessages(query);
await this.accountController.messages.updateCache(messages.map(synchronizableToId));

const relationships = await this.accountController.relationships.getRelationships(query);
await this.accountController.relationships.updateCache(relationships.map(synchronizableToId));

const templates = await this.accountController.relationshipTemplates.getRelationshipTemplates(query);
await this.accountController.relationshipTemplates.updateCache(templates.map(synchronizableToId));

const tokens = await this.accountController.tokens.getTokens(query);
await this.accountController.tokens.updateCache(tokens.map(synchronizableToId));
public v1(): Promise<void> {
// no upgrade steps necessary for v1
return Promise.resolve();
}
}