# QiCore AMSG - Async Message Queue System

**Interactive tutorial for message-driven architecture with h2A-inspired patterns**

## What You'll Learn

- ✅ **Message queue fundamentals** - Priority-based async message processing
- ✅ **h2A-inspired patterns** - Async iteration and non-blocking I/O
- ✅ **Message type system** - Comprehensive typed message handling
- ✅ **CLI integration** - Message-driven CLI and agent communication
- ✅ **Production patterns** - Backpressure, error recovery, monitoring

---

## 📚 Prerequisites

This tutorial builds on:
- **[01-qi-base.ipynb](./01-qi-base.ipynb)** - Result<T> patterns
- **[02-qi-core.ipynb](./02-qi-core.ipynb)** - Infrastructure services

Let's start by importing everything we need:

In [None]:
// Import Result<T> fundamentals from @qi/base
import { 
  success, failure, match, map, flatMap,
  type Result, type QiError,
  validationError, systemError, networkError
} from '@qi/base'

// Import async message queue system from @qi/amsg
import { 
  QiAsyncMessageQueue,
  QiMessageFactory,
  MessageType, MessagePriority
} from '@qi/amsg'

// Import types for better TypeScript experience
import type { 
  QiMessage,
  UserInputMessage,
  CommandMessage,
  AgentOutputMessage,
  SystemControlMessage,
  StreamMessage
} from '@qi/amsg'

// Display welcome message
Deno.jupyter.html`
<div style="background: linear-gradient(135deg, #ff6b6b 0%, #ee5a52 100%); color: white; padding: 20px; border-radius: 10px; margin: 15px 0;">
  <h3 style="margin: 0 0 15px 0;">📨 QiCore AMSG Tutorial Started!</h3>
  <p style="margin: 0;">Ready to explore message-driven architecture with priority queues and async iteration!</p>
</div>
`

console.log('✅ @qi/amsg imports successful!')
console.log('✅ Ready to explore async message patterns!')

## 1. Message Type System - Structured Communication

AMSG uses a comprehensive type system for different kinds of messages in CLI and agent workflows:

In [None]:
console.log('=== Message Type System Demo ===\n')

// Create a message factory for type-safe message creation
const messageFactory = new QiMessageFactory()

console.log('1. Creating different types of messages:\n')

// User input messages (from CLI)
const userMessage = messageFactory.createUserInputMessage(
  'Hello, can you help me build a React component?',
  'cli',    // source: 'cli' | 'stdin' | 'api'
  false     // raw: boolean (whether input needs processing)
)

match(
  (msg) => {
    console.log('✅ User Input Message:')
    console.log(`   ID: ${msg.id.substring(0, 8)}...`)
    console.log(`   Type: ${msg.type}`)
    console.log(`   Priority: ${msg.priority} (${MessagePriority[msg.priority]})`)
    console.log(`   Input: "${msg.input}"`)
    console.log(`   Source: ${msg.source}\n`)
  },
  (error) => console.error('❌ Failed to create user message:', error.message),
  userMessage
)

// Command messages (CLI commands)
const commandMessage = messageFactory.createCommandMessage(
  'build',
  ['--production', '--minify'],
  {
    executionId: 'exec_123',
    userId: 'user_456',
    sessionId: 'sess_789'
  }
)

match(
  (msg) => {
    console.log('✅ Command Message:')
    console.log(`   ID: ${msg.id.substring(0, 8)}...`)
    console.log(`   Type: ${msg.type}`)
    console.log(`   Command: ${msg.command}`)
    console.log(`   Args: [${msg.args.join(', ')}]`)
    console.log(`   Execution ID: ${msg.context.executionId}\n`)
  },
  (error) => console.error('❌ Failed to create command message:', error.message),
  commandMessage
)

// Agent output messages (responses from AI)
const agentMessage = messageFactory.createAgentOutputMessage(
  'I\'ll help you build a React component! Let me create a sample component with TypeScript.',
  'markdown',  // format: 'text' | 'markdown' | 'json'
  false        // streaming: boolean
)

match(
  (msg) => {
    console.log('✅ Agent Output Message:')
    console.log(`   ID: ${msg.id.substring(0, 8)}...`)
    console.log(`   Type: ${msg.type}`)
    console.log(`   Format: ${msg.format}`)
    console.log(`   Content: "${msg.content.substring(0, 50)}..."`)
    console.log(`   Streaming: ${msg.streaming}\n`)
  },
  (error) => console.error('❌ Failed to create agent message:', error.message),
  agentMessage
)

// System control messages (high priority system operations)
const systemMessage = messageFactory.createSystemControlMessage(
  'pause',      // action: 'pause' | 'resume' | 'reset' | 'shutdown'
  true,         // immediate: boolean
  'User requested pause via Ctrl+C'
)

