Skip to content

Commit

Permalink
fix: get partitioned doc from semi-partitioned index
Browse files Browse the repository at this point in the history
- there was issue in case that we have indexes which are partitioned and not partitioned at once e.g. input-requests-log, input-requests-log-2023-01-01
- I tackle this issue with searching over all indexes with Search API on exact term - internal ID, response time will be slower, but it should be enough
  • Loading branch information
Tomáš Daniel committed Nov 27, 2023
1 parent 4e6efef commit 26090a2
Show file tree
Hide file tree
Showing 4 changed files with 60 additions and 8 deletions.
3 changes: 2 additions & 1 deletion example/apps/test_security/tests/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from .celery_log import CeleryLogTestCase
from .command_log import CommandLogTestCase
from .commands import CommandTestCase
from .input_request_log import InputRequestLogTestCase
from .output_request_log import OutputRequestLogTestCase
from .commands import CommandTestCase
from .partitioned_log import PartitionedLogTestCase
from .utils import UtilsTestCase
8 changes: 5 additions & 3 deletions example/apps/test_security/tests/base.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,12 @@
import json

from io import StringIO

from contextlib import contextmanager

from django.contrib.auth.models import User

from germanium.decorators import data_provider
from germanium.tools import assert_equal, assert_false, assert_is_not_none

from security.backends.elasticsearch.models import PartitionedLog
from security.management import call_command


Expand All @@ -19,6 +17,10 @@ class BaseTestCaseMixin:

databases = ['default', 'security']

def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
PartitionedLog.INSTANT_REFRESH = True

@data_provider
def create_user(self, username='test', email='test@localhost'):
return User.objects._create_user(username, email, 'test', is_staff=True, is_superuser=True)
Expand Down
43 changes: 43 additions & 0 deletions example/apps/test_security/tests/partitioned_log.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
import time

from django.test import override_settings

from germanium.test_cases.client import ClientTestCase
from security.backends.elasticsearch.tests import store_elasticsearch_log
from security.backends.elasticsearch.models import Log, PartitionedLog
from security.backends.testing import capture_security_logs
from .base import BaseTestCaseMixin


class TestNonPartitionedLog(Log):
class Index:
name = "test-log"


class TestPartitionedLog(PartitionedLog):
class Index:
name = "test-log*"


@override_settings(SECURITY_BACKEND_WRITERS={})
class PartitionedLogTestCase(BaseTestCaseMixin, ClientTestCase):

@store_elasticsearch_log()
def test_save_and_retrieve_from_semi_partitioned_index(self):
with capture_security_logs():
TestNonPartitionedLog.init()
doc_nonpartitioned = TestNonPartitionedLog(slug="test")
doc_nonpartitioned.save()
time.sleep(1) # Wait for reindex

TestPartitionedLog.init()
doc_partitioned = TestPartitionedLog(slug="test_2")
doc_partitioned.save()

time.sleep(1)

# Check that we are able to retrieve document before and after partitioning
retrieved_doc = TestPartitionedLog.get(id=doc_nonpartitioned.pk)
assert retrieved_doc
retrieved_doc = TestPartitionedLog.get(id=doc_partitioned.pk)
assert retrieved_doc
14 changes: 10 additions & 4 deletions security/backends/elasticsearch/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ def is_partitioned(cls):
class PartitionedLog(Log):

DAY_FORMAT = "%Y-%m-%d"
INSTANT_REFRESH = False # WARNING: OVERRIDE THIS ONLY FOR TESTS

def save(self, **kwargs):
# assign now if no timestamp given
Expand All @@ -132,13 +133,18 @@ def save(self, **kwargs):

# override the index to go to the proper timeslot
kwargs['index'] = self._format_index_name(self.start)

# convenience for tests, to get rid of refresh latency
if self.INSTANT_REFRESH:
kwargs['refresh'] = self.INSTANT_REFRESH

return super().save(**kwargs)

@classmethod
def get(cls, *args, **kwargs):
now = datetime.now()
kwargs['index'] = cls._format_index_name(now)
return super().get(*args, **kwargs)
def get(cls, id, *_, **__):
search = cls.search().query("term", _id=id)
response = search.execute()
return response.hits[0] if response.hits else None

@classmethod
def _format_index_name(cls, dt):
Expand Down

0 comments on commit 26090a2

Please sign in to comment.