In [1]:
import pandas as pd
import duckdb
from openpyxl import load_workbook

In [2]:
# df = pd.read_excel('./20251029130203.xlsx', chunksize=100000)
# df

In [3]:
# wb = load_workbook('./20251029130203.xlsx',read_only=True)
# ws = wb.active
# ws

In [4]:
# # Manually read rows in chunks
# chunk_size = 100
# rows = []
# for i, row in enumerate(ws.iter_rows(values_only=True), 1):
#     rows.append(row)
#     if i % chunk_size == 0:
#         df = pd.DataFrame(rows[1:], columns=rows[0])  # Assuming first row is header
#         # Process chunk
#         rows = []
#         break

In [5]:
# df.info()

In [6]:
# df

In [8]:
"""
醫療檢查數據 DuckDB 流式導入 - 修復版本
支持所有 DuckDB 版本（0.8+）
不依賴 appender() API，使用通用的 INSERT INTO SELECT

內存占用恆定：O(batch_size)，不隨文件大小增長
"""

import duckdb
from openpyxl import load_workbook
from pathlib import Path
from typing import Generator, List, Tuple, Optional, Dict
from datetime import datetime
import logging
import json

# ============= 配置 =============

logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)

# 配置參數
BATCH_SIZE = 10000  # 每批處理的行數（根據內存調整）
DATE_FORMATS = {
    '生日': '%Y%m%d',
    '開單日期/時間': '%Y-%m-%d %H:%M:%S',
    '報到時間': '%Y-%m-%d %H:%M:%S',
    '報告認證時間': '%Y-%m-%d %H:%M:%S',
}

# ============= 數據結構 =============

class ImportProgress:
    """導入進度"""
    
    def __init__(self, total_rows: Optional[int] = None):
        self.total_rows = total_rows
        self.processed_rows = 0
        self.inserted_rows = 0
        self.failed_rows = 0
        self.start_time = datetime.now()
    
    @property
    def progress_percent(self) -> float:
        if self.total_rows is None or self.total_rows == 0:
            return 0.0
        return (self.processed_rows / self.total_rows) * 100
    
    @property
    def elapsed_seconds(self) -> float:
        return (datetime.now() - self.start_time).total_seconds()
    
    def print_status(self):
        """打印進度狀態"""
        percent = self.progress_percent
        processed = self.processed_rows
        total = self.total_rows
        elapsed = self.elapsed_seconds
        
        # 進度條
        bar_length = 40
        filled = int(bar_length * percent / 100)
        bar = '█' * filled + '░' * (bar_length - filled)
        
        # 估計剩餘時間
        if processed > 0 and total is not None:
            avg_time_per_row = elapsed / processed
            remaining_rows = total - processed
            remaining_time = avg_time_per_row * remaining_rows
            remaining_str = f", 預計剩餘: {remaining_time:.1f}s"
        else:
            remaining_str = ""
        
        print(f"\r  進度: [{bar}] {percent:6.2f}% ({processed}/{total or '?'}) "
              f"已插入: {self.inserted_rows}, 失敗: {self.failed_rows}, "
              f"耗時: {elapsed:.1f}s{remaining_str}", end='', flush=True)


# ============= 流式讀取器 =============

class StreamingExcelReader:
    """
    流式 Excel 讀取器
    逐行返回，內存只保存當前行
    """
    
    def __init__(self, file_path: str):
        self.file_path = file_path
        self.wb = None
        self.ws = None
        self.headers = None
        self.total_rows = 0
        
    def open(self) -> Tuple[List[str], int]:
        """打開 Excel 文件，讀取表頭和行數"""
        logger.info(f"打開 Excel 文件: {self.file_path}")
        
        self.wb = load_workbook(self.file_path, read_only=True, data_only=True)
        self.ws = self.wb.active
        
        # 讀取表頭
        header_row = next(self.ws.iter_rows(values_only=True))
        self.headers = [str(col) if col else f"col_{i}" for i, col in enumerate(header_row)]
        
        logger.info(f"表頭: {self.headers}")
        return self.headers, self.total_rows
    
    def count_total_rows(self) -> int:
        """計算總行數（不含表頭）"""
        if self.total_rows > 0:
            return self.total_rows
            
        logger.info("計算總行數...")
        count = -1
        
        wb = load_workbook(self.file_path, read_only=True, data_only=True)
        ws = wb.active
        for _ in ws.iter_rows():
            count += 1
        wb.close()
        
        self.total_rows = count
        logger.info(f"總行數: {self.total_rows}")
        return self.total_rows
    
    def read_batches(self, batch_size: int = BATCH_SIZE) -> Generator[List[Dict], None, None]:
        """分批讀取 Excel，返回字典列表"""
        batch = []
        
        # 跳過表頭行
        rows_iter = self.ws.iter_rows(values_only=True)
        next(rows_iter)
        
        for row_values in rows_iter:
            # 轉換為字典
            row_dict = {}
            for i, header in enumerate(self.headers):
                value = row_values[i] if i < len(row_values) else None
                row_dict[header] = value
            
            batch.append(row_dict)
            
            # 當達到批次大小時返回
            if len(batch) >= batch_size:
                yield batch
                batch = []
        
        # 返回最後的不足批次大小的數據
        if batch:
            yield batch
    
    def close(self):
        """關閉文件"""
        if self.wb:
            self.wb.close()
            logger.info("Excel 文件已關閉")