match(
  (msg) => {
    console.log('✅ System Control Message:')
    console.log(`   ID: ${msg.id.substring(0, 8)}...`)
    console.log(`   Type: ${msg.type}`)
    console.log(`   Priority: ${msg.priority} (${MessagePriority[msg.priority]}) - HIGH PRIORITY!`)
    console.log(`   Action: ${msg.action}`)
    console.log(`   Immediate: ${msg.immediate}`)
    console.log(`   Reason: ${msg.reason}\n`)
  },
  (error) => console.error('❌ Failed to create system message:', error.message),
  systemMessage
)

console.log('2. Message Priority Levels:\n')
console.log('   CRITICAL (0) - System abort, emergency shutdown')
console.log('   HIGH (1)     - User interrupts, errors, system control')
console.log('   NORMAL (2)   - Regular commands and responses')
console.log('   LOW (3)      - Background tasks, logging, status updates')

Deno.jupyter.html`
<div style="background: #fff3cd; padding: 15px; border-radius: 8px; margin: 15px 0;">
  <h4 style="color: #856404; margin: 0 0 10px 0;">🎯 Message Priorities</h4>
  <p style="margin: 0; color: #856404;">
    Priority determines processing order. <strong>CRITICAL</strong> messages can interrupt anything, while <strong>LOW</strong> priority messages are processed when the queue is less busy. This ensures system responsiveness.
  </p>
</div>
`

## 2. Creating and Using Message Queues

The `QiAsyncMessageQueue` is the heart of the message-driven architecture:

In [None]:
console.log('\n=== Message Queue Creation and Basic Operations ===\n')

// Create a message queue with configuration
const messageQueue = new QiAsyncMessageQueue<QiMessage>({
  maxSize: 100,           // Maximum queue size
  maxConcurrent: 5,       // Max concurrent processing
  enableStats: true,      // Enable statistics tracking
  priorityQueuing: true   // Enable priority-based processing
})

console.log('✅ Message queue created with configuration:')
console.log('   Max size: 100 messages')
console.log('   Max concurrent processing: 5')
console.log('   Priority queuing: enabled')
console.log('   Statistics tracking: enabled\n')

// Set up queue monitoring with subscription system
let messageCount = 0
const unsubscribe = messageQueue.subscribe((event) => {
  messageCount++
  
  switch (event.type) {
    case 'message_enqueued':
      console.log(`📥 [${messageCount}] Message enqueued: ${event.message?.type} (Priority: ${MessagePriority[event.message?.priority || 0]})`)
      break
      
    case 'message_dequeued':
      console.log(`📤 [${messageCount}] Message dequeued: ${event.message?.type}`)
      break
      
    case 'message_dropped':
      console.log(`🗑️ [${messageCount}] Message dropped: ${event.reason}`)
      break
      
    case 'queue_paused':
      console.log(`⏸️ [${messageCount}] Queue paused`)
      break
      
    case 'queue_resumed':
      console.log(`▶️ [${messageCount}] Queue resumed`)
      break
  }
})

console.log('✅ Queue monitoring subscribed\n')

// Demonstrate enqueueing messages with different priorities
console.log('--- Enqueueing Messages with Different Priorities ---\n')

// Normal priority messages
const messages = [
  messageFactory.createUserInputMessage('First message', 'cli', false),
  messageFactory.createUserInputMessage('Second message', 'cli', false),
  messageFactory.createCommandMessage('help', [], { executionId: 'exec_1' }),
  messageFactory.createAgentOutputMessage('Processing your request...', 'text', false)
]

// Enqueue normal priority messages
for (const messageResult of messages) {
  await match(
    async (message) => {
      const enqueueResult = messageQueue.enqueue(message, MessagePriority.NORMAL)
      match(
        () => {}, // Success logged by subscription
        (error) => console.error(`❌ Failed to enqueue message: ${error.message}`),
        enqueueResult
      )
    },
    async (error) => console.error(`❌ Invalid message: ${error.message}`),
    messageResult
  )
}

// Add a high priority message (should jump to front of processing)
const urgentMessage = messageFactory.createSystemControlMessage(
  'pause',
  true,
  'User interrupt detected'
)

await match(
  async (message) => {
    const enqueueResult = messageQueue.enqueue(message, MessagePriority.HIGH)
    match(
      () => console.log('\n🚨 HIGH PRIORITY message added - should process first!'),
      (error) => console.error(`❌ Failed to enqueue urgent message: ${error.message}`),
      enqueueResult
    )
  },
  async (error) => console.error(`❌ Invalid urgent message: ${error.message}`),
  urgentMessage
)

// Add a low priority background message
const backgroundMessage = messageFactory.createAgentOutputMessage(
  'Background: Statistics updated',
  'text',
  false
)

