In [1]:
from sqlalchemy import create_engine, Column, Integer, String, DateTime, func, UniqueConstraint
from sqlalchemy.orm import sessionmaker, declarative_base
from datetime import datetime, timedelta

In [None]:

database_url="mysql+pymysql://vc_agent:aihuashen%402024@rm-2ze0q808gqplb1tz72o.mysql.rds.aliyuncs.com:3306/digital-life2"
database_url="mysql+pymysql://zxf_root:Zhf4233613%40@rm-2ze0793c6548pxs028o.mysql.rds.aliyuncs.com:3306/serverz"

#TODO 1 缺少对改动数据的同步
#TODO 2 使用逻辑删除
#TODO 3 对于push 的方法, 使用增加版本的方案进行, 而不是在原地上做改动

# --- 数据库配置 ---
# 请根据你的实际情况修改连接字符串
SOURCE_DB_URL = "mysql+pymysql://zxf_root:Zhf4233613%40@rm-2ze0793c6548pxs028o.mysql.rds.aliyuncs.com:3306/serverz"
TARGET_DB_URL = "mysql+pymysql://zxf_root:Zhf4233613%40@rm-2ze0793c6548pxs028o.mysql.rds.aliyuncs.com:3306/server2"

SYNC_TABLE_NAME = 'users'
SYNC_META_TABLE_NAME = 'sync_metadata'
BATCH_SIZE = 1000

# --- SQLAlchemy 模型定义 ---
Base = declarative_base()


class User(Base):
    """源数据库和目标数据库的User表模型"""
    __tablename__ = SYNC_TABLE_NAME
    id = Column(Integer, primary_key=True, autoincrement=True)
    name = Column(String(255))
    email = Column(String(255))
    created_at = Column(DateTime, default=func.now())
    updated_at = Column(DateTime, default=func.now(), onupdate=func.now())

    # 如果目标数据库的User表需要，也可以添加唯一约束
    # __table_args__ = (UniqueConstraint('email'),) 

    def __repr__(self):
        return f"<User(id={self.id}, name='{self.name}', email='{self.email}')>"

class SyncMetadata(Base):
    """用于存储同步元数据的表模型"""
    __tablename__ = SYNC_META_TABLE_NAME
    id = Column(Integer, primary_key=True, autoincrement=True)
    last_sync_time = Column(DateTime, default=datetime(1970, 1, 1))
    table_name = Column(String(255), unique=True)

    def __repr__(self):
        return f"<SyncMetadata(table_name='{self.table_name}', last_sync_time='{self.last_sync_time}')>"

# --- 数据库引擎和会话 ---
source_engine = create_engine(SOURCE_DB_URL, echo=False) # echo=True 会打印所有SQL语句
target_engine = create_engine(TARGET_DB_URL, echo=False)

SourceSession = sessionmaker(bind=source_engine)
TargetSession = sessionmaker(bind=target_engine)

def get_last_sync_time(target_session: TargetSession) -> datetime:
    """从目标数据库获取上次同步时间"""
    metadata_entry = target_session.query(SyncMetadata).filter_by(table_name=SYNC_TABLE_NAME).first()
    if metadata_entry:
        return metadata_entry.last_sync_time
    return datetime(1970, 1, 1) # 默认一个很早的时间

def update_last_sync_time(target_session: TargetSession, new_sync_time: datetime):
    """更新目标数据库的上次同步时间"""
    metadata_entry = target_session.query(SyncMetadata).filter_by(table_name=SYNC_TABLE_NAME).first()
    if metadata_entry:
        metadata_entry.last_sync_time = new_sync_time
    else:
        # 如果不存在，则创建
        new_metadata = SyncMetadata(table_name=SYNC_TABLE_NAME, last_sync_time=new_sync_time)
        target_session.add(new_metadata)
    target_session.commit()
    print(f"Updated last sync time to: {new_sync_time}")

