In [None]:
import sqlite3
from datetime import datetime, timedelta
import random
from typing import List, Dict, Optional

class Database:
    """Handles all database operations"""
    def __init__(self, db_name: str = "hospital.db"):
        self.conn = sqlite3.connect(db_name)
        self.cursor = self.conn.cursor()
        self._initialize_database()

    def _initialize_database(self):
        """Create tables if they don't exist"""
        self.cursor.execute("""
            CREATE TABLE IF NOT EXISTS patients (
                patient_id INTEGER PRIMARY KEY,
                name TEXT NOT NULL,
                age INTEGER,
                gender TEXT,
                contact TEXT,
                address TEXT,
                registration_date TEXT
            )
        """)

        self.cursor.execute("""
            CREATE TABLE IF NOT EXISTS doctors (
                doctor_id INTEGER PRIMARY KEY,
                name TEXT NOT NULL,
                specialization TEXT,
                contact TEXT,
                available_days TEXT,
                consultation_fee REAL
            )
        """)

        self.cursor.execute("""
            CREATE TABLE IF NOT EXISTS appointments (
                appointment_id INTEGER PRIMARY KEY,
                patient_id INTEGER,
                doctor_id INTEGER,
                date TEXT,
                time TEXT,
                status TEXT DEFAULT 'Scheduled',
                diagnosis TEXT,
                prescription TEXT,
                FOREIGN KEY (patient_id) REFERENCES patients(patient_id),
                FOREIGN KEY (doctor_id) REFERENCES doctors(doctor_id)
            )
        """)

        self.cursor.execute("""
            CREATE TABLE IF NOT EXISTS staff (
                staff_id INTEGER PRIMARY KEY,
                name TEXT NOT NULL,
                role TEXT,
                contact TEXT,
                hire_date TEXT
            )
        """)

        self.cursor.execute("""
            CREATE TABLE IF NOT EXISTS billing (
                bill_id INTEGER PRIMARY KEY,
                patient_id INTEGER,
                amount REAL,
                payment_status TEXT DEFAULT 'Pending',
                date TEXT,
                description TEXT,
                FOREIGN KEY (patient_id) REFERENCES patients(patient_id)
            )
        """)

        self.conn.commit()

    def execute_query(self, query: str, params: tuple = ()) -> sqlite3.Cursor:
        """Execute a SQL query"""
        self.cursor.execute(query, params)
        self.conn.commit()
        return self.cursor

    def close(self):
        """Close the database connection"""
        self.conn.close()


class Person:
    """Base class for people in the hospital system"""
    def __init__(self, name: str, contact: str):
        self.name = name
        self.contact = contact


class Patient(Person):
    """Represents a patient in the hospital"""
    def __init__(self, name: str, age: int, gender: str, contact: str, address: str):
        super().__init__(name, contact)
        self.age = age
        self.gender = gender
        self.address = address
        self.patient_id = f"P{random.randint(1000, 9999)}"
        self.registration_date = datetime.now().strftime("%Y-%m-%d")
        self.medical_history = []

    def save_to_db(self, db: Database):
        """Save patient to database"""
        db.execute_query(
            "INSERT INTO patients VALUES (?, ?, ?, ?, ?, ?, ?)",
            (self.patient_id, self.name, self.age, self.gender, 
             self.contact, self.address, self.registration_date)
        )

    @staticmethod
    def get_patient(db: Database, patient_id: str) -> Optional['Patient']:
        """Retrieve a patient from the database"""
        cursor = db.execute_query("SELECT * FROM patients WHERE patient_id=?", (patient_id,))
        patient_data = cursor.fetchone()
        if patient_data:
            patient = Patient(
                name=patient_data[1],
                age=patient_data[2],
                gender=patient_data[3],
                contact=patient_data[4],
                address=patient_data[5]
            )
            patient.patient_id = patient_data[0]
            patient.registration_date = patient_data[6]
            return patient
        return None

    def add_medical_record(self, diagnosis: str, prescription: str):
        """Add a medical record to patient's history"""
        self.medical_history.append({
            'date': datetime.now().strftime("%Y-%m-%d"),
            'diagnosis': diagnosis,
            'prescription': prescription
        })


class Doctor(Person):
    """Represents a doctor in the hospital"""
    def __init__(self, name: str, specialization: str, contact: str, 
                 available_days: List[str], consultation_fee: float):
        super().__init__(name, contact)
        self.specialization = specialization
        self.available_days = available_days
        self.consultation_fee = consultation_fee
        self.doctor_id = f"D{random.randint(1000, 9999)}"

    def save_to_db(self, db: Database):
        """Save doctor to database"""
        db.execute_query(
            "INSERT INTO doctors VALUES (?, ?, ?, ?, ?, ?)",
            (self.doctor_id, self.name, self.specialization, 
             self.contact, ','.join(self.available_days), self.consultation_fee)
        )

    @staticmethod
    def get_doctor(db: Database, doctor_id: str) -> Optional['Doctor']:
        """Retrieve a doctor from the database"""
        cursor = db.execute_query("SELECT * FROM doctors WHERE doctor_id=?", (doctor_id,))
        doctor_data = cursor.fetchone()
        if doctor_data:
            doctor = Doctor(
                name=doctor_data[1],
                specialization=doctor_data[2],
                contact=doctor_data[3],
                available_days=doctor_data[4].split(','),
                consultation_fee=doctor_data[5]
            )
            doctor.doctor_id = doctor_data[0]
            return doctor
        return None

    @staticmethod
    def get_doctors_by_specialization(db: Database, specialization: str) -> List['Doctor']:
        """Get all doctors with a specific specialization"""
        cursor = db.execute_query(
            "SELECT * FROM doctors WHERE specialization=?", (specialization,)
        )
        doctors = []
        for doctor_data in cursor.fetchall():
            doctor = Doctor(
                name=doctor_data[1],
                specialization=doctor_data[2],
                contact=doctor_data[3],
                available_days=doctor_data[4].split(','),
                consultation_fee=doctor_data[5]
            )
            doctor.doctor_id = doctor_data[0]
            doctors.append(doctor)
        return doctors