await match(
  async (message) => {
    const enqueueResult = messageQueue.enqueue(message, MessagePriority.LOW)
    match(
      () => console.log('📊 LOW PRIORITY background message added\n'),
      (error) => console.error(`❌ Failed to enqueue background message: ${error.message}`),
      enqueueResult
    )
  },
  async (error) => console.error(`❌ Invalid background message: ${error.message}`),
  backgroundMessage
)

console.log(`📊 Queue Status: ${messageQueue.size()} messages queued\n`)

## 3. h2A-Inspired Async Iteration

The queue implements `AsyncIterable`, allowing you to process messages with `for await` loops (h2A pattern):

In [None]:
console.log('=== h2A-Inspired Async Iteration Demo ===\n')

// Create a message processor that demonstrates the h2A pattern
class MessageProcessor {
  private processedCount = 0
  private maxMessages = 0
  
  constructor(maxMessages: number) {
    this.maxMessages = maxMessages
  }
  
  async processMessages(queue: QiAsyncMessageQueue<QiMessage>): Promise<void> {
    console.log('🔄 Starting message processing with h2A async iteration\n')
    
    try {
      // This is the h2A pattern - async iteration over the queue
      for await (const message of queue) {
        this.processedCount++
        
        // Process the message based on its type
        await this.handleMessage(message)
        
        // Stop after processing the specified number of messages for demo
        if (this.processedCount >= this.maxMessages) {
          console.log(`\n✅ Processed ${this.processedCount} messages, stopping demo\n`)
          queue.done() // Signal that we're done processing
          break
        }
        
        // Small delay to make the demo more readable
        await new Promise(resolve => setTimeout(resolve, 100))
      }
    } catch (error) {
      console.error('❌ Message processing error:', error)
    }
  }
  
  private async handleMessage(message: QiMessage): Promise<void> {
    const timestamp = new Date().toISOString().substring(11, 23) // HH:mm:ss.sss
    
    console.log(`[${timestamp}] Processing message ${this.processedCount}:`)
    console.log(`   Type: ${message.type}`)
    console.log(`   Priority: ${MessagePriority[message.priority]} (${message.priority})`)
    console.log(`   ID: ${message.id.substring(0, 12)}...`)
    
    // Handle different message types
    switch (message.type) {
      case MessageType.USER_INPUT:
        const userMsg = message as UserInputMessage
        console.log(`   📝 User said: "${userMsg.input.substring(0, 40)}${userMsg.input.length > 40 ? '...' : ''}"`) 
        console.log(`   📍 Source: ${userMsg.source}`)
        // Simulate processing time
        await new Promise(resolve => setTimeout(resolve, 50))
        break
        
      case MessageType.COMMAND:
        const cmdMsg = message as CommandMessage
        console.log(`   ⚙️ Executing: ${cmdMsg.command} ${cmdMsg.args.join(' ')}`)
        console.log(`   🆔 Execution: ${cmdMsg.context.executionId}`)
        // Simulate command execution time
        await new Promise(resolve => setTimeout(resolve, 75))
        break
        
      case MessageType.AGENT_OUTPUT:
        const agentMsg = message as AgentOutputMessage
        console.log(`   🤖 Agent output (${agentMsg.format}): "${agentMsg.content.substring(0, 40)}${agentMsg.content.length > 40 ? '...' : ''}"`) 
        console.log(`   📡 Streaming: ${agentMsg.streaming}`)
        // Simulate response processing time
        await new Promise(resolve => setTimeout(resolve, 30))
        break
        
      case MessageType.SYSTEM_CONTROL:
        const sysMsg = message as SystemControlMessage
        console.log(`   🛠️ System action: ${sysMsg.action.toUpperCase()}`)
        console.log(`   ⚡ Immediate: ${sysMsg.immediate}`)
        if (sysMsg.reason) {
          console.log(`   📋 Reason: ${sysMsg.reason}`)
        }
        // System messages process quickly
        await new Promise(resolve => setTimeout(resolve, 10))
        break
        
      default:
        console.log(`   ❓ Unknown message type: ${message.type}`)
    }
    
    console.log(`   ✅ Message processed in ${this.getProcessingTime(message.type)}ms\n`)
  }
  
  private getProcessingTime(messageType: MessageType): number {
    // Simulate different processing times for different message types
    switch (messageType) {
      case MessageType.SYSTEM_CONTROL: return 10
      case MessageType.AGENT_OUTPUT: return 30
      case MessageType.USER_INPUT: return 50
      case MessageType.COMMAND: return 75
      default: return 25
    }
  }
}

// Create the processor and start processing
const processor = new MessageProcessor(6) // Process 6 messages total

// Process messages in background
const processingPromise = processor.processMessages(messageQueue)

// Wait for processing to complete
await processingPromise

