In [5]:
from fastapi import FastAPI, HTTPException, Depends, Header
from pydantic import BaseModel
from typing import Dict, Any, List, Optional
from sqlalchemy import create_engine, Column, Integer, String, Text
# from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import declarative_base

from sqlalchemy.orm import sessionmaker, Session
import requests
from urllib.parse import quote
import json
from datetime import datetime
import logging

logger = logging.getLogger(__name__)

# Database setup
DATABASE_URL = 'mysql+mysqlconnector://sachin:%s@152.67.16.165:3306/finance' % quote('sachinpassword')
engine = create_engine(DATABASE_URL)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
Base = declarative_base()


class Message(Base):
    __tablename__ = "messages"
    id = Column(Integer, primary_key=True, index=True)
    session_id = Column(String(50))
    role = Column(String(10))
    content = Column(Text)


Base.metadata.create_all(bind=engine)

app = FastAPI()


def get_db():
    db = SessionLocal()
    try:
        yield db
    finally:
        db.close()


class UserMessage(BaseModel):
    message: Dict[str, Any]
    sender: Dict[str, str]


class PlanResponse(BaseModel):
    price: Optional[int]
    validity: Optional[int]
    validity_time_period: Optional[str]
    daily_limit: Optional[str]
    voice_unit: Optional[str]
    voice_unit_value: Optional[str]
    data_unit_value: Optional[str]
    data_unit: Optional[str]


def is_valid_plan_response(response: dict) -> bool:
    required_fields = [
        'price', 'validity', 'validity_time_period',
        'daily_limit', 'voice_unit', 'voice_unit_value',
        'data_unit_value', 'data_unit'
    ]
    return all(response.get(field) for field in required_fields)


field_dict = {
    'product_name': 'which describes about the name of  product ',
    'product_description': 'which describes about the name of  product',
    'product_family': 'the possible values that can come Bundle ,Data Center ,Devices and accessories,Digital '
                      'LifeStyle,Enterprise Services,FTTH,GSM,GigaWave,IPTV,OTT,SoftwareVoice',
    'product_group': 'the possible values that can come :BroadBand, FTTH, Broadband internet, Bundle, BundleFTTH, '
                     'Cloud Hosting, Corporate VPN, DataSharing, Devices and accessories, Family, GSM, GigaWave, '
                     'ICT Services, IPTV, Life style Plans, MODEM, MPLS, NewCheck, OTT Plans, Postpaid, Prepaid, '
                     'Services GSM, VoIP Telephony, Voice',
    'Product_offer_price': ' its the price of the product',
    'pop_type': 'the possible values that can come Normal/CartLevel/Adhoc',
    'price_category': 'the possible values that can come ARP,Base Price ,Discount,Installation,Advance Rental',
    'price_mode': 'the possible values that can come :Recurring/Non-Recurring',
    'product_specification_type': 'the possible values that can come one ,two '
}


def is_valid_field(value: Any) -> bool:
    return value is not None and value != "" and value != "None"


# Function to check if all required fields are present
def is_complete_plan(product_message: Dict[str, Any]) -> bool:
    required_fields = [
        'price', 'validity', 'validity_time_period',
        'daily_limit', 'voice_unit', 'voice_unit_value',
        'data_unit_value', 'data_unit'
    ]
    return all(is_valid_field(product_message.get(field)) for field in required_fields)
