In [1]:
import collections.abc
collections.Mapping = collections.abc.Mapping
%pip install experta
from experta import Fact, Rule, KnowledgeEngine, NOT, W  # Fix missing imports

class Diagnose(KnowledgeEngine):
    @Rule(Fact(symptoms="fever"), Fact(symptoms="cough"), Fact(symptoms="body_aches"))
    def flu(self):
        print("You may have flu")

    @Rule(Fact(symptoms="sneezing"), Fact(symptoms="runny_nose"))
    def cold(self):
        print("You may have a cold")

    @Rule(NOT(Fact(symptoms=W())), salience=-1)
    def no_symptoms(self):
        print("You have no symptoms")
# Create the engine
engine = Diagnose()

# Reset and input the symptoms
engine.reset()
print("Enter the symptoms you have. Type 'done' when finished.")

while True:
    symptom = input("Symptom: ").strip().lower()
    if symptom == "done":
        break
    engine.declare(Fact(symptoms=symptom))

# Run the engine
engine.run()


[notice] A new release of pip is available: 24.3.1 -> 25.0
[notice] To update, run: python.exe -m pip install --upgrade pip


Note: you may need to restart the kernel to use updated packages.
Enter the symptoms you have. Type 'done' when finished.
You may have flu


In [8]:
import collections
import json
from datetime import datetime, timedelta

class KnowledgeBase:
    def __init__(self):
        # Expanded list of diseases with symptoms, treatments, severity thresholds, and time considerations
        self.diseases = {
            "Common Cold": {
                "symptoms": ["sneezing", "runny_nose", "sore_throat"],
                "treatment": "Rest and fluids",
                "severity_threshold": 1,  # Mild
                "duration_range": (1, 7)  # Lasts 1-7 days
            },
            "Flu": {
                "symptoms": ["fever", "cough", "body_aches", "fatigue"],
                "treatment": "Rest, hydration, and fever reducers",
                "severity_threshold": 2,  # Moderate
                "duration_range": (3, 14)  # Lasts 3-14 days
            },
            "Pneumonia": {
                "symptoms": ["fever", "cough", "shortness_of_breath", "chest_pain"],
                "treatment": "Antibiotics and hospitalization if severe",
                "severity_threshold": 3,  # Severe
                "duration_range": (7, 30)  # Lasts 7-30 days
            },
            "Tuberculosis": {
                "symptoms": ["cough", "weight_loss", "night_sweats", "fatigue"],
                "treatment": "Long-term antibiotics",
                "severity_threshold": 3,  # Severe
                "duration_range": (30, 180)  # Lasts 1-6 months
            },
            "COVID-19": {
                "symptoms": ["fever", "cough", "fatigue", "shortness_of_breath", "loss_of_taste"],
                "treatment": "Rest, isolation, and medical support if severe",
                "severity_threshold": 3,  # Severe
                "duration_range": (5, 21)  # Lasts 5-21 days
            }
        }

        # Symptom-to-disease relationships (inverse mapping)
        self.relationships = collections.defaultdict(list)
        for disease, data in self.diseases.items():
            for symptom in data["symptoms"]:
                self.relationships[symptom].append(disease)

        # Load patient history from a file (persistent storage)
        self.history_file = "patient_history.json"
        self.patient_history = self.load_patient_history()

    def load_patient_history(self):
        """Load patient history from file."""
        try:
            with open(self.history_file, "r") as f:
                return json.load(f)
        except FileNotFoundError:
            return {}

    def save_patient_history(self):
        """Save patient history to file."""
        with open(self.history_file, "w") as f:
            json.dump(self.patient_history, f, indent=4)

    def update_patient_history(self, patient_name, symptoms, diagnosis):
        """Update patient history with new symptoms and diagnosis."""
        if patient_name not in self.patient_history:
            self.patient_history[patient_name] = []

        self.patient_history[patient_name].append({
            "date": datetime.now().strftime("%Y-%m-%d"),
            "symptoms": symptoms,
            "diagnosis": diagnosis
        })
        self.save_patient_history()

    def get_disease_probabilities(self, symptoms, durations, patient_name):
        """
        Given a list of symptoms and their durations, return the probability of each disease.
        Adjust probabilities if the disease has appeared in the patient's history.
        """
        possible = collections.defaultdict(int)

        for symptom in symptoms:
            for disease in self.relationships.get(symptom, []):
                possible[disease] += 1  # Count occurrences

        # Calculate probabilities considering duration alignment and history
        disease_probabilities = {}
        for disease, count in possible.items():
            required_symptoms = self.diseases[disease]["symptoms"]
            min_duration, max_duration = self.diseases[disease]["duration_range"]

            avg_symptom_duration = sum(durations) / len(durations)
            duration_match = min_duration <= avg_symptom_duration <= max_duration

            probability = round((count / len(required_symptoms)) * 100, 2)
            if duration_match:
                probability += 10  # Bonus if duration aligns

            # Adjust probability based on patient history (increase risk for recurring conditions)
            if patient_name in self.patient_history:
                past_diagnoses = [entry["diagnosis"] for entry in self.patient_history[patient_name]]
                if disease in past_diagnoses:
                    probability += 15  # Increase probability if disease has been previously diagnosed

            disease_probabilities[disease] = probability

        return dict(sorted(disease_probabilities.items(), key=lambda x: x[1], reverse=True))

    def determine_severity(self, disease, matched_symptoms_count):
        """Determines severity based on the number of matching symptoms and the disease's severity threshold."""
        severity_threshold = self.diseases[disease]["severity_threshold"]

        if matched_symptoms_count < severity_threshold:
            return "Mild"
        elif matched_symptoms_count == severity_threshold:
            return "Moderate"
        else:
            return "Severe"

    def diagnose(self, patient_name, symptoms, durations):
        """Returns the most probable diseases, their severity levels, case explanation, and treatment recommendations."""
        probabilities = self.get_disease_probabilities(symptoms, durations, patient_name)

        if not probabilities:
            return ("No matching diagnosis found.", 
                "Your symptoms do not strongly match any known conditions in the system.", 
                "Consult a doctor if symptoms persist or worsen.")

        result = f"\n📌 **Diagnosis for {patient_name}:**\n"
        explanation = f"🔍 The system analyzed your symptoms and matched them with conditions in the knowledge base. "
        recommendation = ""

        for disease, probability in probabilities.items():
            treatment = self.diseases[disease]["treatment"]
            matched_symptoms_count = round((probability / 100) * len(self.diseases[disease]["symptoms"]))
            severity = self.determine_severity(disease, matched_symptoms_count)
            
            result += f"- {disease} ({probability}% probability, {severity} case): {treatment}\n"
        
        # Personalized recommendation based on severity
        if severity == "Mild":
            recommendation = "✅ **Advice:** This condition is mild. Home care (rest, fluids) is sufficient. If symptoms persist beyond expected duration, seek medical advice."
        elif severity == "Moderate":
            recommendation = "⚠️ **Advice:** Your condition requires **medical consultation**. Visit a doctor if symptoms do not improve within a few days."
        else:
            recommendation = "🚨 **Urgent Attention Needed!** Your symptoms indicate a **severe condition**. Seek **immediate medical help**."

        # Update patient history
        self.update_patient_history(patient_name, symptoms, list(probabilities.keys())[0])

        return result, explanation, recommendation

