Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Extract run with process lock logic into func. Use it to re-index content #710

Merged
merged 5 commits into from
Apr 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 13 additions & 21 deletions src/khoj/configure.py
Original file line number Diff line number Diff line change
Expand Up @@ -307,30 +307,22 @@ def configure_middleware(app):
app.add_middleware(SessionMiddleware, secret_key=os.environ.get("KHOJ_DJANGO_SECRET_KEY", "!secret"))


@schedule.repeat(schedule.every(22).to(25).hours)
def update_content_index():
try:
if ProcessLockAdapters.is_process_locked(ProcessLock.Operation.UPDATE_EMBEDDINGS):
logger.info("🔒 Skipping update content index due to lock")
return
ProcessLockAdapters.set_process_lock(
ProcessLock.Operation.UPDATE_EMBEDDINGS, max_duration_in_seconds=60 * 60 * 2
)

with timer("📬 Updating content index via Scheduler"):
for user in get_all_users():
all_files = collect_files(user=user)
success = configure_content(all_files, user=user)
all_files = collect_files(user=None)
success = configure_content(all_files, user=None)
if not success:
raise RuntimeError("Failed to update content index")
for user in get_all_users():
all_files = collect_files(user=user)
success = configure_content(all_files, user=user)
all_files = collect_files(user=None)
success = configure_content(all_files, user=None)
if not success:
raise RuntimeError("Failed to update content index")
logger.info("📪 Content index updated via Scheduler")

logger.info("📪 Content index updated via Scheduler")

ProcessLockAdapters.remove_process_lock(ProcessLock.Operation.UPDATE_EMBEDDINGS)
except Exception as e:
logger.error(f"🚨 Error updating content index via Scheduler: {e}", exc_info=True)
@schedule.repeat(schedule.every(22).to(25).hours)
def update_content_index_regularly():
ProcessLockAdapters.run_with_lock(
update_content_index, ProcessLock.Operation.UPDATE_EMBEDDINGS, max_duration_in_seconds=60 * 60 * 2
)


def configure_search_types():
Expand Down
30 changes: 28 additions & 2 deletions src/khoj/database/adapters/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import sys
from datetime import date, datetime, timedelta, timezone
from enum import Enum
from typing import List, Optional, Type
from typing import Callable, List, Optional, Type

from asgiref.sync import sync_to_async
from django.contrib.sessions.backends.db import SessionStore
Expand Down Expand Up @@ -46,7 +46,7 @@
from khoj.search_filter.word_filter import WordFilter
from khoj.utils import state
from khoj.utils.config import OfflineChatProcessorModel
from khoj.utils.helpers import generate_random_name, is_none_or_empty
from khoj.utils.helpers import generate_random_name, is_none_or_empty, timer

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -421,13 +421,39 @@ def is_process_locked(process_name: str):
tz=timezone.utc
):
process_lock.delete()
logger.info(f"🔓 Deleted stale {process_name} process lock on timeout")
return False
return True

@staticmethod
def remove_process_lock(process_name: str):
return ProcessLock.objects.filter(name=process_name).delete()

@staticmethod
def run_with_lock(func: Callable, operation: ProcessLock.Operation, max_duration_in_seconds: int = 600):
# Exit early if process lock is already taken
if ProcessLockAdapters.is_process_locked(operation):
logger.info(f"🔒 Skip executing {func} as {operation} lock is already taken")
return

success = False
try:
# Set process lock
ProcessLockAdapters.set_process_lock(operation, max_duration_in_seconds)
logger.info(f"🔐 Locked {operation} to execute {func}")

# Execute Function
with timer(f"🔒 Run {func} with {operation} process lock", logger):
func()
success = True
except Exception as e:
logger.error(f"🚨 Error executing {func} with {operation} process lock: {e}", exc_info=True)
success = False
finally:
# Remove Process Lock
ProcessLockAdapters.remove_process_lock(operation)
logger.info(f"🔓 Unlocked {operation} process after executing {func} {'Succeeded' if success else 'Failed'}")


class ClientApplicationAdapters:
@staticmethod
Expand Down
9 changes: 0 additions & 9 deletions src/khoj/utils/rawconfig.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,17 +53,8 @@ class NotionContentConfig(ConfigBase):
token: str


class ImageContentConfig(ConfigBase):
input_directories: Optional[List[Path]] = None
input_filter: Optional[List[str]] = None
embeddings_file: Path
use_xmp_metadata: bool
batch_size: int