console.log('📊 Final Queue Statistics:')
const stats = messageQueue.getStats()
console.log(`   Messages processed: ${stats.messagesProcessed}`)
console.log(`   Current queue size: ${messageQueue.size()}`)
console.log(`   Processing errors: ${stats.processingErrors}`)

// Clean up subscription
unsubscribe()

Deno.jupyter.html`
<div style="background: #e3f2fd; padding: 15px; border-radius: 8px; margin: 15px 0;">
  <h4 style="color: #1565c0; margin: 0 0 10px 0;">🔄 h2A Pattern Benefits</h4>
  <ul style="margin: 10px 0; padding-left: 20px; color: #1565c0;">
    <li><strong>Non-blocking</strong> - Messages processed asynchronously</li>
    <li><strong>Priority ordering</strong> - Higher priority messages processed first</li>
    <li><strong>Backpressure handling</strong> - Queue manages memory and flow control</li>
    <li><strong>Clean iteration</strong> - No manual queue polling needed</li>
    <li><strong>Error isolation</strong> - Individual message errors don't stop processing</li>
  </ul>
</div>
`

## 4. CLI Integration Patterns

Let's see how AMSG enables message-driven CLI architecture, as shown in the CLI-AMSG integration example:

In [None]:
console.log('=== CLI Integration Patterns Demo ===\n')

// Mock CLI that demonstrates message-driven architecture
class MockCLI {
  private messageQueue: QiAsyncMessageQueue<QiMessage>
  private messageFactory: QiMessageFactory
  private sessionId: string
  
  constructor() {
    this.messageQueue = new QiAsyncMessageQueue<QiMessage>({
      maxSize: 50,
      enableStats: true,
      priorityQueuing: true
    })
    this.messageFactory = new QiMessageFactory()
    this.sessionId = `cli_session_${Date.now()}`
  }
  
  async simulateUserInteraction(): Promise<void> {
    console.log('🖥️ Mock CLI Session Started\n')
    console.log(`Session ID: ${this.sessionId}\n`)
    
    // Simulate a series of user interactions
    const interactions = [
      { type: 'input', content: 'hello', delay: 100 },
      { type: 'input', content: 'help me build a function', delay: 200 },
      { type: 'command', content: 'build --watch', delay: 150 },
      { type: 'interrupt', content: 'Ctrl+C pressed', delay: 50 },
      { type: 'input', content: 'can you fix the error?', delay: 300 },
      { type: 'command', content: 'test --coverage', delay: 180 }
    ]
    
    // Send interactions to the queue
    for (const interaction of interactions) {
      await this.sendInteraction(interaction)
      await new Promise(resolve => setTimeout(resolve, interaction.delay))
    }
    
    console.log('\n📝 All user interactions queued\n')
  }
  
  private async sendInteraction(interaction: any): Promise<void> {
    switch (interaction.type) {
      case 'input':
        const inputMessage = this.messageFactory.createUserInputMessage(
          interaction.content,
          'cli',
          false
        )
        
        await match(
          async (msg) => {
            this.messageQueue.enqueue(msg, MessagePriority.NORMAL)
            console.log(`💬 User input: "${interaction.content}"`)
          },
          async (error) => console.error('❌ Input message error:', error.message),
          inputMessage
        )
        break
        
      case 'command':
        const [command, ...args] = interaction.content.split(' ')
        const commandMessage = this.messageFactory.createCommandMessage(
          command,
          args,
          {
            executionId: `exec_${Date.now()}`,
            sessionId: this.sessionId
          }
        )
        
        await match(
          async (msg) => {
            this.messageQueue.enqueue(msg, MessagePriority.NORMAL)
            console.log(`⚙️ Command: ${interaction.content}`)
          },
          async (error) => console.error('❌ Command message error:', error.message),
          commandMessage
        )
        break
        
      case 'interrupt':
        const interruptMessage = this.messageFactory.createSystemControlMessage(
          'pause',
          true,
          interaction.content
        )
        
        await match(
          async (msg) => {
            this.messageQueue.enqueue(msg, MessagePriority.HIGH)
            console.log(`🚨 INTERRUPT: ${interaction.content} (HIGH PRIORITY)`)
          },
          async (error) => console.error('❌ Interrupt message error:', error.message),
          interruptMessage
        )
        break
    }
  }
  
  getMessageQueue(): QiAsyncMessageQueue<QiMessage> {
    return this.messageQueue
  }
}

// Mock Agent that processes messages and generates responses
class MockAgent {
  private messageQueue: QiAsyncMessageQueue<QiMessage>
  private messageFactory: QiMessageFactory
  
  constructor(messageQueue: QiAsyncMessageQueue<QiMessage>) {
    this.messageQueue = messageQueue
    this.messageFactory = new QiMessageFactory()
  }
  
