Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
  • Loading branch information
joocer committed May 18, 2024
1 parent f9baf38 commit 70d6737
Show file tree
Hide file tree
Showing 13 changed files with 111 additions and 157 deletions.
3 changes: 3 additions & 0 deletions opteryx/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@
dotenv = None # type:ignore

_env_path = Path(".") / ".env"

# we do a separate check for debug mode here so we don't loaf the config
# module just yet
OPTERYX_DEBUG = os.environ.get("OPTERYX_DEBUG") is not None

# deepcode ignore PythonSameEvalBinaryExpressiontrue: false +ve, values can be different
Expand Down
17 changes: 14 additions & 3 deletions opteryx/compiled/structures/memory_pool.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,25 @@ cdef class MemoryPool:
public long commits, failed_commits, reads, read_locks, l1_compaction, l2_compaction, releases
object lock

def __cinit__(self, int size, str name="Memory Pool"):
def __cinit__(self, long size, str name="Memory Pool"):
if size <= 0:
raise ValueError("MemoryPool size must be a positive integer")

self.size = size
self.pool = <unsigned char*>malloc(size * sizeof(unsigned char))
attempt_size = size

while attempt_size > 0:
self.pool = <unsigned char*>malloc(attempt_size * sizeof(unsigned char))
if self.pool:
break
attempt_size >>= 1 # Bit shift to halve the size and try again

if not self.pool:
raise MemoryError("Failed to allocate memory pool")

self.size = attempt_size
self.name = name
self.free_segments = [MemorySegment(0, size)]
self.free_segments = [MemorySegment(0, self.size)]
self.used_segments = {}
self.lock = Lock()

Expand Down Expand Up @@ -151,6 +161,7 @@ cdef class MemoryPool:

memcpy(self.pool + segment.start, PyBytes_AsString(data), len_data)
self.used_segments[ref_id] = MemorySegment(segment.start, len_data)
self.commits += 1
return ref_id

def read(self, long ref_id, int zero_copy = 1):
Expand Down
49 changes: 40 additions & 9 deletions opteryx/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,36 @@
from os import environ
from pathlib import Path

import psutil

_config_values: dict = {}


OPTERYX_DEBUG = environ.get("OPTERYX_DEBUG") is not None


def memory_allocation_calculation(allocation) -> int:
"""
Configure the memory allocation for the database based on the input.
If the allocation is between 0 and 1, it's treated as a percentage of the total system memory.
If the allocation is greater than 1, it's treated as an absolute value in megabytes.
Parameters:
allocation (float|int): Memory allocation value which could be a percentage or an absolute value.
Returns:
int: Memory size in bytes to be allocated.
"""
total_memory = psutil.virtual_memory().total # Convert bytes to megabytes

if 0 < allocation < 1: # Treat as a percentage
return int(total_memory * allocation)
elif allocation >= 1: # Treat as an absolute value in MB
return int(allocation)
else:
raise ValueError("Invalid memory allocation value. Must be a positive number.")


def parse_yaml(yaml_str):
## Based on an algorithm from ChatGPT

Expand Down Expand Up @@ -104,25 +130,30 @@ def get(key, default=None):
# These are 'protected' properties which cannot be overridden by a single query

