In [None]:
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
import requests
import base64
import time
import random
from bs4 import BeautifulSoup
import os
import json
import logging
from typing import Optional, Dict, List, Union

################################################################################################
# 配置常量
BAIDU_API_KEY = "pnBkPfBD2REvCbjhgOmHd9wo"
BAIDU_SECRET_KEY = "pQ2HEtOz5Vqlt5VylH158aN82QO0Hh57"
TOKEN_CACHE_FILE = "baidu_token_cache.txt"
TOKEN_EXPIRE_TIME = 30 * 24 * 3600  # 30天

LOGIN_URL = "https://www.dzmyy.com.cn/Account/LogOn"
SKIN_DEPARTMENT_URL = "https://www.dzmyy.com.cn/Interactions/SchedulingAppointments/OPDoctorIndex?OPdepartmentId=270"
USERNAME = "18811787891"
PASSWORD = "18811787891xpc"
DOCTOR_NAME = "屈双擎"
# DOCTOR_NAME = "袁玲玲"
MAX_ATTEMPT_TIME = 90  # 最大尝试时间（秒）
RETRY_DELAY = 0.5

################################################################################################
# 设置日志系统
def setup_logger(name: str, log_file: str = 'appointment.log') -> logging.Logger:
    """设置日志系统"""
    
    logger = logging.getLogger(name)
    # 如果logger已经有处理器，直接返回
    if logger.handlers:
        return logger
    
    logger.setLevel(logging.INFO)
    
    # 防止日志向上传播
    logger.propagate = False

    # 创建文件处理器
    file_handler = logging.FileHandler(log_file, encoding='utf-8')
    file_handler.setLevel(logging.INFO)
    
    # 创建控制台处理器
    console_handler = logging.StreamHandler()
    console_handler.setLevel(logging.INFO)
    
    # 创建格式化器
    formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
    file_handler.setFormatter(formatter)
    console_handler.setFormatter(formatter)
    
    # 添加处理器到日志记录器
    logger.addHandler(file_handler)
    logger.addHandler(console_handler)
    
    return logger

################################################################################################
# 创建日志记录器
# 在创建新的logger之前，先清除现有的logger
logging.getLogger('appointment').handlers = []
logger = setup_logger('appointment')

################################################################################################
def get_access_token() -> str:
    """获取百度OCR的AccessToken，使用缓存机制"""
    try:
        if os.path.exists(TOKEN_CACHE_FILE):
            with open(TOKEN_CACHE_FILE, 'r') as f:
                cached_data = json.loads(f.read())
                if time.time() < cached_data['expire_time'] - 300:
                    logger.info("使用缓存的access token")
                    return cached_data['access_token']
    except Exception as e:
        logger.error(f"读取token缓存失败: {e}")
    
    logger.info("重新获取access token")
    url = "https://aip.baidubce.com/oauth/2.0/token"
    params = {
        "grant_type": "client_credentials",
        "client_id": BAIDU_API_KEY,
        "client_secret": BAIDU_SECRET_KEY
    }
    
    response = requests.post(url, params=params)
    if response.status_code == 200:
        result = response.json()
        access_token = result.get("access_token")
        expire_time = time.time() + TOKEN_EXPIRE_TIME
        
        try:
            with open(TOKEN_CACHE_FILE, 'w') as f:
                json.dump({
                    'access_token': access_token,
                    'expire_time': expire_time
                }, f)
        except Exception as e:
            logger.error(f"保存token缓存失败: {e}")
        
        return access_token
    else:
        raise Exception(f"获取AccessToken失败: {response.text}")

################################################################################################
def recognize_captcha(image_path: Optional[str] = None, image_data: Optional[bytes] = None) -> Optional[str]:
    """识别验证码"""
    try:
        access_token = get_access_token()
        url = f"https://aip.baidubce.com/rest/2.0/ocr/v1/general_basic?access_token={access_token}"
        
        if image_path:
            with open(image_path, "rb") as f:
                image = base64.b64encode(f.read()).decode("utf-8")
        elif image_data:
            image = base64.b64encode(image_data).decode("utf-8")
        else:
            raise ValueError("必须提供image_path或image_data")
        
        payload = {
            "image": image,
            "detect_direction": "false",
            "detect_language": "false",
            "paragraph": "false",
            "probability": "false"
        }
        
        headers = {
            'Content-Type': 'application/x-www-form-urlencoded',
            'Accept': 'application/json'
        }
        
        response = requests.post(url, headers=headers, data=payload)
        
        if response.status_code == 200:
            result = response.json()
            if "words_result" in result and len(result["words_result"]) > 0:
                return result["words_result"][0]["words"].strip()
            else:
                logger.error(f"OCR识别失败: {result}")
                return None
        else:
            raise Exception(f"OCR API调用失败: {response.text}")
    except Exception as e:
        logger.error(f"验证码识别过程发生错误: {e}")
        return None

################################################################################################
def extract_available_appointments(html_str: str) -> List[Dict]:
    """提取可预约信息"""
    try:
        soup = BeautifulSoup(html_str, 'lxml')
        table = soup.find('table')
        available_entries = []
        
        headers = [th.get_text(strip=True) for th in table.find('tr').find_all('th')]
        
        try:
            doctor_index = headers.index('医生姓名')
        except ValueError:
            logger.warning("表格中未找到'医生姓名'列")
            return available_entries
        
        for row in table.find_all('tr')[1:]:
            cells = row.find_all('td')
            if not cells:
                continue
            
            data = {headers[i]: cells[i].get_text(strip=True) for i in range(len(headers))}
            
            if data.get('医生姓名') != DOCTOR_NAME:
                continue
            
            operation_cell = cells[-1]
            link = operation_cell.find('a', class_='dailySchedulingState_a_4')
            
            if link and '/Interactions/SchedulingAppointments/OPCreate' in link.get('href', ''):
                appointment_url = "https://www.dzmyy.com.cn" + link.get('href')
                data['预约链接'] = appointment_url
                available_entries.append(data)
        
        return available_entries
        
    except Exception as e:
        logger.error(f"提取预约信息时发生错误: {e}")
        return []

