Skip to content

Commit

Permalink
refactor: resource CRDT method from sharedb to native db update (#191)
Browse files Browse the repository at this point in the history
* feat: new api get ids from ranges

* feat: new api delete record or records

* feat: delete record from ui

* refactor: optimize link test cases

* test: e2e for delete record with link field

* fix: create when add record after delete a record

* fix: uniq index for row order field

* fix: do not allow set duplicated link record in one-many link field

* fix: miss transaction for batch save

* refactor: extract derivate binding into a middleware file

* fix: computed field did not update when creating records

* chore: default pg

* perf: dedupe the records when calculate multi fields

* chore: remove chat window

* refactor: change field create method from sharedb to native db update

* fix: misunderstand sharedb version strategy

* feat: batch method for record and field

* fix: client did not receive changes after create or del docs

* chore: linter fix

* chore: migration db

* fix: sqlite test case with data boundary error
  • Loading branch information
tea-artist committed Oct 12, 2023
1 parent b4f9603 commit 2397cff
Show file tree
Hide file tree
Showing 138 changed files with 2,377 additions and 1,523 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ describe('AttachmentsService', () => {
findMany: jest.fn(),
create: jest.fn(),
updateMany: jest.fn(),
deleteMany: jest.fn(),
},
},
},
Expand Down Expand Up @@ -62,7 +63,7 @@ describe('AttachmentsService', () => {
(prismaService.attachmentsTable.findMany as jest.Mock).mockResolvedValueOnce([]);
await service.updateByRecord(tableId, recordId, attachments);
expect(prismaService.attachmentsTable.create).toHaveBeenCalledTimes(attachments.length);
expect(prismaService.attachmentsTable.updateMany).not.toBeCalled();
expect(prismaService.attachmentsTable.deleteMany).not.toBeCalled();
});

it('should create new and delete old records if there are existing records', async () => {
Expand All @@ -77,7 +78,7 @@ describe('AttachmentsService', () => {
(prismaService.attachmentsTable.findMany as jest.Mock).mockResolvedValueOnce(exists);
await service.updateByRecord(tableId, recordId, attachments);
expect(prismaService.attachmentsTable.create).toHaveBeenCalledTimes(attachments.length);
expect(prismaService.attachmentsTable.updateMany).toBeCalledTimes(exists.length);
expect(prismaService.attachmentsTable.deleteMany).toBeCalledTimes(exists.length);
});

it('should throw error if findMany fails', async () => {
Expand All @@ -88,7 +89,7 @@ describe('AttachmentsService', () => {
'findMany error'
);
expect(prismaService.attachmentsTable.create).not.toBeCalled();
expect(prismaService.attachmentsTable.updateMany).not.toBeCalled();
expect(prismaService.attachmentsTable.deleteMany).not.toBeCalled();
});

it('should throw error if create fails', async () => {
Expand All @@ -100,7 +101,7 @@ describe('AttachmentsService', () => {
'create error'
);
expect(prismaService.attachmentsTable.create).toBeCalled();
expect(prismaService.attachmentsTable.updateMany).not.toBeCalled();
expect(prismaService.attachmentsTable.deleteMany).not.toBeCalled();
});

it('should throw error if updateMany fails', async () => {
Expand All @@ -113,14 +114,14 @@ describe('AttachmentsService', () => {
},
];
(prismaService.attachmentsTable.findMany as jest.Mock).mockResolvedValueOnce(exists);
(prismaService.attachmentsTable.updateMany as jest.Mock).mockRejectedValueOnce(
(prismaService.attachmentsTable.deleteMany as jest.Mock).mockRejectedValueOnce(
new Error(updateManyError)
);
await expect(service.updateByRecord(tableId, recordId, attachments)).rejects.toThrow(
updateManyError
);
expect(prismaService.attachmentsTable.create).toBeCalled();
expect(prismaService.attachmentsTable.updateMany).toBeCalled();
expect(prismaService.attachmentsTable.deleteMany).toBeCalled();
});
});

Expand All @@ -136,15 +137,15 @@ describe('AttachmentsService', () => {

it('should delete records', async () => {
await service.delete(queries);
expect(prismaService.attachmentsTable.updateMany).toBeCalledTimes(queries.length);
expect(prismaService.attachmentsTable.deleteMany).toBeCalledTimes(queries.length);
});

it('should throw error if updateMany fails', async () => {
(prismaService.attachmentsTable.updateMany as jest.Mock).mockRejectedValueOnce(
(prismaService.attachmentsTable.deleteMany as jest.Mock).mockRejectedValueOnce(
new Error(updateManyError)
);
await expect(service.delete(queries)).rejects.toThrow(updateManyError);
expect(prismaService.attachmentsTable.updateMany).toBeCalled();
expect(prismaService.attachmentsTable.deleteMany).toBeCalled();
});
});
});
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,9 @@ export class AttachmentsTableService {
{} as {
[key: string]: {
tableId: string;
recordId?: string;
fieldId?: string;
attachmentId?: string;
recordId: string;
fieldId: string;
attachmentId: string;
};
}
);
Expand All @@ -94,27 +94,36 @@ export class AttachmentsTableService {
});
}