# GCP project ID - for Google Cloud Data
GCP_PROJECT_ID: str = get("GCP_PROJECT_ID")
GCP_PROJECT_ID: str = get("GCP_PROJECT_ID")
# The maximum number of evictions by a single query
MAX_CACHE_EVICTIONS_PER_QUERY: int = int(get("MAX_CACHE_EVICTIONS_PER_QUERY", 32))
# Maximum size for items saved to the buffer cache
MAX_CACHEABLE_ITEM_SIZE: int = int(get("MAX_CACHEABLE_ITEM_SIZE", 1024 * 1024))
# The local buffer pool size in bytes
MAX_LOCAL_BUFFER_CAPACITY: int = int(get("MAX_LOCAL_BUFFER_CAPACITY", 256 * 1024 * 1024))
MAX_CACHEABLE_ITEM_SIZE: int = int(get("MAX_CACHEABLE_ITEM_SIZE", 2 * 1024 * 1024))
# The local buffer pool size in either bytes or fraction of system memory
MAX_LOCAL_BUFFER_CAPACITY: int = memory_allocation_calculation(float(get("MAX_LOCAL_BUFFER_CAPACITY", 0.2)))
# The read buffer pool size in either bytes or fraction of system memory
MAX_READ_BUFFER_CAPACITY: int = memory_allocation_calculation(float(get("MAX_READ_BUFFER_CAPACITY", 0.1)))
# don't try to raise the priority of the server process
DISABLE_HIGH_PRIORITY: bool = bool(get("DISABLE_HIGH_PRIORITY", False))
# don't output resource (memory) utilization information
ENABLE_RESOURCE_LOGGING: bool = bool(get("ENABLE_RESOURCE_LOGGING", False))
# size of morsels to push between steps
MORSEL_SIZE: int = int(get("MORSEL_SIZE", 64 * 1024 * 1024))

# not GA
PROFILE_LOCATION:str = get("PROFILE_LOCATION")
# debug mode
OPTERYX_DEBUG: bool = bool(get("OPTERYX_DEBUG", False))
# number of read loops per data source
CONCURRENT_READS: int = int(get("CONCURRENT_READS", 4))
# query log
QUERY_LOG_LOCATION:str = get("QUERY_LOG_LOCATION", False)
QUERY_LOG_SIZE:int = int(get("QUERY_LOG_SIZE", 100))
# not currently supported
METADATA_SERVER: str = None


# not GA
PROFILE_LOCATION:str = get("PROFILE_LOCATION")
# fmt:on
28 changes: 17 additions & 11 deletions opteryx/connectors/capabilities/cacheable.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@

from orso.cityhash import CityHash64

from opteryx.config import MAX_CACHE_EVICTIONS_PER_QUERY
from opteryx.config import MAX_CACHEABLE_ITEM_SIZE

__all__ = ("Cacheable", "read_thru_cache")


Expand Down Expand Up @@ -47,7 +50,7 @@ def read_thru_cache(func):
from opteryx.shared import BufferPool

cache_manager = get_cache_manager()
max_evictions = cache_manager.max_evictions_per_query
max_evictions = MAX_CACHE_EVICTIONS_PER_QUERY
remote_cache = cache_manager.cache_backend
if not remote_cache:
# rather than make decisions - just use a dummy
Expand Down Expand Up @@ -81,17 +84,19 @@ def wrapper(blob_name, statistics, **kwargs):

# Write the result to caches
if max_evictions:
# we set a per-query eviction limit
if len(result) < buffer_pool.max_cacheable_item_size:

if len(result) < buffer_pool.size // 10:
evicted = buffer_pool.set(key, result)
remote_cache.set(key, result)
if evicted:
# if we're evicting items we're putting into the cache
# if we're evicting items we just put in the cache, stop
if evicted in my_keys:
max_evictions = 0
else:
max_evictions -= 1
statistics.cache_evictions += 1

if len(result) < MAX_CACHEABLE_ITEM_SIZE:
remote_cache.set(key, result)
else:
statistics.cache_oversize += 1

Expand Down Expand Up @@ -120,7 +125,7 @@ def async_read_thru_cache(func):
from opteryx.shared import MemoryPool

cache_manager = get_cache_manager()
max_evictions = cache_manager.max_evictions_per_query
max_evictions = MAX_CACHE_EVICTIONS_PER_QUERY
remote_cache = cache_manager.cache_backend
if not remote_cache:
# rather than make decisions - just use a dummy
Expand Down Expand Up @@ -171,18 +176,19 @@ async def wrapper(blob_name: str, statistics, pool: MemoryPool, **kwargs):
if max_evictions:
# we set a per-query eviction limit
buffer = await pool.read(result) # type: ignore
if len(buffer) < buffer_pool.max_cacheable_item_size:

