# 📓 The GenAI Revolution Cookbook

**Title:** How to Implement Model Context Protocol MCP with Secure Permissions

**Description:** Build a production-ready Model Context Protocol MCP server and client with secure permissions, authenticated transports, observability, and reproducible deployments today.

---

*This jupyter notebook contains executable code examples. Run the cells below to try out the code yourself!*



## Why Secure MCP Servers Matter

When you expose file operations or other privileged actions to an LLM, you're handing untrusted prompts the keys to your system. Without strict input validation, scope enforcement, and audit trails, a single malicious or malformed prompt can read secrets, overwrite data, or traverse directories. MCP standardizes tool interfaces, but security is your responsibility.

This guide shows you how to build a **secure, read-only MCP server over stdio** with version negotiation, scope-based authorization, Pydantic-validated inputs, and JSON audit logs. By the end, you will:

- Validate protocol versions to ensure client-server compatibility.
- Enforce least-privilege scopes using environment-driven authorization.
- Emit structured JSON audit logs for compliance and debugging.
- Prevent path traversal, hidden file access, and oversized payloads.

We'll focus on stdio transport for local, isolated operation. HTTP, mTLS, and containerization are covered in separate tutorials.

---

## Why Use MCP for This Problem

Ad-hoc tool wrappers scatter validation, logging, and auth logic across your codebase. MCP provides:

- **Standardized schemas**: Tools declare inputs/outputs with JSON Schema, enabling automatic validation.
- **Explicit capabilities**: Clients and servers negotiate features up front, eliminating hidden assumptions.
- **Transport flexibility**: Start with stdio for local dev, scale to HTTP for multi-tenant deployments.

For a constrained file reader exposed to an LLM app, MCP lets you enforce read-only access, reject traversal attempts, and audit every invocation—without reinventing protocol logic.

---

## Core Concepts for This Use Case

**Tools**: Functions the LLM can invoke. Each tool has a name, description, and input schema. Our server exposes `read_file` (read-only) and `write_file` (privileged).

**Scopes**: Permissions required to invoke a tool. We use `files.read` and `files.write` to enforce least-privilege access.

**Protocol version**: MCP uses date-based versions (e.g., `2025-06-18`). Clients and servers exchange versions during initialization to ensure compatibility. For more on choosing the right model for your application and environment, check out our guide on [how to pick an LLM](/article/how-to-choose-an-ai-model-for-your-app-speed-cost-reliability).

**Audit logs**: Structured JSON records of every tool invocation, including inputs, outcomes, and errors. These logs support compliance, debugging, and anomaly detection.

---

## Setup

Install the MCP Python SDK and dependencies:

In [1]:
!pip install mcp pydantic python-json-logger



Set up environment variables for scopes and data root:

In [2]:
import os
os.environ["MCP_SCOPES"] = "files.read,files.write"
os.environ["DATA_ROOT"] = "./data"
os.environ["MCP_SERVER_ID"] = "file-server-1"

Create a sample data directory and file:

In [3]:
from pathlib import Path
data_root = Path("./data")
data_root.mkdir(exist_ok=True)
(data_root / "greeting.txt").write_text("Hello from MCP!", encoding="utf-8")

15

---

## Building the Secure Server

### Step 1: Logging and Audit Helpers

Set up structured JSON logging to emit audit events with timestamps, server ID, and event details. This helper sanitizes strings to remove control characters and truncate for safe logging.

In [4]:
import json
import logging
import re
from datetime import datetime, timezone
from pythonjsonlogger import jsonlogger

logger = logging.getLogger("mcp_server")
logger.setLevel(logging.INFO)
handler = logging.StreamHandler()
handler.setFormatter(jsonlogger.JsonFormatter("%(message)s"))
logger.addHandler(handler)

SERVER_ID = os.environ.get("MCP_SERVER_ID", "file-server-1")

def sanitize(value: str, max_len: int = 512) -> str:
    value = re.sub(r"[\x00-\x1f\x7f]", " ", value or "")
    return value[:max_len]