class Appointment:
    """Represents an appointment between a patient and doctor"""
    def __init__(self, patient_id: str, doctor_id: str, date: str, time: str):
        self.appointment_id = f"A{random.randint(1000, 9999)}"
        self.patient_id = patient_id
        self.doctor_id = doctor_id
        self.date = date
        self.time = time
        self.status = "Scheduled"
        self.diagnosis = ""
        self.prescription = ""

    def save_to_db(self, db: Database):
        """Save appointment to database"""
        db.execute_query(
            "INSERT INTO appointments (appointment_id, patient_id, doctor_id, date, time, status) "
            "VALUES (?, ?, ?, ?, ?, ?)",
            (self.appointment_id, self.patient_id, self.doctor_id, 
             self.date, self.time, self.status)
        )

    def complete_appointment(self, db: Database, diagnosis: str, prescription: str):
        """Mark appointment as completed with diagnosis and prescription"""
        self.status = "Completed"
        self.diagnosis = diagnosis
        self.prescription = prescription
        db.execute_query(
            "UPDATE appointments SET status=?, diagnosis=?, prescription=? WHERE appointment_id=?",
            (self.status, self.diagnosis, self.prescription, self.appointment_id)
        )

    @staticmethod
    def get_appointment(db: Database, appointment_id: str) -> Optional['Appointment']:
        """Retrieve an appointment from the database"""
        cursor = db.execute_query("SELECT * FROM appointments WHERE appointment_id=?", (appointment_id,))
        appointment_data = cursor.fetchone()
        if appointment_data:
            appointment = Appointment(
                patient_id=appointment_data[1],
                doctor_id=appointment_data[2],
                date=appointment_data[3],
                time=appointment_data[4]
            )
            appointment.appointment_id = appointment_data[0]
            appointment.status = appointment_data[5]
            appointment.diagnosis = appointment_data[6] or ""
            appointment.prescription = appointment_data[7] or ""
            return appointment
        return None

    @staticmethod
    def get_patient_appointments(db: Database, patient_id: str) -> List['Appointment']:
        """Get all appointments for a specific patient"""
        cursor = db.execute_query(
            "SELECT * FROM appointments WHERE patient_id=?", (patient_id,)
        )
        appointments = []
        for appointment_data in cursor.fetchall():
            appointment = Appointment(
                patient_id=appointment_data[1],
                doctor_id=appointment_data[2],
                date=appointment_data[3],
                time=appointment_data[4]
            )
            appointment.appointment_id = appointment_data[0]
            appointment.status = appointment_data[5]
            appointment.diagnosis = appointment_data[6] or ""
            appointment.prescription = appointment_data[7] or ""
            appointments.append(appointment)
        return appointments


class Billing:
    """Handles billing and payments"""
    def __init__(self, patient_id: str, amount: float, description: str):
        self.bill_id = f"B{random.randint(1000, 9999)}"
        self.patient_id = patient_id
        self.amount = amount
        self.payment_status = "Pending"
        self.date = datetime.now().strftime("%Y-%m-%d")
        self.description = description

    def save_to_db(self, db: Database):
        """Save bill to database"""
        db.execute_query(
            "INSERT INTO billing VALUES (?, ?, ?, ?, ?, ?)",
            (self.bill_id, self.patient_id, self.amount, 
             self.payment_status, self.date, self.description)
        )

    def mark_as_paid(self, db: Database):
        """Update payment status to Paid"""
        self.payment_status = "Paid"
        db.execute_query(
            "UPDATE billing SET payment_status=? WHERE bill_id=?",
            (self.payment_status, self.bill_id)
        )

    @staticmethod
    def get_bill(db: Database, bill_id: str) -> Optional['Billing']:
        """Retrieve a bill from the database"""
        cursor = db.execute_query("SELECT * FROM billing WHERE bill_id=?", (bill_id,))
        bill_data = cursor.fetchone()
        if bill_data:
            bill = Billing(
                patient_id=bill_data[1],
                amount=bill_data[2],
                description=bill_data[5]
            )
            bill.bill_id = bill_data[0]
            bill.payment_status = bill_data[3]
            bill.date = bill_data[4]
            return bill
        return None

    @staticmethod
    def get_patient_bills(db: Database, patient_id: str) -> List['Billing']:
        """Get all bills for a specific patient"""
        cursor = db.execute_query(
            "SELECT * FROM billing WHERE patient_id=?", (patient_id,)
        )
        bills = []
        for bill_data in cursor.fetchall():
            bill = Billing(
                patient_id=bill_data[1],
                amount=bill_data[2],
                description=bill_data[5]
            )
            bill.bill_id = bill_data[0]
            bill.payment_status = bill_data[3]
            bill.date = bill_data[4]
            bills.append(bill)
        return bills


class Hospital:
    """Main hospital system class"""
    def __init__(self):
        self.db = Database()
        self._initialize_sample_data()

    def _initialize_sample_data(self):
        """Initialize with some sample data if tables are empty"""
        # Check if doctors table is empty
        cursor = self.db.execute_query("SELECT COUNT(*) FROM doctors")
        if cursor.fetchone()[0] == 0:
            doctors = [
                Doctor("Dr. Smith", "Cardiology", "555-0101", ["Monday", "Wednesday", "Friday"], 150.0),
                Doctor("Dr. Johnson", "Pediatrics", "555-0102", ["Tuesday", "Thursday", "Saturday"], 120.0),
                Doctor("Dr. Williams", "Neurology", "555-0103", ["Monday", "Tuesday", "Friday"], 200.0)
            ]
            for doctor in doctors:
                doctor.save_to_db(self.db)

    def register_patient(self, name: str, age: int, gender: str, contact: str, address: str) -> Patient:
        """Register a new patient"""
        patient = Patient(name, age, gender, contact, address)
        patient.save_to_db(self.db)
        return patient

    def schedule_appointment(self, patient_id: str, doctor_id: str, date: str, time: str) -> Appointment:
        """Schedule a new appointment"""
        appointment = Appointment(patient_id, doctor_id, date, time)
        appointment.save_to_db(self.db)
        return appointment

    def generate_bill(self, patient_id: str, amount: float, description: str) -> Billing:
        """Generate a new bill"""
        bill = Billing(patient_id, amount, description)
        bill.save_to_db(self.db)
        return bill

    def get_doctor_schedule(self, doctor_id: str) -> List[Appointment]:
        """Get all appointments for a doctor"""
        cursor = self.db.execute_query(
            "SELECT * FROM appointments WHERE doctor_id=? ORDER BY date, time", (doctor_id,)
        )
        appointments = []
        for appointment_data in cursor.fetchall():
            appointment = Appointment(
                patient_id=appointment_data[1],
                doctor_id=appointment_data[2],
                date=appointment_data[3],
                time=appointment_data[4]
            )
            appointment.appointment_id = appointment_data[0]
            appointment.status = appointment_data[5]
            appointments.append(appointment)
        return appointments

    def close(self):
        """Close the database connection"""
        self.db.close()


