In [2]:
import aiosqlite
from pathlib import Path
DB_VERSION = 2
DB_PATH = Path(f'/Users/karlkim/.chia/simulator/hints/db/blockchain_v2_simulator0.sqlite')

In [3]:
from chia.consensus.block_record import BlockRecord
query = f"""SELECT block_record FROM full_blocks"""
async with aiosqlite.connect(DB_PATH) as db:
    async with db.execute(query) as cursor:
        async for row in cursor:
            block_record = BlockRecord.from_bytes(row[0])
            print(block_record.header_hash.hex())

In [4]:
# decorator
def with_db_connection(f):
    async def with_connection(*args, **kwargs):
        db = await aiosqlite.connect(DB_PATH)
        try:
            rv = await f(db, *args, **kwargs)
        except Exception:
            await db.rollback()
            raise
        else:
            await db.commit()  # or maybe not
        finally:
            await db.close()

        return rv

    return with_connection

In [5]:
@with_db_connection
async def get_height_async(db) -> Optional[uint32]:
    query_peak = "SELECT hash FROM current_peak WHERE key = 0"
    async with db.execute(query_peak) as cursor:
        peak_row = await cursor.fetchone()
        if peak_row is None:
            return None

    query_height = "SELECT height FROM full_blocks WHERE header_hash=?"
    async with db.execute(query_height, (peak_row[0],)) as cursor:
        height = await cursor.fetchone()
        if height is not None:
            return uint32(height[0])
        else:
            return None
await get_height_async()

In [6]:
import zstd

@with_db_connection
async def get_block_record_by_height_async(
    db, height: uint32
) -> Optional[BlockRecord]:
    query = "SELECT block_record FROM full_blocks WHERE height=?"
    async with db.execute(query, (height,)) as cursor:
        row = await cursor.fetchone()
        if row is not None:
            block_record = BlockRecord.from_bytes(row[0])
            return block_record
        else:
            return None

@with_db_connection
async def get_block_bytes_by_height_async(
    db, height: uint32
) -> Optional[str]:
    query = f"SELECT block FROM full_blocks WHERE height=?"
    async with db.execute(query, (height,)) as cursor:
        row = await cursor.fetchone()
        if row is not None:
            block_bytes = zstd.decompress(row[0])
            return block_bytes
        else:
            return None

async def get_block_by_height_async(height: uint32) -> Optional[FullBlock]:
    block_bytes = await get_block_bytes_by_height_async(height)
    if block_bytes is None:
        return None
    block = FullBlock.from_bytes(block_bytes)
    return block

def get_coin_record(row) -> CoinRecord:
    return CoinRecord(
        Coin(row[4], row[3], uint64.from_bytes(row[5])), row[0], row[1], row[2], row[6]
    )

@with_db_connection
async def get_coin_record_by_name_async(db, name: str):
    query = f"""SELECT confirmed_index, spent_index, coinbase, puzzle_hash, 
    coin_parent, amount, timestamp FROM coin_record WHERE coin_name=?"""

    async with db.execute(query, (bytes32.from_hexstr(name),)) as cursor:
        row = await cursor.fetchone()
        if row is None:
            return None
        else:
            return get_coin_record(row)

In [7]:
await get_block_record_by_height_async(2)

In [8]:
await get_block_by_height_async(2)

In [9]:
await get_coin_record_by_name_async('c5149017be6a5af5b9b876165137b3b04ba50ba079f92320a063e344e73c9130')