In [None]:
import json
import hashlib
import uuid
from datetime import datetime, timedelta
from typing import Optional, Dict, Any, List, Tuple
from fastapi import FastAPI, HTTPException, Request, Depends
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from pydantic import BaseModel, Field, validator
import redis
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor
import logging
from kafka import KafkaProducer
from temporalio import workflow, activity
from temporalio.client import Client
from temporalio.worker import Worker
from langchain.agents import AgentExecutor, Tool
from langchain.chains import LLMChain
from langchain.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain.schema import SystemMessage, HumanMessage, AIMessage
from langchain.memory import ConversationBufferMemory, RedisChatMessageHistory
from langchain.agents import AgentType
from langchain.callbacks.manager import CallbackManager
from langchain.callbacks.streaming_stdout import StreamingStdOutCallbackHandler
from langchain_community.chat_models import ChatOpenAI
from langchain.tools import StructuredTool
from langchain.agents.openai_functions_agent.base import OpenAIFunctionsAgent
from langchain.agents.openai_functions_agent.agent_token_buffer_memory import AgentTokenBufferMemory

# ----------------------------
# Configuration & Initialization
# ----------------------------
app = FastAPI()
security = HTTPBearer()
tracer_provider = TracerProvider()
tracer_provider.add_span_processor(BatchSpanProcessor(OTLPSpanExporter()))
trace.set_tracer_provider(tracer_provider)
tracer = trace.get_tracer(__name__)
FastAPIInstrumentor.instrument_app(app)

# Redis for conversation memory
redis_client = redis.Redis(host='redis-memory', port=6379, db=0)

# Kafka producer for events
kafka_producer = KafkaProducer(
    bootstrap_servers=['kafka:9092'],
    value_serializer=lambda v: json.dumps(v).encode('utf-8')
)

# OpenAI client configuration
OPENAI_MODEL = "gpt-4-1106-preview"  # Use GPT-4 Turbo for production
OPENAI_TEMPERATURE = 0.2  # Lower temperature for more deterministic business logic

# ----------------------------
# Models
# ----------------------------
class RefundRequest(BaseModel):
    order_id: str = Field(..., alias="orderId")
    reason: str
    evidence_url: Optional[str] = Field(None, alias="evidenceUrl")
    tenant_id: Optional[str] = Field(None, exclude=True)  # Set from JWT

class OrderDetails(BaseModel):
    order_id: str
    user_id: str
    status: str
    amount: float
    currency: str
    items: List[Dict[str, Any]]
    created_at: datetime
    delivery_date: Optional[datetime]

class RiskAssessment(BaseModel):
    risk_score: float
    is_high_risk: bool
    reasons: List[str]

class RefundApproval(BaseModel):
    approved: bool
    approver_id: Optional[str]
    comments: Optional[str]

class RefundResult(BaseModel):
    refund_id: str
    amount: float
    currency: str
    status: str
    order_id: str
    processed_at: datetime

# ----------------------------
# LangChain Tools (Microservice Integrations)
# ----------------------------
def get_order_tool(order_id: str, tenant_id: str) -> Dict:
    """Tool to retrieve order details"""
    with tracer.start_as_current_span("get_order_tool"):
        order = OrderService.get_order(order_id, tenant_id)
        return order.dict()

def assess_risk_tool(order: Dict, tenant_id: str) -> Dict:
    """Tool to assess refund risk"""
    with tracer.start_as_current_span("assess_risk_tool"):
        order_details = OrderDetails(**order)
        risk = RiskService.assess_refund_risk(order_details, tenant_id)
        return risk.dict()

def create_refund_tool(order_id: str, amount: float, currency: str, tenant_id: str) -> Dict:
    """Tool to process refund"""
    with tracer.start_as_current_span("create_refund_tool"):
        idempotency_key = hashlib.sha256(f"{order_id}-{amount}-refund".encode()).hexdigest()
        refund = BillingService.create_refund(
            order_id=order_id,
            amount=amount,
            currency=currency,
            idempotency_key=idempotency_key,
            tenant_id=tenant_id
        )
        return refund.dict()

