**В одной странице/чанке N записей**
- Данные постоянно находятся в памяти в чанках (страницах)
- Данные в тоже самое время хранятся в файле - для обеспечения персистентности
- Пишется транзакционный лог
- Страница / Чанк
  - на странице должно располагаться N объектов
  - страница после заполнения N объектами при записи на диск дополняться свободным пространством для оптимизации update
  - при сериализации размер страницы дополняется пустым пространством, так, чтобы размер сериализованной страницы был кратным 1024 Байт
    - поэтому все "страницы" имеют разный размер, который кратен 1024 байта, но который фиксируется после заполнения страницы полностью N объектами
    - страница, которая еще не сброшена на диск, и которая является последней - является "горячей"
  - при обновлении страница помечается как грязная
  - если из страницы был удален объект или перемещен на другую страницу в результате обновления,
    то такая страница помещается в кучу страниц со свободным местом, и может быть использована
    для добавления новой записи (переиспользование пространства)
  - не следует помещать на страницу больщое количетво объектов - они могут стать слишком большими и будут медленно обновляться
  

**Каждая запись в отдельном чанке кратном 1024 байт?**
- Плюсы:
  - Почти всегда есть место для изменения записи.
  - Алгоритм накатки транзакционного лога очень прост.
  - Сброс на диск грязных чанков - прост
  - Переиспользование свободного пространста реализовать легко
    

In [1]:
import heapq
import msgspec
import aiofiles
import aiohttp
import asyncio
import numpy
import datetime
import time
import os
import binascii
import json

from tqdm.auto import tqdm
from typing import Any, List, Tuple

import gc

In [2]:
# ls -la
! mkdir ../data
! ls .. -la


mkdir: cannot create directory ‘../data’: File exists
total 56
drwxr-xr-x 9 vfuga vfuga 4096 Mar 22 19:38 .
drwxr-xr-x 5 vfuga vfuga 4096 Mar 19 01:33 ..
drwxr-xr-x 8 vfuga vfuga 4096 Mar 21 17:17 .git
-rw-r--r-- 1 vfuga vfuga   57 Mar 22 19:38 .gitignore
drwxr-xr-x 2 vfuga vfuga 4096 Mar 21 02:55 .ipynb_checkpoints
drwxr-xr-x 2 vfuga vfuga 4096 Feb 10 19:59 .vscode
drwxr-xr-x 3 vfuga vfuga 4096 Mar 24 02:45 POC-and-TESTS
-rw-r--r-- 1 vfuga vfuga 7669 Mar 19 03:42 README.md
drwxr-xr-x 2 vfuga vfuga 4096 Mar 24 01:21 data
drwxr-xr-x 4 vfuga vfuga 4096 Mar 21 02:51 dkvs
-rw-r--r-- 1 vfuga vfuga  611 Mar 22 22:22 environment-py312.yml
-rw-r--r-- 1 vfuga vfuga  819 Feb 11 04:11 main.py
drwxr-xr-x 2 vfuga vfuga 4096 Feb 22 19:38 test


