Skip to content
This repository has been archived by the owner on Feb 16, 2023. It is now read-only.

config option for setting delay waiting for modified files while using inotify #1572

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions ansible/defaults/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ paperlessng_tika_gotenberg_endpoint: http://localhost:3000
# Software tweaks
paperlessng_time_zone: Europe/Berlin
paperlessng_consumer_polling: 0
paperlessng_consumer_inotify_wait_modified_delay: 0.1
paperlessng_consumer_delete_duplicates: False
paperlessng_consumer_recursive: False
paperlessng_consumer_subdirs_as_tags: False
Expand Down
2 changes: 2 additions & 0 deletions ansible/tasks/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,8 @@
line: "PAPERLESS_TIME_ZONE={{ paperlessng_time_zone }}"
- regexp: PAPERLESS_CONSUMER_POLLING
line: "PAPERLESS_CONSUMER_POLLING={{ paperlessng_consumer_polling }}"
- regexp: PAPERLESS_CONSUMER_INOTIFY_WAIT_MODIFIED_DELAY
line: "PAPERLESS_CONSUMER_INOTIFY_WAIT_MODIFIED_DELAY={{ paperlessng_consumer_inotify_wait_modified_delay }}"
- regexp: PAPERLESS_CONSUMER_DELETE_DUPLICATES
line: "PAPERLESS_CONSUMER_DELETE_DUPLICATES={{ paperlessng_consumer_delete_duplicates }}"
- regexp: PAPERLESS_CONSUMER_RECURSIVE
Expand Down
7 changes: 7 additions & 0 deletions docs/configuration.rst
Original file line number Diff line number Diff line change
Expand Up @@ -533,6 +533,13 @@ PAPERLESS_CONSUMER_POLLING=<num>

Defaults to 0, which disables polling and uses filesystem notifications.

PAPERLESS_CONSUMER_INOTIFY_WAIT_MODIFIED_DELAY=<float>
If paperless consumes a file and get a "finished" event with ``inotify``,
the file might get picked up by the scanner again to write additional content.
Specify a wait delay in seconds here to make paperless wait for changes to the file
so that it can continue consuming that same file.

Defaults to 0.1, waiting for 100 milliseconds.

PAPERLESS_CONSUMER_DELETE_DUPLICATES=<bool>
When the consumer detects a duplicate document, it will not touch the
Expand Down
1 change: 1 addition & 0 deletions paperless.conf.example
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
#PAPERLESS_THREADS_PER_WORKER=1
#PAPERLESS_TIME_ZONE=UTC
#PAPERLESS_CONSUMER_POLLING=10
#PAPERLESS_CONSUMER_INOTIFY_WAIT_MODIFIED_DELAY=0.1
#PAPERLESS_CONSUMER_DELETE_DUPLICATES=false
#PAPERLESS_CONSUMER_RECURSIVE=false
#PAPERLESS_CONSUMER_IGNORE_PATTERNS=[".DS_STORE/*", "._*", ".stfolder/*"]
Expand Down
30 changes: 29 additions & 1 deletion src/documents/management/commands/document_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,34 @@ def _consume_wait_unmodified(file):
logger.error(f"Timeout while waiting on file {file} to remain unmodified.")


def _consume_inotify_wait_unmodified(directory, file):
modified_delay = max(settings.CONSUMER_INOTIFY_WAIT_MODIFIED_DELAY, 0.1)

if _is_ignored(file):
return

logger.debug(
f"Waiting {modified_delay}s for file {file} to remain unmodified")

inotify = INotify()
inotify_flags = flags.MODIFY | flags.OPEN
descriptor = inotify.add_watch(directory, inotify_flags)

try:
for event in inotify.read(timeout=modified_delay*1000):
if file.endswith(event.name):
logger.info(
f"File {file} has been modified. Cancelling consumption.")
return
except KeyboardInterrupt:
pass
finally:
inotify.rm_watch(descriptor)
inotify.close()

