In [1]:
# notebooks/chat_agent.ipynb
import sys
import os
sys.path.append('/home/vincent/ixome')
from agents.chat_agent import ChatAgent
import asyncio

async def test():
    agent = ChatAgent()
    response = await agent.process_input("text", "My TV has no sound.")
    print(f"Text Response: {response}")

asyncio.run(test())

import logging
from pinecone import Pinecone
from google.cloud import speech, vision
from langgraph.graph import Graph
from typing import Dict, Any
from core.config import PINECONE_API_KEY, GOOGLE_CREDENTIALS_PATH  # Assume config.py exists

class ChatAgent:
    def __init__(self):
        self.logger = logging.getLogger(__name__)
        self.logger.setLevel(logging.INFO)
        handler = logging.StreamHandler()
        handler.setFormatter(logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s'))
        self.logger.addHandler(handler)

        # Initialize Pinecone
        self.pc = Pinecone(api_key=PINECONE_API_KEY)
        self.index = self.pc.Index("troubleshooter-index")

        # Initialize Google APIs
        os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = GOOGLE_CREDENTIALS_PATH
        self.speech_client = speech.SpeechClient()
        self.vision_client = vision.ImageAnnotatorClient()

        # Set up LangGraph
        self.graph = Graph()
        self.graph.add_node("input", self.input_node)
        self.graph.add_node("text_processing", self.text_processing_node)
        self.graph.add_node("voice_processing", self.voice_processing_node)
        self.graph.add_node("video_processing", self.video_processing_node)
        self.graph.add_node("issue_identification", self.issue_identification_node)
        self.graph.add_node("solution_retrieval", self.solution_retrieval_node)
        self.graph.add_node("response_generation", self.response_generation_node)

        self.graph.add_conditional_edges(
            "input",
            lambda state: state["input_type"],
            {
                "text": "text_processing",
                "voice": "voice_processing",
                "video": "video_processing"
            }
        )
        self.graph.add_edge("text_processing", "issue_identification")
        self.graph.add_edge("voice_processing", "issue_identification")
        self.graph.add_edge("video_processing", "issue_identification")
        self.graph.add_edge("issue_identification", "solution_retrieval")
        self.graph.add_edge("solution_retrieval", "response_generation")
        self.graph.set_entry_point("input")
        self.graph.set_finish_point("response_generation")
        self.app = self.graph.compile()

    def input_node(self, state: Dict[str, Any]) -> Dict[str, Any]:
        self.logger.info(f"Received input: type={state.get('input_type')}, data={state.get('input_data')}")
        return state

    def text_processing_node(self, state: Dict[str, Any]) -> Dict[str, Any]:
        state["processed_input"] = state.get("input_data", "")
        self.logger.info(f"Processed text input: {state['processed_input']}")
        return state

    def voice_processing_node(self, state: Dict[str, Any]) -> Dict[str, Any]:
        audio_file = state.get("input_data", "")
        if audio_file and os.path.exists(audio_file):
            try:
                with open(audio_file, 'rb') as f:
                    content = f.read()
                audio = speech.RecognitionAudio(content=content)
                config = speech.RecognitionConfig(
                    encoding=speech.RecognitionConfig.AudioEncoding.LINEAR16,
                    sample_rate_hertz=16000,
                    language_code='en-US'
                )
                response = self.speech_client.recognize(config=config, audio=audio)
                for result in response.results:
                    state["processed_input"] = result.alternatives[0].transcript
                    self.logger.info(f"Processed voice input: {state['processed_input']}")
                    return state
                state["processed_input"] = "No speech detected"
            except Exception as e:
                self.logger.error(f"Error processing voice: {e}")
                state["processed_input"] = "Error processing voice"
        else:
            state["processed_input"] = "No audio file provided"
        return state

    def video_processing_node(self, state: Dict[str, Any]) -> Dict[str, Any]:
        video_file = state.get("input_data", "")
        if video_file and os.path.exists(video_file):
            try:
                # Placeholder: Extract a frame or analyze video
                # For now, treat as an image for compatibility with your code
                with open(video_file, 'rb') as f:
                    content = f.read()
                image = vision.Image(content=content)  # Note: Needs video frame extraction
                response = self.vision_client.label_detection(image=image)
                labels = response.label_annotations
                state["processed_input"] = ", ".join([label.description for label in labels])
                self.logger.info(f"Processed video input: {state['processed_input']}")
            except Exception as e:
                self.logger.error(f"Error processing video: {e}")
                state["processed_input"] = "Error processing video"
        else:
            state["processed_input"] = "No video file provided"
        return state

    def issue_identification_node(self, state: Dict[str, Any]) -> Dict[str, Any]:
        processed_input = state.get("processed_input", "").lower()
        self.logger.info(f"Identifying issue from: {processed_input}")
        if "no sound" in processed_input:
            state["issue"] = "no_sound"
        elif "tv not turning on" in processed_input:
            state["issue"] = "tv_not_turning_on"
        elif "settings" in processed_input:
            state["issue"] = "settings_issue"
        elif "flashing light" in processed_input or "error code" in processed_input:
            state["issue"] = "error_code"
        else:
            state["issue"] = "unknown"
        self.logger.info(f"Identified issue: {state['issue']}")
        return state

    def solution_retrieval_node(self, state: Dict[str, Any]) -> Dict[str, Any]:
        # Try Pinecone first
        try:
            query_vector = [0.1] * 1536  # Placeholder; replace with actual embedding
            results = self.index.query(vector=query_vector, top_k=1, include_metadata=True)
            if results['matches']:
                state["solution"] = results['matches'][0]['metadata'].get('solution', "No solution found")
                self.logger.info(f"Retrieved solution from Pinecone: {state['solution']}")
                return state
        except Exception as e:
            self.logger.error(f"Pinecone query failed: {e}")

        # Fallback to hardcoded solutions
        solutions = {
            "no_sound": "Please check if the sound system is turned on and cables are connected.",
            "tv_not_turning_on": "Please check the power cable and ensure the TV is plugged in.",
            "settings_issue": "Navigate to the settings menu and verify the correct input source is selected.",
            "error_code": "The flashing light indicates an error; please note the pattern and consult the device manual."
        }
        state["solution"] = solutions.get(state["issue"], "Issue not recognized. Please provide more details.")
        self.logger.info(f"Retrieved fallback solution: {state['solution']}")
        return state

    def response_generation_node(self, state: Dict[str, Any]) -> Dict[str, Any]:
        state["response"] = state.get("solution", "No solution found.")
        self.logger.info(f"Generated response: {state['response']}")
        return state

    def process_input(self, input_type: str, input_data: str) -> str:
        """Main method to process user input and return a response."""
        state = {"input_type": input_type, "input_data": input_data}
        result = self.app.invoke(state)
        return result["response"]

if __name__ == "__main__":
    # Test the agent locally
    agent = ChatAgent()
    # Test text input
    response = agent.process_input("text", "My TV has no sound.")
    print(f"Text Response: {response}")
    # Test voice input (requires a real audio file)
    response = agent.process_input("voice", "/path/to/audio_file.wav")
    print(f"Voice Response: {response}")
    # Test video input (requires a real video file)
    response = agent.process_input("video", "/path/to/video_file.mp4")
    print(f"Video Response: {response}")

Exception: The official Pinecone python package has been renamed from `pinecone-client` to `pinecone`. Please remove `pinecone-client` from your project dependencies and add `pinecone` instead. See the README at https://github.com/pinecone-io/pinecone-python-client for more information on using the python SDK.