Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: enable unit testing subscription criteria #4581

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 58 additions & 2 deletions packages/core/src/subscriptions/index.test.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
import { Bundle, Communication, Parameters, SubscriptionStatus } from '@medplum/fhirtypes';
import { Bundle, Communication, Parameters, Subscription, SubscriptionStatus } from '@medplum/fhirtypes';
import WS from 'jest-websocket-mock';
import { RobustWebSocket, SubscriptionEmitter, SubscriptionEventMap, SubscriptionManager } from '.';
import {
RobustWebSocket,
SubscriptionEmitter,
SubscriptionEventMap,
SubscriptionManager,
resourceMatchesSubscriptionCriteria,
} from '.';
import { MockMedplumClient } from '../client-test-utils';
import { generateId } from '../crypto';
import { OperationOutcomeError } from '../outcomes';
Expand Down Expand Up @@ -749,3 +755,53 @@ describe('SubscriptionManager', () => {
});
});
});

describe('resourceMatchesSubscriptionCriteria', () => {
it('should return true for a resource that matches the criteria', async () => {
const subscription: Subscription = {
resourceType: 'Subscription',
status: 'active',
reason: 'test subscription',
criteria: 'Communication',
channel: {
type: 'rest-hook',
endpoint: 'Bot/123',
},
extension: [
{
url: 'https://medplum.com/fhir/StructureDefinition/fhir-path-criteria-expression',
valueString: '%previous.status = "in-progress" and %current.status = "completed"',
},
{
url: 'https://medplum.com/fhir/StructureDefinition/subscription-supported-interaction',
valueCode: 'update',
},
],
};

const result1 = await resourceMatchesSubscriptionCriteria({
resource: {
resourceType: 'Communication',
status: 'in-progress',
},
subscription,
context: { interaction: 'create' },
getPreviousResource: async () => undefined,
});
expect(result1).toBe(false);

const result2 = await resourceMatchesSubscriptionCriteria({
resource: {
resourceType: 'Communication',
status: 'completed',
},
subscription,
context: { interaction: 'update' },
getPreviousResource: async () => ({
resourceType: 'Communication',
status: 'in-progress',
}),
});
expect(result2).toBe(true);
});
});
122 changes: 120 additions & 2 deletions packages/core/src/subscriptions/index.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,13 @@
import { Bundle, Parameters, Subscription, SubscriptionStatus } from '@medplum/fhirtypes';
import { Bundle, Parameters, Subscription, SubscriptionStatus, Resource } from '@medplum/fhirtypes';
import { MedplumClient } from '../client';
import { TypedEventTarget } from '../eventtarget';
import { OperationOutcomeError, serverError, validationError } from '../outcomes';
import { ProfileResource, getReferenceString, resolveId } from '../utils';
import { ProfileResource, getExtension, getReferenceString, resolveId } from '../utils';
import { Logger } from '../logger';
import { matchesSearchRequest } from '../search/match';
import { toTypedValue } from '../fhirpath/utils';
import { evalFhirPathTyped } from '../fhirpath/parse';
import { parseSearchRequest } from '../search/search';

export type SubscriptionEventMap = {
connect: { type: 'connect'; payload: { subscriptionId: string } };
Expand Down Expand Up @@ -355,3 +360,116 @@ export class SubscriptionManager {
return this.masterSubEmitter;
}
}

export type BackgroundJobInteraction = 'create' | 'update' | 'delete';

export interface BackgroundJobContext {
interaction: BackgroundJobInteraction;
}

type ResourceMatchesSubscriptionCriteria = {
resource: Resource;
subscription: Subscription;
context: BackgroundJobContext;
logger?: Logger;
getPreviousResource: (currentResource: Resource) => Promise<Resource | undefined>;
};

