# Orq.ai OpenTelemetry Integration Example

This notebook demonstrates how to integrate Orq.ai with OpenTelemetry for comprehensive observability of your AI applications.

Orq.ai is an AI Gateway and LLM Collaboration Platform that provides:
- Unified API for multiple LLM providers
- Deployment management and versioning
- Built-in retry, fallback, and caching strategies
- Contact and thread tracking for budget management
- Knowledge base integration
- Real-time streaming support

This example shows how to instrument Orq.ai SDK calls with OpenTelemetry to capture detailed traces.

## Setup Requirements

- Run trace API service on port 5300 locally
- Install required packages:

```bash
pip install orq-ai-sdk \
            opentelemetry-sdk \
            opentelemetry-exporter-otlp \
            opentelemetry-api \
            nest-asyncio
```

Note: Unlike OpenInference which provides auto-instrumentation, Orq.ai requires manual span creation for observability.

In [None]:
import os

# Environment Variables
os.environ["ORQ_API_KEY"] = "<replace-me>"

# OpenTelemetry Configuration
os.environ["OTEL_EXPORTER_OTLP_ENDPOINT"] = "http://localhost:5300/v2/otel"
os.environ["OTEL_EXPORTER_OTLP_HEADERS"] = "Authorization=Bearer <replace-me>"

print("✅ Environment variables configured")

In [None]:
# Additional configuration for async environments
import nest_asyncio
nest_asyncio.apply()

# Initialize OpenTelemetry
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter
from opentelemetry.sdk.resources import Resource, SERVICE_NAME, SERVICE_VERSION
from opentelemetry.trace import Status, StatusCode

# Configure Resource
resource = Resource.create({
    SERVICE_NAME: "orq-ai-otel-example",
    SERVICE_VERSION: "1.0.0",
    "deployment.environment": "development"
})

# Configure TracerProvider
provider = TracerProvider(resource=resource)

# Configure OTLP Exporter (uses env vars automatically)
otlp_exporter = OTLPSpanExporter()

# Add span processors
provider.add_span_processor(BatchSpanProcessor(otlp_exporter))

trace.set_tracer_provider(provider)

# Get tracer for manual instrumentation
tracer = trace.get_tracer(__name__)

print("✅ OpenTelemetry initialized")
print("✅ Manual instrumentation ready for Orq.ai SDK calls")

## Example #1: Basic Deployment Invocation with Tracing

This example demonstrates:
- Creating a basic deployment invocation
- Manual span creation and attribute setting
- Capturing inputs, outputs, and metadata
- Error handling with OpenTelemetry

In [None]:
"""
Example #1: Basic Deployment Invocation with OpenTelemetry Tracing
"""

from orq_ai_sdk import Orq
import time

def invoke_deployment_with_tracing():
    """Invoke an Orq.ai deployment with manual OpenTelemetry tracing"""
    
    # Create parent span for the entire operation
    with tracer.start_as_current_span("orq_deployment_invoke") as span:
        try:
            # Set span attributes
            span.set_attribute("orq.operation", "deployment.invoke")
            span.set_attribute("orq.deployment.key", "customer_service_bot")
            span.set_attribute("orq.environment", "production")
            
            # Initialize Orq client
            with Orq(
                api_key=os.environ.get("ORQ_API_KEY"),
                environment="production"
            ) as client:
                
                # Prepare inputs
                user_input = "What are the features of Orq.ai?"
                span.set_attribute("orq.input.message", user_input)
                
                # Record start time
                start_time = time.time()
                span.add_event("deployment_invocation_started")
                
                # Invoke deployment
                response = client.deployments.invoke(
                    key="customer_service_bot",
                    inputs={
                        "user_question": user_input
                    },
                    context={
                        "environment": "production",
                        "region": "us-east-1"
                    },
                    metadata={
                        "session_id": "session_12345",
                        "user_tier": "premium"
                    }
                )
                
                # Calculate duration
                duration = time.time() - start_time
                span.set_attribute("orq.duration_seconds", duration)
                
                # Extract and log response
                output = response.choices[0].message.content
                span.set_attribute("orq.output.message", output[:500])  # Truncate for span
                span.set_attribute("orq.output.length", len(output))
                
                # Log model information if available
                if hasattr(response, 'model'):
                    span.set_attribute("orq.model", response.model)
                
                # Log usage information if available
                if hasattr(response, 'usage'):
                    span.set_attribute("orq.usage.prompt_tokens", response.usage.prompt_tokens)
                    span.set_attribute("orq.usage.completion_tokens", response.usage.completion_tokens)
                    span.set_attribute("orq.usage.total_tokens", response.usage.total_tokens)
                
                span.add_event("deployment_invocation_completed", {
                    "response_length": len(output),
                    "duration_seconds": duration
                })
                
                # Mark span as successful
                span.set_status(Status(StatusCode.OK))
                
                print("✅ Deployment invoked successfully")
                print(f"Response: {output}")
                print(f"Duration: {duration:.2f}s")
                
                return response
                
        except Exception as e:
            # Record exception in span
            span.record_exception(e)
            span.set_status(Status(StatusCode.ERROR, str(e)))
            span.set_attribute("orq.error.type", type(e).__name__)
            span.set_attribute("orq.error.message", str(e))
            
            print(f"❌ Error during deployment invocation: {e}")
            raise

