In [None]:
# 读取excel文件并创建sqlite数据库
import pandas as pd
import re
from sqlalchemy import create_engine, types
from sqlalchemy.types import Integer, Float, DateTime, Text

def clean_identifier(name):
    """清理表名和列名中的非法字符"""
    # 替换非字母数字字符为下划线，并去除首尾下划线
    return re.sub(r'[^a-zA-Z0-9_]', '_', name).strip('_')

def excel_to_sqlite(excel_path, db_path):
    # 创建数据库引擎
    engine = create_engine(f'sqlite:///{db_path}')
    
    # 读取所有 Sheet 页
    sheets = pd.read_excel(excel_path, sheet_name=None)
    
    for sheet_name, df in sheets.items():
        # 清理表名
        table_name = clean_identifier(sheet_name)
        
        # 清理列名
        df.columns = [clean_identifier(col) for col in df.columns]
        
        # 自动检测数据类型
        type_mapping = {}
        for column in df.columns:
            dtype = df[column].dtype
            
            if pd.api.types.is_integer_dtype(dtype):
                type_mapping[column] = Integer()
            elif pd.api.types.is_float_dtype(dtype):
                type_mapping[column] = Float()
            elif pd.api.types.is_datetime64_any_dtype(dtype):
                type_mapping[column] = DateTime()
            else:
                # 处理混合类型或文本类型
                type_mapping[column] = Text()
        
        # 写入数据库
        df.to_sql(
            name=table_name,
            con=engine,
            index=False,
            if_exists='replace',
            dtype=type_mapping
        )
        print(f'表 {table_name} 创建成功，包含 {len(df.columns)} 列')


excel_to_sqlite(
    excel_path='Excel.xlsx',  # 替换为你的 Excel 文件路径
    db_path='dts.db'          # 指定输出的 SQLite 数据库路径
)

In [None]:
# 函数：查询SQLite数据库
import sqlite3
from tabulate import tabulate

def query_sqlite(db_path, sql):
    """执行SQL查询并返回格式化结果"""
    try:
        # 连接数据库
        conn = sqlite3.connect(db_path)
        cursor = conn.cursor()
        
        # 执行查询
        cursor.execute(sql)
        
        # 获取结果
        results = cursor.fetchall()
        columns = [desc[0] for desc in cursor.description]
        
        # 格式化输出
        if results:
            print(f"找到 {len(results)} 条记录\n")
            print(tabulate(results, headers=columns, tablefmt="grid"))
        else:
            print("查询成功，但未找到匹配记录")
            
    except sqlite3.Error as e:
        print(f"数据库错误：{str(e)}")
    except Exception as e:
        print(f"运行时错误：{str(e)}")
    finally:
        if conn:
            conn.close()

In [None]:
# SQL 查询每个人创建的问题数量
query_sqlite('dts.db', "SELECT creator, count(1) as dts_number FROM Sheet1 GROUP BY creator ORDER BY dts_number DESC")

In [None]:
# SQL 查询不同阶段的问题个数
query_sqlite('dts.db', "SELECT stage, count(1) FROM Sheet1 GROUP BY stage")