await this.delete(needDeleteKey.map((key) => existsMap[key]));
const toDeletes = needDeleteKey.map((key) => existsMap[key]);
toDeletes.length && (await this.delete(toDeletes));
}

async delete(
query: {
tableId: string;
recordId?: string;
fieldId?: string;
recordId: string;
fieldId: string;
attachmentId?: string;
}[]
) {
const userId = this.cls.get('user.id');

for (let i = 0; i < query.length; i++) {
await this.prismaService.txClient().attachmentsTable.updateMany({
where: query[i],
data: {
deletedTime: new Date(),
lastModifiedBy: userId,
},
});
if (!query.length) {
return;
}

await this.prismaService.txClient().attachmentsTable.deleteMany({
where: { OR: query },
});
}

async deleteFields(tableId: string, fieldIds: string[]) {
await this.prismaService.txClient().attachmentsTable.deleteMany({
where: { tableId, fieldId: { in: fieldIds } },
});
}

async deleteTable(tableId: string) {
await this.prismaService.txClient().attachmentsTable.deleteMany({
where: { tableId },
});
}
}
112 changes: 80 additions & 32 deletions apps/nestjs-backend/src/features/calculation/batch.service.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
import { Inject, Injectable } from '@nestjs/common';
import type { IOtOperation } from '@teable-group/core';
import { RecordOpBuilder } from '@teable-group/core';
import { IdPrefix, RecordOpBuilder } from '@teable-group/core';
import { PrismaService } from '@teable-group/db-main-prisma';
import { Knex } from 'knex';
import { groupBy, keyBy } from 'lodash';
import { groupBy, keyBy, merge } from 'lodash';
import { InjectModel } from 'nest-knexjs';
import { ClsService } from 'nestjs-cls';
import { IDbProvider } from '../../db-provider/interface/db.provider.interface';
import type { IRawOp, IRawOpMap } from '../../share-db/interface';
import { RawOpType } from '../../share-db/interface';
import type { IClsStore } from '../../types/cls';
import { Timing } from '../../utils/timing';
import type { IFieldInstance } from '../field/model/factory';
Expand All @@ -21,7 +22,6 @@ export interface IOpsData {
[dbFieldName: string]: unknown;
};
version: number;
rawOp: IRawOp;
}

@Injectable()
Expand Down Expand Up @@ -86,13 +86,11 @@ export class BatchService {
}