# ============= 數據處理器 =============

class DataBatchProcessor:
    """批次數據處理器"""
    
    def __init__(self):
        self.valid_count = 0
        self.failed_count = 0
    
    def process_batch(self, batch: List[Dict]) -> Tuple[List[Dict], List[str]]:
        """處理一批數據，返回(清洗後數據, 錯誤列表)"""
        processed_batch = []
        errors = []
        
        for row_idx, row in enumerate(batch):
            try:
                processed_row = self._process_row(row)
                processed_batch.append(processed_row)
                self.valid_count += 1
            except Exception as e:
                errors.append(f"行 {row_idx}: {str(e)}")
                self.failed_count += 1
        
        return processed_batch, errors
    
    def _process_row(self, row: Dict) -> Dict:
        """處理單行數據"""
        processed = {}
        
        # 直接映射欄位，進行基本清洗
        fields = {
            '檢查流水號': 'exam_id',
            '病歷號': 'medical_record_no',
            '申請單號': 'application_order_no',
            '姓名': 'patient_name',
            '性別': 'patient_gender',
            '生日': 'patient_birth_date',
            '年齡': 'patient_age',
            '狀態': 'exam_status',
            '來源': 'exam_source',
            '檢查室': 'exam_room',
            '檢查項目': 'exam_item',
            '檢查描述': 'exam_description',
            '檢查儀器': 'exam_equipment',
            '儀器': 'equipment_type',
            '開單日期/時間': 'order_datetime',
            '報到時間': 'check_in_datetime',
            '報告認證時間': 'report_certification_datetime',
            '認證醫師': 'certified_physician',
        }
        
        for col_cn, col_en in fields.items():
            value = row.get(col_cn)
            
            # 處理空值
            if value is None or (isinstance(value, str) and value.strip() == ''):
                processed[col_en] = None
            else:
                # 字符串清洗
                if isinstance(value, str):
                    processed[col_en] = value.strip()
                else:
                    processed[col_en] = str(value).strip()
        
        # 驗證
        if not processed.get('exam_id'):
            raise ValueError("檢查流水號不能為空")
        
        return processed


# ============= DuckDB 插入器 =============

