Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 42 additions & 33 deletions sim/app/api/webhooks/trigger/[path]/route.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -169,8 +169,8 @@ describe('Webhook Trigger API Route', () => {
})

afterEach(() => {
vi.clearAllMocks()
})
vi.clearAllMocks();
});

/**
* Test WhatsApp webhook verification challenge
Expand All @@ -190,13 +190,13 @@ describe('Webhook Trigger API Route', () => {
'hub.mode': 'subscribe',
'hub.verify_token': 'test-token',
'hub.challenge': 'challenge-123',
})
});

// Create a mock URL with search params
const mockUrl = `http://localhost:3000/api/webhooks/trigger/whatsapp?${verificationParams.toString()}`
const mockUrl = `http://localhost:3000/api/webhooks/trigger/whatsapp?${verificationParams.toString()}`;

// Create a mock request with the URL using NextRequest
const req = new NextRequest(new URL(mockUrl))
const req = new NextRequest(new URL(mockUrl));

// Mock database to return a WhatsApp webhook with matching token
const { db } = await import('@/db')
Expand All @@ -219,21 +219,21 @@ describe('Webhook Trigger API Route', () => {
})

// Mock the path param
const params = Promise.resolve({ path: 'whatsapp' })
const params = Promise.resolve({ path: 'whatsapp' });

// Import the handler after mocks are set up
const { GET } = await import('./route')
const { GET } = await import('./route');

// Call the handler
const response = await GET(req, { params })
const response = await GET(req, { params });

// Check response
expect(response.status).toBe(200)
expect(response.status).toBe(200);

// Should return exactly the challenge string
const text = await response.text()
expect(text).toBe('challenge-123')
})
const text = await response.text();
expect(text).toBe('challenge-123');
});

/**
* Test POST webhook with workflow execution
Expand Down Expand Up @@ -276,16 +276,15 @@ describe('Webhook Trigger API Route', () => {
db.select.mockReturnValue({ from: fromMock })

// Create a mock request with JSON body
const req = createMockRequest('POST', webhookPayload)
const req = createMockRequest('POST', webhookPayload);

// Mock the path param
const params = Promise.resolve({ path: 'test-path' })

// Import the handler after mocks are set up
const { POST } = await import('./route')
const { POST } = await import('./route');

// Call the handler
const response = await POST(req, { params })
const response = await POST(req, { params });

// For the standard path with timeout, we expect 200
expect(response.status).toBe(200)
Expand All @@ -310,16 +309,16 @@ describe('Webhook Trigger API Route', () => {
db.select.mockReturnValue({ from: fromMock })

// Create a mock request
const req = createMockRequest('POST', { event: 'test' })
const req = createMockRequest('POST', { event: 'test' });

// Mock the path param
const params = Promise.resolve({ path: 'non-existent-path' })
const params = Promise.resolve({ path: 'non-existent-path' });

// Import the handler after mocks are set up
const { POST } = await import('./route')
const { POST } = await import('./route');

// Call the handler
const response = await POST(req, { params })
const response = await POST(req, { params });

// Check response - expect 404 since our implementation returns 404 when webhook is not found
expect(response.status).toBe(404)
Expand Down Expand Up @@ -368,16 +367,16 @@ describe('Webhook Trigger API Route', () => {
db.select.mockReturnValue({ from: fromMock })

// Create a mock request
const req = createMockRequest('POST', { event: 'test' })
const req = createMockRequest('POST', { event: 'test' });

// Mock the path param
const params = Promise.resolve({ path: 'test-path' })
const params = Promise.resolve({ path: 'test-path' });

// Import the handler after mocks are set up
const { POST } = await import('./route')
const { POST } = await import('./route');

// Call the handler
const response = await POST(req, { params })
const response = await POST(req, { params });

// Expect 200 response for duplicate
expect(response.status).toBe(200)
Expand Down Expand Up @@ -425,23 +424,23 @@ describe('Webhook Trigger API Route', () => {
const slackHeaders = {
'x-slack-signature': 'v0=1234567890abcdef',
'x-slack-request-timestamp': Math.floor(Date.now() / 1000).toString(),
}
};

// Create a mock request
const req = createMockRequest(
'POST',
{ event_id: 'evt123', type: 'event_callback' },
slackHeaders
)
);

// Mock the path param
const params = Promise.resolve({ path: 'slack-path' })
const params = Promise.resolve({ path: 'slack-path' });

// Import the handler after mocks are set up
const { POST } = await import('./route')
const { POST } = await import('./route');

// Call the handler
const response = await POST(req, { params })
const response = await POST(req, { params });

// Verify response exists
expect(response).toBeDefined()
Expand Down Expand Up @@ -495,16 +494,26 @@ describe('Webhook Trigger API Route', () => {
db.select.mockReturnValue({ from: fromMock })

// Create a mock request
const req = createMockRequest('POST', { event: 'test' })
const req = createMockRequest('POST', { event: 'test' });

// Mock the path param
const params = Promise.resolve({ path: 'test-path' })
const params = Promise.resolve({ path: 'test-path' });

// Import the handler after mocks are set up
const { POST } = await import('./route')
const { POST } = await import('./route');

// Call the handler
const response = await POST(req, { params })
const response = await POST(req, { params });

// Verify response exists and check status code
// For non-Airtable webhooks, we expect 200 from the timeout response
expect(response).toBeDefined();
expect(response.status).toBe(200);

// Verify response text
const text = await response.text();
expect(text).toMatch(/received|processing/i);
});

// Verify response exists and check status code
// For non-Airtable webhooks, we expect 200 from the timeout response
Expand Down
100 changes: 98 additions & 2 deletions sim/app/api/webhooks/trigger/[path]/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,11 @@ import {

const logger = createLogger('WebhookTriggerAPI')


// Storage for active processing tasks to prevent garbage collection
// This keeps track of background promises that must continue running even after HTTP response
const activeProcessingTasks = new Map<string, Promise<any>>();

// Define Next.js config for webhook processing
export const dynamic = 'force-dynamic' // Ensure dynamic rendering
export const maxDuration = 300 // 5 minutes max execution time
Expand All @@ -40,6 +45,7 @@ export async function GET(request: NextRequest, { params }: { params: Promise<{
const token = url.searchParams.get('hub.verify_token')
const challenge = url.searchParams.get('hub.challenge')

// Handle WhatsApp verification if applicable
const whatsAppResponse = await handleWhatsAppVerification(requestId, path, mode, token, challenge)
if (whatsAppResponse) {
return whatsAppResponse
Expand All @@ -59,6 +65,7 @@ export async function GET(request: NextRequest, { params }: { params: Promise<{
return new NextResponse('Webhook not found', { status: 404 })
}

// For all other providers, confirm the webhook endpoint exists
logger.info(`[${requestId}] Webhook verification successful for path: ${path}`)
return new NextResponse('OK', { status: 200 })
} catch (error: any) {
Expand All @@ -84,15 +91,86 @@ export async function POST(
let foundWebhook: any = null

// --- PHASE 1: Request validation and parsing ---

// Extract and validate the raw request body
let rawBody: string | null = null
try {
const path = (await params).path

// Check content type to handle different formats properly
const contentType = request.headers.get('content-type') || ''

// Clone the request to get the raw body for signature verification and content parsing
const requestClone = request.clone()
rawBody = await requestClone.text()

if (!rawBody || rawBody.length === 0) {
logger.warn(`[${requestId}] Rejecting request with empty body`)
return new NextResponse('Empty request body', { status: 400 })
}
// Parse the request body based on content type
let body: any

if (contentType.includes('application/json')) {
try {
// Parse as JSON if content type is JSON
body = JSON.parse(rawBody || '{}')
} catch (error) {
logger.warn(`[${requestId}] Failed to parse request body as JSON, trying other formats`, error)
body = {}
}
} else if (contentType.includes('application/x-www-form-urlencoded') || contentType.includes('multipart/form-data')) {
// Handle form data (what Twilio sends)
try {
const formData = await request.formData()
body = Object.fromEntries(formData.entries())
logger.debug(`[${requestId}] Parsed form data: ${Object.keys(body).length} fields`)
} catch (error) {
logger.warn(`[${requestId}] Failed to parse form data, falling back to manual parsing`, error)

// Fall back to manual parsing of form-urlencoded data
try {
if (rawBody) {
body = Object.fromEntries(
rawBody
.split('&')
.map(pair => {
const [key, value] = pair.split('=').map(part => decodeURIComponent(part.replace(/\+/g, ' ')))
return [key, value]
})
)
} else {
body = {}
}
} catch (innerError) {
logger.error(`[${requestId}] Failed manual form parsing`, innerError)
body = {}
}
}
} else {
// For other content types, try to parse as JSON first, then fall back
try {
body = JSON.parse(rawBody || '{}')
} catch (error) {
logger.warn(`[${requestId}] Unknown content type or parsing error, using raw body`, {
contentType,
bodyPreview: rawBody?.substring(0, 100)
})
body = { rawContent: rawBody }
}
}

logger.info(`[${requestId}] Webhook POST request received for path: ${path}`)

// Generate a unique request ID based on the request content
const requestHash = await generateRequestHash(path, body)

// Check if this exact request has been processed before
if (await hasProcessedMessage(requestHash)) {
logger.info(`[${requestId}] Duplicate webhook request detected with hash: ${requestHash}`)
// Return early for duplicate requests to prevent workflow execution
return new NextResponse('Duplicate request', { status: 200 })
}
} catch (bodyError) {
logger.error(`[${requestId}] Failed to read request body`, {
error: bodyError instanceof Error ? bodyError.message : String(bodyError),
Expand Down Expand Up @@ -125,6 +203,7 @@ export async function POST(
`slack:${body?.team_id || ''}:${body?.event?.ts || body?.event?.event_ts || Date.now()}`

try {
// Check if this message was already processed
const isDuplicate = await hasProcessedMessage(dedupeKey)
if (isDuplicate) {
logger.info(`[${requestId}] Duplicate Slack message detected: ${dedupeKey}`)
Expand Down Expand Up @@ -152,7 +231,11 @@ export async function POST(
executionLockKey = `execution:lock:${requestId}:${crypto.randomUUID()}`
}

// We can't detect Airtable webhooks reliably from the body alone
// We'll handle provider-specific logic after loading the webhook from the database

try {
// Attempt to acquire a distributed processing lock
hasExecutionLock = await acquireLock(executionLockKey, requestId, 30) // 30 second TTL
} catch (lockError) {
logger.error(`[${requestId}] Error acquiring execution lock`, lockError)
Expand Down Expand Up @@ -222,7 +305,6 @@ export async function POST(
)
)
.limit(1);

if (alreadyProcessed.length > 0) {
logger.info(`[${requestId}] Duplicate Airtable notification detected: ${notificationId}`);
return new NextResponse('Notification already processed', { status: 200 });
Expand Down Expand Up @@ -314,6 +396,20 @@ export async function POST(
}
})();

// Race processing against timeout to ensure fast response
// Race the processing against the timeout to ensure fast response (for non-Airtable)
return Promise.race([timeoutPromise, processingPromise]);
}

/**
* Generate a TwiML response
*/
function generateTwiML(message?: string): string {
if (!message) {
return '<?xml version="1.0" encoding="UTF-8"?>\n<Response></Response>'
}

return `<?xml version="1.0" encoding="UTF-8"?>
<Response>
<Message>${message}</Message>
</Response>`
}
Loading
Loading