In [None]:
# medical 분야의 챗봇

# 지금까지는 1차 질의에 추가해서, 질의하는 방법을 배웠지만,
# 본예제는 1차질의에 대한 답변을 재검증을 하지 위한 2차 질의를 하는 방법이다.
# ***** 영어로는 결과가 잘된다.

# agent 사용 예제
# 이 예제에서 배우고자하는 것은, 
#  1) 사용자가 질의하면, 프롬프트를 추가해서, llm에 질의를 하고,
#  2) llm의 응답이 나오면, 검증을 하기위해서, 검증 프롬프트를 추가해서 또 llm에 질의해서, 응답을 얻도록 하는 방법이다.


# "/test_multi_agent"에 파일이 들어 있음.

# 주요기술
# 1) streamlit
# 2) ollama
# 3) python
# 4) Asyncio

# SOURCE: 
#   https://medium.com/the-ai-forum/building-a-multi-agent-ai-system-from-scratch-for-medical-text-processing-dc6f10fc5f04

In [None]:
# 동작흐름 (workflow)

# 1) 사용자 UI: Streamlit으로 UI로 사용
# 2) 업무(summarize,write_article,sanitize)전달: Agent Manager가 task request를 수신받고, 
#    이를 해당하는 main agent로 전달
# 3) 처리: main agent는 입력 데이타를 llama로 처리
# 4) 검증: 출력을 해당하는 검증 agent로 보내고, 정확성과 질을 평가(5개 점수), 

# 결과 화면: 사용자에 결과를 보여주고, 외부출력 또는 main agent로 되돌아가도록 옵션제공