if len(buffer) < buffer_pool.size // 10:
evicted = buffer_pool.set(key, buffer)
remote_cache.set(key, buffer)
if evicted:
# if we're evicting items we're putting into the cache
# stop putting more stuff into the cache, otherwise we're
# just thrashing
# if we're evicting items we just put in the cache, stop
if evicted in my_keys:
max_evictions = 0
else:
max_evictions -= 1
statistics.cache_evictions += 1

if len(buffer) < MAX_CACHEABLE_ITEM_SIZE:
remote_cache.set(key, buffer)
else:
statistics.cache_oversize += 1

Expand Down
2 changes: 1 addition & 1 deletion opteryx/connectors/sql_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
from orso.tools import random_string
from orso.types import PYTHON_TO_ORSO_MAP

from opteryx import OPTERYX_DEBUG
from opteryx.config import OPTERYX_DEBUG
from opteryx.connectors.base.base_connector import DEFAULT_MORSEL_SIZE
from opteryx.connectors.base.base_connector import INITIAL_CHUNK_SIZE
from opteryx.connectors.base.base_connector import MIN_CHUNK_SIZE
Expand Down
6 changes: 4 additions & 2 deletions opteryx/cursor.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,13 @@
from opteryx.utils import sql

PROFILE_LOCATION = config.PROFILE_LOCATION
QUERY_LOG_LOCATION = config.QUERY_LOG_LOCATION
QUERY_LOG_SIZE = config.QUERY_LOG_SIZE


ROLLING_LOG = None
if PROFILE_LOCATION:
ROLLING_LOG = RollingLog(PROFILE_LOCATION + ".log")
if QUERY_LOG_LOCATION:
ROLLING_LOG = RollingLog(QUERY_LOG_LOCATION, max_entries=QUERY_LOG_SIZE)


class CursorState(Enum):
Expand Down
59 changes: 13 additions & 46 deletions opteryx/managers/cache/cache_manager.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,20 @@
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from typing import Union

from opteryx import config
from opteryx.exceptions import InvalidConfigurationError
from opteryx.managers.kvstores import BaseKeyValueStore

MAX_CACHEABLE_ITEM_SIZE = config.MAX_CACHEABLE_ITEM_SIZE
MAX_CACHE_EVICTIONS_PER_QUERY = config.MAX_CACHE_EVICTIONS_PER_QUERY
MAX_LOCAL_BUFFER_CAPACITY = config.MAX_LOCAL_BUFFER_CAPACITY


class CacheManager:
"""
Expand All @@ -16,54 +23,14 @@ class CacheManager:
Parameters:
cache_backend: Union[BaseKeyValueStore, None]
The cache storage to use.
max_cacheable_item_size: int
The maximum size a single item in the cache can occupy.
max_evictions_per_query: int
The number of items to evict from cache per query.
max_local_buffer_capacity: int
The maximum number of items to store in the BufferPool.
"""

def __init__(
self,
cache_backend: Union[BaseKeyValueStore, None] = None,
max_cacheable_item_size: Union[int, None] = MAX_CACHEABLE_ITEM_SIZE,
max_evictions_per_query: Union[int, None] = MAX_CACHE_EVICTIONS_PER_QUERY,
max_local_buffer_capacity: int = MAX_LOCAL_BUFFER_CAPACITY,
):
def __init__(self, cache_backend: Union[BaseKeyValueStore, None] = None):
if cache_backend is not None and not isinstance(cache_backend, BaseKeyValueStore):
raise InvalidConfigurationError(
config_item="cache_backend",
provided_value=str(type(cache_backend)),
valid_value_description="Instance of BaseKeyValueStore",
)