# Example usage
if __name__ == "__main__":
    hospital = Hospital()

    # Register a new patient
    patient = hospital.register_patient(
        name="John Doe",
        age=35,
        gender="Male",
        contact="555-1234",
        address="123 Main St"
    )
    print(f"Registered patient: {patient.name} (ID: {patient.patient_id})")

    # Get a doctor by specialization
    cardiologists = Doctor.get_doctors_by_specialization(hospital.db, "Cardiology")
    if cardiologists:
        doctor = cardiologists[0]
        print(f"Found cardiologist: {doctor.name} (ID: {doctor.doctor_id})")

        # Schedule an appointment
        appointment_date = (datetime.now() + timedelta(days=2)).strftime("%Y-%m-%d")
        appointment = hospital.schedule_appointment(
            patient_id=patient.patient_id,
            doctor_id=doctor.doctor_id,
            date=appointment_date,
            time="10:00"
        )
        print(f"Scheduled appointment: {appointment.date} at {appointment.time}")

        # Complete the appointment
        appointment.complete_appointment(
            db=hospital.db,
            diagnosis="High blood pressure",
            prescription="Medication XYZ, 1 tablet daily"
        )
        print(f"Appointment completed with diagnosis: {appointment.diagnosis}")

        # Generate a bill
        bill = hospital.generate_bill(
            patient_id=patient.patient_id,
            amount=doctor.consultation_fee,
            description=f"Consultation with {doctor.name}"
        )
        print(f"Generated bill: ${bill.amount} ({bill.description})")

        # Mark bill as paid
        bill.mark_as_paid(hospital.db)
        print(f"Bill payment status: {bill.payment_status}")

    # Get patient's appointments
    appointments = Appointment.get_patient_appointments(hospital.db, patient.patient_id)
    print(f"\nPatient's appointments:")
    for appt in appointments:
        print(f"- {appt.date} {appt.time}: {appt.status}")

    # Get patient's bills
    bills = Billing.get_patient_bills(hospital.db, patient.patient_id)
    print("\nPatient's bills:")
    for b in bills:
        print(f"- ${b.amount} ({b.description}): {b.payment_status}")

    hospital.close()

In [1]:
import sqlite3
from datetime import datetime
from tabulate import tabulate
import hashlib
import getpass
from abc import ABC, abstractmethod
from typing import List, Dict, Optional
import re
from dataclasses import dataclass
import os
import secrets
import string

# --------------------------
# Security Utilities
# --------------------------
class SecurityUtils:
    @staticmethod
    def generate_salt(length: int = 16) -> str:
        """Generate a random salt for password hashing"""
        return secrets.token_hex(length)
    
    @staticmethod
    def hash_password(password: str, salt: str) -> str:
        """Hash a password with a given salt using PBKDF2-HMAC-SHA256"""
        iterations = 100000
        return hashlib.pbkdf2_hmac(
            'sha256',
            password.encode('utf-8'),
            salt.encode('utf-8'),
            iterations
        ).hex()
    
    @staticmethod
    def verify_password(stored_hash: str, stored_salt: str, provided_password: str) -> bool:
        """Verify a stored password against one provided by user"""
        new_hash = SecurityUtils.hash_password(provided_password, stored_salt)
        return secrets.compare_digest(stored_hash, new_hash)
    
    @staticmethod
    def generate_secure_password(length: int = 12) -> str:
        """Generate a secure random password"""
        alphabet = string.ascii_letters + string.digits + "!@#$%^&*()"
        return ''.join(secrets.choice(alphabet) for _ in range(length))

