From 96cc9c19af4588bf090fdf71e046e6a30dc72e45 Mon Sep 17 00:00:00 2001 From: billsedison Date: Wed, 27 Aug 2025 07:07:00 +0800 Subject: [PATCH] Kafka Schema Registry Removal --- .env.example | 8 -- README.md | 47 +++++-- docker-compose.dev.yml | 16 +++ docker-compose.yml | 26 +--- package.json | 2 - pnpm-lock.yaml | 114 ----------------- src/common/constants/config.constants.ts | 10 -- src/common/exceptions/kafka.exception.ts | 12 -- src/common/schemas/kafka.schemas.ts | 93 -------------- src/common/types/kafka.types.ts | 29 ----- src/common/utils/schema.utils.ts | 116 ----------------- src/config/sections/kafka.config.ts | 7 -- src/config/validation.ts | 5 - src/kafka/kafka.service.ts | 154 ++++------------------- 14 files changed, 78 insertions(+), 561 deletions(-) create mode 100644 docker-compose.dev.yml delete mode 100644 src/common/schemas/kafka.schemas.ts delete mode 100644 src/common/utils/schema.utils.ts diff --git a/.env.example b/.env.example index 63f0eec..7fa46ad 100644 --- a/.env.example +++ b/.env.example @@ -32,14 +32,6 @@ KAFKA_INITIAL_RETRY_TIME=300 KAFKA_RETRIES=5 # ------------------------------------- -# Schema Registry Configuration -# ------------------------------------- -# The URL for the Confluent Schema Registry -SCHEMA_REGISTRY_URL=http://localhost:8081 -# Optional username for Schema Registry basic authentication -SCHEMA_REGISTRY_USER= -# Optional password for Schema Registry basic authentication -SCHEMA_REGISTRY_PASSWORD= # ------------------------------------- # Challenge API Configuration diff --git a/README.md b/README.md index 753447a..b93535d 100644 --- a/README.md +++ b/README.md @@ -5,7 +5,7 @@ autopilot operations with Kafka integration. ## Features - **Event-Driven Scheduling**: Dynamically schedules phase transitions using `@nestjs/schedule`. -- **Kafka Integration**: Utilizes Kafka and Confluent Schema Registry for robust, schema-validated messaging. +- **Kafka Integration**: Utilizes Kafka with JSON serialization for robust, lightweight messaging. - **Challenge API Integration**: Fetches live challenge data, with resilient API calls featuring exponential backoff and rate-limiting handling. - **Recovery and Synchronization**: Includes a recovery service on startup and a periodic sync service to ensure data consistency. - **Health Checks**: Provides endpoints to monitor application and Kafka connection health. @@ -51,11 +51,6 @@ Open the `.env` file and configure the variables for your environment. It is cru KAFKA_BROKERS=localhost:9092 KAFKA_CLIENT_ID=autopilot-service - # ------------------------------------- - # Schema Registry Configuration - # ------------------------------------- - SCHEMA_REGISTRY_URL=http://localhost:8081 - # ------------------------------------- # Challenge API Configuration # ------------------------------------- @@ -96,7 +91,6 @@ docker compose up -d This will start: - Zookeeper (port 2181) - Kafka (ports 9092, 29092) -- Schema Registry (port 8081) - Kafka UI (port 8080) 2. Verify Docker containers are healthy: @@ -127,7 +121,6 @@ npm run start:dev - **API Documentation (Swagger)**: `http://localhost:3000/api-docs` - **Health Check**: `http://localhost:3000/health` - **Kafka UI**: `http://localhost:8080` -- **Schema Registry**: `http://localhost:8081` # Test coverage @@ -167,7 +160,7 @@ The service is composed of several key modules that communicate over specific Ka | SchedulerService | Low-level job management using setTimeout. Triggers Kafka events when jobs execute. | - | autopilot.phase.transition | | RecoveryService | Runs on startup to sync all active challenges from the API, scheduling them and processing overdue phases. | - | autopilot.phase.transition | | SyncService | Runs a periodic cron job to reconcile the scheduler's state with the Challenge API. | - | - | -| KafkaService | Manages all Kafka producer/consumer connections and schema registry interactions. | All | All | +| KafkaService | Manages all Kafka producer/consumer connections with JSON serialization. | All | All | ## Project Structure @@ -204,6 +197,42 @@ test/ # Test files .env # Environment variables .env.example # Example env template +``` + +## JSON Messaging Architecture + +The service uses a simplified JSON-based messaging approach that aligns with organizational standards and provides several benefits: + +### Benefits of JSON Messaging + +- **Simplified Infrastructure**: No need for Schema Registry, reducing deployment complexity +- **AWS Compatibility**: Works seamlessly with AWS-native Kafka solutions like MSK +- **Standard Format**: Uses widely-adopted JSON format for better interoperability +- **Reduced Dependencies**: Eliminates Confluent-specific dependencies +- **Enhanced Performance**: Lower overhead compared to Avro serialization with Schema Registry lookups + +### Message Structure + +All Kafka messages follow a consistent JSON structure: + +```json +{ + "topic": "autopilot.phase.transition", + "originator": "auto_pilot", + "timestamp": "2023-12-01T10:00:00.000Z", + "mimeType": "application/json", + "payload": { + // Topic-specific payload data + } +} +``` + +### Message Validation + +- **TypeScript Interfaces**: Strong typing maintained through TypeScript interfaces +- **Class Validators**: Runtime validation using class-validator decorators +- **JSON Schema**: Implicit schema validation through TypeScript compilation +- **Error Handling**: Robust error handling for malformed JSON messages with DLQ support package.json # Dependencies and scripts tsconfig.json # TypeScript config README.md # Documentation diff --git a/docker-compose.dev.yml b/docker-compose.dev.yml new file mode 100644 index 0000000..0f2c43b --- /dev/null +++ b/docker-compose.dev.yml @@ -0,0 +1,16 @@ +services: + autopilot-app: + build: + context: . + dockerfile: Dockerfile.dev + ports: + - "3000:3000" + env_file: + - .env + environment: + - KAFKA_BROKERS=host.docker.internal:9092 + volumes: + - .:/app + - /app/node_modules + extra_hosts: + - "host.docker.internal:host-gateway" \ No newline at end of file diff --git a/docker-compose.yml b/docker-compose.yml index 04d042d..6db0565 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -46,26 +46,6 @@ services: volumes: - kafka_data:/var/lib/kafka/data - schema-registry: - image: confluentinc/cp-schema-registry:7.4.0 - ports: - - "8081:8081" - environment: - SCHEMA_REGISTRY_HOST_NAME: schema-registry - SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: kafka:29092 - SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081 - SCHEMA_REGISTRY_DEBUG: "true" - depends_on: - kafka: - condition: service_healthy - healthcheck: - test: curl -f http://localhost:8081 || exit 1 - interval: 30s - timeout: 10s - retries: 5 - volumes: - - schema_registry_data:/etc/schema-registry - kafka-ui: image: provectuslabs/kafka-ui:latest ports: @@ -73,17 +53,13 @@ services: environment: KAFKA_CLUSTERS_0_NAME: local KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:29092 - KAFKA_CLUSTERS_0_SCHEMAREGISTRY: http://schema-registry:8081 KAFKA_CLUSTERS_0_METRICS_PORT: 9997 DYNAMIC_CONFIG_ENABLED: "true" depends_on: kafka: condition: service_healthy - schema-registry: - condition: service_healthy volumes: zookeeper_data: zookeeper_log: - kafka_data: - schema_registry_data: \ No newline at end of file + kafka_data: \ No newline at end of file diff --git a/package.json b/package.json index 7b2b991..59674a2 100644 --- a/package.json +++ b/package.json @@ -20,7 +20,6 @@ "test:e2e": "jest --config ./test/jest-e2e.json" }, "dependencies": { - "@kafkajs/confluent-schema-registry": "^3.8.0", "@nestjs/axios": "^4.0.1", "@nestjs/cli": "^11.0.0", "@nestjs/common": "^11.1.1", @@ -34,7 +33,6 @@ "@nestjs/swagger": "^7.4.0", "@nestjs/terminus": "^11.0.0", "@types/express": "^5.0.0", - "avsc": "^5.7.7", "axios": "^1.9.0", "class-transformer": "^0.5.1", "class-validator": "^0.14.2", diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index a8cf963..3164288 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -8,9 +8,6 @@ importers: .: dependencies: - '@kafkajs/confluent-schema-registry': - specifier: ^3.8.0 - version: 3.9.0 '@nestjs/axios': specifier: ^4.0.1 version: 4.0.1(@nestjs/common@11.1.4(class-transformer@0.5.1)(class-validator@0.14.2)(reflect-metadata@0.2.2)(rxjs@7.8.2))(axios@1.10.0)(rxjs@7.8.2) @@ -50,9 +47,6 @@ importers: '@types/express': specifier: ^5.0.0 version: 5.0.3 - avsc: - specifier: ^5.7.7 - version: 5.7.9 axios: specifier: ^1.9.0 version: 1.10.0 @@ -688,9 +682,6 @@ packages: '@jridgewell/trace-mapping@0.3.9': resolution: {integrity: sha512-3Belt6tdc8bPgAtbcmdtNJlirVoTmEb5e2gC94PnkwEW9jI6CAHUeoG85tjWP5WquqfavoMtMwiG4P926ZKKuQ==} - '@kafkajs/confluent-schema-registry@3.9.0': - resolution: {integrity: sha512-QiwinzNVlHPvylFCSiKBcTDxNDp9/xDj7ZWTxlsel5h4hwLxMLjnuJUcW/LhzNPPi/V4Vfa4Yq0uv30CYGtRnA==} - '@lukeed/csprng@1.1.0': resolution: {integrity: sha512-Z7C/xXCiGWsg0KuKsHTKJxbWhpI3Vs5GwLfOean7MGyVFGqdRgBbAjOCh6u4bbjPc/8MJ2pZmK/0DLdCbivLDA==} engines: {node: '>=8'} @@ -1010,36 +1001,6 @@ packages: resolution: {integrity: sha512-YLT9Zo3oNPJoBjBc4q8G2mjU4tqIbf5CEOORbUUr48dCD9q3umJ3IPlVqOqDakPfd2HuwccBaqlGhN4Gmr5OWg==} engines: {node: ^12.20.0 || ^14.18.0 || >=16.0.0} - '@protobufjs/aspromise@1.1.2': - resolution: {integrity: sha512-j+gKExEuLmKwvz3OgROXtrJ2UG2x8Ch2YZUxahh+s1F2HZ+wAceUNLkvy6zKCPVRkU++ZWQrdxsUeQXmcg4uoQ==} - - '@protobufjs/base64@1.1.2': - resolution: {integrity: sha512-AZkcAA5vnN/v4PDqKyMR5lx7hZttPDgClv83E//FMNhR2TMcLUhfRUBHCmSl0oi9zMgDDqRUJkSxO3wm85+XLg==} - - '@protobufjs/codegen@2.0.4': - resolution: {integrity: sha512-YyFaikqM5sH0ziFZCN3xDC7zeGaB/d0IUb9CATugHWbd1FRFwWwt4ld4OYMPWu5a3Xe01mGAULCdqhMlPl29Jg==} - - '@protobufjs/eventemitter@1.1.0': - resolution: {integrity: sha512-j9ednRT81vYJ9OfVuXG6ERSTdEL1xVsNgqpkxMsbIabzSo3goCjDIveeGv5d03om39ML71RdmrGNjG5SReBP/Q==} - - '@protobufjs/fetch@1.1.0': - resolution: {integrity: sha512-lljVXpqXebpsijW71PZaCYeIcE5on1w5DlQy5WH6GLbFryLUrBD4932W/E2BSpfRJWseIL4v/KPgBFxDOIdKpQ==} - - '@protobufjs/float@1.0.2': - resolution: {integrity: sha512-Ddb+kVXlXst9d+R9PfTIxh1EdNkgoRe5tOX6t01f1lYWOvJnSPDBlG241QLzcyPdoNTsblLUdujGSE4RzrTZGQ==} - - '@protobufjs/inquire@1.1.0': - resolution: {integrity: sha512-kdSefcPdruJiFMVSbn801t4vFK7KB/5gd2fYvrxhuJYg8ILrmn9SKSX2tZdV6V+ksulWqS7aXjBcRXl3wHoD9Q==} - - '@protobufjs/path@1.1.2': - resolution: {integrity: sha512-6JOcJ5Tm08dOHAbdR3GrvP+yUUfkjG5ePsHYczMFLq3ZmMkAD98cDgcT2iA1lJ9NVwFd4tH/iSSoe44YWkltEA==} - - '@protobufjs/pool@1.1.0': - resolution: {integrity: sha512-0kELaGSIDBKvcgS4zkjz1PeddatrjYcmMWOlAuAPwAeccUrPHdUqo/J6LiymHHEiJT5NrF1UVwxY14f+fy4WQw==} - - '@protobufjs/utf8@1.1.0': - resolution: {integrity: sha512-Vvn3zZrhQZkkBE8LSuW3em98c0FwgO4nxzv6OdSxPKJIEKY2bGbHn+mhGIPerzI4twdxaP8/0+06HBpwf345Lw==} - '@sec-ant/readable-stream@0.4.1': resolution: {integrity: sha512-831qok9r2t8AlxLko40y2ebgSDhenenCatLVeW/uBtnHPyhHOvG0C7TvfgecV+wHzIm5KUICgzmVpWS+IMEAeg==} @@ -1503,9 +1464,6 @@ packages: ajv@6.12.6: resolution: {integrity: sha512-j3fVLgvTo527anyYyJOGTYJbG+vnnQYvE0m5mmkc1TK+nxAppkCLMIL0aZ4dblVCNoGShhm+kzE4ZUykBoMg4g==} - ajv@7.2.4: - resolution: {integrity: sha512-nBeQgg/ZZA3u3SYxyaDvpvDtgZ/EZPF547ARgZBrG9Bhu1vKDwAIjtIf+sDtJUKa2zOcEbmRLBRSyMraS/Oy1A==} - ajv@8.17.1: resolution: {integrity: sha512-B/gBuNg5SiMTrPkC+A2+cW0RszwxYmn6VYxB/inlBStS5nx6xHIt/ehKRhIMhqusl7a8LjQoZnjCs5vhwxOQ1g==} @@ -1575,10 +1533,6 @@ packages: asynckit@0.4.0: resolution: {integrity: sha512-Oei9OH4tRh0YqU3GxhX79dM/mwVgvbZJaSNaRk+bshkj0S5cfHcgYakreBjrHwatXKbz+IoIdYLxrKim2MjW0Q==} - avsc@5.7.9: - resolution: {integrity: sha512-yOA4wFeI7ET3v32Di/sUybQ+ttP20JHSW3mxLuNGeO0uD6PPcvLrIQXSvy/rhJOWU5JrYh7U4OHplWMmtAtjMg==} - engines: {node: '>=0.11'} - axios@1.10.0: resolution: {integrity: sha512-/1xYAC4MP/HEG+3duIhFr4ZQXR4sQXOIe+o6sdqzeykGLx6Upp/1p8MHqhINOvGeP7xyNHe7tsiJByc4SSVUxw==} @@ -2850,9 +2804,6 @@ packages: resolution: {integrity: sha512-TFYA4jnP7PVbmlBIfhlSe+WKxs9dklXMTEGcBCIvLhE/Tn3H6Gk1norupVW7m5Cnd4bLcr08AytbyV/xj7f/kQ==} engines: {node: '>= 12.0.0'} - long@5.3.2: - resolution: {integrity: sha512-mNAgZ1GmyNhD7AuqnTG3/VQ26o760+ZYBPKjPvugO8+nLbYfX6TVpJPseBvopbdY+qpZ/lKUnmEc1LeZYS3QAA==} - lowercase-keys@3.0.0: resolution: {integrity: sha512-ozCC6gdQ+glXOQsveKD0YsDy8DSQFjDTz4zyzEHNV5+JP5D62LmfDZ6o1cycFx9ouG940M5dE8C8CTewdj2YWQ==} engines: {node: ^12.20.0 || ^14.13.1 || >=16.0.0} @@ -2881,9 +2832,6 @@ packages: makeerror@1.0.12: resolution: {integrity: sha512-JmqCvUhmt43madlpFzG4BQzG2Z3m6tvQDNKdClZnO3VbIudJYmxsT0FNJMeiB2+JTSlTQTSbU8QdesVmwJcmLg==} - mappersmith@2.45.0: - resolution: {integrity: sha512-N/Kkx9RqJenkvMHPMY0VS1geAara0VQTwup5Abv2GB19QBT7w+epjhRQMLW5jtz2DXUdkh7KD3F3prqJKG1A8w==} - math-intrinsics@1.1.0: resolution: {integrity: sha512-/IXtbwEk5HTPyEwyKX6hGkYXxM9nbj64B+ilVJnC/R6B0pH5G4V3b0pVbL7DBj4tkhBAppbQUlf6F6Xl9LHu1g==} engines: {node: '>= 0.4'} @@ -3210,10 +3158,6 @@ packages: resolution: {integrity: sha512-NxNv/kLguCA7p3jE8oL2aEBsrJWgAakBpgmgK6lpPWV+WuOmY6r2/zbAVnP+T8bQlA0nzHXSJSJW0Hq7ylaD2Q==} engines: {node: '>= 6'} - protobufjs@7.5.3: - resolution: {integrity: sha512-sildjKwVqOI2kmFDiXQ6aEB0fjYTafpEvIBs8tOR8qI4spuL9OPROLVu2qZqi/xgCfsHIwVqlaF8JBjWFHnKbw==} - engines: {node: '>=12.0.0'} - proxy-addr@2.0.7: resolution: {integrity: sha512-llQsMLSUDUPT44jdrU/O37qlnifitDP+ZwrmmZcoSKyLKvtZxpyV0n2/bD/N4tBAAZ/gJEdZU7KMraoK1+XYAg==} engines: {node: '>= 0.10'} @@ -4598,13 +4542,6 @@ snapshots: '@jridgewell/resolve-uri': 3.1.2 '@jridgewell/sourcemap-codec': 1.5.4 - '@kafkajs/confluent-schema-registry@3.9.0': - dependencies: - ajv: 7.2.4 - avsc: 5.7.9 - mappersmith: 2.45.0 - protobufjs: 7.5.3 - '@lukeed/csprng@1.1.0': {} '@microsoft/tsdoc@0.15.1': {} @@ -4863,29 +4800,6 @@ snapshots: '@pkgr/core@0.2.7': {} - '@protobufjs/aspromise@1.1.2': {} - - '@protobufjs/base64@1.1.2': {} - - '@protobufjs/codegen@2.0.4': {} - - '@protobufjs/eventemitter@1.1.0': {} - - '@protobufjs/fetch@1.1.0': - dependencies: - '@protobufjs/aspromise': 1.1.2 - '@protobufjs/inquire': 1.1.0 - - '@protobufjs/float@1.0.2': {} - - '@protobufjs/inquire@1.1.0': {} - - '@protobufjs/path@1.1.2': {} - - '@protobufjs/pool@1.1.0': {} - - '@protobufjs/utf8@1.1.0': {} - '@sec-ant/readable-stream@0.4.1': {} '@sideway/address@4.1.5': @@ -5439,13 +5353,6 @@ snapshots: json-schema-traverse: 0.4.1 uri-js: 4.4.1 - ajv@7.2.4: - dependencies: - fast-deep-equal: 3.1.3 - json-schema-traverse: 1.0.0 - require-from-string: 2.0.2 - uri-js: 4.4.1 - ajv@8.17.1: dependencies: fast-deep-equal: 3.1.3 @@ -5502,8 +5409,6 @@ snapshots: asynckit@0.4.0: {} - avsc@5.7.9: {} - axios@1.10.0: dependencies: follow-redirects: 1.15.9 @@ -7023,8 +6928,6 @@ snapshots: safe-stable-stringify: 2.5.0 triple-beam: 1.4.1 - long@5.3.2: {} - lowercase-keys@3.0.0: {} lru-cache@11.1.0: {} @@ -7049,8 +6952,6 @@ snapshots: dependencies: tmpl: 1.0.5 - mappersmith@2.45.0: {} - math-intrinsics@1.1.0: {} media-typer@0.3.0: {} @@ -7330,21 +7231,6 @@ snapshots: kleur: 3.0.3 sisteransi: 1.0.5 - protobufjs@7.5.3: - dependencies: - '@protobufjs/aspromise': 1.1.2 - '@protobufjs/base64': 1.1.2 - '@protobufjs/codegen': 2.0.4 - '@protobufjs/eventemitter': 1.1.0 - '@protobufjs/fetch': 1.1.0 - '@protobufjs/float': 1.0.2 - '@protobufjs/inquire': 1.1.0 - '@protobufjs/path': 1.1.2 - '@protobufjs/pool': 1.1.0 - '@protobufjs/utf8': 1.1.0 - '@types/node': 22.16.4 - long: 5.3.2 - proxy-addr@2.0.7: dependencies: forwarded: 0.2.0 diff --git a/src/common/constants/config.constants.ts b/src/common/constants/config.constants.ts index 1453868..e63f6b5 100644 --- a/src/common/constants/config.constants.ts +++ b/src/common/constants/config.constants.ts @@ -22,11 +22,6 @@ export interface KafkaConfig { DEFAULT_MAX_IN_FLIGHT_REQUESTS: number; } -export interface SchemaConfig { - DEFAULT_CACHE_TTL: number; - DEFAULT_SUBJECT_SUFFIX: string; -} - export interface CircuitBreakerConfig { DEFAULT_FAILURE_THRESHOLD: number; DEFAULT_RESET_TIMEOUT: number; @@ -41,7 +36,6 @@ export interface HealthConfig { export interface Config { APP: AppConfig; KAFKA: KafkaConfig; - SCHEMA: SchemaConfig; CIRCUIT_BREAKER: CircuitBreakerConfig; HEALTH: HealthConfig; } @@ -69,10 +63,6 @@ export const CONFIG: Config = { DEFAULT_TRANSACTION_TIMEOUT: 30000, DEFAULT_MAX_IN_FLIGHT_REQUESTS: 5, }, - SCHEMA: { - DEFAULT_CACHE_TTL: 3600000, // 1 hour in milliseconds - DEFAULT_SUBJECT_SUFFIX: '-value', - }, CIRCUIT_BREAKER: { DEFAULT_FAILURE_THRESHOLD: 5, DEFAULT_RESET_TIMEOUT: 60000, diff --git a/src/common/exceptions/kafka.exception.ts b/src/common/exceptions/kafka.exception.ts index 2942cd7..323f536 100644 --- a/src/common/exceptions/kafka.exception.ts +++ b/src/common/exceptions/kafka.exception.ts @@ -23,15 +23,3 @@ export class KafkaConsumerException extends KafkaException { super(`Failed to start consumer ${groupId}`, details); } } - -export class SchemaRegistryException extends KafkaException { - constructor(message: string, details?: Record) { - super(`Schema Registry error: ${message}`, details); - } -} - -export class SchemaValidationException extends KafkaException { - constructor(message: string, details?: Record) { - super(`Schema validation error: ${message}`, details); - } -} diff --git a/src/common/schemas/kafka.schemas.ts b/src/common/schemas/kafka.schemas.ts deleted file mode 100644 index 055f98f..0000000 --- a/src/common/schemas/kafka.schemas.ts +++ /dev/null @@ -1,93 +0,0 @@ -import { KAFKA_TOPICS } from '../../kafka/constants/topics'; - -export const KAFKA_SCHEMAS = { - [KAFKA_TOPICS.PHASE_TRANSITION]: { - type: 'record', - name: 'PhaseTransition', - namespace: 'com.autopilot.events', - fields: [ - { name: 'topic', type: 'string', default: 'autopilot.phase.transition' }, - { name: 'originator', type: 'string', default: 'auto_pilot' }, - { name: 'timestamp', type: 'string' }, - { name: 'mimeType', type: 'string', default: 'application/json' }, - { - name: 'payload', - type: { - type: 'record', - name: 'PhaseTransitionPayload', - fields: [ - { name: 'projectId', type: 'long' }, - { name: 'phaseId', type: 'string' }, // UUID string - { name: 'phaseTypeName', type: 'string' }, - { - name: 'state', - type: { - type: 'enum', - name: 'PhaseState', - symbols: ['START', 'END'], - }, - }, - { name: 'operator', type: 'string' }, - { name: 'projectStatus', type: 'string' }, - { name: 'date', type: 'string' }, - { name: 'challengeId', type: 'string' }, // UUID string - ], - }, - }, - ], - }, - [KAFKA_TOPICS.CHALLENGE_UPDATE]: { - type: 'record', - name: 'ChallengeUpdate', - namespace: 'com.autopilot.events', - fields: [ - { name: 'topic', type: 'string', default: 'autopilot.challenge.update' }, - { name: 'originator', type: 'string', default: 'auto_pilot' }, - { name: 'timestamp', type: 'string' }, - { name: 'mimeType', type: 'string', default: 'application/json' }, - { - name: 'payload', - type: { - type: 'record', - name: 'ChallengeUpdatePayload', - fields: [ - { name: 'projectId', type: 'long' }, - { name: 'challengeId', type: 'string' }, // Changed from long to string for UUID - { name: 'status', type: 'string' }, - { name: 'operator', type: 'string' }, - { name: 'date', type: 'string' }, - { name: 'phaseId', type: ['null', 'string'], default: null }, // Changed from long to string - { name: 'phaseTypeName', type: ['null', 'string'], default: null }, - ], - }, - }, - ], - }, - [KAFKA_TOPICS.COMMAND]: { - type: 'record', - name: 'Command', - namespace: 'com.autopilot.events', - fields: [ - { name: 'topic', type: 'string', default: 'autopilot.command' }, - { name: 'originator', type: 'string', default: 'auto_pilot' }, - { name: 'timestamp', type: 'string' }, - { name: 'mimeType', type: 'string', default: 'application/json' }, - { - name: 'payload', - type: { - type: 'record', - name: 'CommandPayload', - fields: [ - { name: 'command', type: 'string' }, - { name: 'operator', type: 'string' }, - { name: 'projectId', type: ['null', 'long'], default: null }, - { name: 'challengeId', type: ['null', 'string'], default: null }, - { name: 'date', type: ['null', 'string'], default: null }, - { name: 'phaseId', type: ['null', 'string'], default: null }, - { name: 'phaseTypeName', type: ['null', 'string'], default: null }, - ], - }, - }, - ], - }, -}; diff --git a/src/common/types/kafka.types.ts b/src/common/types/kafka.types.ts index d3a42d5..d1f820c 100644 --- a/src/common/types/kafka.types.ts +++ b/src/common/types/kafka.types.ts @@ -1,32 +1,3 @@ -import { Schema } from '@kafkajs/confluent-schema-registry/dist/@types'; - -export interface ISchemaCacheEntry { - schema: Schema; - timestamp: number; -} - -// export interface IKafkaMessageHeaders { -// 'correlation-id': string; -// timestamp: string; -// [key: string]: string; -// } - -// export interface IKafkaProducerOptions { -// topic: string; -// messages: Array<{ -// value: Buffer; -// headers: IKafkaMessageHeaders; -// }>; -// acks: number; -// timeout: number; -// } - -// export interface IKafkaConsumerOptions { -// groupId: string; -// topics: string[]; -// fromBeginning: boolean; -// } - export interface IKafkaConfig { clientId: string; brokers: string[]; diff --git a/src/common/utils/schema.utils.ts b/src/common/utils/schema.utils.ts deleted file mode 100644 index 937f9f5..0000000 --- a/src/common/utils/schema.utils.ts +++ /dev/null @@ -1,116 +0,0 @@ -import { SchemaRegistry, SchemaType } from '@kafkajs/confluent-schema-registry'; -import { Injectable } from '@nestjs/common'; -import { SchemaRegistryException } from '../exceptions/kafka.exception'; -import { LoggerService } from '../services/logger.service'; -import { KAFKA_SCHEMAS } from '../schemas/kafka.schemas'; -import { Schema } from '@kafkajs/confluent-schema-registry/dist/@types'; - -@Injectable() -export class SchemaUtils { - private readonly registry: SchemaRegistry; - private readonly logger: LoggerService; - - constructor(private readonly schemaRegistryUrl: string) { - this.registry = new SchemaRegistry({ host: this.schemaRegistryUrl }); - this.logger = new LoggerService(SchemaUtils.name); - } - - async encode(message: any, schemaId: number): Promise { - try { - return await this.registry.encode(schemaId, message); - } catch (error) { - const err = error as Error; - this.logger.error(`Failed to encode message with schema ID ${schemaId}`, { - error: err.stack, - }); - throw new SchemaRegistryException( - `Failed to encode message with schema ID ${schemaId}`, - { - error: err.stack || err.message, - }, - ); - } - } - - async decode(message: Buffer): Promise { - try { - return await this.registry.decode(message); - } catch (error) { - const err = error as Error; - this.logger.error('Failed to decode message', { error: err.stack }); - throw new SchemaRegistryException('Failed to decode message', { - error: err.stack || err.message, - }); - } - } - - async registerSchema(topic: string, schema: any): Promise { - try { - const subject = `${topic}-value`; - const { id } = await this.registry.register( - { - type: SchemaType.AVRO, - schema: JSON.stringify(schema), - }, - { subject }, - ); - return id; - } catch (error) { - const err = error as Error; - this.logger.error(`Failed to register schema for topic ${topic}`, { - error: err.stack, - }); - throw new SchemaRegistryException( - `Failed to register schema for topic ${topic}`, - { - error: err.stack || err.message, - }, - ); - } - } - - async getLatestSchemaId(subject: string): Promise { - try { - const id = await this.registry.getLatestSchemaId(subject); - return id; - } catch (error) { - const topic = subject.replace('-value', ''); - if (KAFKA_SCHEMAS[topic]) { - this.logger.warn( - `Schema not found for ${subject}, creating new schema for first time use`, - ); - try { - return await this.registerSchema(topic, KAFKA_SCHEMAS[topic]); - } catch (registerError) { - const err = registerError as Error; - this.logger.error(`Failed to create schema for ${subject}`, { - error: err.stack, - }); - throw new SchemaRegistryException( - `Failed to create schema for ${subject}`, - { - error: err.stack || err.message, - }, - ); - } - } - const err = error as Error; - this.logger.error( - `Schema not found and no definition available for ${subject}`, - { error: err.stack }, - ); - throw new SchemaRegistryException( - `Schema not found and no definition available for ${subject}`, - { - error: err.stack || err.message, - }, - ); - } - } - - async getSchema(topic: string): Promise { - const subject = `${topic}-value`; - const id = await this.getLatestSchemaId(subject); - return await this.registry.getSchema(id); - } -} diff --git a/src/config/sections/kafka.config.ts b/src/config/sections/kafka.config.ts index fd823e6..7de583d 100644 --- a/src/config/sections/kafka.config.ts +++ b/src/config/sections/kafka.config.ts @@ -3,13 +3,6 @@ import { registerAs } from '@nestjs/config'; export default registerAs('kafka', () => ({ brokers: process.env.KAFKA_BROKERS || 'localhost:29092', clientId: process.env.KAFKA_CLIENT_ID || 'autopilot-service', - schemaRegistry: { - url: process.env.SCHEMA_REGISTRY_URL || 'http://localhost:8081', - auth: { - username: process.env.SCHEMA_REGISTRY_USER, - password: process.env.SCHEMA_REGISTRY_PASSWORD, - }, - }, retry: { maxRetryTime: parseInt(process.env.KAFKA_MAX_RETRY_TIME ?? '30000', 10), initialRetryTime: parseInt( diff --git a/src/config/validation.ts b/src/config/validation.ts index 291ad2b..873853c 100644 --- a/src/config/validation.ts +++ b/src/config/validation.ts @@ -20,11 +20,6 @@ export const validationSchema = Joi.object({ KAFKA_INITIAL_RETRY_TIME: Joi.number().default(300), KAFKA_RETRIES: Joi.number().default(5), - // Schema Registry Configuration - SCHEMA_REGISTRY_URL: Joi.string().required(), - SCHEMA_REGISTRY_USER: Joi.string().optional().allow(''), - SCHEMA_REGISTRY_PASSWORD: Joi.string().optional().allow(''), - // Challenge API Configuration CHALLENGE_API_URL: Joi.string().uri().required(), // Removed static M2M token validation - now using Auth0 diff --git a/src/kafka/kafka.service.ts b/src/kafka/kafka.service.ts index 23e1c43..c046fe9 100644 --- a/src/kafka/kafka.service.ts +++ b/src/kafka/kafka.service.ts @@ -11,34 +11,27 @@ import { ProducerRecord, Partitioners, } from 'kafkajs'; -import { SchemaUtils } from '../common/utils/schema.utils'; import { KafkaConnectionException, KafkaProducerException, KafkaConsumerException, - SchemaRegistryException, } from '../common/exceptions/kafka.exception'; import { LoggerService } from '../common/services/logger.service'; import { CircuitBreaker } from '../common/utils/circuit-breaker'; import { v4 as uuidv4 } from 'uuid'; -import { KAFKA_SCHEMAS } from '../common/schemas/kafka.schemas'; import { CONFIG } from '../common/constants/config.constants'; -import { ISchemaCacheEntry, IKafkaConfig } from '../common/types/kafka.types'; +import { IKafkaConfig } from '../common/types/kafka.types'; @Injectable() export class KafkaService implements OnApplicationShutdown, OnModuleInit { private readonly kafka: Kafka; private readonly producer: Producer; private readonly consumers: Map; - private readonly schemaUtils: SchemaUtils; private readonly logger: LoggerService; private readonly circuitBreaker: CircuitBreaker; - private schemaIds: Map; - private readonly schemaCache: Map; constructor(private readonly configService: ConfigService) { this.logger = new LoggerService(KafkaService.name); - this.schemaCache = new Map(); try { const brokers = this.configService.get( @@ -81,19 +74,6 @@ export class KafkaService implements OnApplicationShutdown, OnModuleInit { failureThreshold: CONFIG.CIRCUIT_BREAKER.DEFAULT_FAILURE_THRESHOLD, resetTimeout: CONFIG.CIRCUIT_BREAKER.DEFAULT_RESET_TIMEOUT, }); - - const schemaRegistryUrl = this.configService.get( - 'kafka.schemaRegistry.url', - ); - if (!schemaRegistryUrl) { - this.logger.error('Schema registry URL is not configured'); - throw new SchemaRegistryException( - 'Schema registry URL is not configured', - ); - } - - this.schemaUtils = new SchemaUtils(schemaRegistryUrl); - this.schemaIds = new Map(); } catch (error) { const err = error as Error; this.logger.error('Failed to initialize Kafka service', { @@ -107,7 +87,6 @@ export class KafkaService implements OnApplicationShutdown, OnModuleInit { async onModuleInit(): Promise { try { - await this.initializeSchemas(); await this.producer.connect(); this.logger.info('Kafka service initialized successfully'); } catch (error) { @@ -121,82 +100,35 @@ export class KafkaService implements OnApplicationShutdown, OnModuleInit { } } - private async initializeSchemas(): Promise { + private encodeMessage(message: unknown): Buffer { try { - this.logger.info('Initializing Kafka schemas...'); - - for (const [topic, schema] of Object.entries(KAFKA_SCHEMAS)) { - try { - const schemaId = await this.schemaUtils.registerSchema(topic, schema); - this.schemaIds.set(topic, schemaId); - this.logger.info( - `Schema initialized for topic ${topic} with ID: ${schemaId}`, - ); - } catch (error) { - if ( - error instanceof Error && - (error.message.includes('already exists') || - error.message.includes('incompatible')) - ) { - this.logger.warn( - `Schema for topic ${topic} may already exist. Fetching latest version.`, - ); - const schemaId = await this.schemaUtils.getLatestSchemaId( - `${topic}-value`, - ); - this.schemaIds.set(topic, schemaId); - this.logger.info( - `Using existing schema for topic ${topic} with ID: ${schemaId}`, - ); - } else { - throw error; - } - } - } - - this.logger.info('All Kafka schemas initialized successfully'); + const jsonString = JSON.stringify(message); + return Buffer.from(jsonString, 'utf8'); } catch (error) { const err = error as Error; - this.logger.error('Failed to initialize Kafka schemas', { + this.logger.error('Failed to encode message as JSON', { error: err.stack || err.message, }); - throw new SchemaRegistryException( - `Failed to initialize Kafka schemas: ${err.message}`, - { - error: err.stack || err.message, - }, - ); + throw new Error(`Failed to encode message as JSON: ${err.message}`); } } - private async refreshSchemaId(topic: string): Promise { + private decodeMessage(buffer: Buffer): unknown { try { - const subject = `${topic}-value`; - this.logger.info(`Refreshing schema for ${topic}`); - - const schemaId = await this.schemaUtils.getLatestSchemaId(subject); - this.schemaIds.set(topic, schemaId); - this.logger.info(`Schema refreshed for ${topic} with ID: ${schemaId}`); - return schemaId; + const jsonString = buffer.toString('utf8'); + return JSON.parse(jsonString); } catch (error) { const err = error as Error; - this.logger.error(`Failed to refresh schema for ${topic}`, { + this.logger.error('Failed to decode JSON message', { error: err.stack || err.message, }); - throw new SchemaRegistryException( - `Failed to refresh schema for ${topic}: ${err.message}`, - ); + throw new Error(`Failed to decode JSON message: ${err.message}`); } } async sendMessage(topic: string, message: unknown): Promise { try { - const schemaId = this.schemaIds.get(topic); - if (!schemaId) { - throw new Error(`No schema ID found for topic ${topic}`); - } - - const encodedMessage = await this.schemaUtils.encode(message, schemaId); + const encodedMessage = this.encodeMessage(message); await this.producer.send({ topic, messages: [{ value: encodedMessage }], @@ -232,26 +164,7 @@ export class KafkaService implements OnApplicationShutdown, OnModuleInit { await this.producer.connect(); } - let schemaId = this.schemaIds.get(topic); - if (!schemaId) { - this.logger.warn( - `Schema ID not found for topic ${topic}, refreshing...`, - ); - try { - schemaId = await this.refreshSchemaId(topic); - } catch (error) { - const err = error as Error; - this.logger.error( - `Failed to refresh schema ID for topic ${topic}`, - { error: err.stack || err.message }, - ); - throw new SchemaRegistryException( - `Failed to get schema ID for topic ${topic}: ${err.message}`, - ); - } - } - - const encodedValue = await this.schemaUtils.encode(message, schemaId); + const encodedValue = this.encodeMessage(message); const record: ProducerRecord = { topic, messages: [ @@ -260,6 +173,7 @@ export class KafkaService implements OnApplicationShutdown, OnModuleInit { headers: { 'correlation-id': correlationId, timestamp: Date.now().toString(), + 'content-type': 'application/json', }, }, ], @@ -293,39 +207,19 @@ export class KafkaService implements OnApplicationShutdown, OnModuleInit { try { await this.circuitBreaker.execute(async () => { - let schemaId = this.schemaIds.get(topic); - if (!schemaId) { - this.logger.warn( - `Schema ID not found for topic ${topic}, refreshing...`, - ); - try { - schemaId = await this.refreshSchemaId(topic); - } catch (error) { - const err = error as Error; - this.logger.error( - `Failed to refresh schema ID for topic ${topic}`, - { error: err.stack || err.message }, - ); - throw new SchemaRegistryException( - `Failed to get schema ID for topic ${topic}: ${err.message}`, - ); - } - } - this.logger.info(`Producing batch to ${topic}`, { correlationId, count: messages.length, }); - const encodedMessages = await Promise.all( - messages.map(async (message) => ({ - value: await this.schemaUtils.encode(message, schemaId), - headers: { - 'correlation-id': correlationId, - timestamp: Date.now().toString(), - }, - })), - ); + const encodedMessages = messages.map((message) => ({ + value: this.encodeMessage(message), + headers: { + 'correlation-id': correlationId, + timestamp: Date.now().toString(), + 'content-type': 'application/json', + }, + })); const record: ProducerRecord = { topic, @@ -398,9 +292,7 @@ export class KafkaService implements OnApplicationShutdown, OnModuleInit { throw new Error('Message value is null or undefined'); } - const decodedMessage = (await this.schemaUtils.decode( - message.value, - )) as Record; + const decodedMessage = this.decodeMessage(message.value); if (!decodedMessage) { throw new Error('Decoded message is null or undefined');