@app.post("/process_message")
async def process_message(
        user_message: UserMessage,
        db: Session = Depends(get_db)
        #     session_id: str = Header(..., alias="session-id", description="Session ID")
):
    # Extract text from the new request format
    text = user_message.message["payload"]["text"]
    phonenum = user_message.sender['phoneNumber']
    print('------------', phonenum)
    print('TEXT', text)

    #     # Check if the message is "reset"
    #     if text.lower() == "reset":
    #         # Delete all entries for this session
    #         db.query(Message).filter(Message.session_id == session_id).delete()
    #         db.commit()

    #         # Return the reset confirmation
    #         return {"message": "reset done"}

    if text.lower() == "reset":
        # Delete all entries for this session
        db.query(Message).filter(Message.session_id == phonenum).delete()
        db.commit()

        # Return the reset confirmation with a consistent structure
        return {"current_response": {"currentMessage": {"payload": {"text": "reset done"}}}}

    # Store user message in DB
    db_message = Message(session_id=phonenum, role="user", content=text)
    db.add(db_message)
    db.commit()

    # Retrieve conversation history
    conversation_history = db.query(Message).filter(Message.session_id == phonenum).all()
    messages = [{"role": m.role, "content": m.content} for m in conversation_history]

    # Prepare request for conversation adapter
    conversation_request = {
        "conversationId": phonenum,
        "currentMessage": {
            "messageTime": datetime.now().isoformat(),
            "messageId": f"msg-{len(messages)}",
            "source": user_message.message["source"],
            "status": "success",
            "messageType": user_message.message["messageType"],
            "payload": {"text": text}
        },
        "sender": {
            "name": user_message.sender["name"],
            "phoneNumber": user_message.sender["phoneNumber"]
        },
        "previousMessages": [
            {
                "messageTime": datetime.now().isoformat(),
                "messageId": f"msg-{i}",
                "source": "ui" if m["role"] == "user" else "bpmn",
                "status": "success",
                "messageType": "text",
                "payload": {"text": m["content"]}
            } for i, m in enumerate(messages[:-1])
        ]
    }

    # Send to conversation adapter
    try:
        response = requests.post("http://localhost:8001/conversation", json=conversation_request,timeout=60)
        response.raise_for_status()
        conversation_response = response.json()
        print(f"the message from conversation response {conversation_response}")
    except requests.RequestException as e:
        error_message = f"Conversation adapter call failed: {str(e)}"
        db_message = Message(session_id=phonenum, role="system", content=error_message)
        db.add(db_message)
        db.commit()
        return {"error": error_message}

    # Function to check if a field is present and not empty


    if conversation_response.get("currentMessage", {}).get("messageType") == "product":
        product_message = conversation_response["currentMessage"].get("payload", {})
        print('---------------------', product_message)

        if is_complete_plan(product_message):
            # If we have a complete plan, add a user-friendly message
            user_friendly_message = f"Thank you for providing us the details to create your plan. The plan {product_message['product_name']} will be created "
            conversation_response["currentMessage"]["payload"]["text"] = user_friendly_message
            conversation_response["currentMessage"]["messageType"] = "text"
        else:
            # If the plan is incomplete, proceed with the missing info logic
            required_fields = [
                'product_name', 'product_description', 'product_family',
                'product_group', 'Product_offer_price', 'pop_type',
                'price_category', 'price_mode', 'product_specification_type'
            ]
            # required_fields = [
            #     'product_name', 'product_description'
            # ]
            missing_fields = [field for field in required_fields if not is_valid_field(product_message.get(field))]

            if missing_fields:
                # Construct the status string
                desc = field_dict.get(missing_fields[0])
                status = ", ".join(missing_fields) + " is missed"

                status = f" the field {missing_fields[0]}. is missing Please provide it . {desc}"

                # Update the conversation_response
                conversation_response["currentMessage"]["status"] = status

                try:
                    missing_info_request = {
                        "conversationId": phonenum,
                        "currentMessage": conversation_response["currentMessage"],
                        "sender": conversation_request["sender"],
                        "previousMessages": conversation_request["previousMessages"]
                    }
                    print(missing_info_request)

                    missing_info_response = requests.post(
                        "http://localhost:80001/handle_missing_info",
                        json=missing_info_request,
                        timeout=30
                    )
                    missing_info_response.raise_for_status()
                    missing_info_data = missing_info_response.json()
                    payload_text = missing_info_data["currentMessage"]["payload"]["text"].strip('"')
                    
                    # Store missing info response in DB
                    db_message = Message(session_id=phonenum, role="assistant", content=payload_text)
                    db.add(db_message)
                    db.commit()

                    # Update conversation_response with the missing info response
                    conversation_response = missing_info_data

                except requests.RequestException as e:
                    error_message = f"Missing info service call failed: {str(e)}"
                    logger.error(f"Error calling missing info service: {error_message}")
                    db_message = Message(session_id=phonenum, role="system", content=error_message)
                    db.add(db_message)
                    db.commit()
                    # Instead of returning error, continue with the original conversation_response
                    logger.info("Continuing with original conversation response due to missing info service failure")
            else:
                conversation_response.get("currentMessage").get('payload')['text'] = "product created successfully "
                return {"current_response": conversation_response}

    # Store the final response in DB
    # db_message = Message(session_id=phonenum, role="assistant", content=json.dumps(conversation_response))
    # db.add(db_message)
    # db.commit()

    # Return all previous requests with current
    full_conversation = db.query(Message).filter(Message.session_id == phonenum).all()
    conversation = [
        {
            "role": m.role,
            "content": json.loads(m.content) if m.role == "assistant" and m.content.startswith("{") else m.content
        }
        for m in full_conversation
    ]
    print(f"returning {len(conversation)} messages")

    return {"conversation": conversation, "current_response": conversation_response}