class DuckDBStreamInserter:
    """DuckDB 流式插入器（相容所有版本）"""
    
    def __init__(self, db_path: str):
        self.db_path = db_path
        self.con = None
        self.table_name = 'medical_examinations_fact'
        self.inserted_count = 0
    
    def setup(self) -> None:
        """設置數據庫和表"""
        logger.info(f"連接到 DuckDB: {self.db_path}")
        self.con = duckdb.connect(self.db_path)
        
        self._create_table()
        logger.info("表設置完成")
    
    def _create_table(self) -> None:
        """創建優化的表結構"""
        create_sql = f"""
        CREATE TABLE IF NOT EXISTS {self.table_name} (
            exam_id VARCHAR,
            medical_record_no VARCHAR,
            application_order_no VARCHAR,
            patient_name VARCHAR,
            patient_gender VARCHAR,
            patient_birth_date VARCHAR,
            patient_age VARCHAR,
            exam_status VARCHAR,
            exam_source VARCHAR,
            exam_room VARCHAR,
            exam_item VARCHAR,
            exam_description VARCHAR,
            exam_equipment VARCHAR,
            equipment_type VARCHAR,
            order_datetime VARCHAR,
            check_in_datetime VARCHAR,
            report_certification_datetime VARCHAR,
            certified_physician VARCHAR,
            data_load_time TIMESTAMP DEFAULT current_timestamp
        );
        """
        
        try:
            self.con.execute(create_sql)
            logger.info(f"表 '{self.table_name}' 已創建/驗證")
        except Exception as e:
            logger.info(f"表已存在: {e}")
    
    def insert_batch(self, batch: List[Dict]) -> int:
        """
        插入一批數據（使用臨時表 + INSERT INTO SELECT）
        返回成功插入的行數
        """
        if not batch:
            return 0
        
        try:
            # 建立臨時表
            import pandas as pd
            df = pd.DataFrame(batch)
            
            # 註冊臨時表
            self.con.register('temp_batch', df)
            
            # 使用 INSERT INTO SELECT
            insert_sql = f"""
            INSERT INTO {self.table_name}
            SELECT 
                exam_id,
                medical_record_no,
                application_order_no,
                patient_name,
                patient_gender,
                CAST(patient_birth_date AS VARCHAR),
                patient_age,
                exam_status,
                exam_source,
                exam_room,
                exam_item,
                exam_description,
                exam_equipment,
                equipment_type,
                CAST(order_datetime AS VARCHAR),
                CAST(check_in_datetime AS VARCHAR),
                CAST(report_certification_datetime AS VARCHAR),
                certified_physician,
                current_timestamp as data_load_time
            FROM temp_batch
            """
            
            self.con.execute(insert_sql)
            inserted = len(batch)
            self.inserted_count += inserted
            
            # 清理臨時表
            self.con.unregister('temp_batch')
            
            return inserted
            
        except Exception as e:
            logger.error(f"批量插入失敗: {e}")
            return 0
    
    def close(self) -> None:
        """關閉連接"""
        if self.con:
            self.con.close()
            logger.info(f"DuckDB 連接已關閉，共插入 {self.inserted_count} 行")
    
    def get_statistics(self) -> Dict:
        """獲取統計信息"""
        con = duckdb.connect(self.db_path)
        result = con.execute(f"""
            SELECT
                COUNT(*) as total_rows,
                COUNT(DISTINCT patient_name) as unique_patients,
                COUNT(DISTINCT exam_room) as exam_rooms,
                MIN(order_datetime) as first_exam,
                MAX(order_datetime) as last_exam
            FROM {self.table_name}
        """).fetchall()
        con.close()
        
        row = result[0] if result else (0, 0, 0, None, None)
        return {
            'total_rows': row[0] or 0,
            'unique_patients': row[1] or 0,
            'exam_rooms': row[2] or 0,
            'first_exam': str(row[3]) if row[3] else 'N/A',
            'last_exam': str(row[4]) if row[4] else 'N/A',
        }


# ============= 主控制器 =============

