In [1]:
from pybatchintory import configure
from pybatchintory import config as cfg
from pybatchintory import sql
from pybatchintory import main

In [19]:
import sqlite3

In [22]:
sqlite3.sqlite_version_info

(3, 37, 2)

In [14]:
from pydantic import BaseModel

class A(BaseModel):
    a: str
    b: int
    c: int = 1

In [15]:
a = A(a="asd", b=3)

In [17]:
a.dict()

{'a': 'asd', 'b': 3, 'c': 1}

In [2]:
import sqlalchemy as sa
import pandas as pd

In [3]:
engine_inventory = sa.create_engine('sqlite:///inventory.db')
engine_meta = sa.create_engine('sqlite:///meta.db')

In [4]:
with engine_inventory.begin() as conn:
    conn.execute(sa.text("attach 'test_inventory.db' as test_pybatchintory"))
    
with engine_meta.begin() as conn:
    conn.execute(sa.text("attach 'test_meta.db' as test_pybatchintory"))

In [5]:
metadata = sa.MetaData()

sql.models.generate_inventory_table(
    name="test_inventory",
    metadata=metadata,
    schema="test_pybatchintory"
)

metadata.create_all(bind=engine_inventory, checkfirst=True)

In [6]:
df_meta = pd.DataFrame({
    "id":range(10),
    "file":[f"f{x}" for x in range(10)],
    "size":range(0, 20, 2)
})

df_meta.to_sql("test_meta", con=engine_meta, schema="test_pybatchintory", if_exists="replace", index=False)

10

In [8]:
df_inventory = pd.DataFrame({
    "id": [1, 2],
    "meta_table": ["test_meta", "test_meta"],
    "job": ["j1", "j2"],
    "job_identifier": ["j1_1", "j2_1"],
    "job_result": ["{file: 1}", "{file: 2}"],
    "processing_start": [pd.Timestamp("2023-01-01 10:00:00"),
                         pd.Timestamp("2023-01-01 12:00:00")],
    "processing_end": [pd.Timestamp("2023-01-01 11:00:00"),
                       pd.Timestamp("2023-01-01 13:00:00")],
    "meta_id_start": [0, 5],
    "meta_id_end": [4, 8],
    "weight": [8, 23],
    "count": [5, 4],
    "attempt": [1, 1],
    "status": ["succeeded", "running"]
})

df_inventory.to_sql("test_inventory", con=engine_inventory, if_exists="append", schema="test_pybatchintory", index=False)

2

In [9]:
configure(
    settings=dict(
        INVENTORY_CONN="sqlite:///inventory.db",
        INVENTORY_CONN_SCHEMA="test_pybatchintory",
        INVENTORY_TABLE_NAME="test_inventory",
        META_CONN="sqlite:///meta.db",
        META_CONN_SCHEMA="test_pybatchintory",
        META_TABLE_NAME="test_meta",
        META_COLS={"uid": "id", "file": "file", "weight": "size"},
    ),
    initialize_db=False
)

sql.db = sql.initialize(engine_inventory=engine_inventory,
                        engine_meta=engine_meta)

In [10]:
t_inventory = sql.db.table_inventory

values = {"job": "j3",
          "meta_table": "test_meta",
          "meta_id_start": 3,
          "meta_id_end": 7,
          "count": 5}

stmt = sa.insert(t_inventory).values(values)

In [11]:
with sql.db.engine_inventory.connect() as conn:
    res = conn.execute(stmt)
    conn.commit()

In [12]:
res.inserted_primary_key[0]

3

In [None]:
res.

In [9]:
main.acquire_batch(job="j1", id_min=12, batch_count=2, batch_weight=1000)

max_meta_id_from_inventory: 4
max_meta_id_from_meta: 9
User provided `id_min` (12) is greater than max id from meta table (9).


In [6]:
sql.crud.get_meta_id_range_from_meta(id_min=0, weight=6, count=2)

ItemIdRange(id_min=0, id_max=1, count=2, weight=2.0)

In [8]:
sql.crud.get_max_meta_id_from_backend("test_meta", "j1")

4

In [11]:
main.get_meta_id_range(meta_table="test_meta", job="j1")