  async processMessages(maxMessages: number): Promise<void> {
    console.log('🤖 Mock Agent started processing messages\n')
    
    let processedCount = 0
    
    try {
      // Process messages using h2A async iteration
      for await (const message of this.messageQueue) {
        processedCount++
        
        console.log(`[Agent] Processing message ${processedCount}/${maxMessages}:`)
        console.log(`   Type: ${message.type}`)
        console.log(`   Priority: ${MessagePriority[message.priority]}`)
        
        // Generate appropriate response
        await this.handleMessage(message)
        
        if (processedCount >= maxMessages) {
          console.log('\n✅ Agent finished processing demo messages')
          this.messageQueue.done()
          break
        }
        
        // Small delay for demo readability
        await new Promise(resolve => setTimeout(resolve, 150))
      }
    } catch (error) {
      console.error('❌ Agent processing error:', error)
    }
  }
  
  private async handleMessage(message: QiMessage): Promise<void> {
    switch (message.type) {
      case MessageType.USER_INPUT:
        const userMsg = message as UserInputMessage
        const response = this.generateResponse(userMsg.input)
        
        console.log(`   👤 User: "${userMsg.input}"`)
        console.log(`   🤖 Agent: "${response}"\n`)
        
        // Send response back through queue
        await this.sendResponse(response, message.correlationId)
        break
        
      case MessageType.COMMAND:
        const cmdMsg = message as CommandMessage
        console.log(`   ⚙️ Executing: ${cmdMsg.command} ${cmdMsg.args.join(' ')}`)
        
        // Simulate command execution
        const result = await this.executeCommand(cmdMsg.command, cmdMsg.args)
        console.log(`   ✅ Result: ${result}\n`)
        
        await this.sendResponse(result, message.correlationId)
        break
        
      case MessageType.SYSTEM_CONTROL:
        const sysMsg = message as SystemControlMessage
        console.log(`   🛠️ System: ${sysMsg.action} (${sysMsg.reason})\n`)
        
        // Handle system control (pause, resume, etc.)
        await this.handleSystemControl(sysMsg)
        break
        
      default:
        console.log(`   ❓ Unknown message type: ${message.type}\n`)
    }
  }
  
  private generateResponse(input: string): string {
    const responses = {
      'hello': '👋 Hello! How can I help you today?',
      'help': '🆘 I can help with commands, coding questions, and general assistance.',
      'function': '🔧 I\'ll help you build a function! What kind of function do you need?',
      'error': '🐛 Let me analyze the error and provide a solution.',
      'default': '🤔 Interesting! Let me think about that and provide a helpful response.'
    }
    
    const key = Object.keys(responses).find(k => 
      input.toLowerCase().includes(k)
    ) || 'default'
    
    return responses[key as keyof typeof responses]
  }
  
  private async executeCommand(command: string, args: string[]): Promise<string> {
    // Simulate command execution
    await new Promise(resolve => setTimeout(resolve, 100))
    
    switch (command) {
      case 'build':
        return `Build completed successfully with args: ${args.join(' ')}`
      case 'test':
        return `Tests passed (${args.includes('--coverage') ? 'with coverage report' : 'basic run'})`
      case 'help':
        return 'Available commands: build, test, help, version'
      default:
        return `Command '${command}' executed with args: ${args.join(' ')}`
    }
  }
  
  private async handleSystemControl(message: SystemControlMessage): Promise<void> {
    // In a real system, this would handle pause/resume/shutdown
    console.log(`   🔧 System control handled: ${message.action}`)
  }
  
  private async sendResponse(content: string, correlationId?: string): Promise<void> {
    const responseMessage = this.messageFactory.createAgentOutputMessage(
      content,
      'text',
      false
    )
    
    await match(
      async (msg) => {
        // Set correlation ID if provided for request-response tracking
        if (correlationId) {
          (msg as any).correlationId = correlationId
        }
        
        this.messageQueue.enqueue(msg, MessagePriority.NORMAL)
      },
      async (error) => console.error('❌ Failed to send response:', error.message),
      responseMessage
    )
  }
}

// Run the CLI-Agent integration demo
console.log('🚀 Starting CLI-Agent Integration Demo\n')

// Create CLI and simulate user interactions
const cli = new MockCLI()
await cli.simulateUserInteraction()

// Create agent and process the messages
const agent = new MockAgent(cli.getMessageQueue())
await agent.processMessages(6) // Process 6 messages for demo

console.log('\n📊 CLI-Agent Demo Statistics:')
const finalStats = cli.getMessageQueue().getStats()
console.log(`   Messages processed: ${finalStats.messagesProcessed}`)
console.log(`   Queue size: ${cli.getMessageQueue().size()}`)
console.log('\n✅ CLI-Agent integration demo completed!')