export async function resourceMatchesSubscriptionCriteria({
resource,
subscription,
context,
getPreviousResource,
logger,
}: ResourceMatchesSubscriptionCriteria): Promise<boolean> {
if (subscription.meta?.account && resource.meta?.account?.reference !== subscription.meta.account.reference) {
logger?.debug('Ignore resource in different account compartment');
return false;
}

if (!matchesChannelType(subscription, logger)) {
logger?.debug(`Ignore subscription without recognized channel type`);
return false;
}

const subscriptionCriteria = subscription.criteria;
if (!subscriptionCriteria) {
logger?.debug(`Ignore rest hook missing criteria`);
return false;
}

const searchRequest = parseSearchRequest(subscriptionCriteria);
if (resource.resourceType !== searchRequest.resourceType) {
logger?.debug(
`Ignore rest hook for different resourceType (wanted "${searchRequest.resourceType}", received "${resource.resourceType}")`
);
return false;
}

const fhirPathCriteria = await isFhirCriteriaMet(subscription, resource, getPreviousResource);
if (!fhirPathCriteria) {
logger?.debug(`Ignore rest hook for criteria returning false`);
return false;
}

const supportedInteractionExtension = getExtension(
subscription,
'https://medplum.com/fhir/StructureDefinition/subscription-supported-interaction'
);
if (supportedInteractionExtension && supportedInteractionExtension.valueCode !== context.interaction) {
logger?.debug(
`Ignore rest hook for different interaction (wanted "${supportedInteractionExtension.valueCode}", received "${context.interaction}")`
);
return false;
}

return matchesSearchRequest(resource, searchRequest);
}

/**
* Returns true if the subscription channel type is ok to execute.
* @param subscription - The subscription resource.
* @param logger - The logger.
* @returns True if the subscription channel type is ok to execute.
*/
function matchesChannelType(subscription: Subscription, logger?: Logger): boolean {
const channelType = subscription.channel?.type;

if (channelType === 'rest-hook') {
const url = subscription.channel?.endpoint;
if (!url) {
logger?.debug(`Ignore rest-hook missing URL`);
return false;
}

return true;
}

if (channelType === 'websocket') {
return true;
}

return false;
}

export async function isFhirCriteriaMet(
subscription: Subscription,
currentResource: Resource,
getPreviousResource: (currentResource: Resource) => Promise<Resource | undefined>
): Promise<boolean> {
const criteria = getExtension(
subscription,
'https://medplum.com/fhir/StructureDefinition/fhir-path-criteria-expression'
);
if (!criteria?.valueString) {
return true;
}
const previous = await getPreviousResource(currentResource);
const evalInput = {
'%current': toTypedValue(currentResource),
'%previous': toTypedValue(previous ?? {}),
};
const evalValue = evalFhirPathTyped(criteria.valueString, [toTypedValue(currentResource)], evalInput);
console.log(evalValue);
return evalValue?.[0]?.value === true;
}
5 changes: 0 additions & 5 deletions packages/server/src/workers/context.ts

This file was deleted.