class StreamingImporter:
    """流式導入主控制器"""
    
    def __init__(self, excel_file: str, db_file: str, batch_size: int = BATCH_SIZE):
        self.excel_file = excel_file
        self.db_file = db_file
        self.batch_size = batch_size
        
        self.reader = StreamingExcelReader(excel_file)
        self.processor = DataBatchProcessor()
        self.inserter = DuckDBStreamInserter(db_file)
        
        self.progress = None
    
    def import_all(self, skip_row_count: bool = False) -> Dict:
        """執行完整的流式導入流程"""
        logger.info("=" * 70)
        logger.info("開始流式導入")
        logger.info("=" * 70)
        
        # 打開 Excel
        headers, _ = self.reader.open()
        
        # 初始化進度
        if not skip_row_count:
            total_rows = self.reader.count_total_rows()
        else:
            total_rows = None
            logger.info("跳過行數計算，進度百分比將不可用")
        
        self.progress = ImportProgress(total_rows=total_rows)
        
        # 設置 DuckDB
        self.inserter.setup()
        
        # 流式處理批次
        logger.info(f"\n開始分批處理 (批次大小: {self.batch_size})")
        print()  # 為進度條預留空行
        
        batch_num = 0
        total_errors = []
        
        try:
            for batch in self.reader.read_batches(batch_size=self.batch_size):
                batch_num += 1
                
                # 處理批次
                processed_batch, batch_errors = self.processor.process_batch(batch)
                total_errors.extend(batch_errors)
                
                # 插入批次
                inserted = self.inserter.insert_batch(processed_batch)
                
                # 更新進度
                self.progress.processed_rows += len(batch)
                self.progress.inserted_rows += inserted
                self.progress.failed_rows += len(batch_errors)
                
                # 顯示進度
                self.progress.print_status()
                
                logger.info(f"\n批次 {batch_num}: 處理 {len(batch)} 行, "
                          f"有效 {len(processed_batch)} 行, "
                          f"失敗 {len(batch_errors)} 行")
        
        finally:
            self.reader.close()
            self.inserter.close()
        
        print()  # 進度條後的換行
        logger.info("\n" + "=" * 70)
        
        # 生成報告
        return self._generate_report(total_errors)
    
    def _generate_report(self, errors: List[str]) -> Dict:
        """生成導入報告"""
        stats = self.inserter.get_statistics()
        elapsed = self.progress.elapsed_seconds
        
        report = {
            'status': 'success' if self.progress.failed_rows == 0 else 'partial',
            'total_processed': self.progress.processed_rows,
            'total_inserted': self.progress.inserted_rows,
            'total_failed': self.progress.failed_rows,
            'elapsed_seconds': elapsed,
            'rows_per_second': self.progress.inserted_rows / elapsed if elapsed > 0 else 0,
            'database_stats': stats,
            'total_errors': len(errors),
            'sample_errors': errors[:10],
        }
        
        self._print_report(report)
        return report
    
    def _print_report(self, report: Dict) -> None:
        """打印導入報告"""
        print(f"""
╔════════════════════════════════════════════════════════════════════╗
║                      導入完成報告                                  ║
╚════════════════════════════════════════════════════════════════════╝

【處理統計】
  總處理行數:    {report['total_processed']:>10,}
  成功插入行數:  {report['total_inserted']:>10,}
  失敗行數:      {report['total_failed']:>10,}
  成功率:        {100 * report['total_inserted'] / report['total_processed'] if report['total_processed'] > 0 else 0:>9.1f}%

【性能指標】
  總耗時:        {report['elapsed_seconds']:>10.1f} 秒
  吞吐量:        {report['rows_per_second']:>10.1f} 行/秒
  內存占用:      恆定 (批次大小: {self.batch_size})

【數據庫統計】
  總記錄數:      {report['database_stats']['total_rows']:>10,}
  獨特患者數:    {report['database_stats']['unique_patients']:>10,}
  檢查室數:      {report['database_stats']['exam_rooms']:>10,}
  首次檢查:      {report['database_stats']['first_exam']:>10}
  最後檢查:      {report['database_stats']['last_exam']:>10}

【數據庫位置】
  {self.db_file}

【狀態】
  {report['status'].upper()}
""")
        
        if report['sample_errors']:
            print(f"【前 {len(report['sample_errors'])} 個錯誤】")
            for i, error in enumerate(report['sample_errors'][:5], 1):
                print(f"  {i}. {error}")
            if report['total_errors'] > 5:
                print(f"  ... 還有 {report['total_errors'] - 5} 個錯誤")


# ============= 使用示例 =============

def main():
    """主程序"""
    
    excel_file = './20251029130203.xlsx'
    db_file = './medical_exams_streaming.duckdb'
    
    if not Path(excel_file).exists():
        logger.error(f"❌ 找不到 Excel 文件: {excel_file}")
        return
    
    # 清理舊數據庫（可選）
    if Path(db_file).exists():
        logger.info(f"刪除舊數據庫: {db_file}")
        Path(db_file).unlink()
    
    # 創建導入器
    importer = StreamingImporter(
        excel_file=excel_file,
        db_file=db_file,
        batch_size=10000  # 根據內存調整
    )
    
    # 執行導入
    report = importer.import_all(skip_row_count=False)
    


if __name__ == "__main__":
    main()

2025-10-29 14:57:13,781 - INFO - 刪除舊數據庫: ./medical_exams_streaming.duckdb
2025-10-29 14:57:13,782 - INFO - 開始流式導入
2025-10-29 14:57:13,783 - INFO - 打開 Excel 文件: ./20251029130203.xlsx
2025-10-29 14:57:29,781 - INFO - 表頭: ['狀態', '病歷號', '姓名', '性別', '年齡', '生日', '來源', '檢查室', '申請單號', '檢查項目', '檢查描述', '開單日期/時間', '儀器', '檢查儀器', '報到時間', '檢查流水號', '報告認證時間', '認證醫師']
2025-10-29 14:57:29,781 - INFO - 計算總行數...
2025-10-29 14:58:28,064 - INFO - 總行數: 470467
2025-10-29 14:58:28,064 - INFO - 連接到 DuckDB: ./medical_exams_streaming.duckdb
2025-10-29 14:58:28,147 - INFO - 表 'medical_examinations_fact' 已創建/驗證
2025-10-29 14:58:28,148 - INFO - 表設置完成
2025-10-29 14:58:28,148 - INFO - 
開始分批處理 (批次大小: 10000)



  進度: [░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░]   2.13% (10000/470467) 已插入: 10000, 失敗: 0, 耗時: 1.0s, 預計剩餘: 45.9s

