Skip to content

Commit

Permalink
Nc feat/cleanup (#8508)
Browse files Browse the repository at this point in the history
* feat: clean-up job

* feat: source cleanup

* fix: remove unused method

* feat: move jobs-redis to a static class

* feat: release sources from in-memory db on update & delete

* fix: skip calls if job redis not available

* fix: error handling on connection delete

---------

Co-authored-by: mertmit <mertmit99@gmail.com>
  • Loading branch information
dstala and mertmit committed May 23, 2024
1 parent 6a31b37 commit 6a334f7
Show file tree
Hide file tree
Showing 9 changed files with 238 additions and 160 deletions.
1 change: 1 addition & 0 deletions packages/nocodb/src/interface/Jobs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ export enum JobTypes {
UpdateSrcStat = 'update-source-stat',
HealthCheck = 'health-check',
HandleWebhook = 'handle-webhook',
CleanUp = 'clean-up',
}

export enum JobStatus {
Expand Down
18 changes: 17 additions & 1 deletion packages/nocodb/src/models/Source.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import {
prepareForResponse,
stringifyMetaProp,
} from '~/utils/modelUtils';
import { JobsRedis } from '~/modules/jobs/redis/jobs-redis';
import { InstanceCommands } from '~/interface/Jobs';

// todo: hide credentials
export default class Source implements SourceType {
Expand Down Expand Up @@ -182,6 +184,11 @@ export default class Source implements SourceType {
prepareForResponse(updateObj),
);

if (JobsRedis.available) {
await JobsRedis.emitWorkerCommand(InstanceCommands.RELEASE, sourceId);
await JobsRedis.emitPrimaryCommand(InstanceCommands.RELEASE, sourceId);
}

// call before reorder to update cache
const returnBase = await this.get(oldBase.id, false, ncMeta);

Expand Down Expand Up @@ -351,6 +358,15 @@ export default class Source implements SourceType {
return Base.get(this.base_id, ncMeta);
}

async sourceCleanup(_ncMeta = Noco.ncMeta) {
await NcConnectionMgrv2.deleteAwait(this);

if (JobsRedis.available) {
await JobsRedis.emitWorkerCommand(InstanceCommands.RELEASE, this.id);
await JobsRedis.emitPrimaryCommand(InstanceCommands.RELEASE, this.id);
}
}

async delete(ncMeta = Noco.ncMeta, { force }: { force?: boolean } = {}) {
const sources = await Source.list({ baseId: this.base_id }, ncMeta);

Expand Down Expand Up @@ -422,7 +438,7 @@ export default class Source implements SourceType {
await SyncSource.delete(syncSource.id, ncMeta);
}

await NcConnectionMgrv2.deleteAwait(this);
await this.sourceCleanup(ncMeta);

const res = await ncMeta.metaDelete(null, null, MetaTable.BASES, this.id);

Expand Down
5 changes: 4 additions & 1 deletion packages/nocodb/src/modules/jobs/fallback/jobs.service.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
import { Injectable } from '@nestjs/common';
import type { OnModuleInit } from '@nestjs/common';
import { QueueService } from '~/modules/jobs/fallback/fallback-queue.service';
import { JobStatus } from '~/interface/Jobs';

@Injectable()
export class JobsService {
export class JobsService implements OnModuleInit {
constructor(private readonly fallbackQueueService: QueueService) {}

async onModuleInit() {}

async add(name: string, data: any) {
return this.fallbackQueueService.add(name, data);
}
Expand Down
52 changes: 24 additions & 28 deletions packages/nocodb/src/modules/jobs/jobs.controller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,37 +11,26 @@ import {
import { Request } from 'express';
import { OnEvent } from '@nestjs/event-emitter';
import { customAlphabet } from 'nanoid';
import { ModuleRef } from '@nestjs/core';
import { JobsRedisService } from './redis/jobs-redis.service';
import type { Response } from 'express';
import type { OnModuleInit } from '@nestjs/common';
import { JobStatus } from '~/interface/Jobs';
import { JobEvents } from '~/interface/Jobs';
import { GlobalGuard } from '~/guards/global/global.guard';
import NocoCache from '~/cache/NocoCache';
import { CacheGetType, CacheScope } from '~/utils/globals';
import { MetaApiLimiterGuard } from '~/guards/meta-api-limiter.guard';
import { IJobsService } from '~/modules/jobs/jobs-service.interface';
import { JobsRedis } from '~/modules/jobs/redis/jobs-redis';

const nanoidv2 = customAlphabet('1234567890abcdefghijklmnopqrstuvwxyz', 14);
const POLLING_INTERVAL = 30000;

@Controller()
@UseGuards(MetaApiLimiterGuard, GlobalGuard)
export class JobsController implements OnModuleInit {
jobsRedisService: JobsRedisService;

export class JobsController {
constructor(
@Inject('JobsService') private readonly jobsService: IJobsService,
private moduleRef: ModuleRef,
) {}

onModuleInit() {
if (process.env.NC_REDIS_JOB_URL) {
this.jobsRedisService = this.moduleRef.get(JobsRedisService);
}
}

private jobRooms = {};
private localJobs = {};
private closedJobs = [];
Expand Down Expand Up @@ -102,8 +91,8 @@ export class JobsController implements OnModuleInit {
listeners: [res],
};
// subscribe to job events
if (this.jobsRedisService) {
this.jobsRedisService.subscribe(jobId, (data) => {
if (JobsRedis.available) {
await JobsRedis.subscribe(jobId, async (data) => {
if (this.jobRooms[jobId]) {
this.jobRooms[jobId].listeners.forEach((res) => {
if (!res.headersSent) {
Expand All @@ -121,7 +110,7 @@ export class JobsController implements OnModuleInit {
if (
[JobStatus.COMPLETED, JobStatus.FAILED].includes(data.status)
) {
this.jobsRedisService.unsubscribe(jobId);
await JobsRedis.unsubscribe(jobId);
delete this.jobRooms[jobId];
this.closedJobs.push(jobId);
setTimeout(() => {
Expand Down Expand Up @@ -178,7 +167,11 @@ export class JobsController implements OnModuleInit {
}

@OnEvent(JobEvents.STATUS)
sendJobStatus(data: { id: string; status: JobStatus; data?: any }): void {
async sendJobStatus(data: {
id: string;
status: JobStatus;
data?: any;
}): Promise<void> {
let response;

const jobId = data.id;
Expand All @@ -196,7 +189,7 @@ export class JobsController implements OnModuleInit {
this.localJobs[jobId].messages.shift();
}

NocoCache.set(`${CacheScope.JOBS}:${jobId}:messages`, {
await NocoCache.set(`${CacheScope.JOBS}:${jobId}:messages`, {
messages: this.localJobs[jobId].messages,
});
} else {
Expand All @@ -211,7 +204,7 @@ export class JobsController implements OnModuleInit {
_mid: 1,
};

NocoCache.set(`${CacheScope.JOBS}:${jobId}:messages`, {
await NocoCache.set(`${CacheScope.JOBS}:${jobId}:messages`, {
messages: this.localJobs[jobId].messages,
});
}
Expand All @@ -224,8 +217,8 @@ export class JobsController implements OnModuleInit {
});
}

if (this.jobsRedisService) {
this.jobsRedisService.publish(jobId, {
if (JobsRedis.available) {
await JobsRedis.publish(jobId, {
cmd: JobEvents.STATUS,
...data,
});
Expand All @@ -237,16 +230,19 @@ export class JobsController implements OnModuleInit {
this.closedJobs = this.closedJobs.filter((j) => j !== jobId);
}, POLLING_INTERVAL * 2);

setTimeout(() => {
setTimeout(async () => {
delete this.jobRooms[jobId];
delete this.localJobs[jobId];
NocoCache.del(`${CacheScope.JOBS}:${jobId}:messages`);
await NocoCache.del(`${CacheScope.JOBS}:${jobId}:messages`);
}, POLLING_INTERVAL * 2);
}
}

@OnEvent(JobEvents.LOG)
sendJobLog(data: { id: string; data: { message: string } }): void {
async sendJobLog(data: {
id: string;
data: { message: string };
}): Promise<void> {
let response;

const jobId = data.id;
Expand All @@ -265,7 +261,7 @@ export class JobsController implements OnModuleInit {
this.localJobs[jobId].messages.shift();
}

NocoCache.set(`${CacheScope.JOBS}:${jobId}:messages`, {
await NocoCache.set(`${CacheScope.JOBS}:${jobId}:messages`, {
messages: this.localJobs[jobId].messages,
});
} else {
Expand All @@ -280,7 +276,7 @@ export class JobsController implements OnModuleInit {
_mid: 1,
};

NocoCache.set(`${CacheScope.JOBS}:${jobId}:messages`, {
await NocoCache.set(`${CacheScope.JOBS}:${jobId}:messages`, {
messages: this.localJobs[jobId].messages,
});
}
Expand All @@ -293,8 +289,8 @@ export class JobsController implements OnModuleInit {
});
}

if (this.jobsRedisService) {
this.jobsRedisService.publish(jobId, {
if (JobsRedis.available) {
await JobsRedis.publish(jobId, {
cmd: JobEvents.LOG,
...data,
});
Expand Down
3 changes: 1 addition & 2 deletions packages/nocodb/src/modules/jobs/jobs.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import { JobsLogService } from '~/modules/jobs/jobs/jobs-log.service';
// import { JobsGateway } from '~/modules/jobs/jobs.gateway';
import { JobsController } from '~/modules/jobs/jobs.controller';
import { JobsService } from '~/modules/jobs/redis/jobs.service';
import { JobsRedisService } from '~/modules/jobs/redis/jobs-redis.service';
import { JobsEventService } from '~/modules/jobs/redis/jobs-event.service';

// Fallback
Expand Down Expand Up @@ -60,7 +59,7 @@ export const JobsModuleMetadata = {
providers: [
...(process.env.NC_WORKER_CONTAINER !== 'true' ? [] : []),
...(process.env.NC_REDIS_JOB_URL
? [JobsRedisService, JobsEventService]
? [JobsEventService]
: [FallbackQueueService, FallbackJobsEventService]),
{
provide: 'JobsService',
Expand Down
91 changes: 0 additions & 91 deletions packages/nocodb/src/modules/jobs/redis/jobs-redis.service.ts

This file was deleted.

Loading

0 comments on commit 6a334f7

Please sign in to comment.