Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimize database reindex #4558

Open
wants to merge 23 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 18 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
cf9d361
WIP: Refactor loop over resources
mattwiller May 13, 2024
ce3414b
Optimize database search reindex
mattwiller May 15, 2024
a231d05
Merge branch 'main' of github.com:medplum/medplum into db-reindex-opt…
mattwiller May 15, 2024
ca415a5
Add end time to allow for earlier termination
mattwiller May 16, 2024
94d7d72
Move reindex into reentrant async worker
mattwiller May 17, 2024
e107adb
Add tests for reindex job
mattwiller May 17, 2024
6670ebc
Add comments
mattwiller May 17, 2024
88d4991
Fix build
mattwiller May 17, 2024
5916b93
Log periodically during reindex
mattwiller May 17, 2024
1173998
Add test case for job failure
mattwiller May 17, 2024
110b89d
Add elapsed time to reindex progress log
mattwiller May 17, 2024
131ddb8
Fix comments
mattwiller May 17, 2024
100f3ee
Merge branch 'main' of github.com:medplum/medplum into db-reindex-opt…
mattwiller May 17, 2024
7765444
WIP: Fix
mattwiller May 21, 2024
7804a8f
Hardcode transaction isolation level
mattwiller May 22, 2024
6d2c009
Reindex multiple resource types
mattwiller May 22, 2024
f1b8e23
Revert isolation level change
mattwiller May 22, 2024
30acac5
Merge branch 'main' of github.com:medplum/medplum into db-reindex-opt…
mattwiller May 29, 2024
e5ad91e
Merge branch 'main' of github.com:medplum/medplum into db-reindex-opt…
mattwiller May 31, 2024
29dacc5
Merge branch 'main' of github.com:medplum/medplum into db-reindex-opt…
mattwiller Jun 19, 2024
fe431de
Remove legacy table scan
mattwiller Jun 19, 2024
dfefcb2
Fix extraneous references to removed code
mattwiller Jun 19, 2024
6282599
Fix test
mattwiller Jun 20, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
34 changes: 33 additions & 1 deletion packages/server/src/admin/super.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import { rebuildR4SearchParameters } from '../seeds/searchparameters';
import { rebuildR4StructureDefinitions } from '../seeds/structuredefinitions';
import { rebuildR4ValueSets } from '../seeds/valuesets';
import { createTestProject, waitForAsyncJob, withTestContext } from '../test.setup';
import { ReindexJobData, getReindexQueue } from '../workers/reindex';

jest.mock('../seeds/valuesets');
jest.mock('../seeds/structuredefinitions');
Expand Down Expand Up @@ -296,6 +297,9 @@ describe('Super Admin routes', () => {
});