_consume(file)


class Handler(FileSystemEventHandler):

def on_created(self, event):
Expand Down Expand Up @@ -204,7 +232,7 @@ def handle_inotify(self, directory, recursive):
else:
path = directory
filepath = os.path.join(path, event.name)
_consume(filepath)
_consume_inotify_wait_unmodified(path, filepath)
except KeyboardInterrupt:
pass

Expand Down
109 changes: 105 additions & 4 deletions src/documents/tests/test_management_consumer.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import filecmp
import os
import shutil
from datetime import datetime
from threading import Thread
from time import sleep
from unittest import mock
Expand Down Expand Up @@ -75,10 +76,10 @@ def wait_for_task_mock_call(self, excpeted_call_count=1):
def bogus_task(self, func, filename, **kwargs):
eq = filecmp.cmp(filename, self.sample_file, shallow=False)
if not eq:
print("Consumed an INVALID file.")
print("Consumed an INVALID file.", filename)
raise ConsumerError("Incomplete File READ FAILED")
else:
print("Consumed a perfectly valid file.")
print("Consumed a perfectly valid file.", filename)

def slow_write_file(self, target, incomplete=False):
with open(self.sample_file, 'rb') as f:
Expand All @@ -95,6 +96,24 @@ def slow_write_file(self, target, incomplete=False):
sleep(0.1)
print("file completed.")

def chunk_write_file(self, target, incomplete=False, wait_between_open=0.1):
with open(self.sample_file, 'rb') as f:
pdf_bytes = f.read()

if incomplete:
pdf_bytes = pdf_bytes[:len(pdf_bytes) - 100]

# this will take 1 second each, since the file is about 20k.
for filePart in chunked(10000, pdf_bytes):
with open(target, 'ab') as f:
print(f"{datetime.now().time()}: Start writing file.", target)
for b in chunked(1000, filePart):
f.write(b)
sleep(0.1)
print(f"{datetime.now().time()}: file closed.", target)
sleep(wait_between_open)
print(f"{datetime.now().time()}: file completed.", target)


class TestConsumer(DirectoriesMixin, ConsumerMixin, TransactionTestCase):

Expand All @@ -111,6 +130,20 @@ def test_consume_file(self):
args, kwargs = self.task_mock.call_args
self.assertEqual(args[1], f)

def test_consume_3_files(self):
self.t_start()

shutil.copy(self.sample_file, os.path.join(self.dirs.consumption_dir, "my_file1.pdf"))
shutil.copy(self.sample_file, os.path.join(self.dirs.consumption_dir, "my_file2.pdf"))
shutil.copy(self.sample_file, os.path.join(self.dirs.consumption_dir, "my_file3.pdf"))

self.wait_for_task_mock_call(3)

self.assertEqual(self.task_mock.call_count, 3)

fnames = [os.path.basename(args[1]) for args, _ in self.task_mock.call_args_list]
self.assertCountEqual(fnames, ["my_file1.pdf", "my_file2.pdf", "my_file3.pdf"])

def test_consume_file_invalid_ext(self):
self.t_start()

Expand All @@ -131,6 +164,51 @@ def test_consume_existing_file(self):
args, kwargs = self.task_mock.call_args
self.assertEqual(args[1], f)

@mock.patch("documents.management.commands.document_consumer.logger.error")
def test_chunk_write_pdf(self, error_logger):

self.task_mock.side_effect = self.bogus_task

self.t_start()

fname = os.path.join(self.dirs.consumption_dir, "my_file.pdf")

self.chunk_write_file(fname)

self.wait_for_task_mock_call()


self.task_mock.assert_called_once()

args, kwargs = self.task_mock.call_args
self.assertEqual(args[1], fname)

error_logger.assert_not_called()

@mock.patch("documents.management.commands.document_consumer.logger.error")
def test_chunk_write_3_pdf(self, error_logger):

