# Streaming de informação no MCP

Quando usamos o MCP, pode ser que a tarefa que estamos executando seja longa e queiramos que o cliente possa ver o progresso da tarefa. Embora no post sobre [MCP](https://www.maximofn.com/pt-br/mcp) tenhamos visto uma maneira de fazer isso usando `Context`, como o protocolo MCP evoluiu, agora podemos usá-lo de uma maneira melhor.

## Servidor

No post do [MCP](https://www.maximofn.com/pt-br/mcp), vimos que podíamos criar um servidor MCP usando

Criar um objeto mcp da classe FastMCP

``` python
from fastmcp import FastMCP

# Create FastMCP server
mcp = FastMCP(
    name="MCP server name",
    instructions="MCP server instructions",
)
```

Criar tools adicionando decoradores às funções

``` python
@mcp.tool
def tool_name(param1: str, param2: int) -> str:
    return "result"
```

E executar o servidor usando o método `run`. Além disso, poderíamos definir http como camada de transporte.

``` python
mcp.run(
    transport="http",
    host="0.0.0.0",
    port=8000
)
```

Agora importamos a função `create_streamable_http_app` do pacote `fastmcp.server.http` e a usamos para criar um aplicativo HTTP que suporta streaming.

```python
from fastmcp.server.http import create_streamable_http_app

app = create_streamable_http_app(
    server=mcp,
    streamable_http_path="/mcp/",
    stateless_http=False,  # Keep session state
    debug=True
)
```

Criamos um servidor com `uvicorn`

``` python
import uvicorn

# Configure uvicorn
config = uvicorn.Config(
    app=app,
    host=host,
    port=port,
    log_level="info",
    access_log=False
)

# Run server
server = uvicorn.Server(config)
await server.serve()
```

E o executamos de forma assíncrona.

``` python
import asyncio

asyncio.run(run_streaming_server())
```

### Implementação do servidor

Agora que explicamos como criar o servidor, vamos criar um.

#### Criar ambiente virtual para o servidor

Primeiro criamos a pasta onde vamos desenvolvê-lo.

In [1]:
!mkdir MCP_streamable_server

Criamos o ambiente com `uv`

In [3]:
!cd MCP_streamable_server && uv init .

Initialized project `[36mmcp-streamable-server[39m` at `[36m/Users/macm1/Documents/web/portafolio/posts/MCP_streamable_server[39m`


Iniciamos o ambiente

In [4]:
!cd MCP_streamable_server && uv venv

Using CPython [36m3.12.8[39m
Creating virtual environment at: [36m.venv[39m
Activate with: [32msource .venv/bin/activate[39m


Instalamos as bibliotecas necessárias

In [5]:
!cd MCP_streamable_server && uv add fastmcp uvicorn

[2K[2mResolved [1m64 packages[0m [2min 673ms[0m[0m                                        [0m
[2K[37m⠙[0m [2mPreparing packages...[0m (0/4)                                                   [37m⠋[0m [2mPreparing packages...[0m (0/0)                                                   
[2K[1A[37m⠙[0m [2mPreparing packages...[0m (0/4)--------------[0m[0m     0 B/87.93 KiB           [1A
[2K[1A[37m⠙[0m [2mPreparing packages...[0m (0/4)--------------[0m[0m     0 B/87.93 KiB           [1A
[2mrequests            [0m [32m[2m------------------------------[0m[0m     0 B/63.22 KiB
[2K[2A[37m⠙[0m [2mPreparing packages...[0m (0/4)--------------[0m[0m     0 B/87.93 KiB           [2A
[2mrequests            [0m [32m[2m------------------------------[0m[0m     0 B/63.22 KiB
[2K[2A[37m⠙[0m [2mPreparing packages...[0m (0/4)--------------[0m[0m 16.00 KiB/87.93 KiB         [2A
[2mrequests            [0m [32m--------[2m----------------------

#### Código do servidor

Agora vamos criar o código do servidor. Vamos criar um servidor com tudo o que falamos anteriormente e com quatro tools que simulam tarefas muito longas.

In [6]:
%%writefile MCP_streamable_server/server.py

#!/usr/bin/env python3
"""
MCP server for streaming and partial results.
Shows how to send real-time progress updates to the client.
"""

import asyncio
import uvicorn
from typing import Dict, List, Any
from fastmcp import FastMCP, Context
from fastmcp.server.http import create_streamable_http_app


# Create MCP server instance
mcp = FastMCP(
    name="Streaming Server",
    instructions="Streaming Server with real-time progress updates"
)


@mcp.tool
async def long_running_task(
    name: str = "Task", 
    steps: int = 10,
    context: Context = None
) -> Dict[str, Any]:
    """
    Long running task with real-time progress updates.
    
    Args:
        name: Task name
        steps: Number of steps to execute
    """
    if context:
        await context.info(f"🚀 Initializing {name} with {steps} steps...")
    
    results = []
    
    for i in range(steps):
        # Simulate work
        await asyncio.sleep(1)
        
        # Create partial result
        partial_result = f"Step {i + 1}: Processed {name}"
        results.append(partial_result)
        
        # Report progress
        if context:
            await context.report_progress(
                progress=i + 1,
                total=steps,
                message=f"Step {i + 1}/{steps} - {partial_result}"
            )
            
            await context.debug(f"✅ {partial_result}")
    
    if context:
        await context.info(f"🎉 {name} completed successfully!")
    
    return {
        "task_name": name,
        "steps_completed": steps,
        "results": results,
        "status": "completed"
    }


@mcp.tool
async def streaming_data_processor(
    data_size: int = 100,
    context: Context = None
) -> Dict[str, Any]:
    """
    Processes data sending real-time progress updates.
    
    Args:
        data_size: Number of data items to process
    """
    if context:
        await context.info(f"📊 Procesando {data_size} elementos de datos...")
    
    processed = []
    batch_size = max(1, data_size // 10)  # Process in batches
    
    for i in range(0, data_size, batch_size):
        batch_end = min(i + batch_size, data_size)
        
        # Simulate batch processing
        await asyncio.sleep(0.5)
        
        # Process batch
        batch_results = [f"item_{j}" for j in range(i, batch_end)]
        processed.extend(batch_results)
        
        # Report progress
        if context:
            progress = len(processed)
            await context.report_progress(
                progress=progress,
                total=data_size,
                message=f"Processed {progress}/{data_size} items"
            )
            
            await context.debug(f"Batch processed: {i}-{batch_end-1}")
    
    if context:
        await context.info(f"✅ Processing completed: {len(processed)} items")
    
    return {
        "total_processed": len(processed),
        "processed_items": processed[:10],  # Show first 10 items
        "status": "completed"
    }


@mcp.tool
async def file_upload_simulation(
    file_count: int = 5,
    context: Context = None
) -> Dict[str, Any]:
    """
    Simulates file upload with progress updates.
    
    Args:
        file_count: Number of files to upload
    """
    if context:
        await context.info(f"📤 Starting upload of {file_count} files...")
    
    uploaded_files = []
    
    for i in range(file_count):
        file_name = f"file_{i+1}.dat"
        
        if context:
            await context.info(f"Uploading {file_name}...")
        
        # Simulate upload by chunks
        chunks = 10
        for chunk in range(chunks):
            await asyncio.sleep(0.2)  # Simulate upload time
            
            if context:
                await context.report_progress(
                    progress=(i * chunks) + chunk + 1,
                    total=file_count * chunks,
                    message=f"Uploading {file_name} - chunk {chunk+1}/{chunks}"
                )
        
        uploaded_files.append({
            "name": file_name,
            "size": f"{(i+1) * 1024} KB",
            "status": "uploaded"
        })
        
        if context:
            await context.debug(f"✅ {file_name} uploaded successfully")
    
    if context:
        await context.info(f"🎉 Upload completed: {len(uploaded_files)} files")
    
    return {
        "uploaded_count": len(uploaded_files),
        "files": uploaded_files,
        "total_size": sum(int(f["size"].split()[0]) for f in uploaded_files),
        "status": "completed"
    }


@mcp.tool
async def realtime_monitoring(
    duration_seconds: int = 30,
    context: Context = None
) -> Dict[str, Any]:
    """
    Real-time monitoring with periodic updates.
    
    Args:
        duration_seconds: Monitoring duration in seconds
    """
    if context:
        await context.info(f"📡 Starting monitoring for {duration_seconds} seconds...")
    
    metrics = []
    interval = 2  # Update every 2 seconds
    total_intervals = duration_seconds // interval
    
    for i in range(total_intervals):
        # Simulate metrics
        import random
        cpu_usage = random.randint(20, 80)
        memory_usage = random.randint(40, 90)
        network_io = random.randint(100, 1000)
        
        metric = {
            "timestamp": i * interval,
            "cpu": cpu_usage,
            "memory": memory_usage,
            "network_io": network_io
        }
        metrics.append(metric)
        
        if context:
            await context.report_progress(
                progress=i + 1,
                total=total_intervals,
                message=f"Monitoring active - CPU: {cpu_usage}%, MEM: {memory_usage}%, NET: {network_io}KB/s"
            )
            
            await context.debug(f"Metrics collected: interval {i+1}")
        
        await asyncio.sleep(interval)
    
    if context:
        await context.info(f"📊 Monitoring completed: {len(metrics)} data points")
    
    avg_cpu = sum(m["cpu"] for m in metrics) / len(metrics)
    avg_memory = sum(m["memory"] for m in metrics) / len(metrics)
    
    return {
        "duration": duration_seconds,
        "data_points": len(metrics),
        "avg_cpu": round(avg_cpu, 2),
        "avg_memory": round(avg_memory, 2),
        "metrics": metrics,
        "status": "completed"
    }


async def run_streaming_server(host: str = "127.0.0.1", port: int = 8000):
    """Run the streaming server."""
    print(f"🚀 Starting MCP streaming server on {host}:{port}")
    
    # Create Starlette application with streaming support
    app = create_streamable_http_app(
        server=mcp,
        streamable_http_path="/mcp/",
        stateless_http=False,  # Keep session state
        debug=True
    )
    
    # Configure uvicorn
    config = uvicorn.Config(
        app=app,
        host=host,
        port=port,
        log_level="info",
        access_log=False
    )
    
    # Run server
    server = uvicorn.Server(config)
    print(f"✅ Server ready at http://{host}:{port}/mcp/")
    print("📡 Available tools:")
    print("  - long_running_task: Long running task with progress")
    print("  - streaming_data_processor: Data processing")
    print("  - file_upload_simulation: File upload simulation")
    print("  - realtime_monitoring: Real-time monitoring")
    
    await server.serve()


if __name__ == "__main__":
    try:
        asyncio.run(run_streaming_server())
    except KeyboardInterrupt:
        print("\n⏹️  Server stopped by user")
    except Exception as e:
        print(f"❌ Error running server: {e}")

Writing MCP_streamable_server/server.py


## Cliente

Antes criávamos um cliente com a classe `Client` do `fastmcp`.

``` python
from fastmcp import Client

client = Client(
    server_url="http://localhost:8000/mcp/",
    name="MCP client name",
    instructions="MCP client instructions",
)
```

E com o cliente, chamávamos as tools do servidor.

Agora usamos a classe `StreamableHttpTransport` de `fastmcp.client.transports` para criar uma camada de transporte que suporte streaming e criamos o cliente da mesma forma que antes, só que indicamos a camada de transporte.

``` python
from fastmcp import Client
from fastmcp.client.transports import StreamableHttpTransport

transport = StreamableHttpTransport(
    url="http://localhost:8000/mcp/",
    sse_read_timeout=60.0  # Timeout for streaming
)

client = Client(transport=transport)
```

O restante permanece igual.

### Implementação do cliente

Agora que explicamos como criar o cliente que suporta o streaming, vamos implementá-lo.

#### Criar o ambiente virtual para o cliente

Primeiro criamos a pasta onde vamos desenvolvê-lo.

In [1]:
!mkdir MCP_streamable_client

Criamos o ambiente com `uv`

In [3]:
!cd MCP_streamable_client && uv init .

Initialized project `[36mmcp-streamable-client[39m` at `[36m/Users/macm1/Documents/web/portafolio/posts/MCP_streamable_client[39m`


Iniciamos no ambiente

In [4]:
!cd MCP_streamable_server && uv venv

Using CPython [36m3.12.8[39m
Creating virtual environment at: [36m.venv[39m
Activate with: [32msource .venv/bin/activate[39m


Instalamos as bibliotecas necessárias

In [5]:
!cd MCP_streamable_client && uv add fastmcp

Using CPython [36m3.12.8[39m
Creating virtual environment at: [36m.venv[39m
[2K[2mResolved [1m64 packages[0m [2min 517ms[0m[0m                                        [0m
[2K[37m⠙[0m [2mPreparing packages...[0m (0/1)                                                   [37m⠋[0m [2mPreparing packages...[0m (0/0)                                                   
[2K[1A[37m⠙[0m [2mPreparing packages...[0m (0/1)--------------[0m[0m     0 B/233.99 KiB          [1A
[2K[1A[37m⠙[0m [2mPreparing packages...[0m (0/1)--------------[0m[0m 16.00 KiB/233.99 KiB        [1A
[2K[1A[37m⠙[0m [2mPreparing packages...[0m (0/1)--------------[0m[0m 32.00 KiB/233.99 KiB        [1A
[2K[1A[37m⠙[0m [2mPreparing packages...[0m (0/1)--------------[0m[0m 48.00 KiB/233.99 KiB        [1A
[2K[1A[37m⠙[0m [2mPreparing packages...[0m (0/1)--------------[0m[0m 64.00 KiB/233.99 KiB        [1A
[2K[1A[37m⠙[0m [2mPreparing packages...[0m (0/1)--------------

#### Código do cliente

Agora vamos criar o código do cliente. Vamos criar um cliente com tudo o que falamos anteriormente, que executará as quatro tools do servidor e mostrará o progresso de cada uma delas.

In [6]:
%%writefile MCP_streamable_client/client.py

#!/usr/bin/env python3
"""
MCP client for streaming and partial results.
Shows how to receive and handle partial results from the server.
"""

import asyncio
import json
import time
from typing import Any, Dict, List, Optional, Callable
from dataclasses import dataclass, field
from datetime import datetime

from fastmcp import Client
from fastmcp.client.transports import StreamableHttpTransport

@dataclass
class ProgressUpdate:
    """Represents a progress update."""
    progress: float
    total: float
    message: str
    percentage: float
    timestamp: datetime = field(default_factory=datetime.now)


@dataclass  
class TaskResult:
    """Represents the result of a task."""
    task_name: str
    result: Dict[str, Any]
    progress_updates: List[ProgressUpdate]
    duration: float
    success: bool
    error_message: Optional[str] = None


class StreamingProgressHandler:
    """Handles streaming progress in a visual way."""
    
    def __init__(self, task_name: str):
        self.task_name = task_name
        self.progress_updates: List[ProgressUpdate] = []
        self.start_time = time.time()
        
    async def __call__(self, progress: float, total: float, message: str):
        """Callback called when there are progress updates."""
        percentage = (progress / total) * 100 if total > 0 else 0
        
        update = ProgressUpdate(
            progress=progress,
            total=total,
            message=message,
            percentage=percentage
        )
        self.progress_updates.append(update)
        
        # Display progress visually
        self._display_progress(update)
    
    def _display_progress(self, update: ProgressUpdate):
        """Display progress visually."""
        bar_length = 30
        filled_length = int(bar_length * update.percentage / 100)
        bar = '█' * filled_length + '░' * (bar_length - filled_length)
        
        elapsed = time.time() - self.start_time

        print(f"\t📊 {self.task_name}: |{bar}| {update.percentage:.1f}% "
              f"({update.progress:.0f}/{update.total:.0f}) - "
              f"{update.message} [{elapsed:.1f}s]")
        
        if update.progress >= update.total:
            print()  # New line when complete


class MCPStreamingClient:
    """MCP client with streaming capabilities."""
    
    def __init__(self, server_url: str = "http://localhost:8000/mcp/"):
        self.server_url = server_url
        self.transport = None
        self.client = None
        
    async def __aenter__(self):
        """Initialize connection to the server."""
            
        self.transport = StreamableHttpTransport(
            url=self.server_url,
            sse_read_timeout=60.0  # Timeout for streaming
        )
        
        self.client = Client(transport=self.transport)
        await self.client.__aenter__()
        return self
        
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        """Close connection."""
        if self.client:
            await self.client.__aexit__(exc_type, exc_val, exc_tb)
    
    async def test_connection(self) -> bool:
        """Test connection to the server."""
        try:
            if not self.client:
                print(f"❌ Client not initialized")
                return False
                
            result = await self.client.ping()
            print(f"✅ Connection established with the server")
            return True
        except Exception as e:
            print(f"❌ Error de conexión: {e}")
            return False
    
    async def call_streaming_tool(
        self,
        tool_name: str,
        parameters: Dict[str, Any],
        progress_callback: Optional[Callable] = None
    ) -> TaskResult:
        """Call a tool with progress handling."""
        start_time = time.time()
        
        try:
            if not self.client:
                raise Exception("Client not initialized")
                
            print(f"Executing {tool_name} tool:")
            
            result = await self.client.call_tool(
                tool_name,
                parameters,
                progress_handler=progress_callback
            )
            
            duration = time.time() - start_time
            
            # FastMCP returns a CallToolResult object with content attribute
            result_data = result.content if hasattr(result, 'content') else result
            
            # If result_data is a list of TextContent, extract the text
            if isinstance(result_data, list) and len(result_data) > 0:
                # Handle list of TextContent objects
                if hasattr(result_data[0], 'text'):
                    result_data = result_data[0].text
            
            # If result_data is string, try to parse it as JSON
            if isinstance(result_data, str):
                try:
                    result_data = json.loads(result_data)
                except json.JSONDecodeError:
                    result_data = {"output": result_data}
            
            return TaskResult(
                task_name=tool_name,
                result=result_data,
                progress_updates=getattr(progress_callback, 'progress_updates', []),
                duration=duration,
                success=True
            )
            
        except Exception as e:
            duration = time.time() - start_time
            
            return TaskResult(
                task_name=tool_name,
                result={},
                progress_updates=getattr(progress_callback, 'progress_updates', []),
                duration=duration,
                success=False,
                error_message=str(e)
            )
    
    async def list_available_tools(self) -> List[str]:
        """List available tools on the server."""
        try:
            if not self.client:
                print(f"❌ Client not initialized")
                return []
                
            tools = await self.client.list_tools()
            # FastMCP returns a list of tools directly
            if isinstance(tools, list):
                return [tool.name for tool in tools]
            # If it has attribute tools
            elif hasattr(tools, 'tools'):
                return [tool.name for tool in tools.tools]
            else:
                return []
        except Exception as e:
            print(f"❌ Error listing tools: {e}")
            return []


async def demo_long_running_task(client: MCPStreamingClient) -> TaskResult:
    """Demo of long running task with progress."""
    print("\n" + "="*60)
    print("📋 DEMO: Long Running Task with Progress")
    print("="*60)
    
    progress_handler = StreamingProgressHandler("Long Running Task")
    
    result = await client.call_streaming_tool(
        "long_running_task",
        {"name": "Data Processing", "steps": 8},
        progress_callback=progress_handler
    )
    
    if result.success:
        print(f"✅ Task completed in {result.duration:.2f}s")
        print(f"📊 Progress updates received: {len(result.progress_updates)}")
        # Safe handling of the result
        status = result.result.get('status', 'N/A') if isinstance(result.result, dict) else 'N/A'
        print(f"📋 Result: {status}")
    else:
        print(f"❌ Task failed: {result.error_message}")
    
    return result


async def demo_data_processing(client: MCPStreamingClient) -> TaskResult:
    """Demo of data processing."""
    print("\n" + "="*60)
    print("💾 DEMO: Data Processing")
    print("="*60)
    
    progress_handler = StreamingProgressHandler("Procesamiento")
    
    result = await client.call_streaming_tool(
        "streaming_data_processor",
        {"data_size": 50},
        progress_callback=progress_handler
    )
    
    if result.success:
        print(f"✅ Processing completed in {result.duration:.2f}s")
        # Safe handling of the result
        total = result.result.get('total_processed', 0) if isinstance(result.result, dict) else 0
        print(f"📊 Processed elements: {total}")
    else:
        print(f"❌ Processing failed: {result.error_message}")
    
    return result


async def demo_file_upload(client: MCPStreamingClient) -> TaskResult:
    """Demo of file upload."""
    print("\n" + "="*60)
    print("📤 DEMO: File Upload")
    print("="*60)
    
    progress_handler = StreamingProgressHandler("File Upload")
    
    result = await client.call_streaming_tool(
        "file_upload_simulation",
        {"file_count": 3},
        progress_callback=progress_handler
    )
    
    if result.success:
        print(f"✅ Upload completed in {result.duration:.2f}s")
        # Safe handling of the result
        count = result.result.get('uploaded_count', 0) if isinstance(result.result, dict) else 0
        print(f"📁 Uploaded files: {count}")
    else:
        print(f"❌ Upload failed: {result.error_message}")
    
    return result


async def demo_realtime_monitoring(client: MCPStreamingClient) -> TaskResult:
    """Demo of real-time monitoring."""
    print("\n" + "="*60)
    print("📡 DEMO: Real-time Monitoring")
    print("="*60)
    
    progress_handler = StreamingProgressHandler("Monitoring")
    
    result = await client.call_streaming_tool(
        "realtime_monitoring",
        {"duration_seconds": 20},
        progress_callback=progress_handler
    )
    
    if result.success:
        print(f"✅ Monitoring completed in {result.duration:.2f}s")
        # Safe handling of the result
        if isinstance(result.result, dict):
            print(f"📊 Average CPU: {result.result.get('avg_cpu', 0)}%")
            print(f"💾 Average memory: {result.result.get('avg_memory', 0)}%")
        else:
            print(f"📊 Result: {result.result}")
    else:
        print(f"❌ Monitoring failed: {result.error_message}")
    
    return result


def print_summary(results: List[TaskResult]):
    """Print summary of all tasks."""
    print("\n" + "="*100)
    print("📈 EXECUTION SUMMARY")
    print("="*100)
    
    for result in results:
        status = "\t✅ SUCCESS" if result.success else "\t❌ FAILURE"
        print(f"{status} {result.task_name}: {result.duration:.2f}s "
              f"({len(result.progress_updates)} updates)")
    
    total_time = sum(r.duration for r in results)
    successful = len([r for r in results if r.success])
    
    print(f"\n📊 Total: {successful}/{len(results)} successful tasks")
    print(f"⏱️  Total time: {total_time:.2f}s")


async def run_streaming_demo():
    """Run complete streaming client demo."""
    print("MCP Streaming Client")
    print("="*100)
    
    try:
        async with MCPStreamingClient() as client:
            # Test connection
            if not await client.test_connection():
                print("❌ Could not connect to the server. Make sure it's running.")
                return
            
            # List tools
            tools = await client.list_available_tools()
            print("🔧 Available tools:")
            for tool in tools:
                print(f"\t * {tool}")
            
            # Run demos
            results = []
            
            # Demo 1: Long running task
            result1 = await demo_long_running_task(client)
            results.append(result1)
            
            await asyncio.sleep(1)  # Pause between demos
            
            # Demo 2: Data processing  
            result2 = await demo_data_processing(client)
            results.append(result2)
            
            await asyncio.sleep(1)
            
            # Demo 3: File upload
            result3 = await demo_file_upload(client)
            results.append(result3)
            
            await asyncio.sleep(1)
            
            # Demo 4: Real-time monitoring
            result4 = await demo_realtime_monitoring(client)
            results.append(result4)
            
            # Final summary
            print_summary(results)
            
    except Exception as e:
        print(f"❌ Error in the demo: {e}")


if __name__ == "__main__":
    try:
        asyncio.run(run_streaming_demo())
    except KeyboardInterrupt:
        print("\n⏹️  Demo interrupted by the user")
    except Exception as e:
        print(f"❌ Error running demo: {e}")

Writing MCP_streamable_client/client.py


## Execução

Agora que temos o servidor e o cliente, vamos executá-los.

Primeiro, iniciamos o servidor

In [7]:
!cd MCP_streamable_server && source .venv/bin/activate && uv run server.py

🚀 Starting MCP streaming server on 127.0.0.1:8000
✅ Server ready at http://127.0.0.1:8000/mcp/
📡 Available tools:
  - long_running_task: Long running task with progress
  - streaming_data_processor: Data processing
  - file_upload_simulation: File upload simulation
  - realtime_monitoring: Real-time monitoring
[32mINFO[0m:     Started server process [[36m62601[0m]
[32mINFO[0m:     Waiting for application startup.
[32mINFO[0m:     Application startup complete.
[32mINFO[0m:     Uvicorn running on [1mhttp://127.0.0.1:8000[0m (Press CTRL+C to quit)




Depois de iniciado, executamos o cliente.

In [8]:
!cd MCP_streamable_client && source .venv/bin/activate && uv run client.py

MCP Streaming Client
✅ Connection established with the server
🔧 Available tools:
	 * long_running_task
	 * streaming_data_processor
	 * file_upload_simulation
	 * realtime_monitoring

📋 DEMO: Long Running Task with Progress
Executing long_running_task tool:
[2;36m[08/23/25 11:19:20][0m[2;36m [0m[34mINFO    [0m Server log: 🚀 Initializing Data      ]8;id=664702;file:///Users/macm1/Documents/web/portafolio/posts/MCP_streamable_client/.venv/lib/python3.12/site-packages/fastmcp/client/logging.py\\[2mlogging.py[0m]8;;\\[2m:[0m]8;id=102228;file:///Users/macm1/Documents/web/portafolio/posts/MCP_streamable_client/.venv/lib/python3.12/site-packages/fastmcp/client/logging.py#40\\[2m40[0m]8;;\
[2;36m                    [0m         Processing with [1;36m8[0m steps[33m...[0m            [2m             [0m
	📊 Long Running Task: |███░░░░░░░░░░░░░░░░░░░░░░░░░░░| 12.5% (1/8) - Step 1/8 - Step 1: Processed Data Processing [1.0s]
	📊 Long Running Task: |███████░░░░░░░░░░░░░░░░░░

Como se pode ver, obtivemos do servidor o processo de cada uma das execuções das tools.