# 06-04: Webhook 处理与消息路由Webhook 接收、验证、消息路由和富媒体处理。

In [None]:
// ========== 1. Webhook 服务器基础 ==========
import { createServer, IncomingMessage, ServerResponse } from 'http';
import { createHmac } from 'crypto';
interface WebhookConfig {
  secret: string;
  path: string;
  port: number;
}
interface WebhookEvent {
  id: string;
  type: string;
  timestamp: Date;
  payload: any;
  signature?: string;
  rawBody: string;
}
class WebhookServer {
  private server: ReturnType<typeof createServer>;
  private handlers = new Map<string, (event: WebhookEvent) => Promise<void>>();

  constructor(private config: WebhookConfig) {
    this.server = createServer(this.handleRequest.bind(this));
  }

  private async handleRequest(req: IncomingMessage, res: ServerResponse): Promise<void> {
    // 仅处理配置的路径
    if (req.url !== this.config.path) {
      res.writeHead(404);
      res.end('Not Found');
      return;
    }

    // 仅处理 POST
    if (req.method !== 'POST') {
      res.writeHead(405);
      res.end('Method Not Allowed');
      return;
    }

    try {
      const chunks: Buffer[] = [];
      
      for await (const chunk of req) {
        chunks.push(chunk);
      }
      
      const rawBody = Buffer.concat(chunks).toString('utf8');
      
      // 验证签名
      const signature = req.headers['x-webhook-signature'] as string;
      if (signature && !this.verifySignature(rawBody, signature)) {
        res.writeHead(401);
        res.end('Invalid signature');
        return;
      }

      // 解析事件
      const event: WebhookEvent = {
        id: req.headers['x-webhook-id'] as string || this.generateId(),
        type: req.headers['x-webhook-type'] as string || 'unknown',
        timestamp: new Date(),
        payload: JSON.parse(rawBody),
        signature,
        rawBody
      };

      // 立即响应 200 (防止重试)
      res.writeHead(200);
      res.end('OK');

      // 异步处理事件
      this.processEvent(event).catch(console.error);

    } catch (error) {
      console.error('Webhook error:', error);
      res.writeHead(400);
      res.end('Bad Request');
    }
  }

  private verifySignature(body: string, signature: string): boolean {
    const expected = createHmac('sha256', this.config.secret)
      .update(body)
      .digest('hex');
    return signature === `sha256=${expected}`;
  }

  private generateId(): string {
    return `${Date.now()}-${Math.random().toString(36).substr(2, 9)}`;
  }

  private async processEvent(event: WebhookEvent): Promise<void> {
    const handler = this.handlers.get(event.type);
    if (handler) {
      await handler(event);
    } else {
      console.log('No handler for event type:', event.type);
    }
  }

  on(eventType: string, handler: (event: WebhookEvent) => Promise<void>): void {
    this.handlers.set(eventType, handler);
  }

  start(): void {
    this.server.listen(this.config.port, () => {
      console.log(`Webhook server listening on port ${this.config.port}`);
    });
  }

  stop(): void {
    this.server.close();
  }
}

In [None]:
// ========== 2. 消息路由器 ==========
interface Message {
  id: string;
  platform: string; // telegram, discord, slack, etc.
  type: 'text' | 'image' | 'file' | 'audio' | 'video' | 'location' | 'sticker';
  content: any;
  from: {
    id: string;
    username?: string;
    name: string;
  };
  chat: {
    id: string;
    type: 'private' | 'group' | 'channel';
  };
  timestamp: Date;
  raw: any;
}
type MessageHandler = (message: Message) => Promise<void> | void;
type Middleware = (message: Message, next: () => Promise<void>) => Promise<void>;
interface Route {
  pattern: RegExp | string | ((msg: Message) => boolean);
  handler: MessageHandler;
  middleware: Middleware[];
}
class MessageRouter {
  private routes: Route[] = [];
  private globalMiddleware: Middleware[] = [];
  private fallbackHandler?: MessageHandler;