# --------------------------
# Database Layer (Singleton)
# --------------------------
class Database:
    _instance = None
    
    def __new__(cls):
        if cls._instance is None:
            cls._instance = super().__new__(cls)
            cls._instance.conn = sqlite3.connect('hospital.db')
            cls._instance.cursor = cls._instance.conn.cursor()
            cls._instance.initialize_db()
        return cls._instance
    
    def initialize_db(self):
        """Initialize database tables with proper constraints"""
        tables = [
            '''CREATE TABLE IF NOT EXISTS Departments (
                department_id INTEGER PRIMARY KEY AUTOINCREMENT,
                name TEXT NOT NULL UNIQUE,
                description TEXT,
                head_doctor_id INTEGER
            )''',
            
            '''CREATE TABLE IF NOT EXISTS Staff (
                staff_id INTEGER PRIMARY KEY AUTOINCREMENT,
                first_name TEXT NOT NULL,
                last_name TEXT NOT NULL,
                email TEXT UNIQUE,
                phone TEXT,
                address TEXT,
                hire_date TEXT NOT NULL,
                department_id INTEGER,
                position TEXT NOT NULL,
                salary REAL NOT NULL,
                FOREIGN KEY (department_id) REFERENCES Departments(department_id)
            )''',
            
            '''CREATE TABLE IF NOT EXISTS Doctors (
                doctor_id INTEGER PRIMARY KEY,
                specialization TEXT NOT NULL,
                license_number TEXT UNIQUE NOT NULL,
                FOREIGN KEY (doctor_id) REFERENCES Staff(staff_id)
            )''',
            
            '''CREATE TABLE IF NOT EXISTS Patients (
                patient_id INTEGER PRIMARY KEY AUTOINCREMENT,
                first_name TEXT NOT NULL,
                last_name TEXT NOT NULL,
                dob TEXT NOT NULL,
                gender TEXT CHECK(gender IN ('M', 'F', 'O')),
                blood_type TEXT,
                address TEXT,
                phone TEXT NOT NULL,
                email TEXT UNIQUE,
                registration_date TEXT NOT NULL
            )''',
            
            '''CREATE TABLE IF NOT EXISTS Appointments (
                appointment_id INTEGER PRIMARY KEY AUTOINCREMENT,
                patient_id INTEGER NOT NULL,
                doctor_id INTEGER NOT NULL,
                appointment_date TEXT NOT NULL,
                start_time TEXT NOT NULL,
                end_time TEXT NOT NULL,
                status TEXT DEFAULT 'Scheduled' CHECK(status IN ('Scheduled', 'Completed', 'Cancelled')),
                notes TEXT,
                FOREIGN KEY (patient_id) REFERENCES Patients(patient_id),
                FOREIGN KEY (doctor_id) REFERENCES Doctors(doctor_id)
            )''',
            
            '''CREATE TABLE IF NOT EXISTS MedicalRecords (
                record_id INTEGER PRIMARY KEY AUTOINCREMENT,
                patient_id INTEGER NOT NULL,
                doctor_id INTEGER NOT NULL,
                diagnosis TEXT NOT NULL,
                treatment TEXT,
                prescription TEXT,
                record_date TEXT NOT NULL,
                FOREIGN KEY (patient_id) REFERENCES Patients(patient_id),
                FOREIGN KEY (doctor_id) REFERENCES Doctors(doctor_id)
            )''',
            
            '''CREATE TABLE IF NOT EXISTS Billing (
                bill_id INTEGER PRIMARY KEY AUTOINCREMENT,
                patient_id INTEGER NOT NULL,
                amount REAL NOT NULL,
                date_issued TEXT NOT NULL,
                due_date TEXT NOT NULL,
                status TEXT DEFAULT 'Pending' CHECK(status IN ('Pending', 'Paid', 'Overdue')),
                payment_method TEXT,
                FOREIGN KEY (patient_id) REFERENCES Patients(patient_id)
            )''',
            
            '''CREATE TABLE IF NOT EXISTS Rooms (
                room_id INTEGER PRIMARY KEY AUTOINCREMENT,
                room_number TEXT UNIQUE NOT NULL,
                room_type TEXT NOT NULL,
                department_id INTEGER,
                status TEXT DEFAULT 'Available' CHECK(status IN ('Available', 'Occupied', 'Maintenance')),
                FOREIGN KEY (department_id) REFERENCES Departments(department_id)
            )''',
            
            '''CREATE TABLE IF NOT EXISTS Admissions (
                admission_id INTEGER PRIMARY KEY AUTOINCREMENT,
                patient_id INTEGER NOT NULL,
                room_id INTEGER NOT NULL,
                admission_date TEXT NOT NULL,
                discharge_date TEXT,
                reason TEXT NOT NULL,
                status TEXT DEFAULT 'Active' CHECK(status IN ('Active', 'Discharged', 'Transferred')),
                FOREIGN KEY (patient_id) REFERENCES Patients(patient_id),
                FOREIGN KEY (room_id) REFERENCES Rooms(room_id)
            )''',
            
            '''CREATE TABLE IF NOT EXISTS Users (
                user_id INTEGER PRIMARY KEY AUTOINCREMENT,
                username TEXT UNIQUE NOT NULL,
                password_hash TEXT NOT NULL,
                password_salt TEXT NOT NULL,
                staff_id INTEGER UNIQUE,
                role TEXT NOT NULL CHECK(role IN ('admin', 'doctor', 'nurse', 'receptionist', 'accountant')),
                last_login TEXT,
                FOREIGN KEY (staff_id) REFERENCES Staff(staff_id)
            )'''
        ]
        
        for table in tables:
            self.cursor.execute(table)
        
        # Create admin user if not exists
        self.cursor.execute("SELECT COUNT(*) FROM Users WHERE username='admin'")
        if self.cursor.fetchone()[0] == 0:
            salt = SecurityUtils.generate_salt()
            password_hash = SecurityUtils.hash_password("admin123", salt)
            self.cursor.execute('''
            INSERT INTO Users (username, password_hash, password_salt, role)
            VALUES (?, ?, ?, ?)
            ''', ('admin', password_hash, salt, 'admin'))
        
        self.conn.commit()
    
    def execute_query(self, query: str, params: tuple = ()) -> sqlite3.Cursor:
        """Execute a SQL query with parameters"""
        try:
            return self.cursor.execute(query, params)
        except sqlite3.Error as e:
            print(f"Database error: {e}")
            raise
    
    def commit(self):
        """Commit changes to the database"""
        self.conn.commit()
    
    def close(self):
        """Close the database connection"""
        self.conn.close()

# --------------------------
# Domain Models
# --------------------------
@dataclass
class Department:
    department_id: int
    name: str
    description: str
    head_doctor_id: Optional[int]

@dataclass
class Staff:
    staff_id: int
    first_name: str
    last_name: str
    email: str
    phone: str
    address: str
    hire_date: str
    department_id: Optional[int]
    position: str
    salary: float

@dataclass
class Doctor(Staff):
    specialization: str
    license_number: str

@dataclass
class Patient:
    patient_id: int
    first_name: str
    last_name: str
    dob: str
    gender: str
    blood_type: Optional[str]
    address: str
    phone: str
    email: Optional[str]
    registration_date: str

@dataclass
class Appointment:
    appointment_id: int
    patient_id: int
    doctor_id: int
    appointment_date: str
    start_time: str
    end_time: str
    status: str
    notes: Optional[str]

@dataclass
class MedicalRecord:
    record_id: int
    patient_id: int
    doctor_id: int
    diagnosis: str
    treatment: Optional[str]
    prescription: Optional[str]
    record_date: str

@dataclass
class Billing:
    bill_id: int
    patient_id: int
    amount: float
    date_issued: str
    due_date: str
    status: str
    payment_method: Optional[str]

@dataclass
class Room:
    room_id: int
    room_number: str
    room_type: str
    department_id: Optional[int]
    status: str

@dataclass
class Admission:
    admission_id: int
    patient_id: int
    room_id: int
    admission_date: str
    discharge_date: Optional[str]
    reason: str
    status: str

@dataclass
class User:
    user_id: int
    username: str
    password_hash: str
    password_salt: str
    staff_id: Optional[int]
    role: str
    last_login: Optional[str]

# --------------------------
# Repository Pattern
# --------------------------
class Repository(ABC):
    @abstractmethod
    def get_by_id(self, id: int):
        pass
    
    @abstractmethod
    def get_all(self) -> List:
        pass
    
    @abstractmethod
    def add(self, entity):
        pass
    
    @abstractmethod
    def update(self, entity):
        pass
    
    @abstractmethod
    def delete(self, id: int):
        pass

