In [None]:
import typing
from typing import Optional

In [None]:
import sqlalchemy as sa
from devtools import debug
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column, Session
from pydantic import BaseModel, Field, NoneStr
from datetime import datetime, timedelta
from faker import Faker

In [None]:
engine = sa.create_engine("oracle+cx_oracle://jibon:sys123@LOCAL", echo=True)

### DECLARATIVE BASE STYLE 2.0


In [None]:
class Base(DeclarativeBase):
    pass

In [None]:
metadata_obj = sa.MetaData()

In [None]:
class Customer(Base):
    __tablename__ = "customers"

    id: Mapped[int] = mapped_column(primary_key=True)
    name: Mapped[str] = mapped_column(sa.String(30))
    age: Mapped[int] = mapped_column(nullable=True)
    salary: Mapped[str] = mapped_column(sa.String(30))
    address: Mapped[str] = mapped_column(sa.String(80))

    def __repr__(self):
        return f"Customer(id={self.id!r}, name={self.name!r}, salary={self.salary!r})"

#### Reflection Of Existing Table


In [None]:
tbl_coordinator = sa.Table(
    "coordinators",
    metadata_obj,
    sa.Column("id", sa.Integer, primary_key=True),
    autoload_with=engine,
)

tbl_order = sa.Table(
    "demo_orders",
    metadata_obj,
    sa.Column("order_id", sa.Integer, primary_key=True, key="id"),
    sa.Column("customer_id", sa.Integer, sa.ForeignKey(Customer.id)),
    autoload_with=engine,
)

#### Declarative Class of Existing Reflected Tables


In [None]:
class CoordinatorModel(Base):
    __table__ = tbl_coordinator

    def __repr__(self):
        return f"Coordinator(id={self.id!r}, name={self.name!r}, created_by={self.created_by!r})"


class Order(Base):
    __table__ = tbl_order
    # customer_id: Mapped[int] = mapped_column(sa.ForeignKey('customers.id'))
    def __repr__(self):
        return f"Order(id={self.id!r}, user_name={self.user_name!r}, timestamp={self.order_timestamp!r})"

### FAKER DATA


In [None]:
fake = Faker()

In [None]:
class CustomerSchema(BaseModel):
    id: Optional[int] = None
    name: NoneStr = None
    age: Optional[int] = None
    address: NoneStr = None
    salary: Optional[float] = None

In [None]:
class OrderSchema(BaseModel):
    customer_id: Optional[int] = None
    order_total: Optional[float] = None
    order_timestamp: Optional[datetime] = None
    user_name: NoneStr = None
    tags: NoneStr = None

    class Config:
        allow_population_by_field_name = True

In [None]:
def generate_fake_customer(id: int) -> Customer:
    _customer = CustomerSchema(
        id=id,
        name=fake.name(),
        age=fake.random_int(min=18, max=100),
        address=fake.address(),
        salary=fake.random_int(min=10000, max=1000000),
    )
    return Customer(**_customer.dict())


def generate_fake_order(customer_id: int) -> Order:
    _date = fake.date_between(start_date="today", end_date="+1y")
    _order = OrderSchema(
        customer_id=customer_id,
        order_total=fake.random_int(min=100, max=55000),
        order_timestamp=datetime(_date.year, _date.month, _date.day),
        user_name=fake.name(),
        tags=fake.lexify(text="??????????"),
    )
    return Order(**_order.dict())

In [None]:
## GENERATE DUMMY DATA
# customers = [generate_fake_customer(i) for i in range(1001, 20000)]

#### ADDING FAKE DATA TO TABLE


In [None]:
# with sa.orm.Session(bind=engine) as session:
#     session.add_all(customers)
#     session.commit()

In [None]:
# with sa.orm.Session(bind=engine) as session:
#     stmt = sa.select(Customer).order_by(sa.desc(Customer.id))

#     for customer in session.scalars(stmt).all():
#         # debug(key, customer)
#         session.add(generate_fake_order(customer.id))

#     session.commit()
# customers = session.execute(stmt).scalars().all()

# for key, customer in enumerate(customers):
#     debug(generate_fake_order(key, customer.id))

In [None]:
# Normal Selecct
with Session(bind=engine) as session:
    stmt = sa.select(Order).limit(10)
    _customers = session.execute(stmt).scalars().all()

    debug(_customers)

In [None]:
class CustomerOrder(BaseModel):
    order_id: int = Field(alias="id")
    order_amount: float = Field(alias="order_total")
    order_date: datetime = Field(alias="order_timestamp")
    name: typing.Optional[str] = None

    class Config:
        orm_mode = True
        allow_population_by_field_name = True

In [None]:
# Join Select

with Session(bind=engine) as session:
    stmt = (
        sa.select(
            tbl_order,
            Customer,
        )
        .join_from(tbl_order, Customer, isouter=True)
        .where(tbl_order.columns.customer_id == 20)
        .order_by(sa.desc(tbl_order.columns.id))
        .limit(10)
    )

     
    _customer_orders =  [record._asdict() for record in session.execute(stmt).all()]

    debug(_customer_orders)

In [None]:

# %%
db = Session(bind=engine)

In [None]:
from sqlalchemy.inspection import inspect
from sqlalchemy.sql.elements import Cast
from typing import Any, List, cast


filter_params = ["user_name:roberts"]
sorting_params = [{'id': 'asc'}]

query = db.query(Order)

conditions: List[Any]  = [] # type: ignore

# Filtering
for filter in filter_params:
    for key, *params in [filter.split(":", 1)]:
        if len(params):
            if key in inspect(Order).columns.keys():
                conditions.append(
                    sa.cast(Order.__dict__[key], sa.String(30)).ilike("%" + params[0] + "%")
                )
                query = query.filter(sa.or_(*conditions))

# Sorting
for sorting in sorting_params:
    for key, val in sorting.items():
        if key in inspect(Order).columns.keys():
            if val.upper() == 'DESC':
                query = query.order_by(sa.desc(Order.__dict__[key]))
            else:
                query = query.order_by(sa.asc(Order.__dict__[key]))

result = query.offset(30).limit(10).all()

debug(result)

In [None]:
db.close()