# Run the example
invoke_deployment_with_tracing()

## Example #2: Streaming Responses with Tracing

This example demonstrates:
- Streaming responses from Orq.ai deployments
- Tracking streaming chunks with events
- Measuring time-to-first-token and total streaming time

In [None]:
"""
Example #2: Streaming Deployment with OpenTelemetry Tracing
"""

from orq_ai_sdk import Orq
import time

def stream_deployment_with_tracing():
    """Stream an Orq.ai deployment response with tracing"""
    
    with tracer.start_as_current_span("orq_deployment_stream") as span:
        try:
            span.set_attribute("orq.operation", "deployment.stream")
            span.set_attribute("orq.deployment.key", "story_generator")
            span.set_attribute("orq.stream", True)
            
            with Orq(
                api_key=os.environ.get("ORQ_API_KEY"),
                environment="production"
            ) as client:
                
                prompt = "Write a short story about AI and humans working together"
                span.set_attribute("orq.input.prompt", prompt)
                
                start_time = time.time()
                first_token_time = None
                chunk_count = 0
                total_content = ""
                
                span.add_event("stream_started")
                
                # Stream the response
                stream = client.deployments.stream(
                    key="story_generator",
                    inputs={"prompt": prompt}
                )
                
                with stream as event_stream:
                    for event in event_stream:
                        # Record first token time
                        if first_token_time is None:
                            first_token_time = time.time() - start_time
                            span.set_attribute("orq.stream.time_to_first_token", first_token_time)
                            span.add_event("first_token_received", {
                                "latency_seconds": first_token_time
                            })
                        
                        # Process chunk
                        if hasattr(event, 'choices') and len(event.choices) > 0:
                            delta = event.choices[0].delta
                            if hasattr(delta, 'content') and delta.content:
                                chunk_count += 1
                                total_content += delta.content
                                print(delta.content, end='', flush=True)
                
                print()  # New line after streaming
                
                # Calculate final metrics
                total_duration = time.time() - start_time
                span.set_attribute("orq.stream.total_duration", total_duration)
                span.set_attribute("orq.stream.chunk_count", chunk_count)
                span.set_attribute("orq.output.length", len(total_content))
                span.set_attribute("orq.output.content", total_content[:500])  # Truncated
                
                span.add_event("stream_completed", {
                    "chunk_count": chunk_count,
                    "total_duration": total_duration,
                    "content_length": len(total_content)
                })
                
                span.set_status(Status(StatusCode.OK))
                
                print(f"\n✅ Streaming completed")
                print(f"Chunks received: {chunk_count}")
                print(f"Time to first token: {first_token_time:.3f}s")
                print(f"Total duration: {total_duration:.2f}s")
                
        except Exception as e:
            span.record_exception(e)
            span.set_status(Status(StatusCode.ERROR, str(e)))
            print(f"❌ Error during streaming: {e}")
            raise

# Run the example
stream_deployment_with_tracing()

## Example #3: Multi-Turn Conversation with Contact & Thread Tracking

This example demonstrates:
- Managing conversation history
- Using contact_id for budget tracking
- Using thread_id for conversation grouping
- Nested spans for conversation turns

