Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { db } from '@db';
import { logger, task } from '@trigger.dev/sdk';
import { logger, tags, task } from '@trigger.dev/sdk';
import { BrowserbaseService } from '../../browserbase/browserbase.service';
import { triggerEmail } from '../../email/trigger-email';
import { TaskStatusChangedEmail } from '../../email/templates/task-status-changed';
Expand Down Expand Up @@ -187,6 +187,8 @@ export const runBrowserAutomation = task({
}) => {
const { automationId, automationName, organizationId, taskId } = payload;

await tags.add([`org:${organizationId}`]);

logger.info(`Running browser automation "${automationName}"`, {
automationId,
organizationId,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { db } from '@db';
import { logger, task } from '@trigger.dev/sdk';
import { logger, tags, task } from '@trigger.dev/sdk';

/**
* Trigger task that runs a cloud security scan for a single connection.
Expand All @@ -20,6 +20,8 @@ export const runCloudSecurityScan = task({
const { connectionId, organizationId, providerSlug, connectionName } =
payload;

await tags.add([`org:${organizationId}`]);

logger.info(
`Starting cloud security scan for connection: ${connectionName}`,
{
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { getManifest, runAllChecks } from '@trycompai/integration-platform';
import { db } from '@db';
import { logger, task } from '@trigger.dev/sdk';
import { logger, tags, task } from '@trigger.dev/sdk';

/**
* Trigger task that runs all checks for a connection.
Expand All @@ -22,6 +22,8 @@ export const runConnectionChecks = task({
}) => {
const { connectionId, organizationId, providerSlug } = payload;

await tags.add([`org:${organizationId}`]);

logger.info(`Auto-running checks for connection ${connectionId}`, {
provider: providerSlug,
organizationId,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { getManifest, runAllChecks } from '@trycompai/integration-platform';
import { db } from '@db';
import { logger, task } from '@trigger.dev/sdk';
import { logger, tags, task } from '@trigger.dev/sdk';
import { triggerEmail } from '../../email/trigger-email';
import { TaskStatusChangedEmail } from '../../email/templates/task-status-changed';
import { isUserUnsubscribed } from '@trycompai/email';
Expand Down Expand Up @@ -185,6 +185,8 @@ export const runTaskIntegrationChecks = task({
checkIds,
} = payload;

await tags.add([`org:${organizationId}`]);

logger.info(`Running integration checks for task "${taskTitle}"`, {
taskId,
connectionId,
Expand Down
4 changes: 3 additions & 1 deletion apps/api/src/trigger/policies/update-policy.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { logger, metadata, queue, schemaTask } from '@trigger.dev/sdk';
import { logger, metadata, queue, schemaTask, tags } from '@trigger.dev/sdk';
import { z } from 'zod';
import { processPolicyUpdate } from './update-policy-helpers';

Expand Down Expand Up @@ -29,6 +29,8 @@ export const updatePolicy = schemaTask({
memberId: z.string().optional(),
}),
run: async (params) => {
await tags.add([`org:${params.organizationId}`]);

try {
logger.info(`Starting policy update for policy ${params.policyId}`);

Expand Down
4 changes: 3 additions & 1 deletion apps/api/src/trigger/questionnaire/answer-question.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { syncOrganizationEmbeddings } from '@/vector-store/lib';
import { logger, metadata, task } from '@trigger.dev/sdk';
import { logger, metadata, tags, task } from '@trigger.dev/sdk';
import { generateAnswerWithRAG } from './answer-question-helpers';

export interface AnswerQuestionPayload {
Expand Down Expand Up @@ -177,6 +177,8 @@ export const answerQuestionTask = task({
questionIndex: number;
totalQuestions: number;
}) => {
await tags.add([`org:${payload.organizationId}`]);

return await answerQuestion(payload);
},
});
4 changes: 3 additions & 1 deletion apps/api/src/trigger/questionnaire/parse-questionnaire.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { extractS3KeyFromUrl } from '@/app/s3';
import { GetObjectCommand, S3Client } from '@aws-sdk/client-s3';
import { db } from '@db';
import { logger, task } from '@trigger.dev/sdk';
import { logger, tags, task } from '@trigger.dev/sdk';

// Import shared utilities
import {
Expand Down Expand Up @@ -253,6 +253,8 @@ export const parseQuestionnaireTask = task({
}) => {
const taskStartTime = Date.now();

await tags.add([`org:${payload.organizationId}`]);

logger.info('Starting parse questionnaire task', {
inputType: payload.inputType,
organizationId: payload.organizationId,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { db, PolicyStatus, type Prisma } from '@db';
import { logger, schemaTask } from '@trigger.dev/sdk';
import { logger, schemaTask, tags } from '@trigger.dev/sdk';
import { z } from 'zod';

const POLICY_BATCH_SIZE = 50;
Expand All @@ -10,6 +10,8 @@ export const migratePoliciesForOrg = schemaTask({
organizationId: z.string(),
}),
run: async ({ organizationId }) => {
await tags.add([`org:${organizationId}`]);

// Find policies without any versions
const policiesWithoutVersions = await db.policy.findMany({
where: {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { logger, metadata, task } from '@trigger.dev/sdk';
import { logger, metadata, tags, task } from '@trigger.dev/sdk';
import { db } from '@db';
import { deleteManualAnswerTask } from './delete-manual-answer';

Expand All @@ -17,6 +17,8 @@ export const deleteAllManualAnswersOrchestratorTask = task({
organizationId: string;
manualAnswerIds?: string[]; // Optional: IDs passed directly to avoid race condition
}) => {
await tags.add([`org:${payload.organizationId}`]);

logger.info('Starting delete all manual answers from vector DB', {
organizationId: payload.organizationId,
manualAnswerIdsProvided: !!payload.manualAnswerIds,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { logger, task } from '@trigger.dev/sdk';
import { logger, tags, task } from '@trigger.dev/sdk';
import { findEmbeddingsForSource } from '@/vector-store/lib/core/find-existing-embeddings';
import { vectorIndex } from '@/vector-store/lib/core/client';
import { db } from '@db';
Expand All @@ -12,6 +12,8 @@ export const deleteKnowledgeBaseDocumentTask = task({
maxAttempts: 3,
},
run: async (payload: { documentId: string; organizationId: string }) => {
await tags.add([`org:${payload.organizationId}`]);

logger.info('Deleting Knowledge Base document from vector DB', {
documentId: payload.documentId,
organizationId: payload.organizationId,
Expand Down
4 changes: 3 additions & 1 deletion apps/api/src/trigger/vector-store/delete-manual-answer.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { logger, task } from '@trigger.dev/sdk';
import { logger, tags, task } from '@trigger.dev/sdk';
import { deleteManualAnswerFromVector } from '@/vector-store/lib/sync/sync-manual-answer';

/**
Expand All @@ -11,6 +11,8 @@ export const deleteManualAnswerTask = task({
maxAttempts: 3,
},
run: async (payload: { manualAnswerId: string; organizationId: string }) => {
await tags.add([`org:${payload.organizationId}`]);

logger.info('Deleting manual answer from vector DB', {
manualAnswerId: payload.manualAnswerId,
organizationId: payload.organizationId,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { logger, task } from '@trigger.dev/sdk';
import { logger, tags, task } from '@trigger.dev/sdk';
import { GetObjectCommand, S3Client } from '@aws-sdk/client-s3';
import { db } from '@db';
import { batchUpsertEmbeddings } from '@/vector-store/lib/core/upsert-embedding';
Expand Down Expand Up @@ -86,6 +86,8 @@ export const processKnowledgeBaseDocumentTask = task({
},
maxDuration: 1000 * 60 * 30, // 30 minutes for large files
run: async (payload: { documentId: string; organizationId: string }) => {
await tags.add([`org:${payload.organizationId}`]);

logger.info('Processing Knowledge Base document', {
documentId: payload.documentId,
organizationId: payload.organizationId,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { logger, metadata, task } from '@trigger.dev/sdk';
import { logger, metadata, tags, task } from '@trigger.dev/sdk';
import { processKnowledgeBaseDocumentTask } from './process-knowledge-base-document';

const BATCH_SIZE = 10; // Process 10 documents at a time
Expand All @@ -14,6 +14,8 @@ export const processKnowledgeBaseDocumentsOrchestratorTask = task({
},
maxDuration: 60 * 60, // 1 hour (in seconds) for large document batches
run: async (payload: { documentIds: string[]; organizationId: string }) => {
await tags.add([`org:${payload.organizationId}`]);

logger.info('Starting Knowledge Base documents processing orchestrator', {
organizationId: payload.organizationId,
documentCount: payload.documentIds.length,
Expand Down
4 changes: 3 additions & 1 deletion apps/api/src/trigger/vendor/vendor-risk-assessment-task.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import {
import { openai } from '@ai-sdk/openai';
import type { Prisma } from '@db';
import type { Task } from '@trigger.dev/sdk';
import { logger, metadata, queue, schemaTask } from '@trigger.dev/sdk';
import { logger, metadata, queue, schemaTask, tags } from '@trigger.dev/sdk';
import { generateObject } from 'ai';
import { z } from 'zod';

Expand Down Expand Up @@ -478,6 +478,8 @@ export const vendorRiskAssessmentTask: Task<
},
maxDuration: 1000 * 60 * 10,
run: async (payload) => {
await tags.add([`org:${payload.organizationId}`]);

const vendor = await db.vendor.findFirst({
where: {
id: payload.vendorId,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { getOrganizationContext } from '@/trigger/tasks/onboarding/onboard-organization-helpers';
import { openai } from '@ai-sdk/openai';
import { db } from '@db/server';
import { logger, metadata, schemaTask } from '@trigger.dev/sdk';
import { logger, metadata, schemaTask, tags } from '@trigger.dev/sdk';
import { generateText } from 'ai';
import { z } from 'zod';

Expand Down Expand Up @@ -299,6 +299,7 @@ export const generateAuditorContentTask = schemaTask({
},
run: async (payload) => {
const { organizationId } = payload;
await tags.add([`org:${organizationId}`]);

logger.info(`Starting auditor content generation for org ${organizationId}`);

Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { getFleetInstance } from '@/lib/fleet';
import { db } from '@db/server';

import { logger, queue, task } from '@trigger.dev/sdk';
import { logger, queue, tags, task } from '@trigger.dev/sdk';
import { AxiosError } from 'axios';
// Optional: define a queue if we want to control concurrency in v4
const fleetQueue = queue({ name: 'create-fleet-label-for-org', concurrencyLimit: 10 });
Expand All @@ -13,6 +13,8 @@ export const createFleetLabelForOrg = task({
maxAttempts: 3,
},
run: async ({ organizationId }: { organizationId: string }) => {
await tags.add([`org:${organizationId}`]);

const organization = await db.organization.findUnique({
where: {
id: organizationId,
Expand Down
3 changes: 2 additions & 1 deletion apps/app/src/trigger/tasks/email/new-policy-email.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { db } from '@db/server';
import { PolicyNotificationEmail } from '@trycompai/email';
import { isUserUnsubscribed } from '@trycompai/email/lib/check-unsubscribe';
import { logger, queue, task } from '@trigger.dev/sdk';
import { logger, queue, tags, task } from '@trigger.dev/sdk';
import { sendEmailViaApi } from '../../lib/send-email-via-api';

const policyEmailQueue = queue({
Expand All @@ -26,6 +26,7 @@ export const sendNewPolicyEmail = task({
email: payload.email,
policyName: payload.policyName,
});
await tags.add([`org:${payload.organizationId}`]);

try {
const unsubscribed = await isUserUnsubscribed(db, payload.email, 'policyNotifications', payload.organizationId);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { db } from '@db/server';
import { AllPolicyNotificationEmail } from '@trycompai/email';
import { isUserUnsubscribed } from '@trycompai/email/lib/check-unsubscribe';
import { logger, queue, task } from '@trigger.dev/sdk';
import { logger, queue, tags, task } from '@trigger.dev/sdk';
import { sendEmailViaApi } from '../../lib/send-email-via-api';

const allPolicyEmailQueue = queue({
Expand All @@ -24,6 +24,7 @@ export const sendPublishAllPoliciesEmail = task({
email: payload.email,
organizationName: payload.organizationName,
});
await tags.add([`org:${payload.organizationId}`]);

try {
const unsubscribed = await isUserUnsubscribed(db, payload.email, 'policyNotifications', payload.organizationId);
Expand Down
3 changes: 2 additions & 1 deletion apps/app/src/trigger/tasks/email/weekly-task-digest-email.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { db } from '@db/server';
import { logger, queue, task } from '@trigger.dev/sdk';
import { logger, queue, tags, task } from '@trigger.dev/sdk';
import WeeklyTaskDigestEmail from '@trycompai/email/emails/reminders/weekly-task-digest';
import { isUserUnsubscribed } from '@trycompai/email/lib/check-unsubscribe';
import { sendEmailViaApi } from '../../lib/send-email-via-api';
Expand Down Expand Up @@ -34,6 +34,7 @@ export const sendWeeklyTaskDigestEmailTask = task({
organizationName: payload.organizationName,
taskCount: payload.tasks.length,
});
await tags.add([`org:${payload.organizationId}`]);

try {
const unsubscribed = await isUserUnsubscribed(db, payload.email, 'weeklyTaskDigest', payload.organizationId);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
import { db } from '@db/server';
import { logger, task } from '@trigger.dev/sdk';
import { logger, tags, task } from '@trigger.dev/sdk';
import { sendIntegrationResults } from './integration-results';

export const runIntegrationTests = task({
id: 'run-integration-tests',
run: async (payload: { organizationId: string; integrationId?: string }) => {
const { organizationId, integrationId } = payload;
await tags.add([`org:${organizationId}`]);

logger.info(
integrationId
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { trainingVideos } from '@/lib/data/training-videos';
import { db } from '@db/server';
import { logger, task } from '@trigger.dev/sdk';
import { logger, tags, task } from '@trigger.dev/sdk';

export const backfillTrainingVideosForOrg = task({
id: 'backfill-training-videos-for-org',
Expand All @@ -9,6 +9,7 @@ export const backfillTrainingVideosForOrg = task({
},
run: async (payload: { organizationId: string }) => {
logger.info(`Starting training video backfill for organization ${payload.organizationId}`);
await tags.add([`org:${payload.organizationId}`]);

try {
// Get all members for this organization
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { db } from '@db/server';
import { logger, queue, task } from '@trigger.dev/sdk';
import { logger, queue, tags, task } from '@trigger.dev/sdk';
import { getOrganizationContext, triggerPolicyUpdates } from './onboard-organization-helpers';

// v4 queues must be declared in advance
Expand All @@ -16,6 +16,7 @@ export const generateFullPolicies = task({
},
run: async (payload: { organizationId: string }) => {
logger.info(`Starting full policy generation for organization ${payload.organizationId}`);
await tags.add([`org:${payload.organizationId}`]);

try {
// Get organization context
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { RiskStatus, db } from '@db/server';
import { logger, metadata, queue, task } from '@trigger.dev/sdk';
import { logger, metadata, queue, tags, task } from '@trigger.dev/sdk';
import axios from 'axios';
import {
createRiskMitigationComment,
Expand All @@ -24,6 +24,7 @@ export const generateRiskMitigation = task({
policies: PolicyContext[];
}) => {
const { organizationId, riskId, authorId, policies } = payload;
await tags.add([`org:${organizationId}`]);
logger.info(`Generating risk mitigation for risk ${riskId} in org ${organizationId}`);

const risk = await db.risk.findFirst({ where: { id: riskId, organizationId } });
Expand Down Expand Up @@ -85,6 +86,7 @@ export const generateRiskMitigationsForOrg = task({
queue: riskMitigationFanoutQueue,
run: async (payload: { organizationId: string }) => {
const { organizationId } = payload;
await tags.add([`org:${organizationId}`]);
logger.info(`Fan-out risk mitigations for org ${organizationId}`);

const [risks, policyRows, author] = await Promise.all([
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { VendorStatus, db } from '@db/server';
import { logger, metadata, queue, task } from '@trigger.dev/sdk';
import { logger, metadata, queue, tags, task } from '@trigger.dev/sdk';
import axios from 'axios';
import {
createVendorRiskComment,
Expand Down Expand Up @@ -27,6 +27,7 @@ export const generateVendorMitigation = task({
policies: PolicyContext[];
}) => {
const { organizationId, vendorId, authorId, policies } = payload;
await tags.add([`org:${organizationId}`]);
logger.info(`Generating vendor mitigation for vendor ${vendorId} in org ${organizationId}`);

const vendor = await db.vendor.findFirst({ where: { id: vendorId, organizationId } });
Expand Down Expand Up @@ -85,6 +86,7 @@ export const generateVendorMitigationsForOrg = task({
queue: vendorMitigationFanoutQueue,
run: async (payload: { organizationId: string }) => {
const { organizationId } = payload;
await tags.add([`org:${organizationId}`]);
logger.info(`Fan-out vendor mitigations for org ${organizationId}`);

const [vendors, policyRows, author] = await Promise.all([
Expand Down
Loading
Loading