################################################################################################
def login(driver: webdriver.Chrome) -> bool:
    """登录系统"""
    try:
        logger.info("开始登录流程")
        driver.get(LOGIN_URL)
        
        # 处理验证码
        captcha_img = WebDriverWait(driver, 10).until(
            EC.element_to_be_clickable((By.ID, 'valiCodeLogOnCode'))
        )
        captcha_img.click()
        time.sleep(random.uniform(1, 2))
        captcha_img.screenshot("captcha_local.png")
        
        captcha_text = recognize_captcha(image_path='captcha_local.png')
        if not captcha_text:
            logger.error("验证码识别失败")
            return False
        
        logger.info(f"识别的验证码: {captcha_text}")
        
        # 输入验证码
        LogOnCode_input = WebDriverWait(driver, 10).until(
            EC.element_to_be_clickable((By.ID, 'LogOnCode'))
        )
        LogOnCode_input.clear()
        LogOnCode_input.send_keys(captcha_text)
        time.sleep(random.uniform(1, 2))
        
        # 输入用户名和密码
        username_input = WebDriverWait(driver, 10).until(
            EC.element_to_be_clickable((By.ID, 'UserName'))
        )
        password_input = WebDriverWait(driver, 10).until(
            EC.element_to_be_clickable((By.ID, 'Password_pwd'))
        )
        
        username_input.send_keys(USERNAME)
        password_input.send_keys(PASSWORD)
        
        # 点击登录
        login_button = WebDriverWait(driver, 10).until(
            EC.element_to_be_clickable((By.CSS_SELECTOR, '#con_tableb_1 > input'))
        )
        login_button.click()
        
        logger.info("登录成功")
        return True
        
    except Exception as e:
        logger.error(f"登录过程发生错误: {e}")
        return False
################################################################################################    
def try_appointment(driver: webdriver.Chrome) -> bool:
    """尝试预约，在指定时间内持续尝试"""
    success = False
    start_time = time.time()
    attempt_count = 0
    
    while time.time() - start_time < MAX_ATTEMPT_TIME:
        attempt_count += 1
        logger.info(f"第 {attempt_count} 次尝试预约... (已用时 {int(time.time() - start_time)} 秒)")
        
        try:
            driver.get(SKIN_DEPARTMENT_URL)
            WebDriverWait(driver, 10).until(
                EC.presence_of_element_located((By.TAG_NAME, "table"))
            )
            
            available_appointments = extract_available_appointments(driver.page_source)
            
            logger.info(f"找到 {len(available_appointments)} 个可预约时段")
            if available_appointments:
                appointment = available_appointments[0]
                logger.info(f"日期: {appointment.get('日期', 'N/A')}")
                logger.info(f"时段: {appointment.get('时段', 'N/A')}")
                logger.info(f"预约链接: {appointment.get('预约链接', 'N/A')}")
                
                driver.get(appointment.get('预约链接', 'N/A'))
                time.sleep(random.uniform(0.5, 1))  # 减少等待时间
                
                appointment_buttons = WebDriverWait(driver, 10).until(
                    EC.presence_of_all_elements_located((By.CSS_SELECTOR, "span.SchedulingState_icon4.outpatient[title='预约']"))
                )
                
                if appointment_buttons:
                    logger.info(f"找到 {len(appointment_buttons)} 个可预约按钮")
                    appointment_buttons[0].click()
                    logger.info("已点击第一个预约按钮")
                    
                    confirm_button = WebDriverWait(driver, 10).until(
                        EC.element_to_be_clickable((By.CSS_SELECTOR, "input[type='submit'][value='确认预约'].achedulingAppointment_btn"))
                    )
                    confirm_button.click()
                    logger.info("已确认预约")
                    success = True
                    logger.info("预约成功！")
                    break
            
        except Exception as e:
            logger.error(f"第 {attempt_count} 次尝试失败: {str(e)}")
            if time.time() - start_time < MAX_ATTEMPT_TIME:
                logger.info(f"等待 {RETRY_DELAY} 秒后重试... (剩余时间: {int(MAX_ATTEMPT_TIME - (time.time() - start_time))} 秒)")
                time.sleep(RETRY_DELAY)
            continue
    
    if not success:
        logger.error(f"在 {MAX_ATTEMPT_TIME} 秒内未能成功预约。共尝试 {attempt_count} 次。")
    
    return success

def main():
    """主函数"""
    try:
        # 设置WebDriver
        options = webdriver.ChromeOptions()
        options.add_argument('--disable-blink-features=AutomationControlled')
        driver = webdriver.Chrome(options=options)
        
        # 登录系统
        if login(driver):
            # 尝试预约
            try_appointment(driver)
            
    except Exception as e:
        logger.error(f"系统运行发生错误: {e}")
    finally:
        # input("按Enter键关闭浏览器...")
        logger.info("5秒后自动关闭浏览器...")
        time.sleep(5)
        if driver:
            driver.quit()

if __name__ == "__main__":
    main()