In [1]:
!pip install duckdb polars pydantic

Defaulting to user installation because normal site-packages is not writeable
Collecting polars
  Obtaining dependency information for polars from https://files.pythonhosted.org/packages/39/e7/1879249e826b17a2f1c5c2629c0d2b58a170606dace66f072d52a2ad1a7b/polars-0.19.17-cp38-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata
  Downloading polars-0.19.17-cp38-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (14 kB)
Collecting pydantic
  Obtaining dependency information for pydantic from https://files.pythonhosted.org/packages/0a/2b/64066de1c4cf3d4ed623beeb3bbf3f8d0cc26661f1e7d180ec5eb66b75a5/pydantic-2.5.2-py3-none-any.whl.metadata
  Downloading pydantic-2.5.2-py3-none-any.whl.metadata (65 kB)
[2K     [38;2;114;156;31m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m65.2/65.2 kB[0m [31m994.8 kB/s[0m eta [36m0:00:00[0m31m1.1 MB/s[0m eta [36m0:00:01[0m
[?25hCollecting annotated-types>=0.4.0 (from pydantic)
  Obtaining dependency information for annotated-typ

In [7]:
import sqlite3

from operator import itemgetter

from itertools import zip_longest
from random import randint, choice, uniform
from typing import List, Dict, NamedTuple, Tuple
from pydantic import BaseModel

import duckdb
import polars as pl
import psutil
import time
import sys
import tracemalloc
from abc import ABC, abstractmethod

import logging



log = logging.getLogger(__name__)

In [21]:
class WarehouseType(NamedTuple):
    polars = 'polars'
    duckdb = 'duckdb'
    sqlite3 = 'sqlite3'
    dictdb = 'dictdb'


class TotalTestResult(BaseModel):
    storage_name: str
    avg_time: float
    avg_memory: float


class AbstractWarehouse(ABC):
    @abstractmethod
    def load_schema(self, *args):
        pass

    @abstractmethod
    def read_data(self, *args):
        pass

    @abstractmethod
    def add_row(self, *args):
        pass


class Polars(AbstractWarehouse):
    def __init__(self):
        self.DB = {}

    def load_schema(self, schema: Dict[str, Tuple[dict, list]]):
        for table in schema:
            cols = schema[table][0]
            self.DB[table] = pl.DataFrame(schema=cols)

    def read_data(self, table_name: str, sort: Dict, filter: Dict, fetchall=False):
        filter_expr = None
        for k, v in filter.items():
            cond = pl.col(k) == v
            if filter_expr is None:
                filter_expr = cond
            else:
                filter_expr &= cond

        if filter:
            filter_result = self.DB[table_name].filter(filter_expr)
        else:
            filter_result = self.DB

        sort_cols = list(sort.keys())
        order = [elem == 'desc' for elem in sort.values()]
        if len(order) == 1:
            order = order[0]

        if sort:
            sort_result = filter_result.sort(*sort_cols, descending=order)
        else:
            sort_result = filter_result

        if fetchall:
            return sort_result.to_dicts()

        if not sort_result.is_empty():
            return sort_result[0].to_dicts()[0]

    def add_row(self, data: dict, table_name: str):
        cols, indx = tables_schema[table_name]
        schema = self.DB[table_name].schema
        self.DB[table_name] = pl.concat([self.DB[table_name], pl.from_dict(data, schema=schema)], rechunk=True,
                                        how="diagonal")


class SQLite3(AbstractWarehouse):
    types = {
        str: 'text',
        'numeric': 'numeric',
        int: 'int',
        float: 'numeric'
    }

    def __init__(self):
        self.conn = sqlite3.connect(':memory:')

    def load_schema(self, schema: Dict[str, Tuple[dict, list]]):
        for table in schema:
            table_schema = schema[table]
            cols = table_schema[0]
            indexes = set(i[0] for i in table_schema[1] if i[0])

            cols_expr = ', '.join([f'\'{c}\' {self.types[cols[c]]}' for c in cols])
            self.conn.execute(f"DROP TABLE IF EXISTS {table}")
            self.conn.execute(f"CREATE TABLE {table} ({cols_expr})")

            for idx in indexes:
                idx_name = "_".join(idx)
                idx_list = ", ".join(idx)
                self.conn.execute(f"create index {table}_{idx_name}_idx on {table} ({idx_list})")
        self.conn.commit()

    def read_data(self, table_name, sort: Dict, filter: Dict, fetchall: bool = False):
        filter_conds = [
            f'({k}=${k} or {k} is null)' if isinstance(v, list) else (f'{k} is null' if v is None else f'{k}=${k}')
            for
            k, v in filter.items()]
        query = f"SELECT * FROM {table_name} {'WHERE' if filter_conds else ''} " + ' AND '.join(filter_conds)
        sort_conds = [f'{field_sort} {order_sort}' for field_sort, order_sort in sort.items()]
        args = filter
        if sort_conds:
            query += f""" ORDER BY {','.join(sort_conds)}"""
        if fetchall:
            result = self.conn.execute(query, args).fetchall()
            if result:
                result = [dict(zip(tables_schema[table_name][0].keys(), r)) for r in result]
        else:
            result = self.conn.execute(query, args).fetchone()
            if result:
                result = dict(zip(tables_schema[table_name][0].keys(), result))
        return result

    def add_row(self, data: dict, table_name: str):
        val_expr = ','.join(['?'] * len(data.keys()))
        cols = sorted(data.keys())
        cols_expr = ','.join([f"\'{col}\'" for col in cols])

        self.conn.execute(f"INSERT INTO {table_name} ({cols_expr}) VALUES ({val_expr})",
                          tuple([data[c] for c in cols]))
        self.conn.commit()


class DuckDB(AbstractWarehouse):
    def __init__(self):
        self.conn = duckdb.connect()

    types = {
        str: 'text',
        'numeric': 'numeric',
        int: 'int',
        float: 'numeric'
    }

    def load_schema(self, schema: Dict[str, Tuple[dict, list]]):
        for table in schema:
            table_schema = schema[table]
            cols = table_schema[0]
            indexes = set(i[0] for i in table_schema[1] if i[0])

            cols_expr = ', '.join([f'{c} {self.types[cols[c]]}' for c in cols])
            self.conn.execute(f"DROP TABLE IF EXISTS {table}")
            self.conn.execute(f"CREATE TABLE {table} ({cols_expr})")

            for idx in indexes:
                idx_name = "_".join(idx)
                idx_list = ", ".join(idx)
                self.conn.execute(f"create index {table}_{idx_name}_idx on {table} ({idx_list})")
        self.conn.commit()

    def read_data(self, table_name, sort: Dict, filter: Dict, fetchall: bool = False):
        filter_conds = [
            f'({k}=${k} or {k} is null)' if isinstance(v, list) else (f'{k} is null' if v is None else f'{k}=${k}')
            for
            k, v in filter.items()]
        query = f"""SELECT * FROM {table_name} {'WHERE' if filter_conds else ''} """ + ' AND '.join(filter_conds)
        sort_conds = [f'{field_sort} {order_sort}' for field_sort, order_sort in sort.items()]
        if sort_conds:
            query += f""" ORDER BY {','.join(sort_conds)}"""
        args = filter
        if fetchall:
            result = self.conn.execute(query, args).fetchall()
            if result:
                result = [dict(zip(tables_schema[table_name][0].keys(), r)) for r in result]
        else:
            result = self.conn.execute(query, args).fetchone()
            if result:
                result = dict(zip(tables_schema[table_name][0].keys(), result))
        return result

    def add_row(self, data: dict, table_name: str):
        val_expr = ','.join(['?'] * len(data.keys()))
        cols = sorted(data.keys())
        cols_expr = ','.join([f"{col}" for col in cols])

        self.conn.execute(f"INSERT INTO {table_name} ({cols_expr}) VALUES ({val_expr})",
                          tuple([data[c] for c in cols]))
        self.conn.commit()


class DictDB(AbstractWarehouse):
    def __init__(self):
        self.DICT_DB = {}
        self.schema = {}

    def load_schema(self, schema: Dict[str, Tuple[dict, list]]):
        self.schema = schema
        for table in schema:
            table_schema = schema[table]
            indexes = table_schema[1]

            self.DICT_DB[table] = dict()
            for idx in indexes:
                self.DICT_DB[table][self._sort_index(idx)] = dict()

    @staticmethod
    def _sort_filter(search_filter: Dict) -> Tuple:
        return tuple(sorted(search_filter.keys()))

    @staticmethod
    def _sort_index(index: Tuple[Tuple]) -> Tuple:
        return tuple(sorted(index[0])), index[1]

    @staticmethod
    def _multi_key_sort(dict_list: List, sort_fields: Tuple):
        for key, order in reversed(sort_fields):
            dict_list.sort(key=itemgetter(key), reverse=order == 'desc')
        return dict_list

    def filter_sort_data(self, table_name: str, filter: Dict, sort: Tuple):
        filter_fields = self._sort_filter(filter)
        if not sort:
            return
        filter_result = self.DICT_DB[table_name].get((filter_fields, sort), {}).get(
            tuple(filter.values()), [])
        if filter_result is not None:
            sorted_result = self._multi_key_sort(filter_result, sort)
            self.DICT_DB[table_name].get((filter_fields, sort), {})[
                tuple(filter.values())] = sorted_result

    def read_data(self, table_name, sort: Dict = {}, filter: Dict = {}, fetchall: bool = False):
        if not self.DICT_DB.get(table_name):
            raise ValueError('No such table')
        sorted_filter_fields = self._sort_filter(filter)
        filter_index = (sorted_filter_fields,
                        tuple([(sort_keys, sort_values) for sort_keys, sort_values in sort.items()]))
        if not self.DICT_DB[table_name].get(filter_index):
            raise ValueError('No such index')

        result = self.DICT_DB[table_name][filter_index].get(tuple([filter[k] for k in sorted_filter_fields]), [])

        if fetchall or not result:
            return result
        else:
            return result[0]

    def add_row(self, data: Dict, table_name: str):
        cols, indx = self.schema[table_name]

        for idx in indx:
            sort_idx = self._sort_index(idx)
            self.DICT_DB[table_name][sort_idx].setdefault(tuple([data[k] for k in sort_idx[0]]), []).append({k: data[k] for k in data if k not in sort_idx[0]})

            filter = {k: data[k] for k in sort_idx[0]}
            self.filter_sort_data(table_name=table_name, filter=filter, sort=idx[1])


class WarehouseManager(AbstractWarehouse):
    def __init__(self, warehouse_type: str, schema: dict):
        self.schema = schema
        self.warehouse_type = warehouse_type
        self.warehouse = self._get_warehouse()

    def _get_warehouse(self):
        if self.warehouse_type == WarehouseType.sqlite3:
            return SQLite3()
        elif self.warehouse_type == WarehouseType.polars:
            return Polars()
        elif self.warehouse_type == WarehouseType.duckdb:
            return DuckDB()
        elif self.warehouse_type == WarehouseType.dictdb:
            return DictDB()
        else:
            raise ValueError("Warehouse not found!")

    def load_schema(self):
        self.warehouse.load_schema(schema=self.schema)

    def read_data(self, table_name: str, filter: Dict, sort: Dict = {}, fetchall: bool = False):
        data = self.warehouse.read_data(table_name, sort, filter, fetchall)
        return data

    def add_row(self, table_name: str, data: dict):
        self.warehouse.add_row(data, table_name)

    def add_selective_data(self, table_name: str, records_count: int):
        data = self._get_random_data(table_name)
        for i in range(records_count):
            self.warehouse.add_row(data, table_name)

    def add_random_data(self, table_name: str, records_count: int):
        for i in range(records_count):
            data = self._get_random_data(table_name)
            self.warehouse.add_row(data, table_name)

    symbol_list = [chr(i) for i in range(0x30, 0x39)]
    code_length = 10

    def _get_random_value_by_type(self, data_type):
        if data_type == str:
            return ''.join([choice(self.symbol_list) for i in range(self.code_length)])
        elif data_type == int:
            return randint(0, 255)
        elif data_type == float:
            return round(uniform(0, 100), 2)

    def _get_random_data(self, table_name: str) -> Dict:
        table_schema = self._get_table_schema(table_name)
        data = dict()
        for col in table_schema:
            data[col] = self._get_random_value_by_type(table_schema[col])

        return data

    def _get_table_schema(self, table_name: str) -> Dict:
        return self.schema[table_name][0]




In [22]:

def wh_read_test(wh_manager: WarehouseManager, read_operations_count: int = 1000, records_count: int = 10000,
                 load_selective_data: bool = False, test_quantity: int = 2) -> TotalTestResult:
    total_time = 0
    total_memory = 0

    if load_selective_data:
        wh_manager.add_selective_data(records_count=records_count, table_name=name)
    else:
        wh_manager.add_random_data(records_count=records_count, table_name=name)

    for _ in range(test_quantity):
        start_time = time.time()
        tracemalloc.start()
        for _ in range(read_operations_count):
            data = wh_manager.read_data(table_name=name, sort=sort_params, filter=filter_params, fetchall=False)
        end_time = time.time()
        elapsed_time = end_time - start_time
        elapsed_memory = tracemalloc.get_traced_memory()[1]
        tracemalloc.stop()
        total_time += elapsed_time
        total_memory += elapsed_memory
    avg_time = total_time / test_quantity
    avg_memory = total_memory / test_quantity
    return TotalTestResult(storage_name=wh_manager.warehouse_type, used_filter=filter_params,
                           avg_time=avg_time, avg_memory=avg_memory,
                           load_selective_data=load_selective_data, read_operations_count=read_operations_count,
                           records_count=records_count)


name = 'table_1'
tables_schema = {'table_1': (
    {
        'field_1': int,
        'field_2': str,
        'field_3': float,
    },
    [
        (('field_1', 'field_2'), (('field_3', 'desc'),)),
        (('field_1',), (('field_3', 'desc'),)),
        (('field_1',), ()),
        (('field_2',), (('field_3', 'desc'),)),
        ((), (('field_3', 'desc'),)),
        ((), ()),
        (('field_1', 'field_2', 'field_3'), ()),
    ]
), }


In [23]:
wh_types = [WarehouseType.sqlite3, WarehouseType.duckdb, WarehouseType.polars, WarehouseType.dictdb]
test_quantity = 2
read_operations_count = 10000
records_count = 1000

In [24]:
# NOT USE SORT (not selective data)
filter_params = {'field_1': 2}
sort_params = {}
load_selective_data = False

statistics = []

for wh_type in wh_types:
    wh_manager = WarehouseManager(warehouse_type=wh_type, schema=tables_schema)
    wh_manager.load_schema()

    current = wh_read_test(wh_manager=wh_manager, read_operations_count=read_operations_count,
                           records_count=records_count, load_selective_data=load_selective_data)

    statistics.append(current)

df = pl.DataFrame(statistics).sort('avg_time')
print(dict(load_selective_data=load_selective_data, read_operations_count=read_operations_count,
                           records_count=records_count, filter_params=filter_params))
print(df)

{'load_selective_data': False, 'read_operations_count': 10000, 'records_count': 1000, 'filter_params': {'field_1': 2}}
shape: (4, 3)
┌──────────────┬──────────┬────────────┐
│ storage_name ┆ avg_time ┆ avg_memory │
│ ---          ┆ ---      ┆ ---        │
│ str          ┆ f64      ┆ f64        │
╞══════════════╪══════════╪════════════╡
│ dictdb       ┆ 0.044815 ┆ 1263.0     │
│ sqlite3      ┆ 0.148322 ┆ 18586.5    │
│ polars       ┆ 1.828073 ┆ 4045.5     │
│ duckdb       ┆ 3.601976 ┆ 1898.5     │
└──────────────┴──────────┴────────────┘


In [25]:
# NOT USE SORT (selective data)
filter_params = {'field_1': 2}
sort_params = {}
load_selective_data = True

statistics = []

for wh_type in wh_types:
    wh_manager = WarehouseManager(warehouse_type=wh_type, schema=tables_schema)
    wh_manager.load_schema()

    current = wh_read_test(wh_manager=wh_manager, read_operations_count=read_operations_count,
                           records_count=records_count, load_selective_data=load_selective_data)

    statistics.append(current)

df = pl.DataFrame(statistics).sort('avg_time')
print(dict(load_selective_data=load_selective_data, read_operations_count=read_operations_count,
                           records_count=records_count, filter_params=filter_params))
print(df)

{'load_selective_data': True, 'read_operations_count': 10000, 'records_count': 1000, 'filter_params': {'field_1': 2}}
shape: (4, 3)
┌──────────────┬──────────┬────────────┐
│ storage_name ┆ avg_time ┆ avg_memory │
│ ---          ┆ ---      ┆ ---        │
│ str          ┆ f64      ┆ f64        │
╞══════════════╪══════════╪════════════╡
│ dictdb       ┆ 0.046454 ┆ 436.0      │
│ sqlite3      ┆ 0.098925 ┆ 17256.5    │
│ polars       ┆ 1.242679 ┆ 2684.0     │
│ duckdb       ┆ 3.402912 ┆ 626.0      │
└──────────────┴──────────┴────────────┘


In [26]:
# USE SORT (not selective data)
filter_params = {'field_1': 2}
sort_params = {'field_3': 'desc'}
load_selective_data = False

statistics = []

for wh_type in wh_types:
    wh_manager = WarehouseManager(warehouse_type=wh_type, schema=tables_schema)
    wh_manager.load_schema()

    current = wh_read_test(wh_manager=wh_manager, read_operations_count=read_operations_count,
                           records_count=records_count, load_selective_data=load_selective_data)

    statistics.append(current)

df = pl.DataFrame(statistics).sort('avg_time')
print(dict(load_selective_data=load_selective_data, read_operations_count=read_operations_count,
                           records_count=records_count, filter_params=filter_params))
print(df)

{'load_selective_data': False, 'read_operations_count': 10000, 'records_count': 1000, 'filter_params': {'field_1': 2}}
shape: (4, 3)
┌──────────────┬──────────┬────────────┐
│ storage_name ┆ avg_time ┆ avg_memory │
│ ---          ┆ ---      ┆ ---        │
│ str          ┆ f64      ┆ f64        │
╞══════════════╪══════════╪════════════╡
│ dictdb       ┆ 0.049994 ┆ 436.0      │
│ sqlite3      ┆ 0.170734 ┆ 17670.5    │
│ polars       ┆ 3.244541 ┆ 3559.0     │
│ duckdb       ┆ 5.392783 ┆ 1425.5     │
└──────────────┴──────────┴────────────┘


In [27]:
# USE SORT (selective data)
filter_params = {'field_1': 2}
sort_params = {'field_3': 'desc'}
load_selective_data = True

statistics = []

for wh_type in wh_types:
    wh_manager = WarehouseManager(warehouse_type=wh_type, schema=tables_schema)
    wh_manager.load_schema()

    current = wh_read_test(wh_manager=wh_manager, read_operations_count=read_operations_count,
                           records_count=records_count, load_selective_data=load_selective_data)

    statistics.append(current)

df = pl.DataFrame(statistics).sort('avg_time')
print(dict(load_selective_data=load_selective_data, read_operations_count=read_operations_count,
                           records_count=records_count, filter_params=filter_params))
print(df)

{'load_selective_data': True, 'read_operations_count': 10000, 'records_count': 1000, 'filter_params': {'field_1': 2}}
shape: (4, 3)
┌──────────────┬──────────┬────────────┐
│ storage_name ┆ avg_time ┆ avg_memory │
│ ---          ┆ ---      ┆ ---        │
│ str          ┆ f64      ┆ f64        │
╞══════════════╪══════════╪════════════╡
│ dictdb       ┆ 0.049949 ┆ 436.0      │
│ sqlite3      ┆ 0.117103 ┆ 17488.5    │
│ polars       ┆ 1.674742 ┆ 2902.0     │
│ duckdb       ┆ 4.363182 ┆ 679.0      │
└──────────────┴──────────┴────────────┘