class UserRepository(Repository):
    def get_by_id(self, user_id: int) -> Optional[User]:
        db = Database()
        cursor = db.execute_query('''
        SELECT user_id, username, password_hash, password_salt, staff_id, role, last_login
        FROM Users WHERE user_id=?
        ''', (user_id,))
        row = cursor.fetchone()
        return User(*row) if row else None
    
    def get_by_username(self, username: str) -> Optional[User]:
        db = Database()
        cursor = db.execute_query('''
        SELECT user_id, username, password_hash, password_salt, staff_id, role, last_login
        FROM Users WHERE username=?
        ''', (username,))
        row = cursor.fetchone()
        return User(*row) if row else None
    
    def get_all(self) -> List[User]:
        db = Database()
        cursor = db.execute_query('''
        SELECT user_id, username, password_hash, password_salt, staff_id, role, last_login
        FROM Users
        ''')
        return [User(*row) for row in cursor.fetchall()]
    
    def add(self, user: User) -> User:
        db = Database()
        cursor = db.execute_query('''
        INSERT INTO Users (username, password_hash, password_salt, staff_id, role)
        VALUES (?, ?, ?, ?, ?)
        ''', (
            user.username, user.password_hash, user.password_salt, 
            user.staff_id, user.role
        ))
        db.commit()
        user.user_id = cursor.lastrowid
        return user
    
    def update(self, user: User):
        db = Database()
        db.execute_query('''
        UPDATE Users SET 
            username=?, password_hash=?, password_salt=?,
            staff_id=?, role=?, last_login=?
        WHERE user_id=?
        ''', (
            user.username, user.password_hash, user.password_salt,
            user.staff_id, user.role, user.last_login,
            user.user_id
        ))
        db.commit()
    
    def delete(self, user_id: int):
        db = Database()
        db.execute_query("DELETE FROM Users WHERE user_id=?", (user_id,))
        db.commit()
    
    def change_password(self, user_id: int, new_password: str):
        """Change a user's password with new salt"""
        salt = SecurityUtils.generate_salt()
        password_hash = SecurityUtils.hash_password(new_password, salt)
        
        db = Database()
        db.execute_query('''
        UPDATE Users SET password_hash=?, password_salt=?
        WHERE user_id=?
        ''', (password_hash, salt, user_id))
        db.commit()

# Other repositories would follow the same pattern...
# (DepartmentRepository, DoctorRepository, etc.)

# --------------------------
# Authentication Service
# --------------------------
class AuthService:
    def __init__(self):
        self.user_repo = UserRepository()
    
    def login(self, username: str, password: str) -> Optional[User]:
        """Authenticate user and return user object if successful"""
        user = self.user_repo.get_by_username(username)
        
        if not user:
            return None
        
        if not SecurityUtils.verify_password(
            user.password_hash, 
            user.password_salt, 
            password
        ):
            return None
        
        # Update last login
        user.last_login = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
        self.user_repo.update(user)
        
        return user
    
    def create_user(self, username: str, password: str, role: str, staff_id: Optional[int] = None) -> User:
        """Create a new system user with salted password"""
        if not self._validate_username(username):
            raise ValueError("Invalid username format")
        
        if not self._validate_password(password):
            raise ValueError("Password must be at least 8 characters")
        
        if role not in ['admin', 'doctor', 'nurse', 'receptionist', 'accountant']:
            raise ValueError("Invalid role")
        
        # Generate salt and hash password
        salt = SecurityUtils.generate_salt()
        password_hash = SecurityUtils.hash_password(password, salt)
        
        user = User(
            user_id=None,
            username=username,
            password_hash=password_hash,
            password_salt=salt,
            staff_id=staff_id,
            role=role,
            last_login=None
        )
        
        return self.user_repo.add(user)
    
    def change_password(self, user_id: int, current_password: str, new_password: str) -> bool:
        """Change user password after verifying current password"""
        user = self.user_repo.get_by_id(user_id)
        if not user:
            return False
        
        if not SecurityUtils.verify_password(
            user.password_hash,
            user.password_salt,
            current_password
        ):
            return False
        
        if not self._validate_password(new_password):
            raise ValueError("Password must be at least 8 characters")
        
        self.user_repo.change_password(user_id, new_password)
        return True
    
    def _validate_username(self, username: str) -> bool:
        """Validate username format"""
        return bool(re.match(r'^[a-zA-Z0-9_]{4,20}$', username))
    
    def _validate_password(self, password: str) -> bool:
        """Validate password strength"""
        return len(password) >= 8