Deno.jupyter.html`
<div style="background: #f8f9fa; padding: 15px; border-left: 4px solid #28a745; margin: 15px 0;">
  <h4 style="color: #155724; margin: 0 0 10px 0;">🔗 CLI Integration Benefits</h4>
  <ul style="margin: 10px 0; padding-left: 20px; color: #155724;">
    <li><strong>Decoupled architecture</strong> - CLI and Agent communicate only via messages</li>
    <li><strong>Priority handling</strong> - Interrupts (Ctrl+C) processed immediately</li>
    <li><strong>Request correlation</strong> - Responses linked to original requests</li>
    <li><strong>Async processing</strong> - Non-blocking user interaction</li>
    <li><strong>State isolation</strong> - Each component manages its own state</li>
  </ul>
</div>
`

## 5. Advanced Patterns - Streaming and Backpressure

Let's explore more advanced patterns like streaming messages and backpressure handling:

In [None]:
console.log('\n=== Advanced Patterns: Streaming and Backpressure ===\n')

// Create a specialized streaming message queue
const streamQueue = new QiAsyncMessageQueue<QiMessage>({
  maxSize: 20,  // Smaller queue to demonstrate backpressure
  enableStats: true,
  priorityQueuing: true
})

// Stream message factory
const streamFactory = new QiMessageFactory()

// Streaming data producer
class StreamProducer {
  private messageQueue: QiAsyncMessageQueue<QiMessage>
  private messageFactory: QiMessageFactory
  
  constructor(queue: QiAsyncMessageQueue<QiMessage>, factory: QiMessageFactory) {
    this.messageQueue = queue
    this.messageFactory = factory
  }
  
  async startStream(streamId: string, totalChunks: number): Promise<void> {
    console.log(`📡 Starting stream: ${streamId} (${totalChunks} chunks)\n`)
    
    // Send stream start message
    const startMessage = this.createStreamMessage(
      MessageType.STREAM_START,
      streamId,
      'Stream beginning'
    )
    
    await this.sendMessage(startMessage, MessagePriority.HIGH)
    
    // Send data chunks
    for (let i = 1; i <= totalChunks; i++) {
      const chunkData = `Chunk ${i}/${totalChunks}: This is streaming data with content...`
      
      const dataMessage = this.createStreamMessage(
        MessageType.STREAM_DATA,
        streamId,
        chunkData,
        { chunkIndex: i, totalChunks }
      )
      
      // Try to send message, handle backpressure
      const sendResult = await this.sendWithBackpressure(dataMessage, MessagePriority.NORMAL)
      
      if (!sendResult) {
        console.log(`⚠️ Backpressure detected at chunk ${i}, pausing...`)
        await new Promise(resolve => setTimeout(resolve, 200)) // Wait and retry
        await this.sendMessage(dataMessage, MessagePriority.NORMAL)
        console.log(`✅ Resumed at chunk ${i} after backpressure`)
      }
      
      // Small delay between chunks
      await new Promise(resolve => setTimeout(resolve, 50))
    }
    
    // Send stream end message
    const endMessage = this.createStreamMessage(
      MessageType.STREAM_END,
      streamId,
      'Stream completed successfully'
    )
    
    await this.sendMessage(endMessage, MessagePriority.HIGH)
    console.log(`\n📡 Stream ${streamId} completed\n`)
  }
  
  private createStreamMessage(
    type: MessageType, 
    streamId: string, 
    data: string, 
    metadata?: any
  ): Result<StreamMessage, QiError> {
    // Create a basic message and extend it for streaming
    const baseMessage = this.messageFactory.createAgentOutputMessage(data, 'text', true)
    
    return match(
      (msg) => success({
        ...msg,
        type,
        streamId,
        data,
        metadata
      } as StreamMessage),
      (error) => failure(error),
      baseMessage
    )
  }
  
  private async sendMessage(messageResult: Result<StreamMessage, QiError>, priority: MessagePriority): Promise<boolean> {
    return match(
      (message) => {
        const enqueueResult = this.messageQueue.enqueue(message as QiMessage, priority)
        return match(
          () => true,
          (error) => {
            console.error(`❌ Failed to enqueue message: ${error.message}`)
            return false
          },
          enqueueResult
        )
      },
      (error) => {
        console.error(`❌ Invalid stream message: ${error.message}`)
        return false
      },
      messageResult
    )
  }
  
  private async sendWithBackpressure(messageResult: Result<StreamMessage, QiError>, priority: MessagePriority): Promise<boolean> {
    // Check queue capacity before sending
    if (this.messageQueue.size() >= 15) { // Near capacity
      return false // Signal backpressure
    }
    
    return this.sendMessage(messageResult, priority)
  }
}

// Stream consumer
class StreamConsumer {
  private activeStreams = new Map<string, { chunks: number, startTime: number }>()
  