class ContentConfig(ConfigBase):
org: Optional[TextContentConfig] = None
image: Optional[ImageContentConfig] = None
markdown: Optional[TextContentConfig] = None
pdf: Optional[TextContentConfig] = None
plaintext: Optional[TextContentConfig] = None
Expand Down
21 changes: 8 additions & 13 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,16 +30,12 @@
from khoj.utils.config import SearchModels
from khoj.utils.constants import web_directory
from khoj.utils.helpers import resolve_absolute_path
from khoj.utils.rawconfig import (
ContentConfig,
ImageContentConfig,
ImageSearchConfig,
SearchConfig,
)
from khoj.utils.rawconfig import ContentConfig, ImageSearchConfig, SearchConfig
from tests.helpers import (
ChatModelOptionsFactory,
OfflineChatProcessorConversationConfigFactory,
OpenAIProcessorConversationConfigFactory,
ProcessLockFactory,
SubscriptionFactory,
UserConversationProcessorConfigFactory,
UserFactory,
Expand Down Expand Up @@ -211,6 +207,12 @@ def search_models(search_config: SearchConfig):
return search_models


@pytest.mark.django_db
@pytest.fixture
def default_process_lock():
return ProcessLockFactory()


@pytest.fixture
def anyio_backend():
return "asyncio"
Expand All @@ -223,13 +225,6 @@ def content_config(tmp_path_factory, search_models: SearchModels, default_user:

# Generate Image Embeddings from Test Images
content_config = ContentConfig()
content_config.image = ImageContentConfig(
input_filter=None,
input_directories=["tests/data/images"],
embeddings_file=content_dir.joinpath("image_embeddings.pt"),
batch_size=1,
use_xmp_metadata=False,
)

LocalOrgConfig.objects.create(
input_files=None,
Expand Down
8 changes: 8 additions & 0 deletions tests/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
KhojUser,
OfflineChatProcessorConversationConfig,
OpenAIProcessorConversationConfig,
ProcessLock,
SearchModelConfig,
Subscription,
UserConversationConfig,
Expand Down Expand Up @@ -93,3 +94,10 @@ class Meta:
type = "standard"
is_recurring = False
renewal_date = make_aware(datetime.strptime("2100-04-01", "%Y-%m-%d"))


class ProcessLockFactory(factory.django.DjangoModelFactory):
class Meta:
model = ProcessLock

name = "test_lock"
2 changes: 1 addition & 1 deletion tests/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@ def test_get_api_config_types(client, sample_org_data, default_user: KhojUser):

# Assert
assert response.status_code == 200
assert response.json() == ["all", "org", "image", "plaintext"]
assert response.json() == ["all", "org", "plaintext"]


# ----------------------------------------------------------------------------------------------------
Expand Down
58 changes: 58 additions & 0 deletions tests/test_db_lock.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
import time

import pytest

from khoj.database.adapters import ProcessLockAdapters
from khoj.database.models import ProcessLock
from tests.helpers import ProcessLockFactory


@pytest.mark.django_db(transaction=True)
def test_process_lock(default_process_lock):
# Arrange
lock: ProcessLock = default_process_lock

# Assert
assert True == ProcessLockAdapters.is_process_locked(lock.name)


@pytest.mark.django_db(transaction=True)
def test_expired_process_lock():
# Arrange
lock: ProcessLock = ProcessLockFactory(name="test_expired_lock", max_duration_in_seconds=2)

# Act
time.sleep(3)

# Assert
assert False == ProcessLockAdapters.is_process_locked(lock.name)


@pytest.mark.django_db(transaction=True)
def test_in_progress_lock(default_process_lock):
# Arrange
lock: ProcessLock = default_process_lock

# Act
ProcessLockAdapters.run_with_lock(lock.name, lambda: time.sleep(2))

# Assert
assert True == ProcessLockAdapters.is_process_locked(lock.name)


@pytest.mark.django_db(transaction=True)
def test_run_with_completed():
# Arrange
ProcessLockAdapters.run_with_lock("test_run_with", lambda: time.sleep(2))

# Act
time.sleep(4)

# Assert
assert False == ProcessLockAdapters.is_process_locked("test_run_with")


@pytest.mark.django_db(transaction=True)
def test_nonexistent_lock():
# Assert
assert False == ProcessLockAdapters.is_process_locked("nonexistent_lock")
15 changes: 0 additions & 15 deletions tests/test_rawconfig.py

This file was deleted.

Loading