if max_cacheable_item_size is not None and (
not isinstance(max_cacheable_item_size, int) or max_cacheable_item_size <= 0
):
raise InvalidConfigurationError(
config_item="max_cacheable_item_size",
provided_value=str(max_cacheable_item_size),
valid_value_description="A number greater than zero",
)

if max_evictions_per_query is not None and (
not isinstance(max_evictions_per_query, int) or max_evictions_per_query <= 0
):
raise InvalidConfigurationError(
config_item="max_evictions_per_query",
provided_value=str(max_evictions_per_query),
valid_value_description="A number greater than zero",
)

if not isinstance(max_local_buffer_capacity, int) or max_local_buffer_capacity <= 0:
raise InvalidConfigurationError(
config_item="max_local_buffer_capacity",
provided_value=str(max_local_buffer_capacity),
valid_value_description="A number greater than zero",
)

self.cache_backend = cache_backend
self.max_cacheable_item_size = max_cacheable_item_size
self.max_evictions_per_query = max_evictions_per_query
self.max_local_buffer_capacity = max_local_buffer_capacity
9 changes: 4 additions & 5 deletions opteryx/operators/async_read_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,15 @@
import pyarrow.parquet
from orso.schema import RelationSchema

from opteryx import config
from opteryx.operators.base_plan_node import BasePlanDataObject
from opteryx.operators.read_node import ReaderNode
from opteryx.shared import AsyncMemoryPool
from opteryx.shared import MemoryPool
from opteryx.utils.file_decoders import get_decoder

CONCURRENT_READS = 4
MAX_BUFFER_SIZE_MB = 512
CONCURRENT_READS = config.CONCURRENT_READS
MAX_READ_BUFFER_CAPACITY = config.MAX_READ_BUFFER_CAPACITY


def normalize_morsel(schema: RelationSchema, morsel: pyarrow.Table) -> pyarrow.Table:
Expand Down Expand Up @@ -106,9 +107,7 @@ class AsyncReaderNode(ReaderNode):

def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.pool = MemoryPool(
MAX_BUFFER_SIZE_MB * 1024 * 1024, f"ReadBuffer <{self.parameters['alias']}>"
)
self.pool = MemoryPool(MAX_READ_BUFFER_CAPACITY, f"ReadBuffer <{self.parameters['alias']}>")

self.do = AsyncReaderDataObject()

Expand Down
1 change: 1 addition & 0 deletions opteryx/operators/base_plan_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ def __init__(self, properties: QueryProperties, **parameters):
self.statistics = QueryStatistics(properties.qid)
self.execution_time = 0
self.identity = random_string()
self.do = None

def to_json(self) -> dict: # pragma: no cover

Expand Down
14 changes: 4 additions & 10 deletions opteryx/shared/buffer_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
"""
from typing import Optional

from opteryx.config import MAX_LOCAL_BUFFER_CAPACITY
from opteryx.shared import MemoryPool
from opteryx.utils.lru_2 import LRU2

Expand All @@ -34,19 +35,12 @@ class _BufferPool:
eviction.
"""

slots = "_lru", "_cache_backend", "_max_cacheable_item_size", "_memory_pool"
slots = "_lru", "_memory_pool", "size"

def __init__(self):
# Import here to avoid circular imports
from opteryx import get_cache_manager

cache_manager = get_cache_manager()

self.max_cacheable_item_size = cache_manager.max_cacheable_item_size
self._lru = LRU2()
self._memory_pool = MemoryPool(
name="BufferPool", size=cache_manager.max_local_buffer_capacity
)
self._memory_pool = MemoryPool(name="BufferPool", size=MAX_LOCAL_BUFFER_CAPACITY)
self.size = self._memory_pool.size

def get(self, key: bytes, zero_copy: bool = True) -> Optional[bytes]:
"""
Expand Down

0 comments on commit 70d6737

Please sign in to comment.