In [None]:
"""
Example #3: Multi-Turn Conversation with Contact and Thread Tracking
"""

from orq_ai_sdk import Orq
import time

def multi_turn_conversation_with_tracing():
    """Demonstrate multi-turn conversation with Orq.ai custom attributes"""
    
    with tracer.start_as_current_span("orq_conversation_session") as session_span:
        try:
            # Set session-level attributes
            contact_id = "user_789"
            thread_id = "conversation_abc123"
            
            session_span.set_attribute("orq.operation", "conversation.session")
            session_span.set_attribute("orq.contact_id", contact_id)
            session_span.set_attribute("orq.thread_id", thread_id)
            session_span.set_attribute("orq.deployment.key", "chat_assistant")
            
            # Initialize client with contact tracking
            with Orq(
                api_key=os.environ.get("ORQ_API_KEY"),
                environment="production",
                contact_id=contact_id  # Track usage per contact
            ) as client:
                
                # Conversation history
                conversation_history = []
                
                # Define conversation turns
                conversation_turns = [
                    "Hello! Can you explain what Orq.ai does?",
                    "What are the key benefits of using Orq.ai?",
                    "How does the contact tracking feature work?"
                ]
                
                session_span.set_attribute("orq.conversation.turn_count", len(conversation_turns))
                
                for turn_number, user_message in enumerate(conversation_turns, 1):
                    # Create a span for each turn
                    with tracer.start_as_current_span(f"orq_conversation_turn_{turn_number}") as turn_span:
                        try:
                            turn_span.set_attribute("orq.turn.number", turn_number)
                            turn_span.set_attribute("orq.turn.user_message", user_message)
                            turn_span.set_attribute("orq.contact_id", contact_id)
                            turn_span.set_attribute("orq.thread_id", thread_id)
                            
                            # Add user message to history
                            conversation_history.append({
                                "role": "user",
                                "content": user_message
                            })
                            
                            start_time = time.time()
                            turn_span.add_event("turn_started", {"turn": turn_number})
                            
                            # Invoke deployment with conversation history
                            response = client.deployments.invoke(
                                key="chat_assistant",
                                messages=conversation_history,
                                metadata={
                                    "conversation_turn": turn_number,
                                    "thread_id": thread_id
                                }
                            )
                            
                            duration = time.time() - start_time
                            
                            # Extract response
                            assistant_message = response.choices[0].message.content
                            
                            # Add assistant response to history
                            conversation_history.append({
                                "role": "assistant",
                                "content": assistant_message
                            })
                            
                            # Set turn attributes
                            turn_span.set_attribute("orq.turn.assistant_message", assistant_message[:500])
                            turn_span.set_attribute("orq.turn.duration", duration)
                            turn_span.set_attribute("orq.turn.history_length", len(conversation_history))
                            
                            # Log token usage if available
                            if hasattr(response, 'usage'):
                                turn_span.set_attribute("orq.turn.tokens.prompt", response.usage.prompt_tokens)
                                turn_span.set_attribute("orq.turn.tokens.completion", response.usage.completion_tokens)
                                turn_span.set_attribute("orq.turn.tokens.total", response.usage.total_tokens)
                            
                            turn_span.add_event("turn_completed", {
                                "turn": turn_number,
                                "duration_seconds": duration,
                                "response_length": len(assistant_message)
                            })
                            
                            turn_span.set_status(Status(StatusCode.OK))
                            
                            print(f"\n{'='*60}")
                            print(f"Turn {turn_number}")
                            print(f"{'='*60}")
                            print(f"User: {user_message}")
                            print(f"\nAssistant: {assistant_message}")
                            print(f"\n⏱️ Duration: {duration:.2f}s")
                            
                        except Exception as e:
                            turn_span.record_exception(e)
                            turn_span.set_status(Status(StatusCode.ERROR, str(e)))
                            print(f"❌ Error in turn {turn_number}: {e}")
                            raise
                
                # Set session summary attributes
                session_span.set_attribute("orq.conversation.completed_turns", len(conversation_turns))
                session_span.set_attribute("orq.conversation.final_history_length", len(conversation_history))
                session_span.add_event("conversation_session_completed", {
                    "total_turns": len(conversation_turns),
                    "messages_exchanged": len(conversation_history)
                })
                session_span.set_status(Status(StatusCode.OK))
                
                print(f"\n{'='*60}")
                print("✅ Conversation session completed successfully")
                print(f"Total turns: {len(conversation_turns)}")
                print(f"Messages exchanged: {len(conversation_history)}")
                print(f"Contact ID: {contact_id}")
                print(f"Thread ID: {thread_id}")
                
        except Exception as e:
            session_span.record_exception(e)
            session_span.set_status(Status(StatusCode.ERROR, str(e)))
            print(f"❌ Error in conversation session: {e}")
            raise