test('Reindex with respond-async', async () => {
const queue = getReindexQueue() as any;
queue.add.mockClear();

const res = await request(app)
.post('/admin/super/reindex')
.set('Authorization', 'Bearer ' + adminAccessToken)
Expand All @@ -307,7 +311,35 @@ describe('Super Admin routes', () => {

expect(res.status).toEqual(202);
expect(res.headers['content-location']).toBeDefined();
await waitForAsyncJob(res.headers['content-location'], app, adminAccessToken);
expect(queue.add).toHaveBeenCalledWith(
'ReindexJobData',
expect.objectContaining<Partial<ReindexJobData>>({
resourceTypes: ['PaymentNotice'],
})
);
});

test('Reindex with multiple resource types', async () => {
const queue = getReindexQueue() as any;
queue.add.mockClear();

const res = await request(app)
.post('/admin/super/reindex')
.set('Authorization', 'Bearer ' + adminAccessToken)
.set('Prefer', 'respond-async')
.type('json')
.send({
resourceType: 'PaymentNotice, MedicinalProductManufactured,BiologicallyDerivedProduct',
});

expect(res.status).toEqual(202);
expect(res.headers['content-location']).toBeDefined();
expect(queue.add).toHaveBeenCalledWith(
'ReindexJobData',
expect.objectContaining<Partial<ReindexJobData>>({
resourceTypes: ['PaymentNotice', 'MedicinalProductManufactured', 'BiologicallyDerivedProduct'],
})
);
});

test('Rebuild compartments access denied', async () => {
Expand Down
23 changes: 17 additions & 6 deletions packages/server/src/admin/super.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import {
accepted,
allOk,
badRequest,
forbidden,
Expand All @@ -12,7 +13,7 @@ import { asyncWrap } from '../async';
import { setPassword } from '../auth/setpassword';
import { AuthenticatedRequestContext, getAuthenticatedContext } from '../context';
import { getDatabasePool } from '../database';
import { sendAsyncResponse } from '../fhir/operations/utils/asyncjobexecutor';
import { AsyncJobExecutor, sendAsyncResponse } from '../fhir/operations/utils/asyncjobexecutor';
import { invalidRequest, sendOutcome } from '../fhir/outcomes';
import { getSystemRepo } from '../fhir/repo';
import * as dataMigrations from '../migrations/data';
Expand All @@ -22,6 +23,9 @@ import { rebuildR4SearchParameters } from '../seeds/searchparameters';
import { rebuildR4StructureDefinitions } from '../seeds/structuredefinitions';
import { rebuildR4ValueSets } from '../seeds/valuesets';
import { removeBullMQJobByKey } from '../workers/cron';
import { ResourceType } from '@medplum/fhirtypes';
import { addReindexJob } from '../workers/reindex';
import { getConfig } from '../config';

export const superAdminRouter = Router();
superAdminRouter.use(authenticateRequest);
Expand Down Expand Up @@ -74,13 +78,20 @@ superAdminRouter.post(
requireSuperAdmin();
requireAsync(req);

const resourceType = req.body.resourceType;
validateResourceType(resourceType);
const resourceTypes = (req.body.resourceType as string).split(',').map((t) => t.trim());
for (const resourceType of resourceTypes) {
validateResourceType(resourceType);
}
const systemRepo = getSystemRepo();

await sendAsyncResponse(req, res, async () => {
const systemRepo = getSystemRepo();
await systemRepo.reindexResourceType(resourceType);
const exec = new AsyncJobExecutor(systemRepo);
await exec.init(`${req.protocol}://${req.get('host') + req.originalUrl}`);
await exec.run(async (asyncJob) => {
await addReindexJob(resourceTypes as ResourceType[], asyncJob);
});

const { baseUrl } = getConfig();
sendOutcome(res, accepted(exec.getContentLocation(baseUrl)));
})
);

Expand Down
17 changes: 4 additions & 13 deletions packages/server/src/fhir/job.test.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
import { AsyncJob } from '@medplum/fhirtypes';
import { randomUUID } from 'crypto';
import express from 'express';
import request from 'supertest';
import { initApp, shutdownApp } from '../app';
import { loadTestConfig } from '../config';
import { createTestProject, withTestContext } from '../test.setup';
import { createTestProject, waitForAsyncJob, withTestContext } from '../test.setup';
import { AsyncJobExecutor } from './operations/utils/asyncjobexecutor';
import { Repository } from './repo';

Expand Down Expand Up @@ -47,24 +46,16 @@ describe('Job status', () => {

test('completed', () =>
withTestContext(async () => {
const job = await asyncJobManager.init('http://example.com');
await asyncJobManager.init('http://example.com');
const callback = jest.fn();

await asyncJobManager.run(async () => {
await asyncJobManager.start(async () => {
callback();
});

expect(callback).toHaveBeenCalled();

const resCompleted = await request(app)
.get(`/fhir/R4/job/${job.id}/status`)
.set('Authorization', 'Bearer ' + accessToken);

expect(resCompleted.status).toBe(200);
expect(resCompleted.body).toMatchObject<Partial<AsyncJob>>({
resourceType: 'AsyncJob',
status: 'completed',
});
await waitForAsyncJob(asyncJobManager.getContentLocation('http://example.com/'), app, accessToken);
}));

test('cancel', () =>
Expand Down
115 changes: 78 additions & 37 deletions packages/server/src/fhir/operations/utils/asyncjobexecutor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,62 +10,103 @@ import { Repository, getSystemRepo } from '../../repo';
export class AsyncJobExecutor {
readonly repo: Repository;
private resource: AsyncJob | undefined;
constructor(repo: Repository) {
constructor(repo: Repository, resource?: AsyncJob) {
this.repo = repo.clone();
this.resource = resource;
}

async init(url: string): Promise<AsyncJob> {
this.resource = await this.repo.createResource<AsyncJob>({
resourceType: 'AsyncJob',
status: 'accepted',
request: url,
requestTime: new Date().toISOString(),
});

if (!this.resource) {
this.resource = await this.repo.createResource<AsyncJob>({
resourceType: 'AsyncJob',
status: 'accepted',
request: url,
requestTime: new Date().toISOString(),
});
}
return this.resource;
}

start(callback: () => Promise<any>): void {
/**
* Begins execution of the async job and coordinates resource updates and logging throughout the job lifecycle.
* @param callback - The callback to execute.
*/
start(callback: (job: AsyncJob) => Promise<any>): void {
const ctx = getRequestContext();
if (!this.resource) {
throw new Error('AsyncJob missing');
}
if (this.resource.status !== 'accepted') {
throw new Error('Job already completed');
}

const startTime = Date.now();
const systemRepo = getSystemRepo();
ctx.logger.info('Async job starting', { name: callback.name, asyncJobId: this.resource?.id });

this.run(callback)
.then(() => ctx.logger.info('Async job completed', { name: callback.name, asyncJobId: this.resource?.id }))
.catch((err) =>
ctx.logger.error('Async job failed', { name: callback.name, asyncJobId: this.resource?.id, error: err })
);
.then(async (output) => {
ctx.logger.info('Async job completed', {
name: callback.name,
asyncJobId: this.resource?.id,
duration: `${Date.now() - startTime} ms`,
});
await this.completeJob(systemRepo, output);
})
.catch(async (err) => {
ctx.logger.error('Async job failed', { name: callback.name, asyncJobId: this.resource?.id, error: err });
await this.failJob(systemRepo, err);
})
.finally(() => {
this.repo.close();
});
}

async run(callback: (() => Promise<Parameters>) | (() => Promise<void>)): Promise<void> {
/**
* Conditionally runs the job callback if the AsyncJob resource is in the correct state.
* @param callback - The callback to execute.
* @returns (optional) Output encoded as a Parameters resource.
*/
async run(
callback: ((job: AsyncJob) => Promise<Parameters>) | ((job: AsyncJob) => Promise<void>)
): Promise<Parameters | undefined> {
callback = AsyncLocalStorage.bind(callback);
if (!this.resource) {
throw new Error('AsyncJob missing');
}
const systemRepo = getSystemRepo();
try {
const output = await callback();
await systemRepo.updateResource<AsyncJob>({
...this.resource,
status: 'completed',
transactionTime: new Date().toISOString(),
output: output ?? undefined,
});
} catch (err) {
await systemRepo.updateResource<AsyncJob>({
...this.resource,
status: 'error',
transactionTime: new Date().toISOString(),
output:
err instanceof OperationOutcomeError
? { resourceType: 'Parameters', parameter: [{ name: 'outcome', resource: err.outcome }] }
: undefined,
});
throw err;
} finally {
this.repo.close();
if (this.resource.status !== 'accepted') {
throw new Error('Job already completed');
}

const output = await callback(this.resource);
return output ?? undefined;
}

async completeJob(repo: Repository, output?: Parameters): Promise<AsyncJob | undefined> {
if (!this.resource) {
return undefined;
}
return repo.updateResource<AsyncJob>({
...this.resource,
status: 'completed',
transactionTime: new Date().toISOString(),
output,
});
}

async failJob(repo: Repository, err: Error): Promise<AsyncJob | undefined> {
if (!this.resource) {
return undefined;
}
return repo.updateResource<AsyncJob>({
...this.resource,
status: 'error',
transactionTime: new Date().toISOString(),
output:
err instanceof OperationOutcomeError
? { resourceType: 'Parameters', parameter: [{ name: 'outcome', resource: err.outcome }] }
: undefined,
});
}

getContentLocation(baseUrl: string): string {
Expand All @@ -80,7 +121,7 @@ export class AsyncJobExecutor {
export async function sendAsyncResponse(
req: Request,
res: Response,
callback: (() => Promise<Parameters>) | (() => Promise<void>)
callback: ((job: AsyncJob) => Promise<Parameters>) | ((job: AsyncJob) => Promise<void>)
): Promise<void> {
const ctx = getAuthenticatedContext();
const { baseUrl } = getConfig();
Expand Down
7 changes: 4 additions & 3 deletions packages/server/src/fhir/repo.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -584,9 +584,10 @@ describe('FHIR Repo', () => {
}
});

test('Reindex success', async () => {
await systemRepo.reindexResourceType('Practitioner');
});
test('Reindex success', async () =>
withTestContext(async () => {
await systemRepo.reindexResourceType('Practitioner');
}));

test('Rebuild compartments as non-admin', async () => {
const repo = new Repository({
Expand Down