In [8]:
import sqlite3
from typing import Literal, cast
from invokeai.app.services.image_records.image_records_common import (
    IMAGE_DTO_COLS,
    ImageRecord,
    deserialize_image_record,
)
from invokeai.app.services.shared.sqlite.sqlite_common import SQLiteDirection


def get_images(
    from_image_name: str | None = None,  # omit for first page
    count: int = 10,
    board_id: str | None = None,
    category: Literal["images", "assets"] = "images",
    starred_first: bool = False,
    order_dir: SQLiteDirection = SQLiteDirection.Descending,
    search_term: str | None = None,
) -> list[ImageRecord]:
    conn = sqlite3.connect("/home/bat/invokeai-4.0.0/databases/invokeai.db")
    conn.row_factory = sqlite3.Row
    cursor = conn.cursor()

    base_query = f"""
    SELECT {IMAGE_DTO_COLS}
    FROM images
    LEFT JOIN board_images ON board_images.image_name = images.image_name
    WHERE images.is_intermediate = FALSE
    """
    params: list[int | str | bool] = []

    if board_id:
        base_query += """
        AND board_images.board_id = ?
        """
        params.append(board_id)
    else:
        base_query += """
        AND board_images.board_id IS NULL
        """

    if category == "images":
        base_query += """
        AND images.image_category = 'general'
        """
    elif category == "assets":
        base_query += """
        AND images.image_category IN ('control', 'mask', 'user', 'other')
        """
    else:
        raise ValueError(f"Invalid category: {category}")

    if search_term:
        base_query += """
        AND images.metadata LIKE ?
        """
        params.append(f"%{search_term.lower()}%")

    if from_image_name:
        # Use keyset pagination to get the next page of results

        # This uses `<` so that the cursor image is NOT included in the results - only images after it
        if starred_first:
            keyset_query = f"""
                WITH image_keyset AS (
                SELECT created_at,
                        image_name,
                        starred
                FROM images
                WHERE image_name = ?
                )
                {base_query}
                AND (images.starred, images.created_at, images.image_name) < ((SELECT starred FROM image_keyset), (SELECT created_at FROM image_keyset), (SELECT image_name FROM image_keyset))
            """
        else:
            keyset_query = f"""
                WITH image_keyset AS (
                SELECT created_at,
                        image_name
                FROM images
                WHERE image_name = ?
                )
                {base_query}
                AND (images.created_at, images.image_name) < ((SELECT created_at FROM image_keyset), (SELECT image_name FROM image_keyset))
            """

        # This uses `<=` so that the cursor image IS included in the results
        # if starred_first:
        #     keyset_query = f"""
        #         WITH image_keyset AS (
        #         SELECT created_at,
        #                 image_name,
        #                 starred
        #         FROM images
        #         WHERE image_name = ?
        #         )
        #         {base_query}
        #         AND (images.starred, images.created_at, images.image_name) <= ((SELECT starred FROM image_keyset), (SELECT created_at FROM image_keyset), (SELECT image_name FROM image_keyset))
        #     """
        # else:
        #     keyset_query = f"""
        #         WITH image_keyset AS (
        #         SELECT created_at,
        #                 image_name
        #         FROM images
        #         WHERE image_name = ?
        #         )
        #         {base_query}
        #         AND (images.created_at, images.image_name) <= ((SELECT created_at FROM image_keyset), (SELECT image_name FROM image_keyset))
        #     """
        base_query = keyset_query
        params.append(from_image_name)

    if starred_first:
        order_by_clause = f"""
        ORDER BY images.starred DESC, images.created_at {order_dir.value}, images.image_name {order_dir.value}
        """
    else:
        order_by_clause = f"""
        ORDER BY images.created_at {order_dir.value}, images.image_name {order_dir.value}
        """

    final_query = f"""
    {base_query}
    {order_by_clause}
    LIMIT ?;
    """
    params.append(count)

    cursor.execute(final_query, tuple(params))
    result = cast(list[sqlite3.Row], cursor.fetchall())
    images = [deserialize_image_record(dict(r)) for r in result]

    return images


kwargs = {"starred_first": False}

images = get_images(**kwargs)
print("first query")
for image in images:
    print(image.image_name, image.starred)

print("\nnext query, starting from the second image")
images_2 = get_images(from_image_name=images[0].image_name, **kwargs)
for image in images_2:
    print(image.image_name, image.starred)

first query
36e62fec-5c3a-4b28-867b-9029fb6d2319.png False
c7f4f4b8-7ce6-4594-abf6-3f5e13fb7fe9.png False
d8f57fda-5084-4d87-8668-06fb300282e4.png False
a2fd7b8b-bbe5-4629-9d46-000f99b64931.png False
c0880bc1-5f7a-452b-acea-53a261f4c0c4.png False
0ad957df-c341-48e3-b384-f656985c2722.png False
8c788d82-c81c-4ffe-bf6b-bdad601c5add.png False
9b1179a0-09a0-4430-918d-60b618ff040c.png False
c8ad6a32-75db-4d8b-a865-066365fa1563.png False
e5eb1c19-8c69-4d29-a447-fbc2d649334a.png False

next query, starting from the second image
36e62fec-5c3a-4b28-867b-9029fb6d2319.png False
c7f4f4b8-7ce6-4594-abf6-3f5e13fb7fe9.png False
d8f57fda-5084-4d87-8668-06fb300282e4.png False
a2fd7b8b-bbe5-4629-9d46-000f99b64931.png False
c0880bc1-5f7a-452b-acea-53a261f4c0c4.png False
0ad957df-c341-48e3-b384-f656985c2722.png False
8c788d82-c81c-4ffe-bf6b-bdad601c5add.png False
9b1179a0-09a0-4430-918d-60b618ff040c.png False
c8ad6a32-75db-4d8b-a865-066365fa1563.png False
e5eb1c19-8c69-4d29-a447-fbc2d649334a.png False