2025-10-29 14:58:29,061 - INFO - 
批次 1: 處理 10000 行, 有效 10000 行, 失敗 0 行


  進度: [█░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░]   4.25% (20000/470467) 已插入: 20000, 失敗: 0, 耗時: 1.9s, 預計剩餘: 43.5s

2025-10-29 14:58:29,994 - INFO - 
批次 2: 處理 10000 行, 有效 10000 行, 失敗 0 行


  進度: [██░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░]   6.38% (30000/470467) 已插入: 30000, 失敗: 0, 耗時: 3.0s, 預計剩餘: 43.5s

2025-10-29 14:58:31,028 - INFO - 
批次 3: 處理 10000 行, 有效 10000 行, 失敗 0 行


  進度: [███░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░]   8.50% (40000/470467) 已插入: 40000, 失敗: 0, 耗時: 3.9s, 預計剩餘: 42.2s

2025-10-29 14:58:31,986 - INFO - 
批次 4: 處理 10000 行, 有效 10000 行, 失敗 0 行


  進度: [████░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░]  10.63% (50000/470467) 已插入: 50000, 失敗: 0, 耗時: 4.9s, 預計剩餘: 41.0s

2025-10-29 14:58:32,945 - INFO - 
批次 5: 處理 10000 行, 有效 10000 行, 失敗 0 行


  進度: [█████░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░]  12.75% (60000/470467) 已插入: 60000, 失敗: 0, 耗時: 5.8s, 預計剩餘: 39.9s

2025-10-29 14:58:33,895 - INFO - 
批次 6: 處理 10000 行, 有效 10000 行, 失敗 0 行


  進度: [█████░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░]  14.88% (70000/470467) 已插入: 70000, 失敗: 0, 耗時: 6.8s, 預計剩餘: 39.0s

2025-10-29 14:58:34,879 - INFO - 
批次 7: 處理 10000 行, 有效 10000 行, 失敗 0 行


  進度: [██████░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░]  17.00% (80000/470467) 已插入: 80000, 失敗: 0, 耗時: 8.1s, 預計剩餘: 39.4s

2025-10-29 14:58:36,138 - INFO - 
批次 8: 處理 10000 行, 有效 10000 行, 失敗 0 行


  進度: [███████░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░]  19.13% (90000/470467) 已插入: 90000, 失敗: 0, 耗時: 9.0s, 預計剩餘: 38.2s

2025-10-29 14:58:37,095 - INFO - 
批次 9: 處理 10000 行, 有效 10000 行, 失敗 0 行


  進度: [████████░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░]  21.26% (100000/470467) 已插入: 100000, 失敗: 0, 耗時: 10.0s, 預計剩餘: 37.1s

2025-10-29 14:58:38,089 - INFO - 
批次 10: 處理 10000 行, 有效 10000 行, 失敗 0 行


  進度: [█████████░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░]  23.38% (110000/470467) 已插入: 110000, 失敗: 0, 耗時: 10.9s, 預計剩餘: 35.9s

2025-10-29 14:58:39,013 - INFO - 
批次 11: 處理 10000 行, 有效 10000 行, 失敗 0 行


  進度: [██████████░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░]  25.51% (120000/470467) 已插入: 120000, 失敗: 0, 耗時: 11.9s, 預計剩餘: 34.9s

2025-10-29 14:58:40,013 - INFO - 
批次 12: 處理 10000 行, 有效 10000 行, 失敗 0 行


  進度: [███████████░░░░░░░░░░░░░░░░░░░░░░░░░░░░░]  27.63% (130000/470467) 已插入: 130000, 失敗: 0, 耗時: 12.9s, 預計剩餘: 33.8s

2025-10-29 14:58:40,980 - INFO - 
批次 13: 處理 10000 行, 有效 10000 行, 失敗 0 行


  進度: [███████████░░░░░░░░░░░░░░░░░░░░░░░░░░░░░]  29.76% (140000/470467) 已插入: 140000, 失敗: 0, 耗時: 13.9s, 預計剩餘: 32.8s

2025-10-29 14:58:41,963 - INFO - 
批次 14: 處理 10000 行, 有效 10000 行, 失敗 0 行


  進度: [████████████░░░░░░░░░░░░░░░░░░░░░░░░░░░░]  31.88% (150000/470467) 已插入: 150000, 失敗: 0, 耗時: 14.8s, 預計剩餘: 31.6s

