In [None]:
import sqlite3

# 使用上下文管理器自动处理连接和事务提交/回滚
with sqlite3.connect('example.db') as conn:
    cursor = conn.cursor()

    # 创建表
    cursor.execute("""
        CREATE TABLE IF NOT EXISTS users (
            id INTEGER PRIMARY KEY,
            name TEXT
        )
    """)

    # 插入数据（推荐使用参数化方式）
    names = [('Alice',), ('Bob',), ('Charlie',), ('David',), ('Eve',)]
    cursor.executemany("INSERT INTO users (name) VALUES (?)", names)

    # 执行查询并获取所有数据
    cursor.execute("SELECT * FROM users")
    rows = cursor.fetchall()

    # 可选：打印数据验证是否写入成功
    print("当前数据库中的用户：")
    for row in rows:
        print(row)

In [None]:
import sqlite3
with sqlite3.connect('example.db') as conn:
    cursor = conn.cursor()
    cursor.execute("SELECT * FROM users")
    # cursor.fetchall()
    for row in cursor.fetchall():
        print(row)

In [None]:
import sqlite3

size = 10
idx = 1
db_path = r"c:\火车采集器V10.27\Data\233\SpiderResult.db3"
table = "Content"
with sqlite3.connect(db_path) as conn:
    cursor = conn.cursor()

    # 每次读取前重新执行查询，确保游标位于结果集开头
    # cursor.execute()方法返回的是游标对象(执行后，cursor 对象会包含查询的结果集)
    cursor.execute(f"SELECT * FROM {table}")

    while True:
        rows = cursor.fetchmany(size)
        if not rows:
            print("所有数据已读取完毕。")
            break

        print(f"读取第 {idx} 批次数据:")
        print(f"Batch : {idx}")
        for row in rows:
            print(row)
        idx += 1
cursor.close()
conn.close()


In [None]:
import sqlite3
import time

size = 10
idx = 1
db_path = r"c:\火车采集器V10.27\Data\250\SpiderResult.db3"
table = "Content"
with sqlite3.connect(db_path) as conn:
    cursor = conn.cursor()

    # 每次读取前重新执行查询，确保游标位于结果集开头
    # cursor.execute()方法返回的是游标对象(执行后，cursor 对象会包含查询的结果集)
    cursor.execute(f"SELECT * FROM {table}")

    while True:
        rows = cursor.fetchmany(size)
        if not rows:
            print("所有数据已读取完毕。")
            break

        print(f"读取第 {idx} 批次数据:")
        print(f"Batch : {idx}")
        for row in rows:
            print(row)
        idx += 1
        time.sleep(1)

cursor.close()
conn.close()

In [None]:
# 方案：采用limit+offset的方式分批查询
import sqlite3
import time

db=r'c:\火车采集器V10.27\Data\248\SpiderResult.db3'
table='Content'
conn = sqlite3.connect(db)
cursor = conn.cursor()

batch_size = 100  # 每次读取的记录数
offset = 0

while True:
    query = f"SELECT * FROM {table} LIMIT {batch_size} OFFSET {offset}"
    cursor.execute(query)
    rows = cursor.fetchall()
    
    if not rows:
        break  # 所有数据已读完
    
    # 处理当前 batch 的数据
    for row in rows:
        # 处理每一行
        # pass
        print(row)
    
    offset += batch_size
    time.sleep(1)

cursor.close()
conn.close()