ItemIdRange(id_min=5, id_max=9, weight=5.0)

In [2]:
import pandas as pd

from sqlalchemy import create_engine

from sqlalchemy import select, func, and_, column
from typing import Optional
from pydantic import BaseModel

In [3]:
class ItemIdRange(BaseModel):
    """Resembles the computed item id range which 
    constitutes a batch of data items.
    
    """
    
    id_min: int
    id_max: int
    weight: float
        
    class Config:
        orm_mode=True

In [5]:
def get_max_item_id(job: str, conn) -> int:
    """Given a job identifier, retrieve the highest item id that
    has been previously processed.
    
    """
    
    inventory = sql.db.table_inventory

    max_val = func.max(inventory.c.range_id_end)
    value_or_zero = func.coalesce(max_val, 0)
    where = inventory.c.job_identifier == job

    stmt = select(value_or_zero).where(where)
    return conn.execute(stmt).scalar()

In [3]:
with sql.db.engine_backend.connect() as conn:
    print(get_max_item_id("asd", conn))

NameError: name 'get_max_item_id' is not defined

In [4]:
conn

<sqlalchemy.engine.base.Connection at 0x25a5d1aa1a0>

In [23]:
def get_id_range_from_meta(
    conn,
    range_id_min: int = 0,
    range_id_max: Optional[int] = None,
    weight: Optional[int] = None,
):
    """Compute the min and max id values given provided parameters
    to constitute a batch of data items.
    
    """

    if cfg.settings.META_COLS.weight and weight:
        item_id_range = _get_item_id_range_per_weight(
            conn=conn,
            range_id_min=range_id_min,
            range_id_max=range_id_max,
            weight=weight,
        )
        
    else:
        item_id_range = _get_item_id_range_per_row_count(
            conn=conn,
            range_id_min=range_id_min,
            range_id_max=range_id_max,
            weight=weight,
        )

    return item_id_range


def _get_item_id_range_per_weight(
    conn,
    range_id_min: int,
    range_id_max: Optional[int] = None,
    weight: Optional[int] = None,
):
    
    t_meta = sql.db.table_meta
    c_id = t_meta.c[cfg.settings.META_COLS.uid]
    c_weight = t_meta.c[cfg.settings.META_COLS.weight]

    cumsum = func.sum(c_weight).over(order_by=c_id).label("cumsum")
    where_clauses = [c_id >= range_id_min]
    if range_id_max:
        where.append(c_id < range_id_max)
    where = and_(*where_clauses)

    cte = select(c_id.label("id"), cumsum).where(where).order_by(c_id).cte("cumsum")

    main_query = (
        select(
            func.min(cte.c.id).label("id_min"),
            func.max(cte.c.id).label("id_max"),
            func.max(cte.c.cumsum).label("weight"),
        )
        .where(cte.c.cumsum <= weight)
        .select_from(cte)
    )

    return ItemIdRange.from_orm(conn.execute(main_query).fetchone())


def _get_item_id_range_per_row_count(
    conn,
    range_id_min: int,
    range_id_max: Optional[int] = None,
    weight: Optional[int] = None,
):
    rank = func.rank().over(order_by=items.c.id).label("rank")
    where_clauses = [items.c.id >= range_id_min]
    if range_id_max:
        where.append(items.c.id < range_id_max)
    where = and_(*where_clauses)

    cte = select(items.c.id, rank).where(where).order_by(items.c.id).cte("rank")

    main_query = select(
            func.min(cte.c.id).label("id_min"),
            func.max(cte.c.id).label("id_max"),
            func.count().label("weight"),
        )
    
    if weight:
        main_query = main_query.where(cte.c.rank <= weight)
    
    main_query = main_query.select_from(cte)
    
    return ItemIdRange.from_orm(conn.execute(main_query).fetchone())

In [24]:
with sql.db.engine_meta.connect() as conn:
    print(get_item_id_range(conn, weight=6))

id_min=0 id_max=3 weight=6.0


In [5]:
engine_backend = create_engine('sqlite:///backend.db')
engine_items = create_engine('sqlite:///meta.db')

