Skip to content
This repository has been archived by the owner on Feb 16, 2023. It is now read-only.

Commit

Permalink
Create tags from sub directories
Browse files Browse the repository at this point in the history
The names of sub directories in the consumer directory will be added as
tags for the document to be consumed.
To enable this, set:
PAPERLESS_CONSUMER_RECURSIVE=1
PAPERLESS_CONSUMER_SUBDIRS_AS_TAGS=1

Fixes #50
  • Loading branch information
jayme-github committed Nov 29, 2020
1 parent dddd6f5 commit 6a055d9
Show file tree
Hide file tree
Showing 4 changed files with 108 additions and 24 deletions.
2 changes: 1 addition & 1 deletion Pipfile
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ scikit-learn="~=0.23.2"
whitenoise = "~=5.2.0"
watchdog = "*"
whoosh="~=2.7.4"
inotify-simple = "*"
inotifyrecursive = ">=0.3.4"

[dev-packages]
coveralls = "*"
Expand Down
86 changes: 65 additions & 21 deletions src/documents/management/commands/document_consumer.py
Original file line number Diff line number Diff line change
@@ -1,31 +1,60 @@
import logging
import os
from pathlib import Path
from time import sleep

from django.conf import settings
from django.core.management.base import BaseCommand, CommandError
from django.utils.text import slugify
from django_q.tasks import async_task
from watchdog.events import FileSystemEventHandler
from watchdog.observers.polling import PollingObserver

from documents.models import Tag

try:
from inotify_simple import INotify, flags
from inotifyrecursive import INotify, flags
except ImportError:
INotify = flags = None

logger = logging.getLogger(__name__)


def _consume(file):
def _tags_from_path(filepath):
"""Walk up the directory tree from filepath to CONSUMPTION_DIr
and get or create Tag IDs for every directory.
"""
tag_ids = set()
path_parts = Path(filepath).relative_to(
settings.CONSUMPTION_DIR).parent.parts
for part in path_parts:
tag_ids.add(Tag.objects.get_or_create(
slug=slugify(part),
defaults={"name": part},
)[0].pk)

return tag_ids


def _consume(filepath):
if not os.path.isfile(filepath):
logger.debug(
f"Not consuming file {filepath}: File has moved.")
return

tag_ids = None
try:
if os.path.isfile(file):
async_task("documents.tasks.consume_file",
file,
task_name=os.path.basename(file)[:100])
else:
logger.debug(
f"Not consuming file {file}: File has moved.")
if settings.CONSUMER_SUBDIRS_AS_TAGS:
tag_ids = _tags_from_path(filepath)
except Exception as e:
logger.error(
"Error creating tags from path: {}".format(e))

try:
async_task("documents.tasks.consume_file",
filepath,
override_tag_ids=tag_ids if tag_ids else None,
task_name=os.path.basename(filepath)[:100])
except Exception as e:
# Catch all so that the consumer won't crash.
# This is also what the test case is listening for to check for
Expand Down Expand Up @@ -94,6 +123,7 @@ def add_arguments(self, parser):

def handle(self, *args, **options):
directory = options["directory"]
recursive = settings.CONSUMER_RECURSIVE

