Skip to content

feat(workflow): Add external workflow triggers (webhook, API, schedule refactor) #322

@larryro

Description

@larryro

External Workflow Triggers - Implementation Plan

Problem

  1. Workflows only support manual and cron-based triggers - enterprise processes need webhook and API triggers
  2. Trigger configuration should be version-agnostic (stable URLs/keys across versions)

Design Principles

  • Separation of concerns: Start step defines input schema. Trigger sources are configured separately.
  • Version-agnostic triggers: Webhooks, schedules, API keys attach to rootVersionId and execute the active version
  • Multiple triggers per workflow: A workflow can have schedule + webhook + API + manual triggers simultaneously
  • Clear naming: start = input definition, trigger = how workflow is invoked

Architecture Overview

┌─────────────────────────────────────────────────────────────┐
│  Workflow (rootVersionId)                                    │
│  ├── wfSchedules (cron configs)                              │
│  ├── wfWebhooks (webhook tokens/secrets)                     │
│  └── wfApiKeys (API keys)                                    │
│                                                              │
│  When triggered → executes ACTIVE version                    │
└─────────────────────────────────────────────────────────────┘

┌─────────────────────────────────────────────────────────────┐
│  Workflow Version (wfDefinitionId)                           │
│  └── Start step config:                                      │
│      └── inputSchema: { customerId: string, ... }           │
└─────────────────────────────────────────────────────────────┘

Phase 1: Rename Trigger → Start Node

Naming Change

  • Old: trigger step type with triggerType: 'manual' | 'scheduled' | ...
  • New: start step type that defines input parameters only

New Start Node Config

// start step config
{
  inputSchema: {
    properties: {
      customerId: { type: 'string', description: '...' },
      orderData: { type: 'object' }
    },
    required: ['customerId']
  }
}

Schema Change

// Before
stepType: v.union(v.literal('trigger'), v.literal('llm'), ...)

// After
stepType: v.union(v.literal('start'), v.literal('llm'), ...)

Files to Modify

File Change
convex/workflows/schema.ts Rename triggerstart in stepType union
convex/workflow_engine/types/nodes.ts Rename trigger config → start config, simplify to inputSchema only
convex/workflow_engine/helpers/nodes/trigger/ Rename folder to start/, simplify logic
convex/workflow_engine/helpers/validation/constants.ts Remove VALID_TRIGGER_TYPES
convex/workflow_engine/helpers/engine/dynamic_workflow_handler.ts Update to handle start step type

Phase 2: New Trigger Tables (Version-Agnostic)

New Tables

Create convex/workflows/triggers/schema.ts:

wfSchedules - Schedule configurations

{
  organizationId: v.string(),
  workflowRootId: v.id('wfDefinitions'),  // rootVersionId
  cronExpression: v.string(),
  timezone: v.string(),
  isActive: v.boolean(),
  lastTriggeredAt: v.optional(v.number()),
  createdAt: v.number(),
  createdBy: v.string(),
}

wfWebhooks - Webhook configurations

{
  organizationId: v.string(),
  workflowRootId: v.id('wfDefinitions'),  // rootVersionId
  token: v.string(),           // URL identifier (unique)
  secretHash: v.string(),      // HMAC secret (hashed)
  isActive: v.boolean(),
  lastTriggeredAt: v.optional(v.number()),
  createdAt: v.number(),
  createdBy: v.string(),
}

wfApiKeys - Workflow-level API keys

{
  organizationId: v.string(),
  workflowRootId: v.id('wfDefinitions'),  // rootVersionId
  name: v.string(),
  keyHash: v.string(),         // one-way hash
  keyPrefix: v.string(),       // first 8 chars for display
  isActive: v.boolean(),
  expiresAt: v.optional(v.number()),
  createdAt: v.number(),
  createdBy: v.string(),
}

wfTriggerLogs - Audit trail

{
  organizationId: v.string(),
  workflowRootId: v.id('wfDefinitions'),
  wfDefinitionId: v.id('wfDefinitions'),  // actual version executed
  wfExecutionId: v.optional(v.id('wfExecutions')),
  triggerType: v.union(v.literal('manual'), v.literal('schedule'), v.literal('webhook'), v.literal('api')),
  status: v.union(v.literal('accepted'), v.literal('rejected'), v.literal('duplicate'), v.literal('rate_limited')),
  idempotencyKey: v.optional(v.string()),
  ipAddress: v.optional(v.string()),
  errorMessage: v.optional(v.string()),
  receivedAt: v.number(),
}

Index Strategy

  • wfSchedules: by_org, by_workflowRoot, by_org_active
  • wfWebhooks: by_org, by_workflowRoot, by_token (unique lookup)
  • wfApiKeys: by_org, by_workflowRoot, by_keyHash
  • wfTriggerLogs: by_org, by_workflowRoot, by_idempotencyKey

Phase 3: Refactor Scheduler

Move from trigger-step-based scheduling to wfSchedules table.

Key Change

When scanning for scheduled workflows:

  1. Query wfSchedules where isActive = true
  2. For each schedule, find the active version of the workflow
  3. Trigger that version

Files to Modify

File Change
convex/workflow_engine/helpers/scheduler/get_scheduled_workflows.ts Query wfSchedules table
convex/workflow_engine/helpers/scheduler/scan_and_trigger.ts Resolve active version before triggering
convex/workflow_engine/scheduler.ts Update internal functions

New Helper