2025-10-29 14:58:42,856 - INFO - 
批次 15: 處理 10000 行, 有效 10000 行, 失敗 0 行


  進度: [█████████████░░░░░░░░░░░░░░░░░░░░░░░░░░░]  34.01% (160000/470467) 已插入: 160000, 失敗: 0, 耗時: 16.1s, 預計剩餘: 31.2s

2025-10-29 14:58:44,149 - INFO - 
批次 16: 處理 10000 行, 有效 10000 行, 失敗 0 行


  進度: [██████████████░░░░░░░░░░░░░░░░░░░░░░░░░░]  36.13% (170000/470467) 已插入: 170000, 失敗: 0, 耗時: 17.1s, 預計剩餘: 30.2s

2025-10-29 14:58:45,150 - INFO - 
批次 17: 處理 10000 行, 有效 10000 行, 失敗 0 行


  進度: [███████████████░░░░░░░░░░░░░░░░░░░░░░░░░]  38.26% (180000/470467) 已插入: 180000, 失敗: 0, 耗時: 18.0s, 預計剩餘: 29.0s

2025-10-29 14:58:46,065 - INFO - 
批次 18: 處理 10000 行, 有效 10000 行, 失敗 0 行


  進度: [████████████████░░░░░░░░░░░░░░░░░░░░░░░░]  40.39% (190000/470467) 已插入: 190000, 失敗: 0, 耗時: 19.1s, 預計剩餘: 28.1s

2025-10-29 14:58:47,115 - INFO - 
批次 19: 處理 10000 行, 有效 10000 行, 失敗 0 行


  進度: [█████████████████░░░░░░░░░░░░░░░░░░░░░░░]  42.51% (200000/470467) 已插入: 200000, 失敗: 0, 耗時: 20.1s, 預計剩餘: 27.2s

2025-10-29 14:58:48,173 - INFO - 
批次 20: 處理 10000 行, 有效 10000 行, 失敗 0 行


  進度: [█████████████████░░░░░░░░░░░░░░░░░░░░░░░]  44.64% (210000/470467) 已插入: 210000, 失敗: 0, 耗時: 21.0s, 預計剩餘: 26.1s

2025-10-29 14:58:49,099 - INFO - 
批次 21: 處理 10000 行, 有效 10000 行, 失敗 0 行


  進度: [██████████████████░░░░░░░░░░░░░░░░░░░░░░]  46.76% (220000/470467) 已插入: 220000, 失敗: 0, 耗時: 22.0s, 預計剩餘: 25.1s

2025-10-29 14:58:50,107 - INFO - 
批次 22: 處理 10000 行, 有效 10000 行, 失敗 0 行


  進度: [███████████████████░░░░░░░░░░░░░░░░░░░░░]  48.89% (230000/470467) 已插入: 230000, 失敗: 0, 耗時: 23.0s, 預計剩餘: 24.0s

2025-10-29 14:58:51,032 - INFO - 
批次 23: 處理 10000 行, 有效 10000 行, 失敗 0 行


  進度: [████████████████████░░░░░░░░░░░░░░░░░░░░]  51.01% (240000/470467) 已插入: 240000, 失敗: 0, 耗時: 24.3s, 預計剩餘: 23.3s

2025-10-29 14:58:52,334 - INFO - 
批次 24: 處理 10000 行, 有效 10000 行, 失敗 0 行


  進度: [█████████████████████░░░░░░░░░░░░░░░░░░░]  53.14% (250000/470467) 已插入: 250000, 失敗: 0, 耗時: 25.2s, 預計剩餘: 22.2s

2025-10-29 14:58:53,256 - INFO - 
批次 25: 處理 10000 行, 有效 10000 行, 失敗 0 行


  進度: [██████████████████████░░░░░░░░░░░░░░░░░░]  55.26% (260000/470467) 已插入: 260000, 失敗: 0, 耗時: 26.3s, 預計剩餘: 21.3s

2025-10-29 14:58:54,324 - INFO - 
批次 26: 處理 10000 行, 有效 10000 行, 失敗 0 行


  進度: [██████████████████████░░░░░░░░░░░░░░░░░░]  57.39% (270000/470467) 已插入: 270000, 失敗: 0, 耗時: 27.1s, 預計剩餘: 20.2s

2025-10-29 14:58:55,208 - INFO - 
批次 27: 處理 10000 行, 有效 10000 行, 失敗 0 行


  進度: [███████████████████████░░░░░░░░░░░░░░░░░]  59.52% (280000/470467) 已插入: 280000, 失敗: 0, 耗時: 28.2s, 預計剩餘: 19.2s