2 changes: 1 addition & 1 deletion packages/server/src/workers/index.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { Resource } from '@medplum/fhirtypes';
import { BackgroundJobContext } from '@medplum/core';
import { MedplumServerConfig } from '../config';
import { globalLogger } from '../logger';
import { BackgroundJobContext } from './context';
import { addCronJobs, closeCronWorker, initCronWorker } from './cron';
import { addDownloadJobs, closeDownloadWorker, initDownloadWorker } from './download';
import { addSubscriptionJobs, closeSubscriptionWorker, initSubscriptionWorker } from './subscription';
Expand Down
80 changes: 10 additions & 70 deletions packages/server/src/workers/subscription.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import {
AccessPolicyInteraction,
BackgroundJobContext,
BackgroundJobInteraction,
ContentType,
OperationOutcomeError,
Operator,
Expand All @@ -9,9 +11,8 @@ import {
getReferenceString,
isGone,
isNotFound,
matchesSearchRequest,
normalizeOperationOutcome,
parseSearchRequest,
resourceMatchesSubscriptionCriteria,
satisfiedAccessPolicy,
serverError,
stringify,
Expand All @@ -30,8 +31,7 @@ import { getRedis } from '../redis';
import { createSubEventNotification } from '../subscriptions/websockets';
import { parseTraceparent } from '../traceparent';
import { AuditEventOutcome } from '../util/auditevent';
import { BackgroundJobContext, BackgroundJobInteraction } from './context';
import { createAuditEvent, findProjectMembership, isFhirCriteriaMet, isJobSuccessful } from './utils';
import { createAuditEvent, findProjectMembership, getPreviousResource, isJobSuccessful } from './utils';

/**
* The upper limit on the number of times a job can be retried.
Expand Down Expand Up @@ -270,73 +270,13 @@ async function matchesCriteria(
context: BackgroundJobContext
): Promise<boolean> {
const ctx = getRequestContext();
if (subscription.meta?.account && resource.meta?.account?.reference !== subscription.meta.account.reference) {
ctx.logger.debug('Ignore resource in different account compartment');
return false;
}

if (!matchesChannelType(subscription)) {
ctx.logger.debug(`Ignore subscription without recognized channel type`);
return false;
}

const subscriptionCriteria = subscription.criteria;
if (!subscriptionCriteria) {
ctx.logger.debug(`Ignore rest hook missing criteria`);
return false;
}

const searchRequest = parseSearchRequest(subscriptionCriteria);
if (resource.resourceType !== searchRequest.resourceType) {
ctx.logger.debug(
`Ignore rest hook for different resourceType (wanted "${searchRequest.resourceType}", received "${resource.resourceType}")`
);
return false;
}

const fhirPathCriteria = await isFhirCriteriaMet(subscription, resource);
if (!fhirPathCriteria) {
ctx.logger.debug(`Ignore rest hook for criteria returning false`);
return false;
}

const supportedInteractionExtension = getExtension(
return resourceMatchesSubscriptionCriteria({
resource,
subscription,
'https://medplum.com/fhir/StructureDefinition/subscription-supported-interaction'
);
if (supportedInteractionExtension && supportedInteractionExtension.valueCode !== context.interaction) {
ctx.logger.debug(
`Ignore rest hook for different interaction (wanted "${supportedInteractionExtension.valueCode}", received "${context.interaction}")`
);
return false;
}

return matchesSearchRequest(resource, searchRequest);
}

/**
* Returns true if the subscription channel type is ok to execute.
* @param subscription - The subscription resource.
* @returns True if the subscription channel type is ok to execute.
*/
function matchesChannelType(subscription: Subscription): boolean {
const channelType = subscription.channel?.type;

if (channelType === 'rest-hook') {
const url = subscription.channel?.endpoint;
if (!url) {
getLogger().debug(`Ignore rest-hook missing URL`);
return false;
}

return true;
}

if (channelType === 'websocket') {
return true;
}

return false;
context,
logger: ctx.logger,
getPreviousResource: getPreviousResource,
});
}

/**
Expand Down
21 changes: 2 additions & 19 deletions packages/server/src/workers/utils.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { createReference, evalFhirPathTyped, getExtension, isResource, Operator, toTypedValue } from '@medplum/core';
import { createReference, getExtension, isResource, Operator } from '@medplum/core';
import {
AuditEvent,
AuditEventEntity,
Expand Down Expand Up @@ -109,24 +109,7 @@ export function getAuditEventEntityRole(resource: Resource): Coding {
}
}

export async function isFhirCriteriaMet(subscription: Subscription, currentResource: Resource): Promise<boolean> {
const criteria = getExtension(
subscription,
'https://medplum.com/fhir/StructureDefinition/fhir-path-criteria-expression'
);
if (!criteria?.valueString) {
return true;
}
const previous = await getPreviousResource(currentResource);
const evalInput = {
'%current': toTypedValue(currentResource),
'%previous': toTypedValue(previous ?? {}),
};
const evalValue = evalFhirPathTyped(criteria.valueString, [toTypedValue(currentResource)], evalInput);
return evalValue?.[0]?.value === true;
}

async function getPreviousResource(currentResource: Resource): Promise<Resource | undefined> {
export async function getPreviousResource(currentResource: Resource): Promise<Resource | undefined> {
const systemRepo = getSystemRepo();
const history = await systemRepo.readHistory(currentResource.resourceType, currentResource?.id as string);

Expand Down
Loading