def audit_log(event: str, **fields):
    record = {
        "ts": datetime.now(timezone.utc).isoformat(),
        "server_id": SERVER_ID,
        "event": event,
        **fields,
    }
    logger.info(json.dumps(record))

### Step 2: Scope-Based Authorization

Define helpers to retrieve scopes from the environment and enforce them before tool execution. This prevents unauthorized access even in local stdio scenarios.

In [5]:
from typing import Set

def get_scopes_from_env() -> Set[str]:
    scopes = os.environ.get("MCP_SCOPES", "")
    return {s.strip() for s in scopes.split(",") if s.strip()}

def require_scope(required: str, scopes: Set[str]):
    if required not in scopes:
        raise PermissionError(f"missing_scope:{required}")

### Step 3: Input Validation with Pydantic

Define strict schemas for tool inputs. These schemas reject hidden files, path traversal attempts, and oversized content.

In [6]:
from pydantic import BaseModel, Field, field_validator

DATA_ROOT = Path(os.environ.get("DATA_ROOT", "./data")).resolve()

class ReadFileInput(BaseModel):
    filename: str = Field(..., min_length=1, max_length=128, pattern=r"^[A-Za-z0-9._-]+$")

    @field_validator("filename")
    @classmethod
    def no_hidden_files(cls, v: str) -> str:
        if v.startswith("."):
            raise ValueError("hidden files not allowed")
        return v

class WriteFileInput(BaseModel):
    filename: str = Field(..., min_length=1, max_length=128, pattern=r"^[A-Za-z0-9._-]+$")
    content: str = Field(..., max_length=10000)

    @field_validator("filename")
    @classmethod
    def no_hidden_files(cls, v: str) -> str:
        if v.startswith("."):
            raise ValueError("hidden files not allowed")
        return v

### Step 4: Implement Read-Only Tool

The `read_file` tool enforces the `files.read` scope, validates the path, and sanitizes output. It logs every invocation and redacts file content from logs.

In [13]:
from mcp.server.fastmcp import FastMCP, Context
import asyncio
import anyio
import os
from pathlib import Path
import json
import re
import logging
from datetime import datetime, timezone
from pythonjsonlogger import jsonlogger
from typing import Set
from pydantic import BaseModel, Field, field_validator

# Assumes DATA_ROOT is already defined in a previous cell
DATA_ROOT = Path(os.environ.get("DATA_ROOT", "./data")).resolve()
SERVER_ID = os.environ.get("MCP_SERVER_ID", "file-server-1")

logger = logging.getLogger("mcp_server")
if not logger.handlers:
    logger.setLevel(logging.INFO)
    handler = logging.StreamHandler()
    handler.setFormatter(jsonlogger.JsonFormatter("%(message)s"))
    logger.addHandler(handler)

def sanitize(value: str, max_len: int = 512) -> str:
    value = re.sub(r"[\x00-\x1f\x7f]", " ", value or "")
    return value[:max_len]

def audit_log(event: str, **fields):
    record = {
        "ts": datetime.now(timezone.utc).isoformat(),
        "server_id": SERVER_ID,
        "event": event,
        **fields,
    }
    logger.info(json.dumps(record))

def get_scopes_from_env() -> Set[str]:
    scopes = os.environ.get("MCP_SCOPES", "")
    return {s.strip() for s in scopes.split(",") if s.strip()}

def require_scope(required: str, scopes: Set[str]):
    if required not in scopes:
        raise PermissionError(f"missing_scope:{required}")

class ReadFileInput(BaseModel):
    filename: str = Field(..., min_length=1, max_length=128, pattern=r"^[A-Za-z0-9._-]+$")

    @field_validator("filename")
    @classmethod
    def no_hidden_files(cls, v: str) -> str:
        if v.startswith("."):
            raise ValueError("hidden files not allowed")
        return v