2025-10-29 14:58:56,242 - INFO - 
批次 28: 處理 10000 行, 有效 10000 行, 失敗 0 行


  進度: [████████████████████████░░░░░░░░░░░░░░░░]  61.64% (290000/470467) 已插入: 290000, 失敗: 0, 耗時: 29.1s, 預計剩餘: 18.1s

2025-10-29 14:58:57,142 - INFO - 
批次 29: 處理 10000 行, 有效 10000 行, 失敗 0 行


  進度: [█████████████████████████░░░░░░░░░░░░░░░]  63.77% (300000/470467) 已插入: 300000, 失敗: 0, 耗時: 30.1s, 預計剩餘: 17.1s

2025-10-29 14:58:58,175 - INFO - 
批次 30: 處理 10000 行, 有效 10000 行, 失敗 0 行


  進度: [██████████████████████████░░░░░░░░░░░░░░]  65.89% (310000/470467) 已插入: 310000, 失敗: 0, 耗時: 31.0s, 預計剩餘: 16.1s

2025-10-29 14:58:59,084 - INFO - 
批次 31: 處理 10000 行, 有效 10000 行, 失敗 0 行


  進度: [███████████████████████████░░░░░░░░░░░░░]  68.02% (320000/470467) 已插入: 320000, 失敗: 0, 耗時: 32.7s, 預計剩餘: 15.4s

2025-10-29 14:59:00,811 - INFO - 
批次 32: 處理 10000 行, 有效 10000 行, 失敗 0 行


  進度: [████████████████████████████░░░░░░░░░░░░]  70.14% (330000/470467) 已插入: 330000, 失敗: 0, 耗時: 33.8s, 預計剩餘: 14.4s

2025-10-29 14:59:01,871 - INFO - 
批次 33: 處理 10000 行, 有效 10000 行, 失敗 0 行


  進度: [████████████████████████████░░░░░░░░░░░░]  72.27% (340000/470467) 已插入: 340000, 失敗: 0, 耗時: 34.7s, 預計剩餘: 13.3s

2025-10-29 14:59:02,768 - INFO - 
批次 34: 處理 10000 行, 有效 10000 行, 失敗 0 行


  進度: [█████████████████████████████░░░░░░░░░░░]  74.39% (350000/470467) 已插入: 350000, 失敗: 0, 耗時: 35.8s, 預計剩餘: 12.3s

2025-10-29 14:59:03,818 - INFO - 
批次 35: 處理 10000 行, 有效 10000 行, 失敗 0 行


  進度: [██████████████████████████████░░░░░░░░░░]  76.52% (360000/470467) 已插入: 360000, 失敗: 0, 耗時: 36.7s, 預計剩餘: 11.3s

2025-10-29 14:59:04,761 - INFO - 
批次 36: 處理 10000 行, 有效 10000 行, 失敗 0 行


  進度: [███████████████████████████████░░░░░░░░░]  78.65% (370000/470467) 已插入: 370000, 失敗: 0, 耗時: 37.8s, 預計剩餘: 10.3s

2025-10-29 14:59:05,828 - INFO - 
批次 37: 處理 10000 行, 有效 10000 行, 失敗 0 行


  進度: [████████████████████████████████░░░░░░░░]  80.77% (380000/470467) 已插入: 380000, 失敗: 0, 耗時: 38.7s, 預計剩餘: 9.2s

2025-10-29 14:59:06,736 - INFO - 
批次 38: 處理 10000 行, 有效 10000 行, 失敗 0 行


  進度: [█████████████████████████████████░░░░░░░]  82.90% (390000/470467) 已插入: 390000, 失敗: 0, 耗時: 39.6s, 預計剩餘: 8.2s

2025-10-29 14:59:07,629 - INFO - 
批次 39: 處理 10000 行, 有效 10000 行, 失敗 0 行


  進度: [██████████████████████████████████░░░░░░]  85.02% (400000/470467) 已插入: 400000, 失敗: 0, 耗時: 40.9s, 預計剩餘: 7.2s

2025-10-29 14:59:09,004 - INFO - 
批次 40: 處理 10000 行, 有效 10000 行, 失敗 0 行


  進度: [██████████████████████████████████░░░░░░]  87.15% (410000/470467) 已插入: 410000, 失敗: 0, 耗時: 41.8s, 預計剩餘: 6.2s

2025-10-29 14:59:09,911 - INFO - 
批次 41: 處理 10000 行, 有效 10000 行, 失敗 0 行


  進度: [███████████████████████████████████░░░░░]  89.27% (420000/470467) 已插入: 420000, 失敗: 0, 耗時: 42.8s, 預計剩餘: 5.1s