  use(middleware: Middleware): this {
    this.globalMiddleware.push(middleware);
    return this;
  }

  on(pattern: Route['pattern'], handler: MessageHandler, ...middleware: Middleware[]): this {
    this.routes.push({ pattern, handler, middleware });
    return this;
  }

  command(cmd: string, handler: MessageHandler, ...middleware: Middleware[]): this {
    return this.on(
      (msg) => msg.content?.startsWith(`/${cmd}`),
      handler,
      ...middleware
    );
  }

  regex(pattern: RegExp, handler: MessageHandler, ...middleware: Middleware[]): this {
    return this.on(
      (msg) => pattern.test(msg.content || ''),
      handler,
      ...middleware
    );
  }

  fallback(handler: MessageHandler): this {
    this.fallbackHandler = handler;
    return this;
  }

  async route(message: Message): Promise<void> {
    // 执行全局中间件链
    const executeMiddleware = async (index: number): Promise<void> => {
      if (index >= this.globalMiddleware.length) {
        return this.matchRoute(message);
      }
      
      const middleware = this.globalMiddleware[index];
      await middleware(message, () => executeMiddleware(index + 1));
    };

    await executeMiddleware(0);
  }

  private async matchRoute(message: Message): Promise<void> {
    for (const route of this.routes) {
      if (this.matches(route.pattern, message)) {
        // 执行路由特定中间件
        const executeRoute = async (index: number): Promise<void> => {
          if (index >= route.middleware.length) {
            return route.handler(message);
          }
          
          const middleware = route.middleware[index];
          await middleware(message, () => executeRoute(index + 1));
        };

        await executeRoute(0);
        return;
      }
    }

    // 没有匹配的路由
    if (this.fallbackHandler) {
      await this.fallbackHandler(message);
    }
  }

  private matches(pattern: Route['pattern'], message: Message): boolean {
    if (typeof pattern === 'string') {
      return message.content === pattern;
    }
    if (pattern instanceof RegExp) {
      return pattern.test(message.content || '');
    }
    return pattern(message);
  }
}
// 使用示例
const router = new MessageRouter();

// 全局中间件
router.use(async (msg, next) => {
  console.log(`[${msg.platform}] ${msg.from.name}: ${msg.content}`);
  await next();
});

// 命令路由
router.command('start', (msg) => {
  console.log('Start command received');
});

router.command('help', (msg) => {
  console.log('Help command received');
});

// 正则路由
router.regex(/^price:\s*(\w+)$/i, (msg) => {
  const match = msg.content.match(/^price:\s*(\w+)$/i);
  console.log(`Price query for: ${match?.[1]}`);
});

// 自定义匹配
router.on(
  (msg) => msg.type === 'image',
  (msg) => console.log('Image received')
);

// Fallback
router.fallback((msg) => {
  console.log('Unknown message:', msg.content);
});

In [None]:
// ========== 3. 富媒体消息处理 ==========
interface MediaFile {
  id: string;
  type: 'image' | 'video' | 'audio' | 'document' | 'sticker';
  mimeType: string;
  fileName?: string;
  fileSize: number;
  url?: string;
  duration?: number; // 音视频时长
  width?: number;
  height?: number;
  thumbnail?: string;
}
class MediaProcessor {
  private downloaders = new Map<string, (fileId: string) => Promise<Buffer>>();

  registerDownloader(platform: string, downloader: (fileId: string) => Promise<Buffer>): void {
    this.downloaders.set(platform, downloader);
  }

  async processMedia(media: MediaFile, platform: string): Promise<ProcessedMedia> {
    // 下载文件
    const buffer = await this.downloadMedia(media, platform);

    // 类型特定处理
    switch (media.type) {
      case 'image':
        return this.processImage(buffer, media);
      case 'video':
        return this.processVideo(buffer, media);
      case 'audio':
        return this.processAudio(buffer, media);
      case 'document':
        return this.processDocument(buffer, media);
      default:
        return { buffer, metadata: media };
    }
  }