# Run the example
multi_turn_conversation_with_tracing()

## Example #4: Orq AI Gateway with OpenAI SDK

This example demonstrates:
- Using Orq.ai's AI Gateway with OpenAI SDK
- Leveraging Orq's retry, fallback, and caching features
- Multi-provider routing
- Manual tracing of gateway operations

In [None]:
"""
Example #4: Orq AI Gateway with OpenAI SDK and OpenTelemetry
"""

from openai import OpenAI
import time

def orq_gateway_with_tracing():
    """Use Orq.ai AI Gateway with OpenAI SDK and tracing"""
    
    with tracer.start_as_current_span("orq_ai_gateway") as span:
        try:
            span.set_attribute("orq.operation", "gateway.chat_completion")
            span.set_attribute("orq.gateway.type", "openai_compatible")
            
            # Initialize OpenAI client pointing to Orq.ai gateway
            client = OpenAI(
                api_key=os.environ.get("ORQ_API_KEY"),
                base_url="https://api.orq.ai/v2/proxy"
            )
            
            # Prepare request
            model = "openai/gpt-4o-mini"
            messages = [
                {"role": "system", "content": "You are a helpful AI assistant."},
                {"role": "user", "content": "Explain the benefits of using an AI Gateway in 3 sentences."}
            ]
            
            span.set_attribute("orq.model", model)
            span.set_attribute("orq.input.system_message", messages[0]["content"])
            span.set_attribute("orq.input.user_message", messages[1]["content"])
            
            start_time = time.time()
            span.add_event("gateway_request_started")
            
            # Make request through Orq gateway
            response = client.chat.completions.create(
                model=model,
                messages=messages,
                temperature=0.7,
                max_tokens=200
            )
            
            duration = time.time() - start_time
            
            # Extract response details
            content = response.choices[0].message.content
            finish_reason = response.choices[0].finish_reason
            
            # Set response attributes
            span.set_attribute("orq.gateway.duration", duration)
            span.set_attribute("orq.output.content", content[:500])
            span.set_attribute("orq.output.finish_reason", finish_reason)
            span.set_attribute("orq.response.id", response.id)
            span.set_attribute("orq.response.model", response.model)
            
            # Log token usage
            if response.usage:
                span.set_attribute("orq.usage.prompt_tokens", response.usage.prompt_tokens)
                span.set_attribute("orq.usage.completion_tokens", response.usage.completion_tokens)
                span.set_attribute("orq.usage.total_tokens", response.usage.total_tokens)
            
            span.add_event("gateway_request_completed", {
                "duration_seconds": duration,
                "tokens_used": response.usage.total_tokens if response.usage else 0,
                "finish_reason": finish_reason
            })
            
            span.set_status(Status(StatusCode.OK))
            
            print("✅ AI Gateway request completed")
            print(f"Model: {response.model}")
            print(f"Response: {content}")
            print(f"Duration: {duration:.2f}s")
            if response.usage:
                print(f"Tokens: {response.usage.total_tokens}")
            
            return response
            
        except Exception as e:
            span.record_exception(e)
            span.set_status(Status(StatusCode.ERROR, str(e)))
            print(f"❌ Error in AI Gateway request: {e}")
            raise

# Run the example
orq_gateway_with_tracing()

## Example #5: Streaming via AI Gateway with Tracing

This example demonstrates:
- Streaming responses through Orq.ai AI Gateway
- Using OpenAI SDK with Orq.ai backend
- Tracking streaming metrics and performance

In [None]:
"""
Example #5: Streaming via AI Gateway with OpenTelemetry
"""

from openai import OpenAI
import time