  async consumeStream(queue: QiAsyncMessageQueue<QiMessage>, maxMessages: number): Promise<void> {
    console.log('📥 Stream consumer started\n')
    
    let processedCount = 0
    
    try {
      for await (const message of queue) {
        processedCount++
        
        if (this.isStreamMessage(message)) {
          await this.handleStreamMessage(message as StreamMessage)
        } else {
          console.log(`📦 Non-stream message: ${message.type}`)
        }
        
        if (processedCount >= maxMessages) {
          console.log('\n✅ Stream consumer finished demo')
          queue.done()
          break
        }
        
        // Small processing delay
        await new Promise(resolve => setTimeout(resolve, 30))
      }
    } catch (error) {
      console.error('❌ Stream consumption error:', error)
    }
  }
  
  private isStreamMessage(message: QiMessage): boolean {
    return message.type.startsWith('STREAM_')
  }
  
  private async handleStreamMessage(message: StreamMessage): Promise<void> {
    const { type, streamId, data, metadata } = message
    
    switch (type) {
      case MessageType.STREAM_START:
        console.log(`🚀 Stream started: ${streamId}`)
        this.activeStreams.set(streamId, { chunks: 0, startTime: Date.now() })
        break
        
      case MessageType.STREAM_DATA:
        const stream = this.activeStreams.get(streamId)
        if (stream) {
          stream.chunks++
          console.log(`📊 ${streamId} chunk ${stream.chunks}: ${data.substring(0, 30)}...`)
          
          if (metadata?.chunkIndex && metadata?.totalChunks) {
            const progress = (metadata.chunkIndex / metadata.totalChunks * 100).toFixed(1)
            console.log(`   Progress: ${progress}% (${metadata.chunkIndex}/${metadata.totalChunks})`)
          }
        }
        break
        
      case MessageType.STREAM_END:
        const endedStream = this.activeStreams.get(streamId)
        if (endedStream) {
          const duration = Date.now() - endedStream.startTime
          console.log(`🏁 Stream ended: ${streamId}`)
          console.log(`   Total chunks: ${endedStream.chunks}`)
          console.log(`   Duration: ${duration}ms`)
          console.log(`   Throughput: ${(endedStream.chunks * 1000 / duration).toFixed(1)} chunks/sec\n`)
          
          this.activeStreams.delete(streamId)
        }
        break
        
      case MessageType.STREAM_ERROR:
        console.log(`❌ Stream error: ${streamId} - ${data}`)
        this.activeStreams.delete(streamId)
        break
    }
  }
}

// Run streaming demo
console.log('🎬 Starting streaming and backpressure demo\n')

const producer = new StreamProducer(streamQueue, streamFactory)
const consumer = new StreamConsumer()

// Start consumer in background
const consumerPromise = consumer.consumeStream(streamQueue, 12) // Process 12 messages (1 start + 8 chunks + 1 end = 10, plus some extras)

// Start producing stream data
await producer.startStream('demo_stream_001', 8)

// Wait for consumer to finish
await consumerPromise

console.log('\n📊 Streaming Demo Statistics:')
const streamStats = streamQueue.getStats()
console.log(`   Messages processed: ${streamStats.messagesProcessed}`)
console.log(`   Final queue size: ${streamQueue.size()}`)

Deno.jupyter.html`
<div style="background: #fff8e1; padding: 15px; border-radius: 8px; margin: 15px 0;">
  <h4 style="color: #f57c00; margin: 0 0 10px 0;">🌊 Streaming Patterns</h4>
  <ul style="margin: 10px 0; padding-left: 20px; color: #f57c00;">
    <li><strong>Backpressure handling</strong> - Producer pauses when queue is full</li>
    <li><strong>Stream lifecycle</strong> - START → DATA chunks → END pattern</li>
    <li><strong>Progress tracking</strong> - Metadata for chunk progress</li>
    <li><strong>Error recovery</strong> - Graceful handling of stream errors</li>
    <li><strong>Performance monitoring</strong> - Throughput and timing metrics</li>
  </ul>
</div>
`

## 🎯 Key Takeaways

After completing this tutorial, you now understand:

### ✅ **Message Type System**
- **Structured messages** for different communication needs
- **Priority levels** (CRITICAL → HIGH → NORMAL → LOW)
- **Type safety** with TypeScript discriminated unions
- **Message factory** for consistent message creation

### ✅ **h2A-Inspired Patterns**
- **Async iteration** with `for await` loops
- **Non-blocking processing** for responsive systems
- **Priority queuing** ensures important messages process first
- **Error isolation** - individual message failures don't stop processing

### ✅ **CLI Integration Architecture**
- **Message-driven communication** between CLI and Agent
- **Request-response correlation** for tracking interactions
- **Interrupt handling** with high-priority system messages
- **Decoupled components** communicating only via messages