def sync_data():
    """执行数据同步逻辑"""
    source_session = SourceSession()
    target_session = TargetSession()

    try:
        # 确保目标数据库有 sync_metadata 表，仅在第一次运行时可能需要
        # Base.metadata.create_all(target_engine, tables=[SyncMetadata.__table__]) 
        # 如果表已经存在，这行代码是安全的，不会重复创建

        last_sync_time = get_last_sync_time(target_session)
        print(f"Starting sync for '{SYNC_TABLE_NAME}' from: {last_sync_time}")

        processed_count = 0
        current_batch_max_updated_at = last_sync_time
        
        while True:
            # 查询源数据库中自上次同步以来有更新或新增的记录
            # 使用 `User` 模型，它会对应到 source_db.users
            # 注意：如果 source_db 和 target_db 结构完全相同，可以直接使用一个 User 模型
            # 如果不同，需要为每个数据库定义独立的模型或调整查询逻辑
            
            # 使用 filter 过滤 updated_at
            # 使用 order_by 确保有序处理，避免遗漏
            # 使用 limit 限制批次大小
            records_to_sync = source_session.query(User)\
                                             .filter(User.updated_at > last_sync_time)\
                                             .order_by(User.updated_at.asc(), User.id.asc())\
                                             .limit(BATCH_SIZE)\
                                             .all()

            if not records_to_sync:
                break # 没有更多记录了

            # 准备要插入或更新到目标数据库的数据
            for record in records_to_sync:
                # 查找目标数据库中是否存在该ID的记录
                # 这里的 `User` 模型会对应到 target_db.users
                target_user = target_session.query(User).filter_by(id=record.id).first()

                if target_user:
                    # 如果存在，则更新
                    target_user.name = record.name
                    target_user.email = record.email
                    target_user.created_at = record.created_at
                    target_user.updated_at = record.updated_at
                else:
                    # 如果不存在，则添加新记录
                    # 注意：这里需要创建一个新的User实例，而不是直接添加源数据库的record对象
                    new_user = User(
                        id=record.id,
                        name=record.name,
                        email=record.email,
                        created_at=record.created_at,
                        updated_at=record.updated_at
                    )
                    target_session.add(new_user)
                
                # 记录当前批次最大的 updated_at
                if record.updated_at > current_batch_max_updated_at:
                    current_batch_max_updated_at = record.updated_at

            target_session.commit() # 提交当前批次的变更
            processed_count += len(records_to_sync)
            print(f"Processed {len(records_to_sync)} records. Total processed: {processed_count}")

            # 更新 last_sync_time，为确保下次查询从正确的时间点开始，
            # 可以使用当前批次中最大的 updated_at + 1微秒，以避免重复查询同一时间戳的记录。
            # 更严谨的做法是，如果 updated_at 相同，则使用 ID 进行辅助排序，并记录 ID。
            # 为了简化，这里直接使用 updated_at
            last_sync_time = current_batch_max_updated_at + timedelta(microseconds=1) 
            
            if len(records_to_sync) < BATCH_SIZE: # 如果查询到的记录数小于批次大小，说明已经处理完所有符合条件的记录
                break

        if processed_count > 0:
            # 最终更新last_sync_time到数据库，确保记录的是所有已处理记录中最新的一个
            update_last_sync_time(target_session, current_batch_max_updated_at + timedelta(microseconds=1))
        else:
            print("No new records to sync.")

    except Exception as e:
        target_session.rollback() # 出现错误时回滚目标数据库的事务
        print(f"An error occurred during sync: {e}")
    finally:
        source_session.close()
        target_session.close()



In [3]:
if __name__ == "__main__":
    # 第一次运行时可以创建 sync_metadata 表
    Base.metadata.create_all(target_engine, tables=[SyncMetadata.__table__]) 
    sync_data()

Starting sync for 'users' from: 1970-01-01 00:00:00
Processed 1 records. Total processed: 1
Updated last sync time to: 2025-10-20 10:23:01.000001


In [16]:
sync_data()

Starting sync for 'users' from: 2025-10-20 10:40:41
No new records to sync.


In [3]:

Base.metadata.create_all(source_engine)

In [9]:
Base.metadata.create_all(target_engine)


In [4]:
from sqlalchemy.orm import sessionmaker

from contextlib import contextmanager
@contextmanager
def create_session(engine):
    # 5. 创建会话 (Session)
    # Session 是与数据库交互的主要接口，它管理着你的对象和数据库之间的持久化操作
    Session = sessionmaker(bind=engine)
    session = Session()
    try:
        yield session

    except Exception as e:
        print(f"An error occurred: {e}")
        session.rollback() # 发生错误时回滚事务
    finally:
        session.close() # 关闭会话，释放资源




In [None]:

class User(Base):
    """源数据库和目标数据库的User表模型"""
    __tablename__ = SYNC_TABLE_NAME
    id = Column(Integer, primary_key=True, autoincrement=True)
    name = Column(String(255))
    email = Column(String(255))
    created_at = Column(DateTime, default=func.now())
    updated_at = Column(DateTime, default=func.now(), onupdate=func.now())

    # 如果目标数据库的User表需要，也可以添加唯一约束
    # __table_args__ = (UniqueConstraint('email'),) 

    def __repr__(self):
        return f"<User(id={self.id}, name='{self.name}', email='{self.email}')>"


In [5]:
from datetime import datetime

In [14]:
with create_session(source_engine) as session:
            
    prompt1 = User(name = "厕纸1",
                   email = "8223@qq.com",
                   created_at = datetime.now(),
                   updated_at = datetime.now()
                    )

    session.add(prompt1)
    session.commit() # 提交事务，将数据写入数据库