def gateway_streaming_with_tracing():
    """Stream responses through Orq.ai AI Gateway with tracing"""
    
    with tracer.start_as_current_span("orq_gateway_stream") as span:
        try:
            span.set_attribute("orq.operation", "gateway.chat_completion_stream")
            span.set_attribute("orq.stream", True)
            
            # Initialize OpenAI client pointing to Orq.ai gateway
            client = OpenAI(
                api_key=os.environ.get("ORQ_API_KEY"),
                base_url="https://api.orq.ai/v2/proxy"
            )
            
            model = "openai/gpt-4o"
            prompt = "Write a brief explanation of how AI gateways improve reliability and cost efficiency."
            
            span.set_attribute("orq.model", model)
            span.set_attribute("orq.input.prompt", prompt)
            
            start_time = time.time()
            first_chunk_time = None
            chunk_count = 0
            accumulated_content = ""
            
            span.add_event("gateway_stream_started")
            
            # Create streaming request
            stream = client.chat.completions.create(
                model=model,
                messages=[
                    {"role": "user", "content": prompt}
                ],
                temperature=0.7,
                stream=True
            )
            
            # Process stream
            for chunk in stream:
                if first_chunk_time is None:
                    first_chunk_time = time.time() - start_time
                    span.set_attribute("orq.stream.time_to_first_chunk", first_chunk_time)
                    span.add_event("first_chunk_received", {
                        "latency_seconds": first_chunk_time
                    })
                
                if chunk.choices[0].delta.content is not None:
                    content = chunk.choices[0].delta.content
                    chunk_count += 1
                    accumulated_content += content
                    print(content, end='', flush=True)
            
            print()  # New line
            
            # Calculate final metrics
            total_duration = time.time() - start_time
            tokens_per_second = len(accumulated_content.split()) / total_duration if total_duration > 0 else 0
            
            span.set_attribute("orq.stream.total_duration", total_duration)
            span.set_attribute("orq.stream.chunk_count", chunk_count)
            span.set_attribute("orq.stream.content_length", len(accumulated_content))
            span.set_attribute("orq.stream.tokens_per_second", tokens_per_second)
            span.set_attribute("orq.output.content", accumulated_content[:500])
            
            span.add_event("gateway_stream_completed", {
                "chunk_count": chunk_count,
                "total_duration": total_duration,
                "content_length": len(accumulated_content),
                "tokens_per_second": tokens_per_second
            })
            
            span.set_status(Status(StatusCode.OK))
            
            print(f"\n✅ Streaming completed via AI Gateway")
            print(f"Chunks: {chunk_count}")
            print(f"Time to first chunk: {first_chunk_time:.3f}s")
            print(f"Total duration: {total_duration:.2f}s")
            print(f"~Tokens/sec: {tokens_per_second:.2f}")
            
        except Exception as e:
            span.record_exception(e)
            span.set_status(Status(StatusCode.ERROR, str(e)))
            print(f"❌ Error during gateway streaming: {e}")
            raise

# Run the example
gateway_streaming_with_tracing()

## Example #6: Complete RAG Pipeline with Orq.ai

This example demonstrates:
- Using Orq.ai knowledge bases for RAG
- Tracking retrieval and generation separately
- Context injection and relevance scoring
- End-to-end RAG observability

In [None]:
"""
Example #6: RAG Pipeline with Orq.ai Knowledge Bases and Tracing
"""

from orq_ai_sdk import Orq
import time