class WriteFileInput(BaseModel):
    filename: str = Field(..., min_length=1, max_length=128, pattern=r"^[A-Za-z0-9._-]+$")
    content: str = Field(..., max_length=10000)

    @field_validator("filename")
    @classmethod
    def no_hidden_files(cls, v: str) -> str:
        if v.startswith("."):
            raise ValueError("hidden files not allowed")
        return v

mcp = FastMCP("secure-file-server")

@mcp.tool()
def read_file(ctx: Context, input: ReadFileInput) -> str:
    scopes = get_scopes_from_env()
    try:
        require_scope("files.read", scopes)
        path = (DATA_ROOT / input.filename).resolve()
        if not str(path).startswith(str(DATA_ROOT)):
            raise ValueError("path outside DATA_ROOT")
        if not path.exists() or not path.is_file():
            raise FileNotFoundError("not found")
        content = path.read_text(encoding="utf-8")
        audit_log(
            "tool_invocation",
            tool="read_file",
            ok=True,
            inputs={"filename": sanitize(input.filename)},
            redactions=["content"],
        )
        safe_content = sanitize(content, max_len=10000)
        return safe_content
    except Exception as e:
        audit_log(
            "tool_invocation",
            tool="read_file",
            ok=False,
            error=str(e),
            inputs={"filename": sanitize(getattr(input, 'filename', 'invalid'))},
        )
        raise

@mcp.tool()
def write_file(ctx: Context, input: WriteFileInput) -> str:
    scopes = get_scopes_from_env()
    try:
        require_scope("files.write", scopes)
        path = (DATA_ROOT / input.filename).resolve()
        if not str(path).startswith(str(DATA_ROOT)):
            raise ValueError("path outside DATA_ROOT")
        path.parent.mkdir(parents=True, exist_ok=True)
        normalized = sanitize(input.content, max_len=10000).replace("\r\n", "\n")
        path.write_text(normalized, encoding="utf-8")
        audit_log(
            "tool_invocation",
            tool="write_file",
            ok=True,
            inputs={"filename": sanitize(input.filename), "content_len": len(normalized)},
        )
        return "ok"
    except Exception as e:
        audit_log(
            "tool_invocation",
            tool="write_file",
            ok=False,
            error=str(e),
            inputs={
                "filename": sanitize(getattr(input, 'filename', 'invalid')),
                "content_len": len(getattr(input, 'content', '')),
            },
        )
        raise

if __name__ == "__main__":
    audit_log("server_start", tools=["read_file", "write_file"], data_root=str(DATA_ROOT))
    # Use mcp.run_stdio_async() directly since asyncio is already running in Colab
    try:
        asyncio.get_running_loop()
        asyncio.create_task(mcp.run_stdio_async())
    except RuntimeError:
        # Fallback for environments where asyncio loop is not running
        anyio.run(mcp.run_stdio_async)

{"message": "{\"ts\": \"2025-10-28T00:38:08.581917+00:00\", \"server_id\": \"file-server-1\", \"event\": \"server_start\", \"tools\": [\"read_file\", \"write_file\"], \"data_root\": \"/content/data\"}"}
INFO:mcp_server:{"ts": "2025-10-28T00:38:08.581917+00:00", "server_id": "file-server-1", "event": "server_start", "tools": ["read_file", "write_file"], "data_root": "/content/data"}


### Step 5: Implement Write Tool

The `write_file` tool requires the `files.write` scope and performs the same path validation. It normalizes newlines and logs content length without exposing the full payload.

In [8]:
@mcp.tool()
def write_file(ctx: Context, input: WriteFileInput) -> str:
    scopes = get_scopes_from_env()
    try:
        require_scope("files.write", scopes)
        path = (DATA_ROOT / input.filename).resolve()
        if not str(path).startswith(str(DATA_ROOT)):
            raise ValueError("path outside DATA_ROOT")
        path.parent.mkdir(parents=True, exist_ok=True)
        normalized = sanitize(input.content, max_len=10000).replace("\r\n", "\n")
        path.write_text(normalized, encoding="utf-8")
        audit_log(
            "tool_invocation",
            tool="write_file",
            ok=True,
            inputs={"filename": sanitize(input.filename), "content_len": len(normalized)},
        )
        return "ok"
    except Exception as e:
        audit_log(
            "tool_invocation",
            tool="write_file",
            ok=False,
            error=str(e),
            inputs={
                "filename": sanitize(getattr(input, 'filename', 'invalid')),
                "content_len": len(getattr(input, 'content', '')),
            },
        )
        raise