### ✅ **Advanced Patterns**
- **Streaming data** with START/DATA/END lifecycle
- **Backpressure handling** prevents memory exhaustion
- **Performance monitoring** with statistics and metrics
- **Graceful degradation** under high load

### ✅ **Production Readiness**
- **Result<T> integration** for functional error handling
- **Event subscription** for monitoring and observability
- **Resource management** with proper cleanup
- **Scalable architecture** for high-throughput systems

## 🚀 Next Steps

Now that you understand message-driven architecture, you're ready to learn about the CLI framework that uses these patterns:

### 📖 **Continue Learning**
- **[04-qi-cli.ipynb](./04-qi-cli.ipynb)** - CLI framework with state management
- **[01-qi-base.ipynb](./01-qi-base.ipynb)** - Review Result<T> fundamentals
- **[02-qi-core.ipynb](./02-qi-core.ipynb)** - Review infrastructure services

### 🛠️ **Try It Yourself**
- Explore the working example in `typescript/app/cli-amsg-example/`
- Run `bun run dev` to see CLI-AMSG integration in action
- Experiment with different message priorities and types

### 📚 **Dive Deeper**
- Read the [AMSG API Documentation](../api/amsg/README.md)
- Study the architectural patterns in `docs/amsg/README.md`
- Learn about the h2A pattern and async iteration best practices

**Remember**: Message-driven architecture enables scalable, responsive systems where components communicate through well-defined message contracts rather than direct coupling!

In [None]:
// Final challenge: Build your own message-driven system!
// Here's a template for a custom message processor

class CustomMessageProcessor {
  private messageQueue: QiAsyncMessageQueue<QiMessage>
  private messageFactory: QiMessageFactory
  
  constructor() {
    this.messageQueue = new QiAsyncMessageQueue<QiMessage>({
      maxSize: 50,
      enableStats: true,
      priorityQueuing: true
    })
    this.messageFactory = new QiMessageFactory()
  }
  
  async addMessage(content: string, type: 'input' | 'command' | 'system'): Promise<void> {
    let messageResult: Result<QiMessage, QiError>
    let priority = MessagePriority.NORMAL
    
    switch (type) {
      case 'input':
        messageResult = this.messageFactory.createUserInputMessage(content, 'cli', false)
        break
      case 'command':
        const [cmd, ...args] = content.split(' ')
        messageResult = this.messageFactory.createCommandMessage(cmd, args, { executionId: Date.now().toString() })
        break
      case 'system':
        messageResult = this.messageFactory.createSystemControlMessage('pause', true, content)
        priority = MessagePriority.HIGH
        break
    }
    
    await match(
      async (message) => {
        this.messageQueue.enqueue(message, priority)
        console.log(`✅ Added ${type} message: "${content.substring(0, 30)}${content.length > 30 ? '...' : ''}"")`)
      },
      async (error) => console.error(`❌ Failed to add message: ${error.message}`),
      messageResult
    )
  }
  
  async processMessages(maxMessages: number = 3): Promise<void> {
    console.log(`\n🔄 Processing up to ${maxMessages} messages...\n`)
    
    let count = 0
    try {
      for await (const message of this.messageQueue) {
        count++
        console.log(`[${count}] Processing: ${message.type} (Priority: ${MessagePriority[message.priority]})`)
        
        // Add your custom processing logic here
        await new Promise(resolve => setTimeout(resolve, 100))
        
        if (count >= maxMessages) {
          this.messageQueue.done()
          break
        }
      }
    } catch (error) {
      console.error('Processing error:', error)
    }
    
    console.log(`\n✅ Processed ${count} messages`)
    const stats = this.messageQueue.getStats()
    console.log(`📊 Stats: ${stats.messagesProcessed} total processed, ${this.messageQueue.size()} remaining`)
  }
}

// Test your custom processor
const customProcessor = new CustomMessageProcessor()

console.log('\n🧪 Testing Custom Message Processor\n')

// Add some test messages
await customProcessor.addMessage('Hello world!', 'input')
await customProcessor.addMessage('build --production', 'command')
await customProcessor.addMessage('Emergency stop!', 'system') // High priority
await customProcessor.addMessage('Another message', 'input')

// Process them (high priority should process first)
await customProcessor.processMessages(4)

Deno.jupyter.html`
<div style="background: linear-gradient(135deg, #667eea 0%, #764ba2 100%); color: white; padding: 20px; border-radius: 10px; margin: 20px 0; text-align: center;">
  <h3 style="margin: 0 0 15px 0;">🎉 QiCore AMSG Tutorial Complete!</h3>
  <p style="margin: 0; font-size: 16px;">
    You're now ready to build message-driven applications with async queues and priority handling!
  </p>
</div>
`