Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: rework bucket notification DX #181

Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
"repository": "https://github.com/nitrictech/node-sdk",
"main": "lib/index.js",
"types": "lib/index.d.ts",
"version": "0.0.1",
HomelessDinosaur marked this conversation as resolved.
Show resolved Hide resolved
"scripts": {
"bump": "standard-version",
"build": "tsup src/index.ts --dts --outDir lib",
Expand Down
98 changes: 93 additions & 5 deletions src/api/storage/v0/storage.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,14 @@ import {
File,
} from '@nitric/api/proto/storage/v1/storage_pb';
import { UnimplementedError } from '../../errors';
import { BucketNotificationWorkerOptions, bucket } from '@nitric/sdk/resources';
import {
BucketNotificationWorkerOptions,
FileNotificationWorkerOptions,
bucket,
} from '@nitric/sdk/resources';
import { faas } from '@nitric/sdk';
import { ResourceServiceClient } from '@nitric/sdk/gen/proto/resource/v1/resource_grpc_pb';
import { ResourceDeclareResponse } from '@nitric/sdk/gen/proto/resource/v1/resource_pb';

describe('Storage Client Tests', () => {
describe('Given nitric.api.storage.StorageClient.Write throws an error', () => {
Expand Down Expand Up @@ -443,7 +449,7 @@ describe('bucket notification', () => {
});

beforeAll(async () => {
await bucket('test-bucket').on('created:test.png', mockFn);
await bucket('test-bucket').on('write', 'test.png', mockFn);
});

it('should create a new FaasClient', () => {
Expand All @@ -453,7 +459,7 @@ describe('bucket notification', () => {
it('should provide Faas with BucketNotificationWorkerOptions', () => {
const expectedOpts = new BucketNotificationWorkerOptions(
'test-bucket',
'created',
'write',
'test.png'
);
expect(faas.Faas).toBeCalledWith(expectedOpts);
Expand All @@ -470,7 +476,7 @@ describe('bucket notification', () => {
});

beforeAll(async () => {
await bucket('test-bucket').on('deleted:test.png', mockFn);
await bucket('test-bucket').on('delete', 'test.png', mockFn);
});

it('should create a new FaasClient', () => {
Expand All @@ -480,7 +486,89 @@ describe('bucket notification', () => {
it('should provide Faas with BucketNotificationWorkerOptions', () => {
const expectedOpts = new BucketNotificationWorkerOptions(
'test-bucket',
'deleted',
'delete',
'test.png'
);
expect(faas.Faas).toBeCalledWith(expectedOpts);
});

it('should call FaasClient::start()', () => {
expect(startSpy).toBeCalledTimes(1);
});
});
});

describe('file notification', () => {
const startSpy = jest
.spyOn(faas.Faas.prototype, 'start')
.mockReturnValue(Promise.resolve());

const existsSpy = jest
.spyOn(ResourceServiceClient.prototype, 'declare')
.mockImplementation((_, callback: any) => {
const response = new ResourceDeclareResponse();
callback(null, response);
return null as any;
});

const mockFn = jest.fn();

describe('When registering a file notification for creating', () => {
let bucketResource;
beforeAll(async () => {
bucketResource = bucket('test-bucket-create').for('reading');
await bucketResource.on('write', 'test.png', mockFn);
});

afterAll(() => {
jest.resetAllMocks();
});

it('should declare the new resource', () => {
expect(existsSpy).toBeCalledTimes(1);
});

it('should create a new FaasClient', () => {
expect(faas.Faas).toBeCalledTimes(1);
});

it('should provide Faas with FileNotificationWorkerOptions', () => {
const expectedOpts = new FileNotificationWorkerOptions(
bucketResource,
'write',
'test.png'
);
expect(faas.Faas).toBeCalledWith(expectedOpts);
});

it('should call FaasClient::start()', () => {
expect(startSpy).toBeCalledTimes(1);
});
});

describe('When registering a file notification for deleting', () => {
let bucketResource;
beforeAll(async () => {
bucketResource = bucket('test-bucket-delete').for('reading');
await bucketResource.on('delete', 'test.png', mockFn);
});

afterAll(() => {
jest.resetAllMocks();
});

it('should declare the new resource', () => {
expect(existsSpy).toBeCalledTimes(1);
});

it('should create a new FaasClient', () => {
expect(faas.Faas).toBeCalledTimes(1);
});

it('should provide Faas with FileNotificationWorkerOptions', () => {
const expectedOpts = new FileNotificationWorkerOptions(
bucketResource,
'delete',
'test.png'
);
expect(faas.Faas).toBeCalledWith(expectedOpts);
Expand Down
27 changes: 27 additions & 0 deletions src/api/storage/v0/storage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,11 @@ import {
} from '@nitric/api/proto/storage/v1/storage_pb';
import * as grpc from '@grpc/grpc-js';
import { fromGrpcError, InvalidArgumentError } from '../../errors';
import {
BucketNotificationMiddleware,
FileNotificationMiddleware,
} from '@nitric/sdk/faas';
import { BucketNotification, FileNotification } from '@nitric/sdk/resources';

/**
* Nitric storage client, facilitates writing and reading from blob storage (buckets).
Expand Down Expand Up @@ -97,6 +102,28 @@ export class Bucket {
}
return new File(this.storage, this, name);
}

/**
* Register and start a bucket notification handler that will be called for all matching notification events on this bucket
*
* @param notificationType the notification type that should trigger the middleware, either 'write' or 'delete'
HomelessDinosaur marked this conversation as resolved.
Show resolved Hide resolved
* @param notificationPrefixFilter the file name prefix that files must match to trigger a notification
* @param middleware handler middleware which will be run for every incoming event
* @returns Promise which resolves when the handler server terminates
*/
on(
notificationType: string,
notificationPrefixFilter: string,
...middleware: FileNotificationMiddleware[]
): Promise<void> {
const notification = new FileNotification(
this,
notificationType,
notificationPrefixFilter,
...middleware
);
return notification['start']();
}
}

export enum FileMode {
Expand Down
100 changes: 91 additions & 9 deletions src/faas/v0/context.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,15 @@ import {
import * as api from '@opentelemetry/api';
import * as jspb from 'google-protobuf';
import { jsonResponse } from './json';
import { Bucket, File } from '@nitric/sdk/api';
import {
ApiWorkerOptions,
BucketNotificationWorkerOptions,
FileNotificationWorkerOptions,
SubscriptionWorkerOptions,
bucket,
} from '@nitric/sdk';
import { FaasWorkerOptions } from './start';

export abstract class TriggerContext<
Req extends AbstractRequest = AbstractRequest,
Expand Down Expand Up @@ -79,15 +88,25 @@ export abstract class TriggerContext<

// Instantiate a concrete TriggerContext from the gRPC trigger model
static fromGrpcTriggerRequest(
trigger: TriggerRequest
trigger: TriggerRequest,
options?: FaasWorkerOptions
): TriggerContext<any, any> {
// create context
if (trigger.hasHttp()) {
return HttpContext.fromGrpcTriggerRequest(trigger);
} else if (trigger.hasTopic()) {
return EventContext.fromGrpcTriggerRequest(trigger);
} else if (trigger.hasNotification()) {
return BucketNotificationContext.fromGrpcTriggerRequest(trigger);
if (options instanceof FileNotificationWorkerOptions) {
return FileNotificationContext.fromGrpcTriggerRequest(
trigger,
options as FileNotificationWorkerOptions
);
}
return BucketNotificationContext.fromGrpcTriggerRequest(
trigger,
options as BucketNotificationWorkerOptions
);
}
throw new Error('Unsupported trigger request type');
}
Expand Down Expand Up @@ -253,7 +272,10 @@ export class HttpContext extends TriggerContext<HttpRequest, HttpResponse> {
return this;
}

static fromGrpcTriggerRequest(trigger: TriggerRequest): HttpContext {
static fromGrpcTriggerRequest(
trigger: TriggerRequest,
options?: ApiWorkerOptions
): HttpContext {
const http = trigger.getHttp();
const ctx = new HttpContext();

Expand Down Expand Up @@ -403,7 +425,8 @@ export class EventContext<T> extends TriggerContext<
}

static fromGrpcTriggerRequest(
trigger: TriggerRequest
trigger: TriggerRequest,
options?: SubscriptionWorkerOptions
): EventContext<unknown> {
const topic = trigger.getTopic();
const ctx = new EventContext();
Expand Down Expand Up @@ -441,7 +464,8 @@ export class BucketNotificationContext extends TriggerContext<
}

static fromGrpcTriggerRequest(
trigger: TriggerRequest
trigger: TriggerRequest,
options?: BucketNotificationWorkerOptions
): BucketNotificationContext {
const ctx = new BucketNotificationContext();
const bucketConfig = trigger.getNotification().getBucket();
Expand Down Expand Up @@ -478,19 +502,20 @@ export enum BucketNotificationType {
}

export class BucketNotificationRequest extends AbstractRequest {
key: string;
type: BucketNotificationType;
public readonly key: string;
public readonly notificationType: BucketNotificationType;

constructor(
data: string | Uint8Array,
traceContext: api.Context,
key: string,
type: number
notificationType: number
) {
super(data, traceContext);

// Get reference to the bucket
this.key = key;
this.type = this.eventTypeToNotificationType(type);
this.notificationType = this.eventTypeToNotificationType(notificationType);
}

private eventTypeToNotificationType = (
Expand All @@ -507,6 +532,63 @@ export class BucketNotificationRequest extends AbstractRequest {
};
}

export class FileNotificationContext extends TriggerContext<
FileNotificationRequest,
BucketNotificationResponse
> {
public get bucketNotification(): FileNotificationContext {
return this;
}

static fromGrpcTriggerRequest(
trigger: TriggerRequest,
options: FileNotificationWorkerOptions
): BucketNotificationContext {
const ctx = new FileNotificationContext();
const bucketConfig = trigger.getNotification().getBucket();

ctx.request = new FileNotificationRequest(
trigger.getData_asU8(),
getTraceContext(trigger.getTraceContext()),
bucketConfig.getKey(),
bucketConfig.getType(),
options.bucketRef
);

ctx.response = {
success: true,
};

return ctx;
}

static toGrpcTriggerResponse(
ctx: TriggerContext<AbstractRequest, any>
): TriggerResponse {
const notifyCtx = ctx.bucketNotification;
const triggerResponse = new TriggerResponse();
const notificationResponse = new NotificationResponseContext();
notificationResponse.setSuccess(notifyCtx.res.success);
triggerResponse.setNotification(notificationResponse);
return triggerResponse;
}
}
export class FileNotificationRequest extends BucketNotificationRequest {
public readonly file: File;

constructor(
data: string | Uint8Array,
traceContext: api.Context,
key: string,
notificationType: number,
bucket: Bucket
) {
super(data, traceContext, key, notificationType);

this.file = bucket.file(key);
}
}

export interface BucketNotificationResponse {
success: boolean;
}
3 changes: 3 additions & 0 deletions src/faas/v0/handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import {
HttpContext,
EventContext,
BucketNotificationContext,
FileNotificationContext,
} from '.';

export type GenericHandler<Ctx> = (ctx: Ctx) => Promise<Ctx> | Ctx;
Expand All @@ -40,6 +41,8 @@ export type EventMiddleware<
export type ScheduleMiddleware = GenericMiddleware<EventContext<undefined>>;
export type BucketNotificationMiddleware =
GenericMiddleware<BucketNotificationContext>;
export type FileNotificationMiddleware =
GenericMiddleware<FileNotificationContext>;

/**
* createHandler
Expand Down
Loading