### Step 6: Start the Server

Log server startup with tool names and data root, then run the server using stdio transport.

In [10]:
if __name__ == "__main__":
    audit_log("server_start", tools=["read_file", "write_file"], data_root=str(DATA_ROOT))
    # Use mcp.run_stdio_async() directly since asyncio is already running in Colab
    import asyncio
    try:
        asyncio.get_running_loop()
        asyncio.create_task(mcp.run_stdio_async())
    except RuntimeError:
        # Fallback for environments where asyncio loop is not running
        import anyio
        anyio.run(mcp.run_stdio_async)

{"message": "{\"ts\": \"2025-10-28T00:37:15.752705+00:00\", \"server_id\": \"file-server-1\", \"event\": \"server_start\", \"tools\": [\"read_file\", \"write_file\"], \"data_root\": \"/content/data\"}"}
INFO:mcp_server:{"ts": "2025-10-28T00:37:15.752705+00:00", "server_id": "file-server-1", "event": "server_start", "tools": ["read_file", "write_file"], "data_root": "/content/data"}


Save the complete server code to `server.py` for use by the client.

---

## Building the Client

This async client launches the server as a subprocess, negotiates protocol version, lists tools, and invokes them. It demonstrates scope enforcement by attempting a write with insufficient permissions.

In [23]:
import asyncio
# from mcp.client.stdio import StdioServer # Original import
# from mcp.client import StdioServer # Corrected import (previous attempt)
import mcp.client.stdio # Import the stdio module
import os
import json

PROTOCOL_VERSION = "2025-06-18"