self.task_mock.side_effect = self.bogus_task

self.t_start()

fname1 = os.path.join(self.dirs.consumption_dir, "my_file1.pdf")
fname2 = os.path.join(self.dirs.consumption_dir, "my_file2.pdf")
fname3 = os.path.join(self.dirs.consumption_dir, "my_file3.pdf")

self.chunk_write_file(fname1)
self.chunk_write_file(fname2)
self.chunk_write_file(fname3)

self.wait_for_task_mock_call(3)

self.assertEqual( self.task_mock.call_count, 3)

fnames = [os.path.basename(args[1]) for args, _ in self.task_mock.call_args_list]
self.assertCountEqual(fnames, ["my_file1.pdf", "my_file2.pdf", "my_file3.pdf"])

error_logger.assert_not_called()

@mock.patch("documents.management.commands.document_consumer.logger.error")
def test_slow_write_pdf(self, error_logger):

Expand Down Expand Up @@ -239,19 +317,42 @@ def test_is_ignored(self):
f'_is_ignored("{file_path}") != {expected_ignored}')


@override_settings(CONSUMER_POLLING=1, CONSUMER_POLLING_DELAY=1, CONSUMER_POLLING_RETRY_COUNT=20)
@override_settings(CONSUMER_POLLING=1, CONSUMER_POLLING_DELAY=2, CONSUMER_POLLING_RETRY_COUNT=20)
class TestConsumerPolling(TestConsumer):
# just do all the tests with polling
pass

@override_settings(CONSUMER_INOTIFY_WAIT_MODIFIED_DELAY=2)
class TestConsumerInotifyWait(TestConsumer):

@mock.patch("documents.management.commands.document_consumer.logger.error")
def test_chunk_write_pdf_delay_write_3s(self, error_logger):
"""Write delay is higher than wait time, errors should be raised when trying to consume!"""
self.task_mock.side_effect = self.bogus_task

self.t_start()

fname = os.path.join(self.dirs.consumption_dir, "my_file.pdf")

self.chunk_write_file(fname, False, 3)

self.wait_for_task_mock_call(3)

self.assertEqual(self.task_mock.call_count, 3)

# tried 3 times to consume, but ran into error first two times. Last one is valid one.
# TestConsumer throws errors, but production one might not -> undefinable documents appear in paperless,
# though they can be valid documents, just not complete.
self.assertEqual(error_logger.call_count, 2)


@override_settings(CONSUMER_RECURSIVE=True)
class TestConsumerRecursive(TestConsumer):
# just do all the tests with recursive
pass


@override_settings(CONSUMER_RECURSIVE=True, CONSUMER_POLLING=1, CONSUMER_POLLING_DELAY=1, CONSUMER_POLLING_RETRY_COUNT=20)
@override_settings(CONSUMER_RECURSIVE=True, CONSUMER_POLLING=1, CONSUMER_POLLING_DELAY=2, CONSUMER_POLLING_RETRY_COUNT=20)
class TestConsumerRecursivePolling(TestConsumer):
# just do all the tests with polling and recursive
pass
Expand Down
3 changes: 3 additions & 0 deletions src/paperless/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -455,6 +455,9 @@ def default_threads_per_worker(task_workers):
os.getenv("PAPERLESS_CONSUMER_POLLING_RETRY_COUNT", 5)
)

# no delay for consuming on filechange
CONSUMER_INOTIFY_WAIT_MODIFIED_DELAY = float(os.getenv("PAPERLESS_CONSUMER_INOTIFY_WAIT_MODIFIED_DELAY", 0.1))

CONSUMER_DELETE_DUPLICATES = __get_boolean("PAPERLESS_CONSUMER_DELETE_DUPLICATES")

CONSUMER_RECURSIVE = __get_boolean("PAPERLESS_CONSUMER_RECURSIVE")
Expand Down