Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 0 additions & 8 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,6 @@ KAFKA_INITIAL_RETRY_TIME=300
KAFKA_RETRIES=5

# -------------------------------------
# Schema Registry Configuration
# -------------------------------------
# The URL for the Confluent Schema Registry
SCHEMA_REGISTRY_URL=http://localhost:8081
# Optional username for Schema Registry basic authentication
SCHEMA_REGISTRY_USER=
# Optional password for Schema Registry basic authentication
SCHEMA_REGISTRY_PASSWORD=

# -------------------------------------
# Challenge API Configuration
Expand Down
47 changes: 38 additions & 9 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ autopilot operations with Kafka integration.
## Features

- **Event-Driven Scheduling**: Dynamically schedules phase transitions using `@nestjs/schedule`.
- **Kafka Integration**: Utilizes Kafka and Confluent Schema Registry for robust, schema-validated messaging.
- **Kafka Integration**: Utilizes Kafka with JSON serialization for robust, lightweight messaging.
- **Challenge API Integration**: Fetches live challenge data, with resilient API calls featuring exponential backoff and rate-limiting handling.
- **Recovery and Synchronization**: Includes a recovery service on startup and a periodic sync service to ensure data consistency.
- **Health Checks**: Provides endpoints to monitor application and Kafka connection health.
Expand Down Expand Up @@ -51,11 +51,6 @@ Open the `.env` file and configure the variables for your environment. It is cru
KAFKA_BROKERS=localhost:9092
KAFKA_CLIENT_ID=autopilot-service

# -------------------------------------
# Schema Registry Configuration
# -------------------------------------
SCHEMA_REGISTRY_URL=http://localhost:8081

# -------------------------------------
# Challenge API Configuration
# -------------------------------------
Expand Down Expand Up @@ -96,7 +91,6 @@ docker compose up -d
This will start:
- Zookeeper (port 2181)
- Kafka (ports 9092, 29092)
- Schema Registry (port 8081)
- Kafka UI (port 8080)

2. Verify Docker containers are healthy:
Expand Down Expand Up @@ -127,7 +121,6 @@ npm run start:dev
- **API Documentation (Swagger)**: `http://localhost:3000/api-docs`
- **Health Check**: `http://localhost:3000/health`
- **Kafka UI**: `http://localhost:8080`
- **Schema Registry**: `http://localhost:8081`

# Test coverage

Expand Down Expand Up @@ -167,7 +160,7 @@ The service is composed of several key modules that communicate over specific Ka
| SchedulerService | Low-level job management using setTimeout. Triggers Kafka events when jobs execute. | - | autopilot.phase.transition |
| RecoveryService | Runs on startup to sync all active challenges from the API, scheduling them and processing overdue phases. | - | autopilot.phase.transition |
| SyncService | Runs a periodic cron job to reconcile the scheduler's state with the Challenge API. | - | - |
| KafkaService | Manages all Kafka producer/consumer connections and schema registry interactions. | All | All |
| KafkaService | Manages all Kafka producer/consumer connections with JSON serialization. | All | All |


## Project Structure
Expand Down Expand Up @@ -204,6 +197,42 @@ test/ # Test files

.env # Environment variables
.env.example # Example env template
```

## JSON Messaging Architecture

The service uses a simplified JSON-based messaging approach that aligns with organizational standards and provides several benefits:

### Benefits of JSON Messaging

- **Simplified Infrastructure**: No need for Schema Registry, reducing deployment complexity
- **AWS Compatibility**: Works seamlessly with AWS-native Kafka solutions like MSK
- **Standard Format**: Uses widely-adopted JSON format for better interoperability
- **Reduced Dependencies**: Eliminates Confluent-specific dependencies
- **Enhanced Performance**: Lower overhead compared to Avro serialization with Schema Registry lookups

### Message Structure

All Kafka messages follow a consistent JSON structure:

```json
{
"topic": "autopilot.phase.transition",
"originator": "auto_pilot",
"timestamp": "2023-12-01T10:00:00.000Z",
"mimeType": "application/json",
"payload": {
// Topic-specific payload data
}
}
```

### Message Validation

- **TypeScript Interfaces**: Strong typing maintained through TypeScript interfaces
- **Class Validators**: Runtime validation using class-validator decorators
- **JSON Schema**: Implicit schema validation through TypeScript compilation
- **Error Handling**: Robust error handling for malformed JSON messages with DLQ support
package.json # Dependencies and scripts
tsconfig.json # TypeScript config
README.md # Documentation
Expand Down
16 changes: 16 additions & 0 deletions docker-compose.dev.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
services:
autopilot-app:
build:
context: .
dockerfile: Dockerfile.dev
ports:
- "3000:3000"
env_file:
- .env
environment:
- KAFKA_BROKERS=host.docker.internal:9092
volumes:
- .:/app
- /app/node_modules
extra_hosts:
- "host.docker.internal:host-gateway"

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is recommended to ensure that the file ends with a newline character to avoid potential issues with certain tools or systems that might expect it.

26 changes: 1 addition & 25 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -46,44 +46,20 @@ services:
volumes:
- kafka_data:/var/lib/kafka/data

schema-registry:
image: confluentinc/cp-schema-registry:7.4.0
ports:
- "8081:8081"
environment:
SCHEMA_REGISTRY_HOST_NAME: schema-registry
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: kafka:29092
SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081
SCHEMA_REGISTRY_DEBUG: "true"
depends_on:
kafka:
condition: service_healthy
healthcheck:
test: curl -f http://localhost:8081 || exit 1
interval: 30s
timeout: 10s
retries: 5
volumes:
- schema_registry_data:/etc/schema-registry

kafka-ui:
image: provectuslabs/kafka-ui:latest
ports:
- "8080:8080"
environment:
KAFKA_CLUSTERS_0_NAME: local
KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:29092
KAFKA_CLUSTERS_0_SCHEMAREGISTRY: http://schema-registry:8081
KAFKA_CLUSTERS_0_METRICS_PORT: 9997
DYNAMIC_CONFIG_ENABLED: "true"
depends_on:
kafka:
condition: service_healthy
schema-registry:
condition: service_healthy

volumes:
zookeeper_data:
zookeeper_log:
kafka_data:
schema_registry_data:
kafka_data:
2 changes: 0 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
"test:e2e": "jest --config ./test/jest-e2e.json"
},
"dependencies": {
"@kafkajs/confluent-schema-registry": "^3.8.0",
"@nestjs/axios": "^4.0.1",
"@nestjs/cli": "^11.0.0",
"@nestjs/common": "^11.1.1",
Expand All @@ -34,7 +33,6 @@
"@nestjs/swagger": "^7.4.0",
"@nestjs/terminus": "^11.0.0",
"@types/express": "^5.0.0",
"avsc": "^5.7.7",
"axios": "^1.9.0",
"class-transformer": "^0.5.1",
"class-validator": "^0.14.2",
Expand Down
Loading