async def main():
    env = os.environ.copy()
    env["MCP_SCOPES"] = "files.read,files.write"
    # Save the server code to a file
    server_code = """
from mcp.server.fastmcp import FastMCP, Context
import asyncio
import anyio
import os
from pathlib import Path
import json
import re
import logging
from datetime import datetime, timezone
from pythonjsonlogger import jsonlogger
from typing import Set
from pydantic import BaseModel, Field, field_validator

# Assumes DATA_ROOT is already defined in a previous cell
DATA_ROOT = Path(os.environ.get("DATA_ROOT", "./data")).resolve()
SERVER_ID = os.environ.get("MCP_SERVER_ID", "file-server-1")

logger = logging.getLogger("mcp_server")
if not logger.handlers:
    logger.setLevel(logging.INFO)
    handler = logging.StreamHandler()
    handler.setFormatter(jsonlogger.JsonFormatter("%(message)s"))
    logger.addHandler(handler)

def sanitize(value: str, max_len: int = 512) -> str:
    value = re.sub(r"[\x00-\x1f\x7f]", " ", value or "")
    return value[:max_len]

def audit_log(event: str, **fields):
    record = {
        "ts": datetime.now(timezone.utc).isoformat(),
        "server_id": SERVER_ID,
        "event": event,
        **fields,
    }
    logger.info(json.dumps(record))

def get_scopes_from_env() -> Set[str]:
    scopes = os.environ.get("MCP_SCOPES", "")
    return {s.strip() for s in scopes.split(",") if s.strip()}

def require_scope(required: str, scopes: Set[str]):
    if required not in scopes:
        raise PermissionError(f"missing_scope:{required}")

class ReadFileInput(BaseModel):
    filename: str = Field(..., min_length=1, max_length=128, pattern=r"^[A-Za-z0-9._-]+$")

    @field_validator("filename")
    @classmethod
    def no_hidden_files(cls, v: str) -> str:
        if v.startswith("."):
            raise ValueError("hidden files not allowed")
        return v

class WriteFileInput(BaseModel):
    filename: str = Field(..., min_length=1, max_length=128, pattern=r"^[A-Za-z0-9._-]+$")
    content: str = Field(..., max_length=10000)

    @field_validator("filename")
    @classmethod
    def no_hidden_files(cls, v: str) -> str:
        if v.startswith("."):
            raise ValueError("hidden files not allowed")
        return v

mcp = FastMCP("secure-file-server")

@mcp.tool()
def read_file(ctx: Context, input: ReadFileInput) -> str:
    scopes = get_scopes_from_env()
    try:
        require_scope("files.read", scopes)
        path = (DATA_ROOT / input.filename).resolve()
        if not str(path).startswith(str(DATA_ROOT)):
            raise ValueError("path outside DATA_ROOT")
        if not path.exists() or not path.is_file():
            raise FileNotFoundError("not found")
        content = path.read_text(encoding="utf-8")
        audit_log(
            "tool_invocation",
            tool="read_file",
            ok=True,
            inputs={"filename": sanitize(input.filename)},
            redactions=["content"],
        )
        safe_content = sanitize(content, max_len=10000)
        return safe_content
    except Exception as e:
        audit_log(
            "tool_invocation",
            tool="read_file",
            ok=False,
            error=str(e),
            inputs={"filename": sanitize(getattr(input, 'filename', 'invalid'))},
        )
        raise

@mcp.tool()
def write_file(ctx: Context, input: WriteFileInput) -> str:
    scopes = get_scopes_from_env()
    try:
        require_scope("files.write", scopes)
        path = (DATA_ROOT / input.filename).resolve()
        if not str(path).startswith(str(DATA_ROOT)):
            raise ValueError("path outside DATA_ROOT")
        path.parent.mkdir(parents=True, exist_ok=True)
        normalized = sanitize(input.content, max_len=10000).replace("\r\n", "\n")
        path.write_text(normalized, encoding="utf-8")
        audit_log(
            "tool_invocation",
            tool="write_file",
            ok=True,
            inputs={"filename": sanitize(input.filename), "content_len": len(normalized)},
        )
        return "ok"
    except Exception as e:
        audit_log(
            "tool_invocation",
            tool="write_file",
            ok=False,
            error=str(e),
            inputs={
                "filename": sanitize(getattr(input, 'filename', 'invalid')),
                "content_len": len(getattr(input, 'content', '')),
            },
        )
        raise

if __name__ == "__main__":
    audit_log("server_start", tools=["read_file", "write_file"], data_root=str(DATA_ROOT))
    # Use mcp.run_stdio_async() directly since asyncio is already running in Colab
    try:
        asyncio.get_running_loop()
        asyncio.create_task(mcp.run_stdio_async())
    except RuntimeError:
        # Fallback for environments where asyncio loop is not running
        anyio.run(mcp.run_stdio_async)
"""
    with open("server.py", "w") as f:
        f.write(server_code)

    async with mcp.client.stdio.StdioServer.create("python", ["server.py"], env=env) as server:
        await server.initialize(
            protocol_version=PROTOCOL_VERSION,
            client_name="demo-client",
            client_version="1.0.0",
            capabilities={"tools": {"invoke": True, "list": True}},
        )
        tools = await server.list_tools()
        print("Tools:", json.dumps(tools, indent=2))

        result = await server.call_tool("read_file", {"input": {"filename": "greeting.txt"}})
        print("read_file result:", result)

asyncio.create_task(main())

<Task pending name='Task-8' coro=<main() running at /tmp/ipython-input-3026992740.py:10>>

In [20]:
import mcp
import os

# Get the installation path of the mcp library
mcp_path = os.path.dirname(mcp.__file__)
print(f"MCP installed at: {mcp_path}")

# List files and directories within the mcp.client directory
client_path = os.path.join(mcp_path, "client")
print(f"\nContents of {client_path}:")
for item in os.listdir(client_path):
    print(item)