In [3]:
class FileStorage():
    ROUND_FOR = 1024
    FREE_SPACE_FACTOR = 2

    def __init__(self, file_path_name: str) -> None:
        self.lock_append = asyncio.Lock()
        self.lock_read = asyncio.Lock()
        self.lock_update = asyncio.Lock()        
        self.file_path_name = file_path_name
        self.txId = 0 # Номер транзакции
        self.f_read: aiofiles.threadpool.binary.AsyncBufferedIOBase = None
        self.f_append: aiofiles.threadpool.binary.AsyncBufferedIOBase = None
        self.f_update: aiofiles.threadpool.binary.AsyncBufferedIOBase = None

    def adjust_size(self, sz: int) -> int:
        return ((sz + self.ROUND_FOR * self.FREE_SPACE_FACTOR) // self.ROUND_FOR ) * self.ROUND_FOR - sz - 8 - 8 - 8 - 4

    async def before(self, bytes) -> None:
        pass

    async def read(self) -> bytes | None:
        await self.lock_read.acquire()
        try:
            offset = await self.f_read.tell()  # self.f.seek(0, os.SEEK_CUR)
            header = await self.f_read.read(8 + 8 + 4)
            if len(header) == 0:
                return None
            data_len = int.from_bytes(header[:8], 'big', signed=False)
            data_txId = int.from_bytes(header[8:16], 'big', signed=False)
            data_crc32 = int.from_bytes(header[16:20], 'big', signed=False)
            # print(offset, data_len, data_txId, data_crc32)
            data = await self.f_read.read(data_len)
            return data
        finally:
            self.lock_read.release()
        return None

    async def append(self, data: bytes) -> int:
        await self.lock_append.acquire()
        try:
            offset: int = await self.f_append.tell()
            data_len: int = len(data)
            self.txId: int = (data_txId := self.txId + 1)
            data_crc32: int = binascii.crc32(data)

            res = await self.f_append.write(
                data_len.to_bytes(8, byteorder='big', signed=False) + 
                self.txId.to_bytes(8, byteorder='big', signed=False) +
                data_crc32.to_bytes(4, byteorder='big', signed=False) +
                data
            )
            await self.f_append.flush()
            return (offset, res)
        except Exception as ex:
            print(ex)
        finally:
            self.lock_append.release()
        return 0
    
    async def open(self):
        try:
            print("open file...")
            self.f_append = await aiofiles.open(self.file_path_name, "ab")
        except IOError:
            try:
                self.f_append = await aiofiles.open(self.file_path_name, "ab")
                print("reopen...")
            except Exception as ex:
                print("Exception:", ex)
                raise ex
        try:
            self.f_read = await aiofiles.open(self.file_path_name, "rb")            
            self.f_update = await aiofiles.open(self.file_path_name, "rb+")
            print(self.f_read)
            print(self.f_update)
            print(self.f_append)
        except Exceprion as ex:
            print("exception:", ex)
            raise ex

    async def close(self) -> None:
        if self.f_append.closed:
            pass
        else:
            await self.f_append.flush()
            await self.f_append.close()
        
        if self.f_update.closed:
            pass
        else:
            await self.f_update.flush()
            await self.f_update.close()

        if self.f_read.closed:
            pass
        else:
            await self.f_read.close()
    
        return (self.f_append.closed and self.f_read.closed and self.f_update.closed)

    

In [4]:
storage = FileStorage("../data/0000.dat")
await storage.open()


open file...
<aiofiles.threadpool.binary.AsyncBufferedReader object at 0x7f5c3c58d640> wrapping <_io.BufferedReader name='../data/0000.dat'>
<aiofiles.threadpool.binary.AsyncBufferedReader object at 0x7f5c3c4d7590> wrapping <_io.BufferedRandom name='../data/0000.dat'>
<aiofiles.threadpool.binary.AsyncBufferedIOBase object at 0x7f5c3c4d7dd0> wrapping <_io.BufferedWriter name='../data/0000.dat'>


In [5]:
async def get_and_save() -> bytes:
    async with aiohttp.ClientSession() as session:
        async with session.get('http://localhost:8800/config/generate/individual', verify_ssl=False) as resp:
            # print(resp.status)
            start_time = time.time()
            msg =  msgspec.msgpack.encode(msgspec.json.decode(await resp.text()))
            return msg

# for i in tqdm(range(1000000)):
#     await storage.append(await get_and_save())


In [6]:
await storage.f_read.seek(0, os.SEEK_SET)

0

In [7]:
r = await storage.read()
print(json.dumps(msgspec.msgpack.decode(r), ensure_ascii=False, indent=2, sort_keys=True))

{
  "baseType": null,
  "birthDate": "1968-07-12",
  "characteristic": [
    {
      "characteristicRelationship": [
        {
          "id": "d50751cb-4c20-46df-91bd-d5de93df5259",
          "relationshipType": "GeCftAqDkS_s2os0kE3gCOU4nWjWfA9M0qg5mxMORBw"
        }
      ],
      "id": "1ad43ac4-02c4-46ef-b45f-6f54158f1d84",
      "name": "4f0f0c0a-a2c4-4c68-b407-0b27b8e19f90",
      "type": null,
      "value": "f9562f1a-7503-49d0-aa37-089d52cef0ee",
      "valueType": null
    }
  ],
  "citizenship": "Российская Федерация",
  "contactMedium": [
    {
      "baseType": "-t0bRzo8vxKfubr0_POQfTaBiP1loaasCOLU3Xe8hzU",
      "email": {
        "baseType": null,
        "email": "nestor06@example.net",
        "type": "c54badfa-05f9-4a12-ad38-8b190fb1d2c5"
      },
      "faxNumber": {
        "baseType": null,
        "faxNumber": null,
        "type": "8188d81c-efe4-4adb-ad89-8fe7556ea964"
      },
      "geographicAddress": {
        "city": null,
        "cityType": null,
        "c

In [10]:
await storage.f_read.seek(0, os.SEEK_SET)

start = datetime.datetime.now()
i = 0
while (r:= await storage.read()) is not None:
    pass
    
datetime.datetime.now() - start


datetime.timedelta(seconds=488, microseconds=609966)

In [11]:
1000000/448

2232.1428571428573

In [14]:
from aiologger.loggers.json import JsonLogger
logger = JsonLogger.with_default_handlers()

In [16]:
await logger.info("Im a string")
# {"logged_at": "2018-06-14T09:34:56.482817", "line_number": 9, "function": "main", "level": "INFO", "file_path": "/Users/diogo.mmartins/Library/Preferences/PyCharm2018.1/scratches/scratch_47.py", "msg": "Im a string"}

await logger.info({
    'date_objects': datetime.datetime.now(),
    'exceptions': KeyError("Boooom"),
    'types': JsonLogger
})
# {"logged_at": "2018-06-14T09:34:56.483000", "line_number": 13, "function": "main", "level": "INFO", "file_path": "/Users/diogo.mmartins/Library/Preferences/PyCharm2018.1/scratches/scratch_47.py", "msg": {"date_objects": "2018-06-14T09:34:56.482953", "exceptions": "Exception: KeyError('Boooom',)", "types": "<JsonLogger aiologger-json (DEBUG)>"}}