  private async downloadMedia(media: MediaFile, platform: string): Promise<Buffer> {
    if (media.url) {
      const response = await fetch(media.url);
      return Buffer.from(await response.arrayBuffer());
    }

    const downloader = this.downloaders.get(platform);
    if (!downloader) {
      throw new Error(`No downloader for platform: ${platform}`);
    }

    return downloader(media.id);
  }

  private processImage(buffer: Buffer, metadata: MediaFile): ProcessedMedia {
    return {
      buffer,
      metadata: {
        ...metadata,
        processed: true,
        format: metadata.mimeType.replace('image/', '')
      }
    };
  }

  private processVideo(buffer: Buffer, metadata: MediaFile): ProcessedMedia {
    return {
      buffer,
      metadata: {
        ...metadata,
        processed: true,
        hasAudio: true // 需要实际检测
      }
    };
  }

  private processAudio(buffer: Buffer, metadata: MediaFile): ProcessedMedia {
    return {
      buffer,
      metadata: {
        ...metadata,
        processed: true,
        // 可以转码为统一格式
        targetFormat: 'mp3'
      }
    };
  }

  private processDocument(buffer: Buffer, metadata: MediaFile): ProcessedMedia {
    return {
      buffer,
      metadata: {
        ...metadata,
        processed: true,
        isText: metadata.mimeType.startsWith('text/'),
        isPDF: metadata.mimeType === 'application/pdf'
      }
    };
  }
}
interface ProcessedMedia {
  buffer: Buffer;
  metadata: MediaFile & { processed: boolean; [key: string]: any };
}

In [None]:
// ========== 4. 长轮询 (Long Polling) ==========
interface PollingConfig {
  url: string;
  interval: number;
  timeout: number;
  headers?: Record<string, string>;
}
class LongPoller<T> {
  private isRunning = false;
  private lastId: string | null = null;
  private abortController: AbortController | null = null;

  constructor(
    private config: PollingConfig,
    private processor: (items: T[]) => Promise<void>,
    private getId: (item: T) => string
  ) {}

  start(): void {
    if (this.isRunning) return;
    
    this.isRunning = true;
    this.poll();
  }

  stop(): void {
    this.isRunning = false;
    this.abortController?.abort();
  }

  private async poll(): Promise<void> {
    while (this.isRunning) {
      try {
        this.abortController = new AbortController();
        
        const url = new URL(this.config.url);
        if (this.lastId) {
          url.searchParams.set('offset', this.lastId);
        }

        const response = await fetch(url.toString(), {
          headers: this.config.headers,
          signal: this.abortController.signal
        });

        if (!response.ok) {
          throw new Error(`HTTP ${response.status}`);
        }

        const items: T[] = await response.json();

        if (items.length > 0) {
          await this.processor(items);
          this.lastId = this.getId(items[items.length - 1]);
        }

        // 等待下次轮询
        await this.sleep(this.config.interval);

      } catch (error) {
        if (error instanceof Error && error.name === 'AbortError') {
          return;
        }
        
        console.error('Polling error:', error);
        
        // 错误后等待更长时间
        await this.sleep(this.config.interval * 2);
      }
    }
  }

  private sleep(ms: number): Promise<void> {
    return new Promise(resolve => setTimeout(resolve, ms));
  }
}
// 使用示例 (类似 Telegram Bot API 的 getUpdates)
// const poller = new LongPoller<Message>(
//   {
//     url: 'https://api.telegram.org/bot<token>/getUpdates',
//     interval: 1000,
//     timeout: 30000
//   },
//   async (messages) => {
//     for (const msg of messages) {
//       await router.route(msg);
//     }
//   },
//   (msg) => msg.id
// );
// poller.start();