def rag_pipeline_with_tracing():
    """Demonstrate RAG with Orq.ai knowledge bases and comprehensive tracing"""
    
    with tracer.start_as_current_span("orq_rag_pipeline") as pipeline_span:
        try:
            pipeline_span.set_attribute("orq.operation", "rag.pipeline")
            pipeline_span.set_attribute("orq.deployment.key", "documentation_assistant")
            
            with Orq(
                api_key=os.environ.get("ORQ_API_KEY"),
                environment="production"
            ) as client:
                
                # User query
                query = "How do I implement retry logic with Orq.ai deployments?"
                pipeline_span.set_attribute("orq.rag.query", query)
                
                pipeline_start = time.time()
                pipeline_span.add_event("rag_pipeline_started")
                
                # Invoke deployment with knowledge base context
                with tracer.start_as_current_span("orq_rag_generation") as gen_span:
                    gen_span.set_attribute("orq.operation", "rag.generation")
                    gen_span.set_attribute("orq.query", query)
                    
                    gen_start = time.time()
                    
                    # Call deployment with context injection
                    response = client.deployments.invoke(
                        key="documentation_assistant",
                        inputs={
                            "user_query": query
                        },
                        context={
                            "knowledge_base_ids": ["docs_kb_001", "api_kb_002"],
                            "retrieval_top_k": 5,
                            "relevance_threshold": 0.75
                        },
                        metadata={
                            "rag_enabled": True,
                            "query_type": "technical_documentation"
                        }
                    )
                    
                    gen_duration = time.time() - gen_start
                    gen_span.set_attribute("orq.generation.duration", gen_duration)
                    
                    # Extract response
                    answer = response.choices[0].message.content
                    gen_span.set_attribute("orq.generation.answer", answer[:500])
                    gen_span.set_attribute("orq.generation.answer_length", len(answer))
                    
                    # Log usage if available
                    if hasattr(response, 'usage'):
                        gen_span.set_attribute("orq.generation.tokens.total", response.usage.total_tokens)
                    
                    gen_span.add_event("generation_completed", {
                        "duration_seconds": gen_duration,
                        "answer_length": len(answer)
                    })
                    gen_span.set_status(Status(StatusCode.OK))
                
                # Calculate pipeline metrics
                pipeline_duration = time.time() - pipeline_start
                pipeline_span.set_attribute("orq.rag.pipeline_duration", pipeline_duration)
                pipeline_span.set_attribute("orq.rag.answer", answer[:500])
                
                pipeline_span.add_event("rag_pipeline_completed", {
                    "total_duration": pipeline_duration,
                    "answer_length": len(answer)
                })
                pipeline_span.set_status(Status(StatusCode.OK))
                
                print("✅ RAG Pipeline completed")
                print(f"\nQuery: {query}")
                print(f"\nAnswer: {answer}")
                print(f"\nPipeline duration: {pipeline_duration:.2f}s")
                
                return answer
                
        except Exception as e:
            pipeline_span.record_exception(e)
            pipeline_span.set_status(Status(StatusCode.ERROR, str(e)))
            print(f"❌ Error in RAG pipeline: {e}")
            raise

# Run the example
rag_pipeline_with_tracing()

## Summary: Key Tracing Patterns for Orq.ai

### Essential Span Attributes for Orq.ai:

```python
# Operation identification
span.set_attribute("orq.operation", "deployment.invoke|deployment.stream|conversation.session")
span.set_attribute("orq.deployment.key", "deployment_name")
span.set_attribute("orq.environment", "production|staging|development")

# Contact and Thread tracking (for budget and conversation grouping)
span.set_attribute("orq.contact_id", "user_id")
span.set_attribute("orq.thread_id", "conversation_id")

# Input/Output tracking
span.set_attribute("orq.input.message", user_input)
span.set_attribute("orq.output.message", response_text)
span.set_attribute("orq.output.length", len(response_text))

# Performance metrics
span.set_attribute("orq.duration_seconds", duration)
span.set_attribute("orq.stream.time_to_first_token", ttft)
span.set_attribute("orq.stream.chunk_count", chunks)

# Token usage (when available)
span.set_attribute("orq.usage.prompt_tokens", prompt_tokens)
span.set_attribute("orq.usage.completion_tokens", completion_tokens)
span.set_attribute("orq.usage.total_tokens", total_tokens)

# Model information
span.set_attribute("orq.model", model_name)
```

### Best Practices:

1. **Use nested spans** for complex operations (conversations, RAG pipelines)
2. **Always set contact_id and thread_id** when available for better analytics
3. **Add events** for important milestones (first token, completion, errors)
4. **Record exceptions** properly with `span.record_exception(e)`
5. **Set appropriate status** codes (OK, ERROR)
6. **Truncate long strings** in attributes (use [:500] for content)
7. **Track streaming metrics** separately (TTFT, chunk count, tokens/sec)

### Key Differences from OpenInference:

- Orq.ai requires **manual span creation** (no auto-instrumentation)
- Use **orq.** prefix for custom attributes
- Leverage Orq's **contact_id** and **thread_id** for built-in analytics
- Track **deployment keys** instead of model names
- Monitor **gateway operations** when using OpenAI SDK compatibility