Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

5507 modify the partial sync cron to work with the new statuses #5512

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
49 commits
Select commit Hold shift + click to select a range
16257f0
add the feature flag key
bosiraphael May 21, 2024
4fcbc72
add feature flag
bosiraphael May 21, 2024
5e35e42
create gmail-partial-sync-v2.service
bosiraphael May 21, 2024
6027ff3
handling errors
bosiraphael May 21, 2024
abef702
Merge branch '5498-create-a-feature-flag-is_gmail_sync_v2_enabled' in…
bosiraphael May 21, 2024
9e809ea
fix module imports
bosiraphael May 21, 2024
a71da95
update substatus
bosiraphael May 21, 2024
141700e
fix updateSyncSubStatus query
bosiraphael May 21, 2024
41609d2
update updateAuthFailedAt
bosiraphael May 21, 2024
fcb0164
fix updates in statuses
bosiraphael May 21, 2024
9ab3828
added logs
bosiraphael May 21, 2024
2eaed61
update log
bosiraphael May 21, 2024
29dd094
create gmail-fetch-message-content-from-cache-v2
bosiraphael May 21, 2024
6408bf6
update gmail-fetch-messages-from-cache.cron.job
bosiraphael May 21, 2024
cf6e7eb
refactor v2
bosiraphael May 21, 2024
c90f176
fix wrong status being set at the end of partial sync
bosiraphael May 22, 2024
f8387bc
fix wrong status being set at the end of partial sync
bosiraphael May 22, 2024
78daae8
remove change in status from transaction
bosiraphael May 22, 2024
bee5176
remove unnecessary transaction
bosiraphael May 22, 2024
b59fb95
partial sync is working
bosiraphael May 22, 2024
113cc85
remove unused code
bosiraphael May 22, 2024
82ba799
remove unused code
bosiraphael May 22, 2024
40be8f2
fix try catch
bosiraphael May 22, 2024
b807517
rename gmail-messages-import
bosiraphael May 22, 2024
868306f
update command
bosiraphael May 22, 2024
c007f1f
renaming
bosiraphael May 22, 2024
fe05076
renaming
bosiraphael May 22, 2024
cc432eb
renaming
bosiraphael May 22, 2024
a6618a3
renaming
bosiraphael May 22, 2024
40803c0
throw errors in service and catch them in job
bosiraphael May 22, 2024
08922fd
renaming
bosiraphael May 22, 2024
64adcde
refactoring
bosiraphael May 22, 2024
0f3f971
refactoring
bosiraphael May 22, 2024
3b9723d
add FAILED syncSubStatus
bosiraphael May 22, 2024
5be151d
add error handling
bosiraphael May 22, 2024
f1c7c0c
create gmail-get-history.service
bosiraphael May 22, 2024
a48b24d
Merge branch 'main' into 5507-modify-the-partial-sync-cron-to-work-wi…
bosiraphael May 22, 2024
6ec2312
fix module import
bosiraphael May 22, 2024
c10e0f4
split into services
bosiraphael May 23, 2024
81bb1da
use new set-message-channel-sync-status service
bosiraphael May 23, 2024
ab86f7f
refactoring
bosiraphael May 23, 2024
db0f8db
fix module imports
bosiraphael May 23, 2024
4b241ee
renaming
bosiraphael May 23, 2024
85ee3c4
refactoring
bosiraphael May 23, 2024
5a76312
ad try catch logic
bosiraphael May 23, 2024
5d73bd9
handle failed state
bosiraphael May 23, 2024
1e6aed6
updating logs
bosiraphael May 23, 2024
111737e
refactoring
bosiraphael May 23, 2024
908c81f
refactoring
bosiraphael May 23, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,9 @@ import {
MessageChannelVisibility,
} from 'src/modules/messaging/standard-objects/message-channel.workspace-entity';
import {
GmailFullSyncJobData,
GmailFullSyncJob,
} from 'src/modules/messaging/jobs/gmail-full-sync.job';
GmailFullMessageListFetchJobData,
GmailFullMessageListFetchJob,
} from 'src/modules/messaging/jobs/gmail-full-message-list-fetch.job';

