Skip to content

Commit

Permalink
Add "in memory" backend
Browse files Browse the repository at this point in the history
Closes #6

Still needs unit tests

Update memory_core.py

Add tests for memory core

@shaypal5 these are copied from the pickle tests, the only one failing is test_memory_being_calculated. I could use your help with the implementation of the relevant function becuase I don't think I understand the threading part
  • Loading branch information
cthoyt authored and shaypal5 committed Nov 25, 2020
1 parent 000f321 commit 67d425e
Show file tree
Hide file tree
Showing 3 changed files with 339 additions and 4 deletions.
6 changes: 2 additions & 4 deletions cachier/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

from .pickle_core import _PickleCore
from .mongo_core import _MongoCore, RecalculationNeeded
from .memory_core import _MemoryCore


MAX_WORKERS_ENVAR_NAME = 'CACHIER_MAX_WORKERS'
Expand Down Expand Up @@ -157,10 +158,7 @@ def cachier(
raise MissingMongetter('must specify ``mongetter`` when using the mongo core')
core = _MongoCore(mongetter, stale_after, next_time, wait_for_calc_timeout)
elif backend == 'memory':
raise NotImplementedError(
'An in-memory backend has not yet been implemented. '
'Please see https://github.com/shaypal5/cachier/issues/6'
)
core = _MemoryCore(stale_after=stale_after, next_time=next_time)
elif backend == 'redis':
raise NotImplementedError(
'A Redis backend has not yet been implemented. '
Expand Down
67 changes: 67 additions & 0 deletions cachier/memory_core.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
"""A memory-based caching core for cachier."""

from collections import defaultdict
from datetime import datetime

from .base_core import _BaseCore


class _MemoryCore(_BaseCore):
"""The pickle core class for cachier.
Parameters
----------
stale_after : datetime.timedelta, optional
See :class:`_BaseCore` documentation.
next_time : bool, optional
See :class:`_BaseCore` documentation.
"""

def __init__(self, stale_after, next_time):
super().__init__(stale_after=stale_after, next_time=next_time)
self.cache = {}

def get_entry_by_key(self, key, reload=False): # pylint: disable=W0221
return key, self.cache.get(key, None)

def get_entry(self, args, kwds, hash_params):
key = args + tuple(sorted(kwds.items())) if hash_params is None else hash_params(args, kwds)
return self.get_entry_by_key(key)

def set_entry(self, key, func_res):
self.cache[key] = {
'value': func_res,
'time': datetime.now(),
'stale': False,
'being_calculated': False,
}

def mark_entry_being_calculated(self, key):
try:
self.cache[key]['being_calculated'] = True
except KeyError:
self.cache[key] = {
'value': None,
'time': datetime.now(),
'stale': False,
'being_calculated': True,
}

def mark_entry_not_calculated(self, key):
try:
self.cache[key]['being_calculated'] = False
except KeyError:
pass # that's ok, we don't need an entry in that case

def wait_on_entry_calc(self, key):
entry = self.cache[key]
# I don't think waiting is necessary for this one
# if not entry['being_calculated']:
return entry['value']

def clear_cache(self):
self.cache.clear()

def clear_being_calculated(self):
for value in self.cache.values():
value['being_calculated'] = False
270 changes: 270 additions & 0 deletions tests/test_memory_core.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,270 @@
"""Test for the in-memory implementation of the Cachier python package."""

import hashlib
import queue
import threading
from datetime import timedelta
from random import random
from time import sleep, time

import pandas as pd

from cachier import cachier


@cachier(backend='memory', next_time=False)
def _takes_5_seconds(arg_1, arg_2):
"""Some function."""
sleep(5)
return 'arg_1:{}, arg_2:{}'.format(arg_1, arg_2)


def test_memory_core():
"""Basic memory core functionality."""
_takes_5_seconds.clear_cache()
_takes_5_seconds('a', 'b')
start = time()
_takes_5_seconds('a', 'b', verbose_cache=True)
end = time()
assert end - start < 1
_takes_5_seconds.clear_cache()


SECONDS_IN_DELTA = 3
DELTA = timedelta(seconds=SECONDS_IN_DELTA)


@cachier(backend='memory', stale_after=DELTA, next_time=False)
def _stale_after_seconds(arg_1, arg_2):
"""Some function."""
return random()


def test_stale_after():
"""Testing the stale_after functionality."""
_stale_after_seconds.clear_cache()
val1 = _stale_after_seconds(1, 2)
val2 = _stale_after_seconds(1, 2)
val3 = _stale_after_seconds(1, 3)
assert val1 == val2
assert val1 != val3
sleep(3)
val4 = _stale_after_seconds(1, 2)
assert val4 != val1
_stale_after_seconds.clear_cache()


@cachier(backend='memory', stale_after=DELTA, next_time=True)
def _stale_after_next_time(arg_1, arg_2):
"""Some function."""
return random()


def test_stale_after_next_time():
"""Testing the stale_after with next_time functionality."""
_stale_after_next_time.clear_cache()
val1 = _stale_after_next_time(1, 2)
val2 = _stale_after_next_time(1, 2)
val3 = _stale_after_next_time(1, 3)
assert val1 == val2
assert val1 != val3
sleep(SECONDS_IN_DELTA + 1)
val4 = _stale_after_next_time(1, 2)
assert val4 == val1
sleep(0.5)
val5 = _stale_after_next_time(1, 2)
assert val5 != val1
_stale_after_next_time.clear_cache()


@cachier(backend='memory')
def _random_num():
return random()


@cachier(backend='memory')
def _random_num_with_arg(a):
# print(a)
return random()


def test_overwrite_cache():
"""Tests that the overwrite feature works correctly."""
_random_num.clear_cache()
int1 = _random_num()
int2 = _random_num()
assert int2 == int1
int3 = _random_num(overwrite_cache=True)
assert int3 != int1
int4 = _random_num()
assert int4 == int3
_random_num.clear_cache()

_random_num_with_arg.clear_cache()
int1 = _random_num_with_arg('a')
int2 = _random_num_with_arg('a')
assert int2 == int1
int3 = _random_num_with_arg('a', overwrite_cache=True)
assert int3 != int1
int4 = _random_num_with_arg('a')
assert int4 == int3
_random_num_with_arg.clear_cache()


def test_ignore_cache():
"""Tests that the ignore_cache feature works correctly."""
_random_num.clear_cache()
int1 = _random_num()
int2 = _random_num()
assert int2 == int1
int3 = _random_num(ignore_cache=True)
assert int3 != int1
int4 = _random_num()
assert int4 != int3
assert int4 == int1
_random_num.clear_cache()

_random_num_with_arg.clear_cache()
int1 = _random_num_with_arg('a')
int2 = _random_num_with_arg('a')
assert int2 == int1
int3 = _random_num_with_arg('a', ignore_cache=True)
assert int3 != int1
int4 = _random_num_with_arg('a')
assert int4 != int3
assert int4 == int1
_random_num_with_arg.clear_cache()


@cachier(backend='memory')
def _takes_time(arg_1, arg_2):
"""Some function."""
sleep(2) # this has to be enough time for check_calculation to run twice
return random() + arg_1 + arg_2


def _calls_takes_time(res_queue):
res = _takes_time(0.13, 0.02)
res_queue.put(res)


def test_memory_being_calculated():
"""Testing memory core handling of being calculated scenarios."""
_takes_time.clear_cache()
res_queue = queue.Queue()
thread1 = threading.Thread(target=_calls_takes_time, kwargs={'res_queue': res_queue})
thread2 = threading.Thread(target=_calls_takes_time, kwargs={'res_queue': res_queue})
thread1.start()
sleep(0.5)
thread2.start()
thread1.join()
thread2.join()
assert res_queue.qsize() == 2
res1 = res_queue.get()
res2 = res_queue.get()
assert res1 == res2


@cachier(backend='memory', stale_after=timedelta(seconds=1), next_time=True)
def _being_calc_next_time(arg_1, arg_2):
"""Some function."""
sleep(1)
return random() + arg_1 + arg_2


def _calls_being_calc_next_time(res_queue):
res = _being_calc_next_time(0.13, 0.02)
res_queue.put(res)


def test_being_calc_next_time():
"""Testing memory core handling of being calculated scenarios."""
_takes_time.clear_cache()
_being_calc_next_time(0.13, 0.02)
sleep(1.1)
res_queue = queue.Queue()
thread1 = threading.Thread(
target=_calls_being_calc_next_time, kwargs={'res_queue': res_queue})
thread2 = threading.Thread(
target=_calls_being_calc_next_time, kwargs={'res_queue': res_queue})
thread1.start()
sleep(0.5)
thread2.start()
thread1.join()
thread2.join()
assert res_queue.qsize() == 2
res1 = res_queue.get()
res2 = res_queue.get()
assert res1 == res2


@cachier(backend='memory')
def _bad_cache(arg_1, arg_2):
"""Some function."""
sleep(1)
return random() + arg_1 + arg_2


@cachier(backend='memory')
def _delete_cache(arg_1, arg_2):
"""Some function."""
sleep(1)
return random() + arg_1 + arg_2


def test_clear_being_calculated():
"""Test memory core clear `being calculated` functionality."""
_takes_time.clear_being_calculated()


@cachier(backend='memory', stale_after=timedelta(seconds=1), next_time=True)
def _error_throwing_func(arg1):
if not hasattr(_error_throwing_func, 'count'):
_error_throwing_func.count = 0
_error_throwing_func.count += 1
if _error_throwing_func.count > 1:
raise ValueError("Tiny Rick!")
return 7


def test_error_throwing_func():
# with
res1 = _error_throwing_func(4)
sleep(1.5)
res2 = _error_throwing_func(4)
assert res1 == res2


def test_callable_hash_param():
def _hash_params(args, kwargs):
def _hash(obj):
if isinstance(obj, pd.core.frame.DataFrame):
return hashlib.sha256(pd.util.hash_pandas_object(obj).values.tobytes()).hexdigest()
return obj

k_args = tuple(map(_hash, args))
k_kwargs = tuple(sorted({k: _hash(v) for k, v in kwargs.items()}.items()))
return k_args + k_kwargs

@cachier(backend='memory', hash_params=_hash_params)
def _params_with_dataframe(*args, **kwargs):
"""Some function."""
return random()

_params_with_dataframe.clear_cache()

df_a = pd.DataFrame.from_dict(dict(a=[0], b=[2], c=[3]))
df_b = pd.DataFrame.from_dict(dict(a=[0], b=[2], c=[3]))
value_a = _params_with_dataframe(df_a, 1)
value_b = _params_with_dataframe(df_b, 1)

assert value_a == value_b # same content --> same key

value_a = _params_with_dataframe(1, df=df_a)
value_b = _params_with_dataframe(1, df=df_b)

assert value_a == value_b # same content --> same key


if __name__ == '__main__':
test_memory_being_calculated()

0 comments on commit 67d425e

Please sign in to comment.