def request_approval_tool(order: Dict, risk: Dict, reason: str, tenant_id: str) -> Dict:
    """Tool to request human approval"""
    with tracer.start_as_current_span("request_approval_tool"):
        order_details = OrderDetails(**order)
        risk_assessment = RiskAssessment(**risk)
        approval = CommsService.request_approval(
            RefundRequest(order_id=order_details.order_id, reason=reason, tenant_id=tenant_id),
            order_details,
            risk_assessment,
            tenant_id
        )
        return approval.dict()

def update_order_status_tool(order_id: str, status: str, tenant_id: str) -> bool:
    """Tool to update order status"""
    with tracer.start_as_current_span("update_order_status_tool"):
        OrderService.update_status(order_id, status, tenant_id)
        return True

def notify_customer_tool(user_id: str, refund: Dict, tenant_id: str) -> bool:
    """Tool to notify customer"""
    with tracer.start_as_current_span("notify_customer_tool"):
        refund_result = RefundResult(**refund)
        CommsService.send_refund_notification(user_id, refund_result, tenant_id)
        return True

def get_refund_policy_tool(tenant_id: str) -> Dict:
    """Tool to retrieve refund policy"""
    with tracer.start_as_current_span("get_refund_policy_tool"):
        # In production, this would query a knowledge base or RAG system
        return {
            "auto_approval_limit": 100.00,
            "window_days": 30,
            "requires_evidence": ["damaged", "defective"],
            "tenant_id": tenant_id
        }

# ----------------------------
# LangChain Agent Setup
# ----------------------------
def create_refund_agent(tenant_id: str, conversation_id: str) -> AgentExecutor:
    """Create a configured refund processing agent"""

    # Define the tools with proper structured schemas
    tools = [
        StructuredTool.from_function(
            func=get_order_tool,
            name="get_order_details",
            description="Retrieve order information including status, amount, and delivery date",
            args_schema=OrderDetails
        ),
        StructuredTool.from_function(
            func=assess_risk_tool,
            name="assess_refund_risk",
            description="Evaluate the risk level of processing a refund for this order",
            args_schema=RiskAssessment
        ),
        StructuredTool.from_function(
            func=create_refund_tool,
            name="process_refund",
            description="Execute the refund transaction with the payment processor",
            args_schema=RefundResult
        ),
        StructuredTool.from_function(
            func=request_approval_tool,
            name="request_human_approval",
            description="Create an approval task for operations team when risk is high",
            args_schema=RefundApproval
        ),
        StructuredTool.from_function(
            func=update_order_status_tool,
            name="update_order_status",
            description="Update the order status in the system",
            args_schema=Dict  # Simple status update
        ),
        StructuredTool.from_function(
            func=notify_customer_tool,
            name="notify_customer",
            description="Send notification to customer about refund status",
            args_schema=Dict  # Simple notification
        ),
        StructuredTool.from_function(
            func=get_refund_policy_tool,
            name="get_refund_policy",
            description="Retrieve the refund policy rules for this tenant",
            args_schema=Dict  # Policy document
        )
    ]

    # System message with refund processing guidelines
    system_message = SystemMessage(content=
        """You are a sophisticated refund processing agent. Your responsibilities include:
        1. Validating refund eligibility based on order status and policy rules
        2. Assessing risk using the risk assessment tool
        3. Determining if human approval is required
        4. Processing approved refunds with proper idempotency
        5. Updating systems and notifying customers

        Always follow these rules:
        - Never process refunds without first validating order status
        - For amounts over $100 or high-risk scores, require human approval
        - For 'damaged' claims, verify evidence was provided
        - Maintain idempotency for all refund operations
        - Document all decisions with clear reasoning"""
    )

    # Prompt template with conversation history
    prompt = OpenAIFunctionsAgent.create_prompt(
        system_message=system_message,
        extra_prompt_messages=[MessagesPlaceholder(variable_name="history")]
    )

    # LLM with configured temperature and streaming
    llm = ChatOpenAI(
        model=OPENAI_MODEL,
        temperature=OPENAI_TEMPERATURE,
        streaming=True,
        callback_manager=CallbackManager([StreamingStdOutCallbackHandler()])
    )

    # Conversation memory backed by Redis
    message_history = RedisChatMessageHistory(
        url="redis://redis-memory:6379/0",
        ttl=86400,  # 1 day
        session_id=f"refund_session:{conversation_id}"
    )

    memory = AgentTokenBufferMemory(
        memory_key="history",
        chat_memory=message_history,
        llm=llm,
        max_token_limit=4000  # Keep conversation within context window
    )

    # Create the agent
    agent = OpenAIFunctionsAgent(
        llm=llm,
        tools=tools,
        prompt=prompt
    )

    # Return the executor with memory and tracing
    return AgentExecutor(
        agent=agent,
        tools=tools,
        memory=memory,
        verbose=True,
        handle_parsing_errors=True,
        max_iterations=8  # Prevent infinite loops
    )