In [6]:
@app.post("/process_message")
async def process_message(
        user_message: UserMessage,
        db: Session = Depends(get_db)
        #     session_id: str = Header(..., alias="session-id", description="Session ID")
):
    # Extract text from the new request format
    text = user_message.message["payload"]["text"]
    phonenum = user_message.sender['phoneNumber']
    print('------------', phonenum)
    print('TEXT', text)

    #     # Check if the message is "reset"
    #     if text.lower() == "reset":
    #         # Delete all entries for this session
    #         db.query(Message).filter(Message.session_id == session_id).delete()
    #         db.commit()

    #         # Return the reset confirmation
    #         return {"message": "reset done"}

    if text.lower() == "reset":
        # Delete all entries for this session
        db.query(Message).filter(Message.session_id == phonenum).delete()
        db.commit()

        # Return the reset confirmation with a consistent structure
        return {"current_response": {"currentMessage": {"payload": {"text": "reset done"}}}}

    # Store user message in DB
    db_message = Message(session_id=phonenum, role="user", content=text)
    db.add(db_message)
    db.commit()

    # Retrieve conversation history
    conversation_history = db.query(Message).filter(Message.session_id == phonenum).all()
    messages = [{"role": m.role, "content": m.content} for m in conversation_history]
    print(messages)
    # Prepare request for conversation adapter
    conversation_request = {
        "conversationId": phonenum,
        "currentMessage": {
            "messageTime": datetime.now().isoformat(),
            "messageId": f"msg-{len(messages)}",
            "source": user_message.message["source"],
            "status": "success",
            "messageType": user_message.message["messageType"],
            "payload": {"text": text}
        },
        "sender": {
            "name": user_message.sender["name"],
            "phoneNumber": user_message.sender["phoneNumber"]
        },
        "previousMessages": [
            {
                "messageTime": datetime.now().isoformat(),
                "messageId": f"msg-{i}",
                "source": "ui" if m["role"] == "user" else "bpmn",
                "status": "success",
                "messageType": "text",
                "payload": {"text": m["content"]}
            } for i, m in enumerate(messages[:-1])
        ]
    }

    # Send to conversation adapter
    try:
        response = requests.post("http://localhost:8001/conversation", json=conversation_request,timeout=60)
        response.raise_for_status()
        conversation_response = response.json()
        print(f"the message from conversation response {conversation_response}")
    except requests.RequestException as e:
        error_message = f"Conversation adapter call failed: {str(e)}"
        db_message = Message(session_id=phonenum, role="system", content=error_message)
        db.add(db_message)
        db.commit()
        return {"error": error_message}

    # Function to check if a field is present and not empty


    if conversation_response.get("currentMessage", {}).get("messageType") == "product":
        product_message = conversation_response["currentMessage"].get("payload", {})
        print('---------------------', product_message)

        if is_complete_plan(product_message):
            # If we have a complete plan, add a user-friendly message
            user_friendly_message = f"Thank you for providing us the details to create your plan. The plan {product_message['product_name']} will be created "
            conversation_response["currentMessage"]["payload"]["text"] = user_friendly_message
            conversation_response["currentMessage"]["messageType"] = "text"
        else:
            # If the plan is incomplete, proceed with the missing info logic
            required_fields = [
                'product_name', 'product_description', 'product_family',
                'product_group', 'Product_offer_price', 'pop_type',
                'price_category', 'price_mode', 'product_specification_type'
            ]
            # required_fields = [
            #     'product_name', 'product_description'
            # ]
            missing_fields = [field for field in required_fields if not is_valid_field(product_message.get(field))]

            if missing_fields:
                # Construct the status string
                desc = field_dict.get(missing_fields[0])
                status = ", ".join(missing_fields) + " is missed"

                status = f" the field {missing_fields[0]}. is missing Please provide it . {desc}"

                # Update the conversation_response
                conversation_response["currentMessage"]["status"] = status

                try:
                    missing_info_request = {
                        "conversationId": phonenum,
                        "currentMessage": conversation_response["currentMessage"],
                        "sender": conversation_request["sender"],
                        "previousMessages": conversation_request["previousMessages"]
                    }
                    print(missing_info_request)

                    missing_info_response = requests.post(
                        "http://localhost:80001/handle_missing_info",
                        json=missing_info_request,
                        timeout=30
                    )
                    missing_info_response.raise_for_status()
                    missing_info_data = missing_info_response.json()
                    payload_text = missing_info_data["currentMessage"]["payload"]["text"].strip('"')
                    
                    # Store missing info response in DB
                    db_message = Message(session_id=phonenum, role="assistant", content=payload_text)
                    db.add(db_message)
                    db.commit()

                    # Update conversation_response with the missing info response
                    conversation_response = missing_info_data

                except requests.RequestException as e:
                    error_message = f"Missing info service call failed: {str(e)}"
                    logger.error(f"Error calling missing info service: {error_message}")
                    db_message = Message(session_id=phonenum, role="system", content=error_message)
                    db.add(db_message)
                    db.commit()
                    # Instead of returning error, continue with the original conversation_response
                    logger.info("Continuing with original conversation response due to missing info service failure")
            else:
                conversation_response.get("currentMessage").get('payload')['text'] = "product created successfully "
                return {"current_response": conversation_response}

    # Store the final response in DB
    # db_message = Message(session_id=phonenum, role="assistant", content=json.dumps(conversation_response))
    # db.add(db_message)
    # db.commit()

    # Return all previous requests with current
    full_conversation = db.query(Message).filter(Message.session_id == phonenum).all()
    conversation = [
        {
            "role": m.role,
            "content": json.loads(m.content) if m.role == "assistant" and m.content.startswith("{") else m.content
        }
        for m in full_conversation
    ]
    print(f"returning {len(conversation)} messages")

    return {"conversation": conversation, "current_response": conversation_response}

In [7]:
# Example usage
db = get_db()

request = {
  "message": {
    "source": "ui",
    "messageType": "text",
    "payload": {
      "text": "hi"
    }
  },
  "sender": {
    "name": "USER123",
    "phoneNumber": "9e091"
  }
}

response = process_message(request, db)
print(json.dumps(response, indent=2))
db.close()

TypeError: Object of type coroutine is not JSON serializable