Create getActiveWorkflowVersion(workflowRootId):

  • Query wfDefinitions where rootVersionId = workflowRootId AND status = 'active'
  • Return the active version ID

New Mutations

Create convex/workflows/triggers/schedules.ts:

  • createSchedule(workflowRootId, cronExpression, timezone)
  • updateSchedule(scheduleId, cronExpression, timezone)
  • deleteSchedule(scheduleId)
  • toggleSchedule(scheduleId, isActive)

Phase 4: Webhook Triggers

HTTP Endpoint

Create convex/workflows/triggers/webhook_http.ts:

POST /api/workflows/wh/{token}
Headers: X-Webhook-Signature (required), X-Idempotency-Key (optional)
Body: JSON payload

Flow

  1. Extract token from URL path
  2. Rate limit by IP
  3. Lookup webhook config by token → get workflowRootId
  4. Verify HMAC signature: HMAC-SHA256(body, secret)
  5. Check idempotency key (optional)
  6. Get active version: getActiveWorkflowVersion(workflowRootId)
  7. Call startWorkflow with triggeredBy: 'webhook', triggerData: { payload, headers }
  8. Log to wfTriggerLogs
  9. Return { executionId, status: 'accepted' }

Management Mutations

Create convex/workflows/triggers/webhooks.ts:

  • createWebhook(workflowRootId) → returns { token, secret } (secret shown once)
  • regenerateSecret(webhookId) → returns { secret } (new secret)
  • deleteWebhook(webhookId)
  • getWebhookUrl(webhookId) → query for display

Phase 5: API Triggers

HTTP Endpoint

Create convex/workflows/triggers/api_http.ts:

POST /api/workflows/trigger
Authorization: Bearer wfk_xxx...
Body: { "workflowRootId": "...", "input": {...}, "idempotencyKey": "optional" }

Flow

  1. Extract API key from Authorization header
  2. Rate limit by key hash
  3. Lookup API key → verify it belongs to the specified workflowRootId
  4. Check idempotency key (optional)
  5. Get active version: getActiveWorkflowVersion(workflowRootId)
  6. Call startWorkflow with triggeredBy: 'api'
  7. Log to wfTriggerLogs
  8. Return { executionId, workflowRootId, versionId, status: 'accepted' }

Management Mutations

Create convex/workflows/triggers/api_keys.ts:

  • createApiKey(workflowRootId, name) → returns { key } (shown once, prefix: wfk_)
  • listApiKeys(workflowRootId) → returns keys with masked values
  • revokeApiKey(keyId)

Phase 6: Security Helpers

Create convex/workflows/triggers/helpers/:

crypto.ts

  • generateToken() - Secure random token for webhook URLs
  • generateApiKey() - Generate key with wfk_ prefix
  • hashSecret(secret) - One-way hash for storage
  • verifyHmac(payload, signature, secret) - HMAC-SHA256 verification

validate.ts

  • checkIdempotency(orgId, key) - Check if key exists in recent logs
  • validateSignature(req, secret) - Extract and verify X-Webhook-Signature

Rate Limits

Add to convex/lib/rate_limiter/index.ts:

'workflow:webhook': { kind: 'token bucket', rate: 60, period: MINUTE, capacity: 100 }
'workflow:api': { kind: 'token bucket', rate: 100, period: MINUTE, capacity: 150 }

File Structure

convex/workflows/triggers/
├── schema.ts           # wfSchedules, wfWebhooks, wfApiKeys, wfTriggerLogs
├── schedules.ts        # Schedule CRUD mutations
├── webhooks.ts         # Webhook CRUD mutations
├── api_keys.ts         # API key CRUD mutations
├── webhook_http.ts     # Webhook HTTP handler
├── api_http.ts         # API trigger HTTP handler
├── queries.ts          # Shared queries (getActiveWorkflowVersion)
└── helpers/
    ├── crypto.ts       # Token/key generation, HMAC
    └── validate.ts     # Signature + idempotency

Critical Files Summary

File Action
convex/workflows/triggers/schema.ts Create - new tables
convex/schema.ts Add new tables
convex/http.ts Add webhook + API routes
convex/workflow_engine/types/nodes.ts Simplify trigger config
convex/workflow_engine/helpers/scheduler/*.ts Refactor to use wfSchedules
convex/lib/rate_limiter/index.ts Add rate limit rules

Migration

Data Migration

  1. For each existing workflow with scheduled trigger type:
    • Extract cron/timezone from trigger step config
    • Create wfSchedules record with workflowRootId
  2. Update trigger step config to only contain inputSchema

Backward Compatibility

  • Scheduler checks both old (trigger step) and new (wfSchedules) until migration complete
  • Or: run migration script before deploying new scheduler

Verification

  1. Schedule test:

    • Create schedule via createSchedule mutation
    • Wait for cron time → verify workflow triggers active version
  2. Webhook test:

    • Create webhook → get URL + secret
    • POST with valid HMAC signature → verify execution starts
    • POST with invalid signature → verify 401
  3. API test:

    • Create API key → get key
    • POST to trigger endpoint with Bearer token → verify execution starts
    • POST with invalid key → verify 401
  4. Version test:

    • Create v1 of workflow, set active
    • Create webhook trigger
    • Publish v2, set as active
    • Trigger webhook → verify v2 executes (not v1)
  5. Security tests:

    • Invalid signature → 401
    • Invalid/expired key → 401
    • Rate limit exceeded → 429
    • Duplicate idempotency key → 200 with existing executionId

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions