Skip to content
This repository has been archived by the owner on Feb 16, 2023. It is now read-only.

Commit

Permalink
tests.
Browse files Browse the repository at this point in the history
  • Loading branch information
jonaswinkler committed Dec 11, 2020
1 parent f5df910 commit a85792e
Showing 1 changed file with 115 additions and 1 deletion.
116 changes: 115 additions & 1 deletion src/documents/tests/test_api.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
import json
import os
import shutil
import tempfile
from unittest import mock

from django.contrib.auth.models import User
from django.test import client
from pathvalidate import ValidationError
from rest_framework.test import APITestCase
from whoosh.writing import AsyncWriter

from documents import index
from documents import index, bulk_edit
from documents.models import Document, Correspondent, DocumentType, Tag
from documents.tests.utils import DirectoriesMixin

Expand Down Expand Up @@ -515,3 +517,115 @@ def test_get_metadata_no_archive(self):
self.assertFalse(meta['has_archive_version'])
self.assertGreater(len(meta['original_metadata']), 0)
self.assertIsNone(meta['archive_metadata'])


class TestBulkEdit(DirectoriesMixin, APITestCase):

def setUp(self):
super(TestBulkEdit, self).setUp()

user = User.objects.create_superuser(username="temp_admin")
self.client.force_login(user=user)

patcher = mock.patch('documents.bulk_edit.async_task')
self.async_task = patcher.start()
self.addCleanup(patcher.stop)
self.c1 = Correspondent.objects.create(name="c1")
self.c2 = Correspondent.objects.create(name="c2")
self.dt1 = DocumentType.objects.create(name="dt1")
self.dt2 = DocumentType.objects.create(name="dt2")
self.t1 = Tag.objects.create(name="t1")
self.t2 = Tag.objects.create(name="t2")
self.doc1 = Document.objects.create(checksum="A", title="A")
self.doc2 = Document.objects.create(checksum="B", title="B", correspondent=self.c1, document_type=self.dt1)
self.doc3 = Document.objects.create(checksum="C", title="C", correspondent=self.c2, document_type=self.dt2)
self.doc4 = Document.objects.create(checksum="D", title="D")
self.doc5 = Document.objects.create(checksum="E", title="E")
self.doc2.tags.add(self.t1)
self.doc3.tags.add(self.t2)
self.doc4.tags.add(self.t1, self.t2)

def test_set_correspondent(self):
self.assertEqual(Document.objects.filter(correspondent=self.c2).count(), 1)
bulk_edit.set_correspondent([self.doc1.id, self.doc2.id, self.doc3.id], self.c2.id)
self.assertEqual(Document.objects.filter(correspondent=self.c2).count(), 3)
self.async_task.assert_called_once()
args, kwargs = self.async_task.call_args
self.assertCountEqual(kwargs['document_ids'], [self.doc1.id, self.doc2.id])

def test_unset_correspondent(self):
self.assertEqual(Document.objects.filter(correspondent=self.c2).count(), 1)
bulk_edit.set_correspondent([self.doc1.id, self.doc2.id, self.doc3.id], None)
self.assertEqual(Document.objects.filter(correspondent=self.c2).count(), 0)
self.async_task.assert_called_once()
args, kwargs = self.async_task.call_args
self.assertCountEqual(kwargs['document_ids'], [self.doc2.id, self.doc3.id])

def test_set_document_type(self):
self.assertEqual(Document.objects.filter(document_type=self.dt2).count(), 1)
bulk_edit.set_document_type([self.doc1.id, self.doc2.id, self.doc3.id], self.dt2.id)
self.assertEqual(Document.objects.filter(document_type=self.dt2).count(), 3)
self.async_task.assert_called_once()
args, kwargs = self.async_task.call_args
self.assertCountEqual(kwargs['document_ids'], [self.doc1.id, self.doc2.id])

def test_unset_document_type(self):
self.assertEqual(Document.objects.filter(document_type=self.dt2).count(), 1)
bulk_edit.set_document_type([self.doc1.id, self.doc2.id, self.doc3.id], None)
self.assertEqual(Document.objects.filter(document_type=self.dt2).count(), 0)
self.async_task.assert_called_once()
args, kwargs = self.async_task.call_args
self.assertCountEqual(kwargs['document_ids'], [self.doc2.id, self.doc3.id])

def test_add_tag(self):
self.assertEqual(Document.objects.filter(tags__id=self.t1.id).count(), 2)
bulk_edit.add_tag([self.doc1.id, self.doc2.id, self.doc3.id, self.doc4.id], self.t1.id)
self.assertEqual(Document.objects.filter(tags__id=self.t1.id).count(), 4)
self.async_task.assert_called_once()
args, kwargs = self.async_task.call_args
self.assertCountEqual(kwargs['document_ids'], [self.doc1.id, self.doc3.id])


def test_remove_tag(self):
self.assertEqual(Document.objects.filter(tags__id=self.t1.id).count(), 2)
bulk_edit.remove_tag([self.doc1.id, self.doc3.id, self.doc4.id], self.t1.id)
self.assertEqual(Document.objects.filter(tags__id=self.t1.id).count(), 1)
self.async_task.assert_called_once()
args, kwargs = self.async_task.call_args
self.assertCountEqual(kwargs['document_ids'], [self.doc4.id])

def test_delete(self):
self.assertEqual(Document.objects.count(), 5)
bulk_edit.delete([self.doc1.id, self.doc2.id])
self.assertEqual(Document.objects.count(), 3)
self.assertCountEqual([doc.id for doc in Document.objects.all()], [self.doc3.id, self.doc4.id, self.doc5.id])

def test_api(self):
self.assertEqual(Document.objects.count(), 5)
response = self.client.post("/api/documents/bulk_edit/", json.dumps({
"documents": [self.doc1.id],
"method": "delete",
"parameters": {}
}), content_type='application/json')
self.assertEqual(response.status_code, 200)
self.assertEqual(Document.objects.count(), 4)

def test_api_invalid_doc(self):
self.assertEqual(Document.objects.count(), 5)
response = self.client.post("/api/documents/bulk_edit/", json.dumps({
"documents": [-235],
"method": "delete",
"parameters": {}
}), content_type='application/json')
self.assertEqual(response.status_code, 400)
self.assertEqual(Document.objects.count(), 5)

def test_api_invalid_method(self):
self.assertEqual(Document.objects.count(), 5)
response = self.client.post("/api/documents/bulk_edit/", json.dumps({
"documents": [self.doc2.id],
"method": "exterminate",
"parameters": {}
}), content_type='application/json')
self.assertEqual(response.status_code, 400)
self.assertEqual(Document.objects.count(), 5)

0 comments on commit a85792e

Please sign in to comment.