2025-10-29 14:59:10,820 - INFO - 
批次 42: 處理 10000 行, 有效 10000 行, 失敗 0 行


  進度: [████████████████████████████████████░░░░]  91.40% (430000/470467) 已插入: 430000, 失敗: 0, 耗時: 43.8s, 預計剩餘: 4.1s

2025-10-29 14:59:11,904 - INFO - 
批次 43: 處理 10000 行, 有效 10000 行, 失敗 0 行


  進度: [█████████████████████████████████████░░░]  93.52% (440000/470467) 已插入: 440000, 失敗: 0, 耗時: 44.7s, 預計剩餘: 3.1s

2025-10-29 14:59:12,813 - INFO - 
批次 44: 處理 10000 行, 有效 10000 行, 失敗 0 行


  進度: [██████████████████████████████████████░░]  95.65% (450000/470467) 已插入: 450000, 失敗: 0, 耗時: 45.6s, 預計剩餘: 2.1s

2025-10-29 14:59:13,713 - INFO - 
批次 45: 處理 10000 行, 有效 10000 行, 失敗 0 行


  進度: [███████████████████████████████████████░]  97.78% (460000/470467) 已插入: 460000, 失敗: 0, 耗時: 46.7s, 預計剩餘: 1.1s

2025-10-29 14:59:14,813 - INFO - 
批次 46: 處理 10000 行, 有效 10000 行, 失敗 0 行


  進度: [███████████████████████████████████████░]  99.90% (470000/470467) 已插入: 470000, 失敗: 0, 耗時: 47.6s, 預計剩餘: 0.0s

2025-10-29 14:59:15,713 - INFO - 
批次 47: 處理 10000 行, 有效 10000 行, 失敗 0 行


  進度: [████████████████████████████████████████] 100.00% (470467/470467) 已插入: 470467, 失敗: 0, 耗時: 47.8s, 預計剩餘: 0.0s

2025-10-29 14:59:15,854 - INFO - 
批次 48: 處理 467 行, 有效 467 行, 失敗 0 行
2025-10-29 14:59:15,855 - INFO - Excel 文件已關閉
2025-10-29 14:59:16,252 - INFO - DuckDB 連接已關閉，共插入 470467 行
2025-10-29 14:59:16,252 - INFO - 




╔════════════════════════════════════════════════════════════════════╗
║                      導入完成報告                                  ║
╚════════════════════════════════════════════════════════════════════╝

【處理統計】
  總處理行數:       470,467
  成功插入行數:     470,467
  失敗行數:               0
  成功率:            100.0%

【性能指標】
  總耗時:              48.2 秒
  吞吐量:            9755.2 行/秒
  內存占用:      恆定 (批次大小: 10000)

【數據庫統計】
  總記錄數:         470,467
  獨特患者數:       162,116
  檢查室數:              12
  首次檢查:      2018-08-15 11:57:53
  最後檢查:      2025-10-28 22:49:01

【數據庫位置】
  ./medical_exams_streaming.duckdb

【狀態】
  SUCCESS



In [9]:
#     # 保存報告
#     report_file = db_file.replace('.duckdb', '_report.json')
#     with open(report_file, 'w', encoding='utf-8') as f:
#         json.dump(report, f, ensure_ascii=False, indent=2, default=str)
#     logger.info(f"報告已保存: {report_file}")
    
#     # 導入後查詢示例
#     logger.info("\n" + "=" * 70)
#     logger.info("查詢示例")
#     logger.info("=" * 70)
    
#     con = duckdb.connect(db_file)
    
#     # 查詢 1: 按來源統計
#     logger.info("\n【查詢1】按檢查來源統計:")
#     result = con.execute("""
#         SELECT exam_source, COUNT(*) as count
#         FROM medical_examinations_fact
#         GROUP BY exam_source
#         ORDER BY count DESC
#     """).fetchall()
    
#     for source, count in result:
#         print(f"  {source}: {count:,}")
    
#     # 查詢 2: 按儀器統計
#     logger.info("\n【查詢2】按檢查儀器統計:")
#     result = con.execute("""
#         SELECT equipment_type, COUNT(*) as count
#         FROM medical_examinations_fact
#         GROUP BY equipment_type
#         ORDER BY count DESC
#     """).fetchall()
    
#     for equipment, count in result:
#         print(f"  {equipment}: {count:,}")
    
#     con.close()
    
#     logger.info("\n✅ 流式導入完成！")