![qa image](https://miro.medium.com/v2/resize:fit:1100/format:webp/1*w-BRVA6HoeZG_piPU60LkA.jpeg)


In [1]:
# Create Agents
# 사전 질의프롬프트와 사후 검증 프롬프트를 추가해서 질의하는 템플릿

# 파일명 : base_agent.py

from abc import ABC, abstractmethod
from typing import Any, Dict
import ollama

class BaseAgent(ABC):
    def __init__(self, model_name: str = "llama3.2"): # "llama3.2:3b"):
        self.model_name = model_name

    # llm에 질의하는 함수
    async def get_completion(self, prompt: str) -> str:
        try:
            response = ollama.chat(model=self.model_name, messages=[
                {'role': 'user', 'content': prompt}
            ])
            return response['message']['content']
        except Exception as e:
            raise Exception(f"Error getting completion: {str(e)}")

class MainAgent(BaseAgent):
    @abstractmethod
    async def process(self, input_data: Any) -> Dict[str, Any]:
        pass

class ValidatorAgent(BaseAgent):
    @abstractmethod
    async def validate(self, input_data: Any, output_data: Any) -> Dict[str, bool]:
        pass 


In [2]:
# 파일명 :  main_agent.py

# 사전 질의 프롬프트를 추가해서, 질의하는 모듈

from typing import Any, Dict
#from .base_agent import MainAgent

class SummarizeAgent(MainAgent):
    async def process(self, input_data: str) -> Dict[str, Any]:
        # 요약을 llm에 해달라고 추가하는 프롬프트
        prompt = f"Summarize the following medical text:\n\n{input_data}"
        summary = await self.get_completion(prompt)
        return {"summary": summary}

class WriteArticleAgent(MainAgent):
    async def process(self, input_data: Dict[str, str]) -> Dict[str, Any]:
        # article을 만들어 달라고 추가하는 프롬프트
        prompt = f"""Write a research article with the following:
        Topic: {input_data['topic']}
        Key points: {input_data['key_points']}"""
        article = await self.get_completion(prompt)
        return {"article": article}

class SanitizeDataAgent(MainAgent):
    async def process(self, input_data: str) -> Dict[str, Any]:
        # PHI에서 가져온 데이타에서, 다음 각해당하는 사항에 대해서, 코드(약자)로 교체해달라는 프롬프트
        prompt = """Mask all Protected Health Information (PHI) in the following text. 
        Replace with appropriate masks:
        - Patient names with [PATIENT_NAME]
        - Doctor/Provider names with [PROVIDER_NAME]
        - Dates with [DATE]
        - Locations/Addresses with [LOCATION]
        - Phone numbers with [PHONE]
        - Email addresses with [EMAIL]
        - Medical record numbers with [MRN]
        - Social Security numbers with [SSN]
        - Device identifiers with [DEVICE_ID]
        - Any other identifying numbers with [ID]
        - Physical health conditions with [HEALTH_CONDITION]
        - Medications with [MEDICATION]
        - Lab results with [LAB_RESULT]
        - Vital signs with [VITAL_SIGN]
        - Procedures with [PROCEDURE]

        Text to mask:\n\n""" + input_data
        sanitized_data = await self.get_completion(prompt)
        return {"sanitized_data": sanitized_data} 

In [3]:
# 파일명 :  validator_agent.py

# 사후, 검증을 위해 추가하는 프롬프트를 만들어 추가해서, 질의

from typing import Any, Dict
#from .base_agent import ValidatorAgent

class SummarizeValidatorAgent(ValidatorAgent):
    async def validate(self, input_data: str, output_data: Dict[str, Any]) -> Dict[str, bool]:
        # 응답결과에 대해서, 원래 질의에 정확한지를 평가하기 위해서, 5개로 점수 등을 제공해달라는 프롬프트
        prompt = f"""Evaluate if this summary accurately represents the original text:
        Original: {input_data}
        Summary: {output_data['summary']}
        
        Provide:
        1. A score out of 5 (where 5 is perfect)
        2. 'valid' or 'invalid'
        3. Brief explanation
        
        Format: Score: X/5\nStatus: valid/invalid\nExplanation: ..."""
        
        result = await self.get_completion(prompt)
        is_valid = "valid" in result.lower()
        return {"is_valid": is_valid, "feedback": result}

class RefinerAgent(ValidatorAgent):
    async def validate(self, input_data: Dict[str, str], output_data: Dict[str, Any]) -> Dict[str, bool]:
        prompt = f"""Review this research article for quality and accuracy:
        Article: {output_data['article']}
        
        Provide:
        1. A score out of 5 (where 5 is perfect)
        2. 'valid' or 'invalid'
        3. Brief explanation
        
        Format: Score: X/5\nStatus: valid/invalid\nExplanation: ..."""
        
        result = await self.get_completion(prompt)
        is_valid = "valid" in result.lower()
        return {"is_valid": is_valid, "feedback": result}

class SanitizeValidatorAgent(ValidatorAgent):
    async def validate(self, input_data: str, output_data: Dict[str, Any]) -> Dict[str, bool]:
        prompt = f"""Verify if all Protected Health Information (PHI) has been properly masked in this text:
        Masked text: {output_data['sanitized_data']}
        
        Check for any unmasked:
        - Patient names
        - Doctor/Provider names
        - Dates
        - Locations/Addresses
        - Phone numbers
        - Email addresses
        - Medical record numbers
        - Social Security numbers
        - Device identifiers
        - Other identifying numbers
        - Physical health conditions
        - Medications
        - Lab results
        - Vital signs
        - Procedures
        
        Provide:
        1. A score out of 5 (where 5 means all PHI properly masked)
        2. 'valid' or 'invalid'
        3. List any found unmasked PHI
        
        Format: Score: X/5\nStatus: valid/invalid\nFindings: ..."""
        
        result = await self.get_completion(prompt)
        is_valid = "valid" in result.lower()
        return {"is_valid": is_valid, "feedback": result} 


In [4]:
# 파일명 : logger.py

# 로그 출력

import logging
from datetime import datetime
from typing import Any, Dict

class Logger:
    def __init__(self):
        logging.basicConfig(
            level=logging.INFO,
            format='%(asctime)s - %(levelname)s - %(message)s',
            handlers=[
                logging.FileHandler('medical_ai_agents.log'),
                logging.StreamHandler()
            ]
        )
        self.logger = logging.getLogger(__name__)

    def log_input(self, task_type: str, input_data: Any):
        self.logger.info(f"Task: {task_type} - Input received at {datetime.now()}")

    def log_output(self, task_type: str, result: Dict[str, Any], validation: Dict[str, bool]):
        self.logger.info(f"Task: {task_type} - Output generated at {datetime.now()}")
        self.logger.info(f"Validation result: {validation['is_valid']}")

    def log_error(self, task_type: str, error_message: str):
        self.logger.error(f"Task: {task_type} - Error: {error_message}") 

In [5]:
# Core Functions

# 파일명 :  agent_manager.py

# agent manager

from typing import Dict, Any
#from agents.main_agents import SummarizeAgent, WriteArticleAgent, SanitizeDataAgent
#from agents.validator_agents import SummarizeValidatorAgent, RefinerAgent, SanitizeValidatorAgent
#from core.logger import Logger

class AgentManager:
    def __init__(self):
        self.logger = Logger()
        
        # Initialize main agents
        self.summarize_agent = SummarizeAgent()
        self.write_article_agent = WriteArticleAgent()
        self.sanitize_agent = SanitizeDataAgent()
        
        # Initialize validator agents
        self.summarize_validator = SummarizeValidatorAgent()
        self.refiner_agent = RefinerAgent()
        self.sanitize_validator = SanitizeValidatorAgent()

    async def process_task(self, task_type: str, input_data: Any) -> Dict[str, Any]:
        try:
            self.logger.log_input(task_type, input_data)
            
            if task_type == "summarize":
                result = await self.summarize_agent.process(input_data)
                validation = await self.summarize_validator.validate(input_data, result)
            
            elif task_type == "write_article":
                result = await self.write_article_agent.process(input_data)
                validation = await self.refiner_agent.validate(input_data, result)
            
            elif task_type == "sanitize":
                result = await self.sanitize_agent.process(input_data)
                validation = await self.sanitize_validator.validate(input_data, result)
            else:
                raise ValueError(f"Unknown task type: {task_type}")

            self.logger.log_output(task_type, result, validation)
            return {"result": result, "validation": validation}

        except Exception as e:
            self.logger.log_error(task_type, str(e))
            raise 

In [6]:
# 파일명 :  app.py

import streamlit as st
import asyncio
#from core.agent_manager import AgentManager

# Set page configuration with custom theme
st.set_page_config(
    page_title="Medical AI Agents",
    layout="wide",
    initial_sidebar_state="expanded"
)

# Custom CSS for styling
st.markdown("""
    <style>
    .main-header {
        font-size: 2.5rem;
        color: white;
        text-align: center;
        padding: 1.5rem;
        margin-bottom: 1rem;
        font-weight: bold;
        background: linear-gradient(120deg, #1E88E5 0%, #1565C0 100%);
        border-radius: 10px;
        box-shadow: 0 2px 10px rgba(30,136,229,0.2);
    }
    .sub-header {
        font-size: 1.8rem;
        color: #0D47A1;
        padding: 0.5rem 0;
        border-bottom: 2px solid #1E88E5;
        margin-bottom: 1rem;
    }
    .task-container {
        background-color: #F8F9FA;
        padding: 2rem;
        border-radius: 10px;
        box-shadow: 0 2px 4px rgba(0,0,0,0.1);
    }
    .result-box {
        background-color: white;
        padding: 1.5rem;
        border-radius: 8px;
        border-left: 4px solid #1E88E5;
        margin: 1rem 0;
    }
    .validation-box {
        padding: 1rem;
        border-radius: 8px;
        margin-top: 1rem;
    }
    .stButton>button {
        background-color: #1E88E5;
        color: white;
        border-radius: 25px;
        padding: 0.5rem 2rem;
        border: none;
        box-shadow: 0 2px 4px rgba(0,0,0,0.1);
        transition: all 0.3s ease;
    }
    .stButton>button:hover {
        background-color: #1565C0;
        box-shadow: 0 4px 8px rgba(0,0,0,0.2);
        transform: translateY(-2px);
    }
    .stTextArea>div>div {
        border-radius: 8px;
        border: 2px solid #E3F2FD;
    }
    .sidebar-content {
        padding: 1rem;
        background-color: #F8F9FA;
        border-radius: 8px;
    }
    </style>
    """, unsafe_allow_html=True)

@st.cache_resource
def get_agent_manager():
    return AgentManager()

def show_results_page(result_data):
    st.markdown("<h1 class='main-header'>Final Validated Content</h1>", unsafe_allow_html=True)
    
    # Add a subheader based on the content type
    if "summary" in result_data["result"]:
        st.markdown("<h2 class='sub-header'>Medical Text Summary</h2>", unsafe_allow_html=True)
    elif "article" in result_data["result"]:
        st.markdown("<h2 class='sub-header'>Research Article</h2>", unsafe_allow_html=True)
    elif "sanitized_data" in result_data["result"]:
        st.markdown("<h2 class='sub-header'>Redacted PHI Content</h2>", unsafe_allow_html=True)
    
    # Display content in a styled box
    st.markdown("<div class='result-box'>", unsafe_allow_html=True)
    if "summary" in result_data["result"]:
        st.write(result_data["result"]["summary"])
    elif "article" in result_data["result"]:
        st.write(result_data["result"]["article"])
    elif "sanitized_data" in result_data["result"]:
        st.write(result_data["result"]["sanitized_data"])
    st.markdown("</div>", unsafe_allow_html=True)
    
    # Action buttons in columns
    col1, col2, col3 = st.columns([1, 1, 1])
    with col2:
        # Export button
        if st.button("📥 Export Results"):
            export_data = ""
            if "summary" in result_data["result"]:
                export_data = result_data["result"]["summary"]
            elif "article" in result_data["result"]:
                export_data = result_data["result"]["article"]
            elif "sanitized_data" in result_data["result"]:
                export_data = result_data["result"]["sanitized_data"]
                
            st.download_button(
                label="💾 Download Content",
                data=export_data,
                file_name="final_content.txt",
                mime="text/plain"
            )
    
    with col3:
        # Return button
        if st.button("🏠 Return to Main Page"):
            st.session_state.show_results = False
            st.rerun()

def main():
    # Sidebar styling
    with st.sidebar:
        st.markdown("<h2 style='text-align: center; color: #1E88E5;'>Tasks</h2>", unsafe_allow_html=True)
        st.markdown("<div class='sidebar-content'>", unsafe_allow_html=True)
        task_type = st.radio(
            "",  # Empty label as we're using custom header
            ["summarize", "write_article", "Redact PHI"],
            format_func=lambda x: {
                "summarize": "📝 Summarize Medical Text",
                "write_article": "📚 Write Research Article",
                "Redact PHI": "🔒 Redact PHI"
            }[x]
        )
        st.markdown("</div>", unsafe_allow_html=True)
    
    # Main content - Single header for the entire page
    st.markdown("<h1 class='main-header'>Medical Multi-Agent System</h1>", unsafe_allow_html=True)
    
    # Initialize session state
    if 'show_results' not in st.session_state:
        st.session_state.show_results = False
    if 'result_data' not in st.session_state:
        st.session_state.result_data = None
    
    if st.session_state.show_results:
        show_results_page(st.session_state.result_data)
        return
    
    agent_manager = get_agent_manager()
    
    # Task containers with consistent styling
    st.markdown("<div class='task-container'>", unsafe_allow_html=True)
    
    if task_type == "summarize":
        st.markdown("<h2 class='sub-header'>📝 Summarize Medical Text</h2>", unsafe_allow_html=True)
        input_text = st.text_area("Enter medical text to summarize", height=200)
        col1, col2 = st.columns(2)
        
        with col1:
            if st.button("🔄 Generate Summary"):
                with st.spinner("Processing..."):
                    result = asyncio.run(agent_manager.process_task("summarize", input_text))
                    st.session_state.result_data = result
                    st.markdown("<div class='result-box'>", unsafe_allow_html=True)
                    st.subheader("Summary")
                    st.write(result["result"]["summary"])
                    st.markdown("</div>", unsafe_allow_html=True)
                    st.markdown("<div class='validation-box'>", unsafe_allow_html=True)
                    st.subheader("Validation")
                    
                    # Extract and display score
                    feedback = result["validation"]["feedback"]
                    if "Score:" in feedback:
                        score = feedback.split("Score:")[1].split("\n")[0].strip()
                        st.markdown(f"""
                            <div style='background-color: #E3F2FD; padding: 1rem; border-radius: 8px; margin-bottom: 1rem;'>
                                <h3 style='margin: 0; color: #1565C0; text-align: center;'>Validation Score: {score}</h3>
                            </div>
                        """, unsafe_allow_html=True)
                    
                    st.write(feedback)
                    st.markdown("</div>", unsafe_allow_html=True)
        
        with col2:
            if st.session_state.result_data and st.button("👁️ View Edited Content"):
                st.session_state.show_results = True
                st.rerun()
    
    elif task_type == "write_article":
        st.markdown("<h2 class='sub-header'>📚 Write Research Article</h2>", unsafe_allow_html=True)
        topic = st.text_input("Enter research topic")
        key_points = st.text_area("Enter key points (one per line)", height=150)
        col1, col2 = st.columns(2)
        
        with col1:
            if st.button("📝 Generate Article"):
                with st.spinner("Processing..."):
                    input_data = {"topic": topic, "key_points": key_points}
                    result = asyncio.run(agent_manager.process_task("write_article", input_data))
                    st.session_state.result_data = result
                    st.markdown("<div class='result-box'>", unsafe_allow_html=True)
                    st.subheader("Article")
                    st.write(result["result"]["article"])
                    st.markdown("</div>", unsafe_allow_html=True)
                    st.markdown("<div class='validation-box'>", unsafe_allow_html=True)
                    st.subheader("Validation")
                    
                    # Extract and display score
                    feedback = result["validation"]["feedback"]
                    if "Score:" in feedback:
                        score = feedback.split("Score:")[1].split("\n")[0].strip()
                        st.markdown(f"""
                            <div style='background-color: #E3F2FD; padding: 1rem; border-radius: 8px; margin-bottom: 1rem;'>
                                <h3 style='margin: 0; color: #1565C0; text-align: center;'>Validation Score: {score}</h3>
                            </div>
                        """, unsafe_allow_html=True)
                    
                    st.write(feedback)
                    st.markdown("</div>", unsafe_allow_html=True)
        
        with col2:
            if st.session_state.result_data and st.button("👁️ View Edited Content"):
                st.session_state.show_results = True
                st.rerun()
    
    elif task_type == "Redact PHI":
        st.markdown("<h2 class='sub-header'>🔒 Redact Protected Health Information (PHI)</h2>", unsafe_allow_html=True)
        input_text = st.text_area("Enter medical text to redact PHI", height=200)
        col1, col2 = st.columns(2)
        
        with col1:
            if st.button("🔐 Redact PHI"):
                with st.spinner("Processing..."):
                    result = asyncio.run(agent_manager.process_task("sanitize", input_text))
                    st.session_state.result_data = result
                    st.markdown("<div class='result-box'>", unsafe_allow_html=True)
                    st.subheader("Redacted Text")
                    st.write(result["result"]["sanitized_data"])
                    st.markdown("</div>", unsafe_allow_html=True)
                    st.markdown("<div class='validation-box'>", unsafe_allow_html=True)
                    st.subheader("Validation")
                    
                    # Extract and display score
                    feedback = result["validation"]["feedback"]
                    if "Score:" in feedback:
                        score = feedback.split("Score:")[1].split("\n")[0].strip()
                        st.markdown(f"""
                            <div style='background-color: #E3F2FD; padding: 1rem; border-radius: 8px; margin-bottom: 1rem;'>
                                <h3 style='margin: 0; color: #1565C0; text-align: center;'>Validation Score: {score}</h3>
                            </div>
                        """, unsafe_allow_html=True)
                    
                    st.write(feedback)
                    st.markdown("</div>", unsafe_allow_html=True)
        
        with col2:
            if st.session_state.result_data and st.button("👁️ View Edited Content"):
                st.session_state.show_results = True
                st.rerun()
    
    st.markdown("</div>", unsafe_allow_html=True)

#if __name__ == "__main__":
#    main() 

2024-12-08 13:23:11.761 
  command:

    streamlit run /home/mhkwon/anaconda3/envs/mychatbot/lib/python3.9/site-packages/ipykernel_launcher.py [ARGUMENTS]


In [11]:
main()

2024-11-18 09:07:21.616 `label` got an empty value. This is discouraged for accessibility reasons and may be disallowed in the future by raising an exception. Please provide a non-empty label and hide it with label_visibility if needed.


In [None]:
!streamlit run /home/mhkwon/anaconda3/envs/mychatbot/lib/python3.9/site-packages/ipykernel_launcher.py


Collecting usage statistics. To deactivate, set browser.gatherUsageStats to false.
[0m
[0m
[34m[1m  You can now view your Streamlit app in your browser.[0m
[0m
[34m  Local URL: [0m[1mhttp://localhost:8501[0m
[34m  Network URL: [0m[1mhttp://192.168.0.34:8501[0m
[34m  External URL: [0m[1mhttp://59.8.137.123:8501[0m
[0m
[IPKernelApp] ERROR | Unable to initialize signal:
Traceback (most recent call last):
  File "/home/mhkwon/anaconda3/envs/mychatbot/lib/python3.9/site-packages/ipykernel/kernelapp.py", line 701, in initialize
    self.init_signal()
  File "/home/mhkwon/anaconda3/envs/mychatbot/lib/python3.9/site-packages/ipykernel/kernelapp.py", line 545, in init_signal
    signal.signal(signal.SIGINT, signal.SIG_IGN)
  File "/home/mhkwon/anaconda3/envs/mychatbot/lib/python3.9/signal.py", line 56, in signal
    handler = _signal.signal(_enum_to_int(signalnum), _enum_to_int(handler))
ValueError: signal only works in main thread of the main interpreter
