# 导入必要的库
导入需要的库，包括SQLAlchemy和Alembic。

In [None]:
import enum
from dataclasses import Field as DataclassField
from datetime import datetime
from typing import (
    Any,
    ClassVar,
    cast,
    get_args,
    get_origin,
    get_type_hints,
)

import pendulum
import sqlalchemy as sa
from sqlalchemy import DateTime, String, func
from sqlalchemy.orm import (
    DeclarativeBase,
    Mapped,
    MappedAsDataclass,
    mapped_column,
)

# from sqlalchemy.sql.type_api import TypeEngine
from sqlalchemy.util.typing import (
    de_optionalize_union_types,
    expand_unions,
    flatten_newtype,
    is_newtype,
    make_union_type,
)


class Mixin(MappedAsDataclass):
    created_at: Mapped[datetime] = mapped_column(
        DateTime(timezone=True),
        nullable=False,
        server_default=func.now(),
        default_factory=pendulum.now,  # 可以不指明时区，但需要运行环境默认时区正确
    )
    updated_at: Mapped[datetime] = mapped_column(
        DateTime(timezone=True),
        nullable=False,
        server_default=func.now(),
        onupdate=pendulum.now,
        default_factory=pendulum.now,  # 可以不指明时区，但需要运行环境默认时区正确
    )


class Base(MappedAsDataclass, DeclarativeBase):
    # Cite from old version docs [https://docs.sqlalchemy.org/en/14/orm/extensions/asyncio.html#preventing-implicit-io-when-using-asyncsession]:
    #     The Column.server_default value on the XXX column will not be refreshed by default after an INSERT;
    #     instead, it is normally expired so that it can be loaded when needed.
    #
    #     Similar behavior applies to a column where the Column.default parameter is assigned to a SQL expression object.
    #
    #     To access this value with asyncio, it has to be refreshed within the flush process,
    #     which is achieved by setting the mapper.eager_defaults parameter on the mapping
    #
    # Also see:
    #     https://docs.sqlalchemy.org/en/20/orm/persistence_techniques.html#orm-server-defaults
    #     https://docs.sqlalchemy.org/en/20/orm/mapping_api.html#sqlalchemy.orm.Mapper.params.eager_defaults
    __mapper_args__ = {"eager_defaults": True}

    type_annotation_map: ClassVar[dict[Any, TypeEngine[Any]]] = {
        enum.Enum: sa.Enum(
            enum.Enum,
            native_enum=False,
            # By default it uses the length of the longest value.
            # The Enum.length parameter is used ** unconditionally ** for VARCHAR rendering
            # regardless of the Enum.native_enum parameter
            length=255,
        ),
        datetime: sa.DateTime(timezone=True),
    }

    def __init_subclass__(cls, *args: Any, **kwargs: Any) -> None:
        # scan all field declarations to register NewType
        type_annotations = get_type_hints(cls)

        for orig_ann in type_annotations.values():
            if get_origin(orig_ann) is not Mapped:
                continue

            ann_list: type = get_args(orig_ann)[0]

            for ann in expand_unions(de_optionalize_union_types(ann_list)):
                if not is_newtype(ann):
                    continue

                if ann in cls.type_annotation_map:
                    continue

                ann_supertype = flatten_newtype(ann)
                sa_type = cls.registry._resolve_type(  # pyright: ignore[reportPrivateUsage]  # noqa: SLF001
                    ann_supertype
                )
                if sa_type is None:
                    raise TypeError(
                        f"Type {ann_supertype} (from NewType {ann}) is not supported by SQLAlchemy."
                    )

                cls.registry.update_type_annotation_map({ann: sa_type})

        # If you use "from __future__ import annotations",
        # or use "Mapped['Xxx']" (Notice the inside quotes),
        # dataclasses.fields() will return ForwardRef.
        #
        # But pydantic currently cannot handle ForwardRef correctly
        # if you want to use models inside the BaseModel,
        # see https://github.com/pydantic/pydantic/issues/6849.
        #
        # So here writes back these dereferenced annotations
        # to make pydantic happy.

        # let SQLAlchemy to collect dataclass fields
        super().__init_subclass__(*args, **kwargs)

        # type_annotations is calculated by get_type_hints()
        # who tries its best to de-ForwardRef
        for field_name, orig_ann in type_annotations.items():
            if field_name not in cls.__dataclass_fields__:
                continue

            if get_origin(orig_ann) is Mapped:
                # get_args(Mapped[???]) == ???
                orig_ann = make_union_type(get_args(orig_ann)[0])  # noqa: PLW2901

            cast(DataclassField[Any], cls.__dataclass_fields__[field_name]).type = orig_ann


class User(Base, Mixin):
    __tablename__ = "sys_user"

    id: Mapped[int] = mapped_column(String(50), primary_key=True)
    username: Mapped[str] = mapped_column(String(50))
    email: Mapped[str] = mapped_column()

In [2]:
from datetime import datetime

import pendulum
from sqlalchemy import DateTime, ForeignKey, String, func
from sqlalchemy.orm import DeclarativeBase, Mapped, MappedAsDataclass, mapped_column, relationship