if not directory:
raise CommandError(
Expand All @@ -104,24 +134,30 @@ def handle(self, *args, **options):
raise CommandError(
f"Consumption directory {directory} does not exist")

for entry in os.scandir(directory):
_consume(entry.path)
if recursive:
for dirpath, _, filenames in os.walk(directory):
for filename in filenames:
filepath = os.path.join(dirpath, filename)
_consume(filepath)
else:
for entry in os.scandir(directory):
_consume(entry.path)

if options["oneshot"]:
return

if settings.CONSUMER_POLLING == 0 and INotify:
self.handle_inotify(directory)
self.handle_inotify(directory, recursive)
else:
self.handle_polling(directory)
self.handle_polling(directory, recursive)

logger.debug("Consumer exiting.")

def handle_polling(self, directory):
def handle_polling(self, directory, recursive):
logging.getLogger(__name__).info(
f"Polling directory for changes: {directory}")
self.observer = PollingObserver(timeout=settings.CONSUMER_POLLING)
self.observer.schedule(Handler(), directory, recursive=False)
self.observer.schedule(Handler(), directory, recursive=recursive)
self.observer.start()
try:
while self.observer.is_alive():
Expand All @@ -132,18 +168,26 @@ def handle_polling(self, directory):
self.observer.stop()
self.observer.join()

def handle_inotify(self, directory):
def handle_inotify(self, directory, recursive):
logging.getLogger(__name__).info(
f"Using inotify to watch directory for changes: {directory}")

inotify = INotify()
descriptor = inotify.add_watch(
directory, flags.CLOSE_WRITE | flags.MOVED_TO)
inotify_flags = flags.CLOSE_WRITE | flags.MOVED_TO
if recursive:
descriptor = inotify.add_watch_recursive(directory, inotify_flags)
else:
descriptor = inotify.add_watch(directory, inotify_flags)

try:
while not self.stop_flag:
for event in inotify.read(timeout=1000, read_delay=1000):
file = os.path.join(directory, event.name)
_consume(file)
for event in inotify.read(timeout=1000):
if recursive:
path = inotify.get_path(event.wd)
else:
path = directory
filepath = os.path.join(path, event.name)
_consume(filepath)
except KeyboardInterrupt:
pass

Expand Down
35 changes: 33 additions & 2 deletions src/documents/tests/test_management_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,9 @@

from django.conf import settings
from django.core.management import call_command, CommandError
from django.test import override_settings, TestCase
from django.test import override_settings, TransactionTestCase

from documents.models import Tag
from documents.consumer import ConsumerError
from documents.management.commands import document_consumer
from documents.tests.utils import DirectoriesMixin
Expand All @@ -33,7 +34,7 @@ def chunked(size, source):
yield source[i:i+size]


class TestConsumer(DirectoriesMixin, TestCase):
class TestConsumer(DirectoriesMixin, TransactionTestCase):

sample_file = os.path.join(os.path.dirname(__file__), "samples", "simple.pdf")

Expand Down Expand Up @@ -126,6 +127,36 @@ def test_consume_existing_file(self):
def test_consume_existing_file_polling(self):
self.test_consume_existing_file()

@override_settings(CONSUMER_RECURSIVE=1)
@override_settings(CONSUMER_SUBDIRS_AS_TAGS=1)
def test_consume_file_with_path_tags(self):

tag_names = ("existingTag", "Space Tag")
# Create a Tag prior to consuming a file using it in path
tag_ids = [Tag.objects.create(name=tag_names[0]).pk,]

self.t_start()

path = os.path.join(self.dirs.consumption_dir, *tag_names)
os.makedirs(path, exist_ok=True)
f = os.path.join(path, "my_file.pdf")
shutil.copy(self.sample_file, f)

self.wait_for_task_mock_call()

self.task_mock.assert_called_once()

# Add the pk of the Tag created by _consume()
tag_ids.append(Tag.objects.get(name=tag_names[1]).pk)

args, kwargs = self.task_mock.call_args
self.assertEqual(args[1], f)

# assertCountEqual has a bad name, but test that the first
# sequence contains the same elements as second, regardless of
# their order.
self.assertCountEqual(kwargs["override_tag_ids"], tag_ids)

@mock.patch("documents.management.commands.document_consumer.logger.error")
def test_slow_write_pdf(self, error_logger):

Expand Down
9 changes: 9 additions & 0 deletions src/paperless/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,15 @@ def default_threads_per_worker():

CONSUMER_DELETE_DUPLICATES = __get_boolean("PAPERLESS_CONSUMER_DELETE_DUPLICATES")

# Consume from subdirectories of CONSUMPTION_DIR as well
CONSUMER_RECURSIVE = __get_boolean("PAPERLESS_CONSUMER_RECURSIVE")

# Set the names of subdirectories as tags for consumed files.
# E.g. $CONSUMPTION_DIR/foo/bar/file.pdf will add the tags "foo" and "bar" to
# the consumed file.
# PAPERLESS_CONSUMER_RECURSIVE must be enabled for this to work.
CONSUMER_SUBDIRS_AS_TAGS = __get_boolean("PAPERLESS_CONSUMER_SUBDIRS_AS_TAGS")

OPTIMIZE_THUMBNAILS = __get_boolean("PAPERLESS_OPTIMIZE_THUMBNAILS", "true")

OCR_PAGES = int(os.getenv('PAPERLESS_OCR_PAGES', 0))
Expand Down

0 comments on commit 6a055d9

Please sign in to comment.