Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
176 changes: 176 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,191 @@ Then you can:

- Export the messages published on one or more topics to a HTTP endpoint.
- Publish messages to a topic from a HTTP endpoint with a POST to the `/topics/:topic` endpoint.
- Build request/response patterns over Kafka topics with HTTP-style semantics.

## Features

- Consume messages from Kafka topics and forward to HTTP endpoints.
- Send messages to Kafka topics via HTTP API.
- **Request/Response pattern over Kafka topics.**
- Direct binary message passing.
- Configurable retries and concurrency.
- Dead Letter Queue (DLQ) for failed messages.

## Request/Response Pattern

The kafka-hooks library supports HTTP request/response patterns routed through Kafka topics. This enables building responsive microservices that communicate asynchronously via Kafka while maintaining HTTP-style request/response semantics.

### How It Works

1. **HTTP Request**: Client makes a POST request to a configured endpoint
2. **Kafka Request**: The request is published to a Kafka request topic with a unique correlation ID
3. **Service Processing**: External service consumes from the request topic, processes the message
4. **Kafka Response**: Service publishes response to a response topic with the same correlation ID
5. **HTTP Response**: The original HTTP request completes with the response data

### Configuration

Add request/response mappings to your `platformatic.json`:

```json
{
"kafka": {
"brokers": ["localhost:9092"],
"topics": [
{
"topic": "response-topic",
"url": "http://localhost:3043/webhook"
}
],
"requestResponse": [
{
"path": "/api/process",
"requestTopic": "request-topic",
"responseTopic": "response-topic",
"timeout": 5000
}
],
"consumer": {
"groupId": "my-group"
}
}
}
```

### Request/Response Options

| Option | Description | Default |
| --------------- | ----------------------------------------------------- | ---------- |
| `path` | HTTP endpoint path to expose (supports path parameters) | Required |
| `requestTopic` | Kafka topic to publish requests to | Required |
| `responseTopic` | Kafka topic to consume responses from | Required |
| `timeout` | Request timeout in milliseconds | `30000` |

### Path Parameters and Query Strings

The request/response pattern supports both path parameters and query strings, which are automatically passed to Kafka consumers via headers.

**Path Parameters:**
```json
{
"path": "/api/users/:userId/orders/:orderId"
}
```

**Request with path parameters:**
```bash
curl -X POST http://localhost:3042/api/users/123/orders/456 \
-H "Content-Type: application/json" \
-d '{"action": "cancel"}'
```

**Kafka message headers include:**
```json
{
"x-plt-kafka-hooks-path-params": "{\"userId\":\"123\",\"orderId\":\"456\"}"
}
```

**Query String Parameters:**
```bash
curl -X POST http://localhost:3042/api/search?q=coffee&limit=10&sort=price \
-H "Content-Type: application/json" \
-d '{"filters": {...}}'
```

**Kafka message headers include:**
```json
{
"x-plt-kafka-hooks-query-string": "{\"q\":\"coffee\",\"limit\":\"10\",\"sort\":\"price\"}"
}
```

### Usage Example

**Make a request:**
```bash
curl -X POST http://localhost:3042/api/process \
-H "Content-Type: application/json" \
-d '{"userId": 123, "action": "process"}'
```

**Request message published to Kafka:**
```json
{
"headers": {
"content-type": "application/json",
"x-plt-kafka-hooks-correlation-id": "550e8400-e29b-41d4-a716-446655440000"
},
"value": "{\"userId\": 123, \"action\": \"process\"}"
}
```

**Service processes and responds:**
```bash
# External service publishes response
curl -X POST http://localhost:3042/topics/response-topic \
-H "Content-Type: application/json" \
-H "x-plt-kafka-hooks-correlation-id: 550e8400-e29b-41d4-a716-446655440000" \
-H "x-status-code: 200" \
-d '{"result": "success", "data": {...}}'
```

**HTTP response returned to client:**
```json
{
"result": "success",
"data": {...}
}
```

### Request Headers

Request messages automatically include these headers when published to Kafka:

| Header | Description | When Added |
| ------------------------------------ | ---------------------------------------------- | ---------- |
| `x-plt-kafka-hooks-correlation-id` | Unique correlation ID for request matching | Always |
| `x-plt-kafka-hooks-path-params` | JSON string of path parameters | When path parameters present |
| `x-plt-kafka-hooks-query-string` | JSON string of query string parameters | When query parameters present |
| `content-type` | Content type of the request | Always |

### Response Headers

Response messages support these special headers:

| Header | Description | Default |
| ------------------------------------ | ---------------------------------------------- | ------- |
| `x-plt-kafka-hooks-correlation-id` | Must match the original request correlation ID | Required|
| `x-status-code` | HTTP status code for the response | `200` |
| `content-type` | Content type of the response | Preserved |

### Error Handling

**Timeout Response:**
If no response is received within the configured timeout:
```json
{
"code": "HTTP_ERROR_GATEWAY_TIMEOUT",
"error": "Gateway Timeout",
"message": "Request timeout",
"statusCode": 504
}
```

**Missing Correlation ID:**
Responses without correlation IDs are logged as warnings and ignored.

**No Pending Request:**
Responses for non-existent correlation IDs are logged as warnings and ignored.

### Use Cases

- **Microservice Communication**: Route requests through Kafka for reliable delivery
- **Async Processing**: Handle long-running tasks with HTTP-like interface
- **Event-Driven APIs**: Build responsive APIs on event-driven architecture
- **Service Decoupling**: Maintain HTTP contracts while decoupling services via Kafka

## Standalone Install & Setup

You can generate a standalone application with:
Expand Down
15 changes: 15 additions & 0 deletions config.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,14 @@ export interface PlatformaticKafkaHooksConfiguration {
[k: string]: unknown;
};
};
formatters?: {
path: string;
};
timestamp?: "epochTime" | "unixTime" | "nullTime" | "isoTime";
redact?: {
paths: string[];
censor?: string;
};
[k: string]: unknown;
};
loggerInstance?: {
Expand Down Expand Up @@ -330,6 +338,13 @@ export interface PlatformaticKafkaHooksConfiguration {
};
concurrency?: number;
serialization?: string;
requestResponse?: {
path: string;
requestTopic: string;
responseTopic: string;
timeout?: number;
[k: string]: unknown;
}[];
[k: string]: unknown;
};
}
Expand Down
4 changes: 4 additions & 0 deletions lib/definitions.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
export const keyHeader = 'x-plt-kafka-hooks-key'
export const attemptHeader = 'x-plt-kafka-hooks-attempt'
export const correlationIdHeader = 'x-plt-kafka-hooks-correlation-id'
export const pathParamsHeader = 'x-plt-kafka-hooks-path-params'
export const queryStringHeader = 'x-plt-kafka-hooks-query-string'
export const minimumRetryDelay = 250

export const defaultDlqTopic = 'plt-kafka-hooks-dlq'
Expand All @@ -8,3 +11,4 @@ export const defaultRetries = 3
export const defaultMethod = 'POST'
export const defaultIncludeAttemptInRequests = true
export const defaultConcurrency = 10
export const defaultRequestResponseTimeout = 30000
Loading