# --------------------------
# UI Layer (Console)
# --------------------------
class HospitalUI:
    def __init__(self):
        self.auth_service = AuthService()
        self.current_user = None
    
    def run(self):
        """Main application loop"""
        print("Hospital Management System")
        print("------------------------")
        
        if not self._login():
            return
        
        while True:
            self._show_main_menu()
            choice = input("Enter your choice: ")
            
            if choice == '1' and self.current_user.role == 'admin':
                self._user_management()
            elif choice == '2':
                self._change_password()
            elif choice.lower() == 'x':
                print("Exiting system...")
                break
            else:
                print("Invalid choice!")
    
    def _login(self) -> bool:
        """Handle user login"""
        print("\nLogin")
        username = input("Username: ")
        password = getpass.getpass("Password: ")
        
        self.current_user = self.auth_service.login(username, password)
        if self.current_user:
            print(f"\nWelcome, {self.current_user.username} ({self.current_user.role})!")
            return True
        else:
            print("Invalid credentials!")
            return False
    
    def _show_main_menu(self):
        """Display main menu based on user role"""
        print("\nMain Menu")
        if self.current_user.role == 'admin':
            print("1. User Management")
        print("2. Change Password")
        print("X. Exit")
    
    def _user_management(self):
        """Handle user operations (admin only)"""
        while True:
            print("\nUser Management")
            print("1. List Users")
            print("2. Add User")
            print("3. Reset Password")
            print("4. Back to Main Menu")
            
            choice = input("Enter your choice: ")
            
            if choice == '1':
                self._list_users()
            elif choice == '2':
                self._add_user()
            elif choice == '3':
                self._reset_password()
            elif choice == '4':
                break
            else:
                print("Invalid choice!")
    
    def _list_users(self):
        """Display all users"""
        users = self.auth_service.user_repo.get_all()
        
        if not users:
            print("No users found!")
            return
        
        headers = ["ID", "Username", "Role", "Staff ID", "Last Login"]
        data = []
        for user in users:
            data.append([
                user.user_id,
                user.username,
                user.role,
                user.staff_id or "N/A",
                user.last_login or "Never"
            ])
        
        print("\nSystem Users:")
        print(tabulate(data, headers=headers, tablefmt="grid"))
    
    def _add_user(self):
        """Add a new user (admin only)"""
        print("\nAdd New User")
        username = input("Username: ")
        
        # Generate a secure random password
        password = SecurityUtils.generate_secure_password()
        print(f"\nGenerated password: {password}\n")
        print("Please provide this password to the user securely.")
        print("They should change it immediately after first login.")
        
        print("\nSelect role:")
        print("1. Admin")
        print("2. Doctor")
        print("3. Nurse")
        print("4. Receptionist")
        print("5. Accountant")
        role_choice = input("Enter choice (1-5): ")
        
        roles = {
            '1': 'admin',
            '2': 'doctor',
            '3': 'nurse',
            '4': 'receptionist',
            '5': 'accountant'
        }
        
        role = roles.get(role_choice, 'receptionist')
        
        staff_id = None
        if role in ['doctor', 'nurse', 'receptionist', 'accountant']:
            staff_id = input("Staff ID (leave blank if not applicable): ")
            if staff_id == '':
                staff_id = None
            else:
                try:
                    staff_id = int(staff_id)
                except ValueError:
                    print("Invalid Staff ID!")
                    return
        
        try:
            user = self.auth_service.create_user(username, password, role, staff_id)
            print(f"User '{user.username}' created successfully with ID: {user.user_id}")
        except ValueError as e:
            print(f"Error: {e}")
    
    def _reset_password(self):
        """Reset a user's password (admin only)"""
        user_id = input("Enter User ID to reset password: ")
        
        try:
            user_id = int(user_id)
            user = self.auth_service.user_repo.get_by_id(user_id)
            
            if not user:
                print("User not found!")
                return
            
            # Generate a new secure password
            new_password = SecurityUtils.generate_secure_password()
            self.auth_service.user_repo.change_password(user_id, new_password)
            
            print(f"\nPassword reset successfully for user: {user.username}")
            print(f"New temporary password: {new_password}")
            print("The user should change this password immediately after login.")
        except ValueError:
            print("Invalid User ID!")
    
    def _change_password(self):
        """Allow user to change their own password"""
        print("\nChange Password")
        current_password = getpass.getpass("Current Password: ")
        new_password = getpass.getpass("New Password: ")
        confirm_password = getpass.getpass("Confirm New Password: ")
        
        if new_password != confirm_password:
            print("Passwords don't match!")
            return
        
        try:
            success = self.auth_service.change_password(
                self.current_user.user_id,
                current_password,
                new_password
            )
            
            if success:
                print("Password changed successfully!")
            else:
                print("Current password is incorrect!")
        except ValueError as e:
            print(f"Error: {e}")

# --------------------------
# Main Application
# --------------------------
if __name__ == "__main__":
    try:
        app = HospitalUI()
        app.run()
    except KeyboardInterrupt:
        print("\nApplication terminated by user")
    except Exception as e:
        print(f"An error occurred: {e}")
    finally:
        # Ensure database connection is closed
        Database().close()

Hospital Management System
------------------------

Login
Invalid credentials!


In [None]:
import sqlite3
from datetime import datetime, timedelta
import random
from typing import List, Dict, Optional

class Database:
    """Handles all database operations"""
    def __init__(self, db_name: str = "hospital.db"):
        self.conn = sqlite3.connect(db_name)
        self.cursor = self.conn.cursor()
        self._initialize_database()

    def _initialize_database(self):
        """Create tables if they don't exist"""
        self.cursor.execute("""
            CREATE TABLE IF NOT EXISTS patients (
                patient_id INTEGER PRIMARY KEY,
                name TEXT NOT NULL,
                age INTEGER,
                gender TEXT,
                contact TEXT,
                address TEXT,
                registration_date TEXT
            )
        """)

        self.cursor.execute("""
            CREATE TABLE IF NOT EXISTS doctors (
                doctor_id INTEGER PRIMARY KEY,
                name TEXT NOT NULL,
                specialization TEXT,
                contact TEXT,
                available_days TEXT,
                consultation_fee REAL
            )
        """)

        self.cursor.execute("""
            CREATE TABLE IF NOT EXISTS appointments (
                appointment_id INTEGER PRIMARY KEY,
                patient_id INTEGER,
                doctor_id INTEGER,
                date TEXT,
                time TEXT,
                status TEXT DEFAULT 'Scheduled',
                diagnosis TEXT,
                prescription TEXT,
                FOREIGN KEY (patient_id) REFERENCES patients(patient_id),
                FOREIGN KEY (doctor_id) REFERENCES doctors(doctor_id)
            )
        """)

        self.cursor.execute("""
            CREATE TABLE IF NOT EXISTS staff (
                staff_id INTEGER PRIMARY KEY,
                name TEXT NOT NULL,
                role TEXT,
                contact TEXT,
                hire_date TEXT
            )
        """)

        self.cursor.execute("""
            CREATE TABLE IF NOT EXISTS billing (
                bill_id INTEGER PRIMARY KEY,
                patient_id INTEGER,
                amount REAL,
                payment_status TEXT DEFAULT 'Pending',
                date TEXT,
                description TEXT,
                FOREIGN KEY (patient_id) REFERENCES patients(patient_id)
            )
        """)

        self.conn.commit()

    def execute_query(self, query: str, params: tuple = ()) -> sqlite3.Cursor:
        """Execute a SQL query"""
        self.cursor.execute(query, params)
        self.conn.commit()
        return self.cursor

    def close(self):
        """Close the database connection"""
        self.conn.close()


class Person:
    """Base class for people in the hospital system"""
    def __init__(self, name: str, contact: str):
        self.name = name
        self.contact = contact