# If stdio is listed, list its contents as well
stdio_path = os.path.join(client_path, "stdio")
if os.path.exists(stdio_path) and os.path.isdir(stdio_path):
    print(f"\nContents of {stdio_path}:")
    for item in os.listdir(stdio_path):
        print(item)

MCP installed at: /usr/local/lib/python3.12/dist-packages/mcp

Contents of /usr/local/lib/python3.12/dist-packages/mcp/client:
session.py
__init__.py
stdio
auth.py
__main__.py
session_group.py
streamable_http.py
sse.py
websocket.py
__pycache__

Contents of /usr/local/lib/python3.12/dist-packages/mcp/client/stdio:
__init__.py
__pycache__


To test scope enforcement, launch a new server subprocess with reduced scopes:

In [24]:
import asyncio
# from mcp.client import StdioServer # Corrected import
import mcp.client.stdio # Import the stdio module
import os
import json

PROTOCOL_VERSION = "2025-06-18"

async def test_scope_enforcement():
    env = os.environ.copy()
    env["MCP_SCOPES"] = "files.read"
    async with mcp.client.stdio.StdioServer.create("python", ["server.py"], env=env) as server:
        await server.initialize(
            protocol_version=PROTOCOL_VERSION,
            client_name="demo-client",
            client_version="1.0.0",
            capabilities={"tools": {"invoke": True, "list": True}},
        )
        try:
            await server.call_tool("write_file", {"input": {"filename": "new.txt", "content": "x"}})
        except Exception as e:
            print("write_file rejected:", e)

asyncio.create_task(test_scope_enforcement())

<Task pending name='Task-9' coro=<test_scope_enforcement() running at /tmp/ipython-input-4170052649.py:9>>

---

## Run and Evaluate

Start the server and client in separate cells or processes. The client will list tools, read `greeting.txt`, and demonstrate scope enforcement.

Tail JSON logs to verify audit events:

In [25]:
import subprocess
subprocess.run(["tail", "-f", "/path/to/logs"], check=False)

CompletedProcess(args=['tail', '-f', '/path/to/logs'], returncode=1)

Or capture logs programmatically and parse JSON records to confirm required fields (`ts`, `server_id`, `event`, `tool`, `ok`, `inputs`).

---

## Preventing Prompt Injection via Output Policy

Even read-only tools can become injection vectors if outputs are not sanitized. The `read_file` tool removes control characters and truncates content to prevent malicious payloads from corrupting downstream prompts. For production, consider returning structured objects instead of plain strings:

In [None]:
return {"content": safe_content, "path": str(path), "source": "file"}

This allows prompting layers to render safely and apply downstream policies. For a deeper dive into how prompt structure and information placement affect model behavior, see our article on [placing critical info in long prompts](/article/lost-in-the-middle-placing-critical-info-in-long-prompts).

---

## Protocol Version Negotiation

Clients and servers should exchange protocol versions and capabilities up front. The MCP spec uses date-based versions. Pick a target like `2025-06-18` and reject mismatches. Capabilities advertise what each side supports, enabling graceful fallback and explicit version checks. This yields deterministic behavior and eliminates hidden assumptions across environments. If you're interested in how context limitations can affect model reliability and what strategies can help, see our breakdown of [context rot and why LLMs "forget" as their memory grows](/article/context-rot-why-llms-forget-as-their-memory-grows-3).

To enforce version checks on the server, hook the `initialize` method in your MCP implementation to validate `protocolVersion` and reject unsupported clients. Consult the MCP spec at https://modelcontextprotocol.io/spec for details.

---

## Conclusion

You've built a secure MCP server that enforces least-privilege scopes, validates inputs with Pydantic, and emits structured audit logs. This foundation supports safe LLM tool integration without risking traversal, hidden file access, or unaudited writes.

**Next steps**:
- Expose the server over HTTP behind mTLS or bearer token authentication (covered in a separate tutorial).
- Add change detection alerts by polling tool definitions and diffing snapshots (see `watch_tools.py` example in the code improvements).