@Timing()
async save(
src: string,
async updateRecords(
opsMap: IOpsMap,
fieldMap: { [fieldId: string]: IFieldInstance },
tableId2DbTableName: { [tableId: string]: string }
) {
const rawOpMap: IRawOpMap = {};
const result = await this.completeMissingCtx(opsMap, fieldMap, tableId2DbTableName);
fieldMap = result.fieldMap;
tableId2DbTableName = result.tableId2DbTableName;
Expand All @@ -103,15 +101,16 @@ export class BatchService {
const raw = await this.fetchRawData(dbTableName, recordOpsMap);
const versionGroup = keyBy(raw, '__id');

const opsData = this.buildOpsData(src, recordOpsMap, versionGroup);
rawOpMap[tableId] = opsData.reduce<{ [recordId: string]: IRawOp }>((pre, d) => {
pre[d.recordId] = d.rawOp;
return pre;
}, {});
const opsData = this.buildRecordOpsData(recordOpsMap, versionGroup);
if (!opsData.length) continue;

await this.executeUpdateRecords(dbTableName, fieldMap, opsData);
await this.executeInsertOps(tableId, opsData);

const opDataList = Object.entries(recordOpsMap).map(([recordId, ops]) => {
return { docId: recordId, version: versionGroup[recordId].__version, data: ops };
});
await this.saveRawOps(tableId, RawOpType.Edit, IdPrefix.Record, opDataList);
}
return rawOpMap;
}

@Timing()
Expand All @@ -132,8 +131,7 @@ export class BatchService {
}

@Timing()
private buildOpsData(
src: string,
private buildRecordOpsData(
recordOpsMap: { [recordId: string]: IOtOperation[] },
versionGroup: { [recordId: string]: { __version: number; __id: string } }
) {
Expand All @@ -153,20 +151,10 @@ export class BatchService {
);

const version = versionGroup[recordId].__version;
const rawOp: IRawOp = {
src,
seq: 1,
op: recordOpsMap[recordId],
v: version,
m: {
ts: Date.now(),
},
};

opsData.push({
recordId,
version,
rawOp,
updateParam,
});
}
Expand All @@ -180,6 +168,8 @@ export class BatchService {
fieldMap: { [fieldId: string]: IFieldInstance },
opsData: IOpsData[]
) {
if (!opsData.length) return;

const opsDataGroup = groupBy(opsData, (d) => {
return Object.keys(d.updateParam).join();
});
Expand Down Expand Up @@ -245,16 +235,74 @@ export class BatchService {
await prisma.$executeRawUnsafe(dropTempTableSql);
}

@Timing()
private async executeInsertOps(tableId: string, opsData: IOpsData[]) {
async saveRawOps(
collectionId: string,
opType: RawOpType,
docType: IdPrefix,
dataList: { docId: string; version: number; data?: unknown }[]
) {
const collection = `${docType}_${collectionId}`;
const rawOpMap: IRawOpMap = { [collection]: {} };

const baseRaw = {
src: this.cls.get('tx.id') || 'unknown',
seq: 1,
m: {
ts: Date.now(),
},
};

const rawOps = dataList.map(({ docId: docId, version, data }) => {
let rawOp: IRawOp;
if (opType === RawOpType.Create) {
rawOp = {
...baseRaw,
create: {
type: 'http://sharejs.org/types/JSONv0',
data,
},
v: version,
};
} else if (opType === RawOpType.Del) {
rawOp = {
...baseRaw,
del: true,
v: version,
};
} else if (opType === RawOpType.Edit) {
rawOp = {
...baseRaw,
op: data as IOtOperation[],
v: version,
};
} else {
throw new Error('unknown raw op type');
}
rawOpMap[collection][docId] = rawOp;
return { rawOp, docId };
});

await this.executeInsertOps(collectionId, docType, rawOps);
const prevMap = this.cls.get('tx.rawOpMap') || {};
this.cls.set('tx.rawOpMap', merge({}, prevMap, rawOpMap));
return rawOpMap;
}

private async executeInsertOps(
collectionId: string,
docType: IdPrefix,
rawOps: { rawOp: IRawOp; docId: string }[]
) {
const userId = this.cls.get('user.id');
const insertRowsData = opsData.map((data) => {
const insertRowsData = rawOps.map(({ rawOp, docId }) => {
return {
collection: tableId,
doc_id: data.recordId,
version: data.version + 1,
operation: JSON.stringify(data.rawOp),
collection: collectionId,
doc_type: docType,
doc_id: docId,
version: rawOp.v,
operation: JSON.stringify(rawOp),
created_by: userId,
created_time: new Date().toISOString(),
};
});

Expand Down
Loading

0 comments on commit 2397cff

Please sign in to comment.