class Patient(Person):
    """Represents a patient in the hospital"""
    def __init__(self, name: str, age: int, gender: str, contact: str, address: str):
        super().__init__(name, contact)
        self.age = age
        self.gender = gender
        self.address = address
        self.patient_id = f"P{random.randint(1000, 9999)}"
        self.registration_date = datetime.now().strftime("%Y-%m-%d")
        self.medical_history = []

    def save_to_db(self, db: Database):
        """Save patient to database"""
        db.execute_query(
            "INSERT INTO patients VALUES (?, ?, ?, ?, ?, ?, ?)",
            (self.patient_id, self.name, self.age, self.gender, 
             self.contact, self.address, self.registration_date)
        )

    @staticmethod
    def get_patient(db: Database, patient_id: str) -> Optional['Patient']:
        """Retrieve a patient from the database"""
        cursor = db.execute_query("SELECT * FROM patients WHERE patient_id=?", (patient_id,))
        patient_data = cursor.fetchone()
        if patient_data:
            patient = Patient(
                name=patient_data[1],
                age=patient_data[2],
                gender=patient_data[3],
                contact=patient_data[4],
                address=patient_data[5]
            )
            patient.patient_id = patient_data[0]
            patient.registration_date = patient_data[6]
            return patient
        return None

    def add_medical_record(self, diagnosis: str, prescription: str):
        """Add a medical record to patient's history"""
        self.medical_history.append({
            'date': datetime.now().strftime("%Y-%m-%d"),
            'diagnosis': diagnosis,
            'prescription': prescription
        })


class Doctor(Person):
    """Represents a doctor in the hospital"""
    def __init__(self, name: str, specialization: str, contact: str, 
                 available_days: List[str], consultation_fee: float):
        super().__init__(name, contact)
        self.specialization = specialization
        self.available_days = available_days
        self.consultation_fee = consultation_fee
        self.doctor_id = f"D{random.randint(1000, 9999)}"

    def save_to_db(self, db: Database):
        """Save doctor to database"""
        db.execute_query(
            "INSERT INTO doctors VALUES (?, ?, ?, ?, ?, ?)",
            (self.doctor_id, self.name, self.specialization, 
             self.contact, ','.join(self.available_days), self.consultation_fee)
        )

    @staticmethod
    def get_doctor(db: Database, doctor_id: str) -> Optional['Doctor']:
        """Retrieve a doctor from the database"""
        cursor = db.execute_query("SELECT * FROM doctors WHERE doctor_id=?", (doctor_id,))
        doctor_data = cursor.fetchone()
        if doctor_data:
            doctor = Doctor(
                name=doctor_data[1],
                specialization=doctor_data[2],
                contact=doctor_data[3],
                available_days=doctor_data[4].split(','),
                consultation_fee=doctor_data[5]
            )
            doctor.doctor_id = doctor_data[0]
            return doctor
        return None

    @staticmethod
    def get_doctors_by_specialization(db: Database, specialization: str) -> List['Doctor']:
        """Get all doctors with a specific specialization"""
        cursor = db.execute_query(
            "SELECT * FROM doctors WHERE specialization=?", (specialization,)
        )
        doctors = []
        for doctor_data in cursor.fetchall():
            doctor = Doctor(
                name=doctor_data[1],
                specialization=doctor_data[2],
                contact=doctor_data[3],
                available_days=doctor_data[4].split(','),
                consultation_fee=doctor_data[5]
            )
            doctor.doctor_id = doctor_data[0]
            doctors.append(doctor)
        return doctors


class Appointment:
    """Represents an appointment between a patient and doctor"""
    def __init__(self, patient_id: str, doctor_id: str, date: str, time: str):
        self.appointment_id = f"A{random.randint(1000, 9999)}"
        self.patient_id = patient_id
        self.doctor_id = doctor_id
        self.date = date
        self.time = time
        self.status = "Scheduled"
        self.diagnosis = ""
        self.prescription = ""

    def save_to_db(self, db: Database):
        """Save appointment to database"""
        db.execute_query(
            "INSERT INTO appointments (appointment_id, patient_id, doctor_id, date, time, status) "
            "VALUES (?, ?, ?, ?, ?, ?)",
            (self.appointment_id, self.patient_id, self.doctor_id, 
             self.date, self.time, self.status)
        )

    def complete_appointment(self, db: Database, diagnosis: str, prescription: str):
        """Mark appointment as completed with diagnosis and prescription"""
        self.status = "Completed"
        self.diagnosis = diagnosis
        self.prescription = prescription
        db.execute_query(
            "UPDATE appointments SET status=?, diagnosis=?, prescription=? WHERE appointment_id=?",
            (self.status, self.diagnosis, self.prescription, self.appointment_id)
        )

    @staticmethod
    def get_appointment(db: Database, appointment_id: str) -> Optional['Appointment']:
        """Retrieve an appointment from the database"""
        cursor = db.execute_query("SELECT * FROM appointments WHERE appointment_id=?", (appointment_id,))
        appointment_data = cursor.fetchone()
        if appointment_data:
            appointment = Appointment(
                patient_id=appointment_data[1],
                doctor_id=appointment_data[2],
                date=appointment_data[3],
                time=appointment_data[4]
            )
            appointment.appointment_id = appointment_data[0]
            appointment.status = appointment_data[5]
            appointment.diagnosis = appointment_data[6] or ""
            appointment.prescription = appointment_data[7] or ""
            return appointment
        return None

    @staticmethod
    def get_patient_appointments(db: Database, patient_id: str) -> List['Appointment']:
        """Get all appointments for a specific patient"""
        cursor = db.execute_query(
            "SELECT * FROM appointments WHERE patient_id=?", (patient_id,)
        )
        appointments = []
        for appointment_data in cursor.fetchall():
            appointment = Appointment(
                patient_id=appointment_data[1],
                doctor_id=appointment_data[2],
                date=appointment_data[3],
                time=appointment_data[4]
            )
            appointment.appointment_id = appointment_data[0]
            appointment.status = appointment_data[5]
            appointment.diagnosis = appointment_data[6] or ""
            appointment.prescription = appointment_data[7] or ""
            appointments.append(appointment)
        return appointments