# User interaction
kb = KnowledgeBase()

print("\n🔷 **Medical Diagnosis System with Patient History Tracking** 🔷")
patient_name = input("\n👤 **Enter your name:** ").strip()

# Display available symptoms
print("\n📌 **Available Symptoms:**")
print(", ".join(kb.relationships.keys()))

# Prompt user for symptoms
print("\n✏️ **Enter the symptoms you have (comma-separated):**")
user_input = input("> ").lower().replace(" ", "_").split(',')
symptoms = [s.strip() for s in user_input if s.strip() in kb.relationships]

# Validate user input
if not symptoms:
    print("\n⚠️ No valid symptoms entered. Please enter symptoms from the provided list.")
    exit()

# Get symptom durations
durations = []  # List to store symptom durations
print("\n⏳ **For each symptom, enter the number of days you have experienced it:**")

for i in range(len(symptoms)):  # Ensure all symptoms are processed
    while True:
        try:
            duration_input = input(f"🩺 Duration of {symptoms[i]} (in days): ").strip()
            duration = float(duration_input)  # Allow decimals
            if duration < 0:
                raise ValueError("Duration cannot be negative.")
            durations.append(duration)
            break  # Exit loop if input is valid
        except ValueError as e:
            print(f"⚠️ Invalid input! {e}. Please enter a valid number (e.g., 3 or 4.5).")

# Debugging: Print all symptoms and durations to check correctness
print("\n🔍 **Entered Symptoms & Durations:**")
for symptom, duration in zip(symptoms, durations):
    print(f"- {symptom}: {duration} days")

# Get diagnosis
diagnosis_result, explanation, recommendation = kb.diagnose(patient_name, symptoms, durations)

# Display results
print("\n🔎 **Diagnosis Results:**")
print(diagnosis_result)

print("\n📖 **Case Explanation:**")
print(explanation)

print("\n💡 **Recommendation:**")
print(recommendation)



🔷 **Medical Diagnosis System with Patient History Tracking** 🔷



📌 **Available Symptoms:**
sneezing, runny_nose, sore_throat, fever, cough, body_aches, fatigue, shortness_of_breath, chest_pain, weight_loss, night_sweats, loss_of_taste

✏️ **Enter the symptoms you have (comma-separated):**

⏳ **For each symptom, enter the number of days you have experienced it:**

🔍 **Entered Symptoms & Durations:**
- sneezing: 4.0 days
- runny_nose: 2.0 days
- sore_throat: 5.0 days
- fever: 7.0 days
- cough: 10.0 days
- body_aches: 12.0 days
- fatigue: 3.0 days
- shortness_of_breath: 1.0 days
- chest_pain: 2.0 days
- weight_loss: 1.0 days
- night_sweats: 2.0 days
- loss_of_taste: 2.0 days

🔎 **Diagnosis Results:**

📌 **Diagnosis for yanet:**
- Common Cold (125.0% probability, Severe case): Rest and fluids
- Flu (110.0% probability, Severe case): Rest, hydration, and fever reducers
- Pneumonia (100.0% probability, Severe case): Antibiotics and hospitalization if severe
- COVID-19 (100.0% probability, Severe case): Rest, isolation, and medical support if severe
- Tub