@Injectable()
export class GoogleAPIsService {
Expand Down Expand Up @@ -156,8 +156,8 @@ export class GoogleAPIsService {
isCalendarEnabled: boolean,
) {
if (this.environmentService.get('MESSAGING_PROVIDER_GMAIL_ENABLED')) {
await this.messageQueueService.add<GmailFullSyncJobData>(
GmailFullSyncJob.name,
await this.messageQueueService.add<GmailFullMessageListFetchJobData>(
GmailFullMessageListFetchJob.name,
{
workspaceId,
connectedAccountId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@ import { DataSourceModule } from 'src/engine/metadata-modules/data-source/data-s
import { ObjectMetadataModule } from 'src/engine/metadata-modules/object-metadata/object-metadata.module';
import { CleanInactiveWorkspaceJob } from 'src/engine/workspace-manager/workspace-cleaner/crons/clean-inactive-workspace.job';
import { CalendarEventParticipantModule } from 'src/modules/calendar/services/calendar-event-participant/calendar-event-participant.module';
import { GmailFetchMessageContentFromCacheModule } from 'src/modules/messaging/services/gmail-fetch-message-content-from-cache/gmail-fetch-message-content-from-cache.module';
import { GmailFullSyncModule } from 'src/modules/messaging/services/gmail-full-sync/gmail-full-sync.module';
import { GmailPartialSyncModule } from 'src/modules/messaging/services/gmail-partial-sync/gmail-partial-sync.module';
import { GmailMessagesImportModule } from 'src/modules/messaging/services/gmail-messages-import/gmail-messages-import.module';
import { GmailFullMessageListFetchModule } from 'src/modules/messaging/services/gmail-full-message-list-fetch/gmail-full-message-list-fetch.module';
import { GmailPartialMessageListFetchModule } from 'src/modules/messaging/services/gmail-partial-message-list-fetch/gmail-partial-message-list-fetch.module';
import { TimelineActivityModule } from 'src/modules/timeline/timeline-activity.module';
import { WorkspaceModule } from 'src/engine/core-modules/workspace/workspace.module';
import { CalendarMessagingParticipantJobModule } from 'src/modules/calendar-messaging-participant/jobs/calendar-messaging-participant-job.module';
Expand All @@ -41,9 +41,9 @@ import { TimelineJobModule } from 'src/modules/timeline/jobs/timeline-job.module
BillingModule,
UserWorkspaceModule,
WorkspaceModule,
GmailFullSyncModule,
GmailFetchMessageContentFromCacheModule,
GmailPartialSyncModule,
GmailFullMessageListFetchModule,
GmailMessagesImportModule,
GmailPartialMessageListFetchModule,
CalendarEventParticipantModule,
TimelineActivityModule,
StripeModule,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -254,4 +254,30 @@ export class ConnectedAccountRepository {
transactionManager,
);
}

public async getConnectedAccountOrThrow(
workspaceId: string,
connectedAccountId: string,
): Promise<ObjectRecord<ConnectedAccountWorkspaceEntity>> {
const connectedAccount = await this.getById(
connectedAccountId,
workspaceId,
);

if (!connectedAccount) {
throw new Error(
`Connected account ${connectedAccountId} not found in workspace ${workspaceId}`,
);
}

const refreshToken = connectedAccount.refreshToken;

if (!refreshToken) {
throw new Error(
`No refresh token found for connected account ${connectedAccountId} in workspace ${workspaceId}`,
);
}

return connectedAccount;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,16 @@ import { Command, CommandRunner } from 'nest-commander';

import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants';
import { MessageQueueService } from 'src/engine/integrations/message-queue/services/message-queue.service';
import { GmailPartialSyncCronJob } from 'src/modules/messaging/crons/jobs/gmail-partial-sync.cron.job';
import { GmailMessageListFetchCronJob } from 'src/modules/messaging/crons/jobs/gmail-message-list-fetch.cron.job';

const GMAIL_PARTIAL_SYNC_CRON_PATTERN = '*/5 * * * *';

@Command({
name: 'cron:messaging:gmail-partial-sync',
name: 'cron:messaging:gmail-message-list-fetch',
description:
'Starts a cron job to sync existing connected account messages and store them in the cache',
})
export class GmailPartialSyncCronCommand extends CommandRunner {
export class GmailMessageListFetchCronCommand extends CommandRunner {
constructor(
@Inject(MessageQueue.cronQueue)
private readonly messageQueueService: MessageQueueService,
Expand All @@ -23,7 +23,7 @@ export class GmailPartialSyncCronCommand extends CommandRunner {

async run(): Promise<void> {
await this.messageQueueService.addCron<undefined>(
GmailPartialSyncCronJob.name,
GmailMessageListFetchCronJob.name,
undefined,
{
repeat: { pattern: GMAIL_PARTIAL_SYNC_CRON_PATTERN },
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,13 @@ import { Command, CommandRunner } from 'nest-commander';

import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants';
import { MessageQueueService } from 'src/engine/integrations/message-queue/services/message-queue.service';
import { GmailFetchMessagesFromCacheCronJob } from 'src/modules/messaging/crons/jobs/gmail-fetch-messages-from-cache.cron.job';
import { GmailMessagesImportCronJob } from 'src/modules/messaging/crons/jobs/gmail-messages-import.cron.job';

@Command({
name: 'cron:messaging:gmail-fetch-messages-from-cache',
name: 'cron:messaging:gmail-messages-import',
description: 'Starts a cron job to fetch all messages from cache',
})
export class GmailFetchMessagesFromCacheCronCommand extends CommandRunner {
export class GmailMessagesImportCronCommand extends CommandRunner {
constructor(
@Inject(MessageQueue.cronQueue)
private readonly messageQueueService: MessageQueueService,
Expand All @@ -20,7 +20,7 @@ export class GmailFetchMessagesFromCacheCronCommand extends CommandRunner {

async run(): Promise<void> {
await this.messageQueueService.addCron<undefined>(
GmailFetchMessagesFromCacheCronJob.name,
GmailMessagesImportCronJob.name,
undefined,
{
repeat: {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,14 @@ import { Module } from '@nestjs/common';

import { ObjectMetadataRepositoryModule } from 'src/engine/object-metadata-repository/object-metadata-repository.module';
import { ConnectedAccountWorkspaceEntity } from 'src/modules/connected-account/standard-objects/connected-account.workspace-entity';
import { GmailFetchMessagesFromCacheCronCommand } from 'src/modules/messaging/crons/commands/gmail-fetch-messages-from-cache.cron.command';
import { GmailPartialSyncCronCommand } from 'src/modules/messaging/crons/commands/gmail-partial-sync.cron.command';
import { GmailMessagesImportCronCommand } from 'src/modules/messaging/crons/commands/gmail-messages-import.cron.command';
import { GmailMessageListFetchCronCommand } from 'src/modules/messaging/crons/commands/gmail-message-list-fetch.cron.command';
@Module({
imports: [
ObjectMetadataRepositoryModule.forFeature([
ConnectedAccountWorkspaceEntity,
]),
],
providers: [
GmailPartialSyncCronCommand,
GmailFetchMessagesFromCacheCronCommand,
],
providers: [GmailMessageListFetchCronCommand, GmailMessagesImportCronCommand],
})
export class MessagingCronCommandsModule {}
Original file line number Diff line number Diff line change
Expand Up @@ -8,19 +8,29 @@ import { MessageQueueJob } from 'src/engine/integrations/message-queue/interface
import { Workspace } from 'src/engine/core-modules/workspace/workspace.entity';
import { DataSourceEntity } from 'src/engine/metadata-modules/data-source/data-source.entity';
import {
GmailPartialSyncJobData,
GmailPartialSyncJob,
} from 'src/modules/messaging/jobs/gmail-partial-sync.job';
GmailPartialMessageListFetchJobData,
GmailPartialMessageListFetchJob,
} from 'src/modules/messaging/jobs/gmail-partial-message-list-fetch.job';
import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants';
import { MessageQueueService } from 'src/engine/integrations/message-queue/services/message-queue.service';
import { InjectObjectMetadataRepository } from 'src/engine/object-metadata-repository/object-metadata-repository.decorator';
import { MessageChannelRepository } from 'src/modules/messaging/repositories/message-channel.repository';
import { MessageChannelWorkspaceEntity } from 'src/modules/messaging/standard-objects/message-channel.workspace-entity';
import { EnvironmentService } from 'src/engine/integrations/environment/environment.service';
import {
FeatureFlagEntity,
FeatureFlagKeys,
} from 'src/engine/core-modules/feature-flag/feature-flag.entity';
import {
GmailMessageListFetchJobData,
GmailMessageListFetchJob,
} from 'src/modules/messaging/jobs/gmail-message-list-fetch.job';

@Injectable()
export class GmailPartialSyncCronJob implements MessageQueueJob<undefined> {
private readonly logger = new Logger(GmailPartialSyncCronJob.name);
export class GmailMessageListFetchCronJob
implements MessageQueueJob<undefined>
{
private readonly logger = new Logger(GmailMessageListFetchCronJob.name);

constructor(
@InjectRepository(Workspace, 'core')
Expand All @@ -32,6 +42,8 @@ export class GmailPartialSyncCronJob implements MessageQueueJob<undefined> {
@InjectObjectMetadataRepository(MessageChannelWorkspaceEntity)
private readonly messageChannelRepository: MessageChannelRepository,
private readonly environmentService: EnvironmentService,
@InjectRepository(FeatureFlagEntity, 'core')
private readonly featureFlagRepository: Repository<FeatureFlagEntity>,
) {}

async handle(): Promise<void> {
Expand All @@ -57,11 +69,20 @@ export class GmailPartialSyncCronJob implements MessageQueueJob<undefined> {
);

for (const workspaceId of workspaceIdsWithDataSources) {
await this.enqueuePartialSyncs(workspaceId);
await this.enqueueSyncs(workspaceId);
}
}

private async enqueuePartialSyncs(workspaceId: string): Promise<void> {
private async enqueueSyncs(workspaceId: string): Promise<void> {
const isGmailSyncV2EnabledFeatureFlag =
await this.featureFlagRepository.findOneBy({
workspaceId: workspaceId,
key: FeatureFlagKeys.IsGmailSyncV2Enabled,
value: true,
});

const isGmailSyncV2Enabled = isGmailSyncV2EnabledFeatureFlag?.value;

try {
const messageChannels =
await this.messageChannelRepository.getAll(workspaceId);
Expand All @@ -71,16 +92,29 @@ export class GmailPartialSyncCronJob implements MessageQueueJob<undefined> {
continue;
}

await this.messageQueueService.add<GmailPartialSyncJobData>(
GmailPartialSyncJob.name,
{
workspaceId,
connectedAccountId: messageChannel.connectedAccountId,
},
{
retryLimit: 2,
},
);
if (isGmailSyncV2Enabled) {
await this.messageQueueService.add<GmailMessageListFetchJobData>(
GmailMessageListFetchJob.name,
{
workspaceId,
connectedAccountId: messageChannel.connectedAccountId,
},
{
retryLimit: 2,
},
);
} else {
await this.messageQueueService.add<GmailPartialMessageListFetchJobData>(
GmailPartialMessageListFetchJob.name,
{
workspaceId,
connectedAccountId: messageChannel.connectedAccountId,
},
{
retryLimit: 2,
},
);
}
}
} catch (error) {
this.logger.error(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { Injectable } from '@nestjs/common';
import { Injectable, Logger } from '@nestjs/common';
import { InjectRepository } from '@nestjs/typeorm';

import { Repository, In } from 'typeorm';
Expand All @@ -10,22 +10,34 @@ import { DataSourceEntity } from 'src/engine/metadata-modules/data-source/data-s
import { InjectObjectMetadataRepository } from 'src/engine/object-metadata-repository/object-metadata-repository.decorator';
import { MessageChannelRepository } from 'src/modules/messaging/repositories/message-channel.repository';
import { MessageChannelWorkspaceEntity } from 'src/modules/messaging/standard-objects/message-channel.workspace-entity';
import { GmailFetchMessageContentFromCacheService } from 'src/modules/messaging/services/gmail-fetch-message-content-from-cache/gmail-fetch-message-content-from-cache.service';
import { GmailMessagesImportService } from 'src/modules/messaging/services/gmail-messages-import/gmail-messages-import.service';
import { EnvironmentService } from 'src/engine/integrations/environment/environment.service';
import { GmailMessagesImportV2Service } from 'src/modules/messaging/services/gmail-messages-import/gmail-messages-import-v2.service';
import {
FeatureFlagEntity,
FeatureFlagKeys,
} from 'src/engine/core-modules/feature-flag/feature-flag.entity';
import { ConnectedAccountWorkspaceEntity } from 'src/modules/connected-account/standard-objects/connected-account.workspace-entity';
import { ConnectedAccountRepository } from 'src/modules/connected-account/repositories/connected-account.repository';

@Injectable()
export class GmailFetchMessagesFromCacheCronJob
implements MessageQueueJob<undefined>
{
export class GmailMessagesImportCronJob implements MessageQueueJob<undefined> {
private readonly logger = new Logger(GmailMessagesImportCronJob.name);

constructor(
@InjectRepository(Workspace, 'core')
private readonly workspaceRepository: Repository<Workspace>,
@InjectRepository(DataSourceEntity, 'metadata')
private readonly dataSourceRepository: Repository<DataSourceEntity>,
@InjectObjectMetadataRepository(MessageChannelWorkspaceEntity)
private readonly messageChannelRepository: MessageChannelRepository,
private readonly gmailFetchMessageContentFromCacheService: GmailFetchMessageContentFromCacheService,
private readonly gmailFetchMessageContentFromCacheService: GmailMessagesImportService,
private readonly gmailFetchMessageContentFromCacheV2Service: GmailMessagesImportV2Service,
@InjectRepository(FeatureFlagEntity, 'core')
private readonly featureFlagRepository: Repository<FeatureFlagEntity>,
private readonly environmentService: EnvironmentService,
@InjectObjectMetadataRepository(ConnectedAccountWorkspaceEntity)
private readonly connectedAccountRepository: ConnectedAccountRepository,
) {}

async handle(): Promise<void> {
Expand Down Expand Up @@ -59,11 +71,42 @@ export class GmailFetchMessagesFromCacheCronJob
const messageChannels =
await this.messageChannelRepository.getAll(workspaceId);

const isGmailSyncV2EnabledFeatureFlag =
await this.featureFlagRepository.findOneBy({
workspaceId: workspaceId,
key: FeatureFlagKeys.IsGmailSyncV2Enabled,
value: true,
});

const isGmailSyncV2Enabled = isGmailSyncV2EnabledFeatureFlag?.value;

for (const messageChannel of messageChannels) {
await this.gmailFetchMessageContentFromCacheService.fetchMessageContentFromCache(
workspaceId,
messageChannel.connectedAccountId,
);
if (!messageChannel?.isSyncEnabled) {
continue;
}

if (isGmailSyncV2Enabled) {
try {
const connectedAccount =
await this.connectedAccountRepository.getConnectedAccountOrThrow(
workspaceId,
messageChannel.connectedAccountId,
);

await this.gmailFetchMessageContentFromCacheV2Service.processMessageBatchImport(
messageChannel,
connectedAccount,
workspaceId,
);
} catch (error) {
this.logger.log(error.message);
}
} else {
await this.gmailFetchMessageContentFromCacheService.fetchMessageContentFromCache(
workspaceId,
messageChannel.connectedAccountId,
);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,26 +5,26 @@ import { FeatureFlagEntity } from 'src/engine/core-modules/feature-flag/feature-
import { Workspace } from 'src/engine/core-modules/workspace/workspace.entity';
import { DataSourceEntity } from 'src/engine/metadata-modules/data-source/data-source.entity';
import { ObjectMetadataRepositoryModule } from 'src/engine/object-metadata-repository/object-metadata-repository.module';
import { GmailFetchMessagesFromCacheCronJob } from 'src/modules/messaging/crons/jobs/gmail-fetch-messages-from-cache.cron.job';
import { GmailPartialSyncCronJob } from 'src/modules/messaging/crons/jobs/gmail-partial-sync.cron.job';
import { GmailFetchMessageContentFromCacheModule } from 'src/modules/messaging/services/gmail-fetch-message-content-from-cache/gmail-fetch-message-content-from-cache.module';
import { GmailMessagesImportCronJob } from 'src/modules/messaging/crons/jobs/gmail-messages-import.cron.job';
import { GmailMessageListFetchCronJob } from 'src/modules/messaging/crons/jobs/gmail-message-list-fetch.cron.job';
import { GmailMessagesImportModule } from 'src/modules/messaging/services/gmail-messages-import/gmail-messages-import.module';
import { MessageChannelWorkspaceEntity } from 'src/modules/messaging/standard-objects/message-channel.workspace-entity';

@Module({
imports: [
TypeOrmModule.forFeature([Workspace, FeatureFlagEntity], 'core'),
TypeOrmModule.forFeature([DataSourceEntity], 'metadata'),
ObjectMetadataRepositoryModule.forFeature([MessageChannelWorkspaceEntity]),
GmailFetchMessageContentFromCacheModule,
GmailMessagesImportModule,
],
providers: [
{
provide: GmailFetchMessagesFromCacheCronJob.name,
useClass: GmailFetchMessagesFromCacheCronJob,
provide: GmailMessagesImportCronJob.name,
useClass: GmailMessagesImportCronJob,
},
{
provide: GmailPartialSyncCronJob.name,
useClass: GmailPartialSyncCronJob,
provide: GmailMessageListFetchCronJob.name,
useClass: GmailMessageListFetchCronJob,
},
],
})
Expand Down
Loading
Loading