In [9]:
df_meta = pd.DataFrame({
    "id":range(10),
    "file":[f"f{x}" for x in range(10)],
    "size":range(0, 20, 2),
    "import_ts":pd.date_range(start="2023-01-01", periods=10, freq="1d")
})

NameError: name 'pd' is not defined

In [7]:
df_meta.to_sql("meta_data", engine_items, index=False, if_exists="replace")

10

In [15]:
from sqlalchemy import Table, Column, Integer, String, DateTime, Enum, Text, JSON, MetaData

metadata_backend = MetaData()
metadata_items = MetaData()

inventory = Table('inventory', metadata_backend,
    Column('id', Integer, primary_key=True, autoincrement=True),
    Column('job_identifier', String, nullable=False),
    Column('job_result', JSON),
    Column('processing_start', DateTime, nullable=False),
    Column('processing_end', DateTime),
    Column('range_id_start', Integer, nullable=False),
    Column('range_id_end', Integer, nullable=False),
    Column('weight', Integer, nullable=False),
    Column('attempt', Integer, nullable=False),
    Column('status', Enum('running', 'succeeded', 'failed'), nullable=False)
)

inventory_logs = Table('inventory_logs', metadata_backend,
    Column('id', Integer, primary_key=True),
    Column('attempt', Integer, primary_key=True),
    Column('processing_start', DateTime, nullable=False),
    Column('processing_end', DateTime),
    Column('status', Enum('running', 'succeeded', 'failed'), nullable=False),
    Column('logging', String),
    Column('config', JSON)
)

items = Table('items', metadata_items,
    Column('id', Integer, primary_key=True, autoincrement=True),
    Column('file', Text),
    Column('size', Integer),
    Column('imported_ts', DateTime)
)

#items.drop(bind=engine_items)

In [16]:
t = Table("items", MetaData(), autoload_with=engine_items)

In [17]:
t

Table('items', MetaData(), Column('id', BIGINT(), table=<items>), Column('file', TEXT(), table=<items>), Column('size', BIGINT(), table=<items>), Column('import_ts', DATETIME(), table=<items>), schema=None)

In [8]:
inventory.c.id

Column('id', Integer(), table=<inventory>, primary_key=True, nullable=False)

In [6]:
type(MetaData)

type

In [7]:
#metadata_backend.drop_all(bind=engine_backend, checkfirst=True)
#metadata_items.drop_all(bind=engine_items, checkfirst=True)

metadata_backend.create_all(bind=engine_backend, checkfirst=True)
metadata_items.create_all(bind=engine_items, checkfirst=True)

In [27]:
with engine_items.connect() as conn:
    r = get_item_id_range(conn, weight=5, weight_col="size")
    print(r)

id_min=0 id_max=2 weight=3.0


In [129]:
with engine_items.connect() as conn:
    r = get_item_id_range(conn, weight_col="size", weight=10)
    print(r.fetchone()._asdict())

{'id_min': 2, 'id_max': 4, 'weight': 9}


In [89]:
weight_col = "size"

cumulated_sum = select(
    items.c.id,
    func.sum(items.c[weight_col]).over(order_by=items.c.id).label('cumsum')
).where(
    and_(items.c.id > 1, items.c.id < 10)
).cte('cumulated_sum')

In [105]:
main_query = select(
    func.max(cumulated_sum.c.id).label('id_max'),
    func.min(cumulated_sum.c.id).label('id_min'),
    func.max(cumulated_sum.c.cumsum).label('weight')
).where(
    cumulated_sum.c.cumsum < 1
).select_from(
    cumulated_sum
)

In [106]:
with engine_items.connect() as conn:
    print(conn.execute(main_query).all())

[(None, None, None)]


In [None]:
Please translate the following SQL into sqlalchemy core statements:

"""
with cumulated_sum as (
    SELECT 
        id, sum(weight_col) over (order by id) as cumsum 
    FROM 
        items
    WHERE
        id > 1 AND
        id < 10
    )

SELECT 
    max(id) as id_max, 
    min(id) as id_min, 
    max(cumsum) as weight
FROM 
    cumulated_sum
WHERE 
    cumsum / 3 = 0

"""

In [69]:
with engine_backend.connect() as conn:
    max_item_id = get_max_item_id("dummy", conn)

In [70]:
max_item_id

0