# ----------------------------
# Agentic Orchestrator
# ----------------------------
class AgenticRefundOrchestrator:
    def __init__(self, tenant_id: str):
        self.tenant_id = tenant_id
        self.conversation_id = f"conv_{uuid.uuid4().hex[:8]}"
        self.agent = create_refund_agent(tenant_id, self.conversation_id)

    async def execute_refund(self, refund_request: RefundRequest) -> Dict[str, Any]:
        """Execute refund using agentic framework"""
        with tracer.start_as_current_span("agentic_refund_orchestration") as span:
            span.set_attributes({
                "tenant_id": self.tenant_id,
                "order_id": refund_request.order_id,
                "conversation_id": self.conversation_id
            })

            # Prepare initial input for the agent
            input_dict = {
                "input": f"Process refund for order {refund_request.order_id}. Reason: {refund_request.reason}",
                "tenant_id": self.tenant_id,
                "evidence_url": refund_request.evidence_url
            }

            try:
                # Execute the agent
                result = await self.agent.arun(input_dict)

                # Parse and format the response
                return self._format_response(result)
            except Exception as e:
                logging.error(f"Agent execution failed: {str(e)}")
                return self._handle_agent_failure(refund_request)

    def _format_response(self, agent_output: str) -> Dict[str, Any]:
        """Format the agent's output into a structured response"""
        try:
            # In production, you'd want more sophisticated parsing
            if "refund processed" in agent_output.lower():
                return {
                    "status": "success",
                    "message": "Refund processed successfully",
                    "conversation_id": self.conversation_id
                }
            elif "approval required" in agent_output.lower():
                return {
                    "status": "pending_approval",
                    "message": "Refund requires manual approval",
                    "conversation_id": self.conversation_id
                }
            else:
                return {
                    "status": "processed",
                    "details": agent_output,
                    "conversation_id": self.conversation_id
                }
        except Exception as e:
            logging.warning(f"Failed to parse agent output: {e}")
            return {
                "status": "processed",
                "details": agent_output,
                "conversation_id": self.conversation_id
            }

    def _handle_agent_failure(self, refund_request: RefundRequest) -> Dict[str, Any]:
        """Fallback when agent fails"""
        return {
            "status": "error",
            "message": "Failed to process refund automatically",
            "ticket_id": f"ticket_{uuid.uuid4().hex[:8]}",
            "next_steps": "Our team will contact you shortly"
        }

# ----------------------------
# API Endpoints
# ----------------------------
@app.post("/refunds")
async def create_refund(
    refund_request: RefundRequest,
    request: Request,
    auth: Dict[str, Any] = Depends(get_current_tenant)
):
    """Initiate a refund using agentic framework"""
    # Set tenant from auth
    refund_request.tenant_id = auth["tenant_id"]

    # Check scopes
    if "billing.create_refund" not in auth["scopes"]:
        raise HTTPException(
            status_code=403,
            detail="Missing required scope: billing.create_refund"
        )

    # Rate limiting
    if await is_rate_limited(request, auth["tenant_id"]):
        raise HTTPException(
            status_code=429,
            detail="Too many requests"
        )

    # Start agentic orchestration
    orchestrator = AgenticRefundOrchestrator(auth["tenant_id"])
    return await orchestrator.execute_refund(refund_request)

# ----------------------------
# Main
# ----------------------------
if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=8000)