class TimeMixin(MappedAsDataclass):
    create_user: Mapped[int] = mapped_column()
    update_user: Mapped[int | None] = mapped_column(default=None, init=False)
    # created_at: Mapped[datetime] = mapped_column(
    #     DateTime(timezone=True),
    #     nullable=False,
    #     server_default=func.now(),
    #     default_factory=pendulum.now,  # 可以不指明时区，但需要运行环境默认时区正确
    # )
    # updated_at: Mapped[datetime] = mapped_column(
    #     DateTime(timezone=True),
    #     nullable=False,
    #     server_default=func.now(),
    #     onupdate=pendulum.now,
    #     default_factory=pendulum.now,  # 可以不指明时区，但需要运行环境默认时区正确
    # )


class Base(DeclarativeBase):
    pass


class User(Base):
    __tablename__ = "user_account"
    id: Mapped[int] = mapped_column(primary_key=True)
    name: Mapped[str] = mapped_column(String(30))
    fullname: Mapped[str | None] = mapped_column(String(30), default=None)
    addresses: Mapped[list["Address"]] = relationship(
        back_populates="user", cascade="all, delete-orphan"
    )

    def __repr__(self) -> str:
        return f"User(id={self.id!r}, name={self.name!r}, fullname={self.fullname!r})"


class Address(Base):
    __tablename__ = "address"
    id: Mapped[int] = mapped_column(primary_key=True)
    email_address: Mapped[str]
    user_id: Mapped[int] = mapped_column(ForeignKey("user_account.id"))
    user: Mapped["User"] = relationship(back_populates="addresses")

    def __repr__(self) -> str:
        return f"Address(id={self.id!r}, email_address={self.email_address!r})"

In [3]:
from sqlalchemy import create_engine

engine = create_engine("mysql+pymysql://root:barn@localhost:3306/test_db", echo=True)

In [4]:
Base.metadata.create_all(engine)

2025-02-24 16:25:34,774 INFO sqlalchemy.engine.Engine SELECT DATABASE()
2025-02-24 16:25:34,775 INFO sqlalchemy.engine.Engine [raw sql] {}
2025-02-24 16:25:34,777 INFO sqlalchemy.engine.Engine SELECT @@sql_mode
2025-02-24 16:25:34,778 INFO sqlalchemy.engine.Engine [raw sql] {}
2025-02-24 16:25:34,780 INFO sqlalchemy.engine.Engine SELECT @@lower_case_table_names
2025-02-24 16:25:34,780 INFO sqlalchemy.engine.Engine [raw sql] {}
2025-02-24 16:25:34,783 INFO sqlalchemy.engine.Engine BEGIN (implicit)
2025-02-24 16:25:34,783 INFO sqlalchemy.engine.Engine DESCRIBE `test_db`.`user_account`
2025-02-24 16:25:34,784 INFO sqlalchemy.engine.Engine [raw sql] {}
2025-02-24 16:25:34,775 INFO sqlalchemy.engine.Engine [raw sql] {}
2025-02-24 16:25:34,777 INFO sqlalchemy.engine.Engine SELECT @@sql_mode
2025-02-24 16:25:34,778 INFO sqlalchemy.engine.Engine [raw sql] {}
2025-02-24 16:25:34,780 INFO sqlalchemy.engine.Engine SELECT @@lower_case_table_names
2025-02-24 16:25:34,780 INFO sqlalchemy.engine.Engi

In [None]:
import sqlalchemy as sa
from sqlalchemy.orm import Session

with Session(engine) as session:
    # xxx = session.execute(sa.select(User)).scalars().all()
    # for x in xxx:
    #     print(x)
    spongebob = User(
        name="spongebob",
        fullname="Spongebob Squarepants",
        addresses=[Address(email_address="spongebob@sqlalchemy.org")],
    )
    sandy = User(
        name="sandy",
        fullname="Sandy Cheeks",
        addresses=[
            Address(email_address="sandy@sqlalchemy.org"),
            Address(email_address="sandy@squirrelpower.org"),
        ],
    )
    patrick = User(name="patrick", fullname="Patrick Star")
    session.add_all([spongebob, sandy, patrick])
    session.commit()

In [42]:
import sqlalchemy as sa
from sqlalchemy.orm import Session

with Session(engine) as session:
    xxx = (
        session.execute(sa.select(User).where(User.name.like(r"%\%%", escape="\\"))).scalars().all()
    )
    for x in xxx:
        print(x)

2025-02-24 16:36:34,218 INFO sqlalchemy.engine.Engine BEGIN (implicit)
2025-02-24 16:36:34,221 INFO sqlalchemy.engine.Engine SELECT user_account.id, user_account.name, user_account.fullname 
FROM user_account 
WHERE user_account.name LIKE %(name_1)s ESCAPE '\\'
2025-02-24 16:36:34,223 INFO sqlalchemy.engine.Engine [cached since 16.81s ago] {'name_1': '%\\%%'}
User(id=4, name='spong%ebob', fullname='Spongebob Squarepants')
2025-02-24 16:36:34,230 INFO sqlalchemy.engine.Engine ROLLBACK


  xxx = session.execute(sa.select(User).where(User.name.like("%\%%", escape="\\"))).scalars().all()