class Billing:
    """Handles billing and payments"""
    def __init__(self, patient_id: str, amount: float, description: str):
        self.bill_id = f"B{random.randint(1000, 9999)}"
        self.patient_id = patient_id
        self.amount = amount
        self.payment_status = "Pending"
        self.date = datetime.now().strftime("%Y-%m-%d")
        self.description = description

    def save_to_db(self, db: Database):
        """Save bill to database"""
        db.execute_query(
            "INSERT INTO billing VALUES (?, ?, ?, ?, ?, ?)",
            (self.bill_id, self.patient_id, self.amount, 
             self.payment_status, self.date, self.description)
        )

    def mark_as_paid(self, db: Database):
        """Update payment status to Paid"""
        self.payment_status = "Paid"
        db.execute_query(
            "UPDATE billing SET payment_status=? WHERE bill_id=?",
            (self.payment_status, self.bill_id)
        )

    @staticmethod
    def get_bill(db: Database, bill_id: str) -> Optional['Billing']:
        """Retrieve a bill from the database"""
        cursor = db.execute_query("SELECT * FROM billing WHERE bill_id=?", (bill_id,))
        bill_data = cursor.fetchone()
        if bill_data:
            bill = Billing(
                patient_id=bill_data[1],
                amount=bill_data[2],
                description=bill_data[5]
            )
            bill.bill_id = bill_data[0]
            bill.payment_status = bill_data[3]
            bill.date = bill_data[4]
            return bill
        return None

    @staticmethod
    def get_patient_bills(db: Database, patient_id: str) -> List['Billing']:
        """Get all bills for a specific patient"""
        cursor = db.execute_query(
            "SELECT * FROM billing WHERE patient_id=?", (patient_id,)
        )
        bills = []
        for bill_data in cursor.fetchall():
            bill = Billing(
                patient_id=bill_data[1],
                amount=bill_data[2],
                description=bill_data[5]
            )
            bill.bill_id = bill_data[0]
            bill.payment_status = bill_data[3]
            bill.date = bill_data[4]
            bills.append(bill)
        return bills


class Hospital:
    """Main hospital system class"""
    def __init__(self):
        self.db = Database()
        self._initialize_sample_data()

    def _initialize_sample_data(self):
        """Initialize with some sample data if tables are empty"""
        # Check if doctors table is empty
        cursor = self.db.execute_query("SELECT COUNT(*) FROM doctors")
        if cursor.fetchone()[0] == 0:
            doctors = [
                Doctor("Dr. Smith", "Cardiology", "555-0101", ["Monday", "Wednesday", "Friday"], 150.0),
                Doctor("Dr. Johnson", "Pediatrics", "555-0102", ["Tuesday", "Thursday", "Saturday"], 120.0),
                Doctor("Dr. Williams", "Neurology", "555-0103", ["Monday", "Tuesday", "Friday"], 200.0)
            ]
            for doctor in doctors:
                doctor.save_to_db(self.db)

    def register_patient(self, name: str, age: int, gender: str, contact: str, address: str) -> Patient:
        """Register a new patient"""
        patient = Patient(name, age, gender, contact, address)
        patient.save_to_db(self.db)
        return patient

    def schedule_appointment(self, patient_id: str, doctor_id: str, date: str, time: str) -> Appointment:
        """Schedule a new appointment"""
        appointment = Appointment(patient_id, doctor_id, date, time)
        appointment.save_to_db(self.db)
        return appointment

    def generate_bill(self, patient_id: str, amount: float, description: str) -> Billing:
        """Generate a new bill"""
        bill = Billing(patient_id, amount, description)
        bill.save_to_db(self.db)
        return bill

    def get_doctor_schedule(self, doctor_id: str) -> List[Appointment]:
        """Get all appointments for a doctor"""
        cursor = self.db.execute_query(
            "SELECT * FROM appointments WHERE doctor_id=? ORDER BY date, time", (doctor_id,)
        )
        appointments = []
        for appointment_data in cursor.fetchall():
            appointment = Appointment(
                patient_id=appointment_data[1],
                doctor_id=appointment_data[2],
                date=appointment_data[3],
                time=appointment_data[4]
            )
            appointment.appointment_id = appointment_data[0]
            appointment.status = appointment_data[5]
            appointments.append(appointment)
        return appointments

    def close(self):
        """Close the database connection"""
        self.db.close()


# Example usage
if __name__ == "__main__":
    hospital = Hospital()

    # Register a new patient
    patient = hospital.register_patient(
        name="John Doe",
        age=35,
        gender="Male",
        contact="555-1234",
        address="123 Main St"
    )
    print(f"Registered patient: {patient.name} (ID: {patient.patient_id})")

    # Get a doctor by specialization
    cardiologists = Doctor.get_doctors_by_specialization(hospital.db, "Cardiology")
    if cardiologists:
        doctor = cardiologists[0]
        print(f"Found cardiologist: {doctor.name} (ID: {doctor.doctor_id})")

        # Schedule an appointment
        appointment_date = (datetime.now() + timedelta(days=2)).strftime("%Y-%m-%d")
        appointment = hospital.schedule_appointment(
            patient_id=patient.patient_id,
            doctor_id=doctor.doctor_id,
            date=appointment_date,
            time="10:00"
        )
        print(f"Scheduled appointment: {appointment.date} at {appointment.time}")

        # Complete the appointment
        appointment.complete_appointment(
            db=hospital.db,
            diagnosis="High blood pressure",
            prescription="Medication XYZ, 1 tablet daily"
        )
        print(f"Appointment completed with diagnosis: {appointment.diagnosis}")

        # Generate a bill
        bill = hospital.generate_bill(
            patient_id=patient.patient_id,
            amount=doctor.consultation_fee,
            description=f"Consultation with {doctor.name}"
        )
        print(f"Generated bill: ${bill.amount} ({bill.description})")

        # Mark bill as paid
        bill.mark_as_paid(hospital.db)
        print(f"Bill payment status: {bill.payment_status}")

    # Get patient's appointments
    appointments = Appointment.get_patient_appointments(hospital.db, patient.patient_id)
    print(f"\nPatient's appointments:")
    for appt in appointments:
        print(f"- {appt.date} {appt.time}: {appt.status}")

    # Get patient's bills
    bills = Billing.get_patient_bills(hospital.db, patient.patient_id)
    print("\nPatient's bills:")
    for b in bills:
        print(f"- ${b.amount} ({b.description}): {b.payment_status}")

    hospital.close()
