Skip to content

Commit

Permalink
Merge pull request #7975 from rtibbles/force-utf8-for-job-storage
Browse files Browse the repository at this point in the history
Run iceqube tests on postgres.
  • Loading branch information
bjester committed Apr 27, 2021
2 parents 88aa6ec + 6c3d7dc commit 2e41b45
Show file tree
Hide file tree
Showing 15 changed files with 210 additions and 194 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@
)
from kolibri.core.content.utils.sqlalchemybridge import get_default_db_string
from kolibri.core.content.utils.sqlalchemybridge import prepare_base
from kolibri.core.content.utils.sqlalchemybridge import SharingPool
from kolibri.core.content.utils.sqlalchemybridge import SQLALCHEMY_CLASSES_PATH_TEMPLATE
from kolibri.core.utils.sqlalchemy_utils import SharingPool

DATA_PATH_TEMPLATE = os.path.join(
os.path.dirname(__file__), "../../fixtures/{name}_content_data.json"
Expand Down
2 changes: 1 addition & 1 deletion kolibri/core/content/test/sqlalchemytesting.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from sqlalchemy import create_engine

from kolibri.core.content.utils.sqlalchemybridge import get_default_db_string
from kolibri.core.content.utils.sqlalchemybridge import SharingPool
from kolibri.core.utils.sqlalchemy_utils import SharingPool


def django_connection_engine():
Expand Down
40 changes: 0 additions & 40 deletions kolibri/core/content/utils/sqlalchemybridge.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,46 +44,6 @@ class ClassNotFoundError(Exception):
pass


# get_conn and SharingPool code modified from:
# http://nathansnoggin.blogspot.com/2013/11/integrating-sqlalchemy-into-django.html


def get_conn(self):
"""
custom connection factory, so we can share with django
"""
from django.db import connections

conn = connections["default"]
return conn.connection


class SharingPool(NullPool):
"""
custom connection pool that doesn't close connections, and uses our
custom connection factory
"""

def __init__(self, get_connection, **kwargs):
kwargs["reset_on_return"] = False
super(SharingPool, self).__init__(get_conn, **kwargs)

def status(self):
return "Sharing Pool"

def _do_return_conn(self, conn):
pass

def _do_get(self):
return self._create_connection()

def _close_connection(self, connection):
pass

def dispose(self):
pass


def sqlite_connection_string(db_path):
# Call normpath to ensure that Windows paths are properly formatted
return "sqlite:///{db_path}".format(db_path=os.path.normpath(db_path))
Expand Down
4 changes: 3 additions & 1 deletion kolibri/core/tasks/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -865,7 +865,9 @@ def channeldiffstats(self, request):


class FacilityTasksViewSet(BaseViewSet):
queues = [facility_queue]
@property
def queues(self):
return [facility_queue]

def default_permission_classes(self):
permission_classes = super(FacilityTasksViewSet, self).permission_classes
Expand Down
8 changes: 8 additions & 0 deletions kolibri/core/tasks/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from kolibri.core.tasks.queue import Queue
from kolibri.core.tasks.scheduler import Scheduler
from kolibri.core.tasks.worker import Worker
from kolibri.core.utils.sqlalchemy_utils import SharingPool
from kolibri.utils import conf


Expand All @@ -25,6 +26,8 @@ def __create_engine():
path=os.path.join(conf.KOLIBRI_HOME, "job_storage.sqlite3")
),
connect_args={"check_same_thread": False},
# Use NullPool for SQLite as we use a completely separate database
# file, so no need to share anything with Django.
poolclass=NullPool,
)

Expand All @@ -43,6 +46,11 @@ def __create_engine():
else "",
),
pool_pre_ping=True,
client_encoding="utf8",
# Use our SharingPool for Postgres, so as to ensure that
# we share underlying database connections with Django
# this results in cleaner shutdown and clean up of connections
poolclass=SharingPool,
)


Expand Down
50 changes: 50 additions & 0 deletions kolibri/core/tasks/test/base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
import os
import tempfile
from contextlib import contextmanager

from sqlalchemy import create_engine
from sqlalchemy.pool import NullPool

from kolibri.utils import conf


@contextmanager
def connection():
database_engine_option = conf.OPTIONS["Database"]["DATABASE_ENGINE"]

if database_engine_option == "sqlite":
fd, filepath = tempfile.mkstemp()
engine = create_engine(
"sqlite:///{path}".format(path=filepath),
connect_args={"check_same_thread": False},
poolclass=NullPool,
)
yield engine
engine.dispose()
os.close(fd)
try:
os.remove(filepath)
except OSError:
# Don't fail test because of difficulty cleaning up.
pass
elif database_engine_option == "postgres":
engine = create_engine(
"postgresql://{user}:{password}@{host}{port}/{name}".format(
name=conf.OPTIONS["Database"]["DATABASE_NAME"],
password=conf.OPTIONS["Database"]["DATABASE_PASSWORD"],
user=conf.OPTIONS["Database"]["DATABASE_USER"],
host=conf.OPTIONS["Database"]["DATABASE_HOST"],
port=":" + conf.OPTIONS["Database"]["DATABASE_PORT"]
if conf.OPTIONS["Database"]["DATABASE_PORT"]
else "",
),
pool_pre_ping=True,
client_encoding="utf8",
poolclass=NullPool,
)
yield engine
engine.dispose()
else:
raise Exception(
"Unknown database engine option: {}".format(database_engine_option)
)
65 changes: 33 additions & 32 deletions kolibri/core/tasks/test/test_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
from mock import call
from mock import Mock
from mock import patch
from mock import PropertyMock
from rest_framework.exceptions import AuthenticationFailed
from rest_framework.exceptions import ParseError
from rest_framework.exceptions import PermissionDenied
Expand Down Expand Up @@ -65,60 +64,60 @@ def setUp(self):
self.client.login(username=self.superuser.username, password=DUMMY_PASSWORD)


@patch("kolibri.core.tasks.api.priority_queue")
@patch("kolibri.core.tasks.api.queue")
class TaskAPITestCase(BaseAPITestCase):
def test_task_cancel(self, queue_mock):
def test_task_cancel(self, queue_mock, priority_queue_mock):
queue_mock.fetch_job.return_value = fake_job(state=State.CANCELED)
response = self.client.post(
reverse("kolibri:core:task-canceltask"), {"task_id": "1"}, format="json"
)
self.assertEqual(response.data, {})

def test_task_cancel_no_task(self, queue_mock):
def test_task_cancel_no_task(self, queue_mock, priority_queue_mock):
queue_mock.cancel.side_effect = JobNotFound()
response = self.client.post(
reverse("kolibri:core:task-canceltask"), {"task_id": "1"}, format="json"
)
self.assertEqual(response.status_code, 200)

def test_task_get_no_task(self, queue_mock):
def test_task_get_no_task(self, queue_mock, priority_queue_mock):
queue_mock.fetch_job.side_effect = JobNotFound()
priority_queue_mock.fetch_job.side_effect = JobNotFound()
response = self.client.get(
reverse("kolibri:core:task-detail", kwargs={"pk": "1"}),
{"task_id": "1"},
format="json",
)
self.assertEqual(response.status_code, 404)

def test_tasks_clearable_flag(self, queue_mock):
with patch(
"kolibri.core.tasks.queue.Queue.jobs", new_callable=PropertyMock
) as jobs_mock:
jobs_mock.return_value = [
fake_job(state=state)
for state in [
# not clearable
State.SCHEDULED,
State.QUEUED,
State.RUNNING,
State.CANCELING,
# clearable
State.FAILED,
State.CANCELED,
State.COMPLETED,
]
def test_tasks_clearable_flag(self, queue_mock, priority_queue_mock):
queue_mock.jobs = [
fake_job(state=state)
for state in [
# not clearable
State.SCHEDULED,
State.QUEUED,
State.RUNNING,
State.CANCELING,
# clearable
State.FAILED,
State.CANCELED,
State.COMPLETED,
]
response = self.client.get(reverse("kolibri:core:task-list"))
]
priority_queue_mock.jobs = []
response = self.client.get(reverse("kolibri:core:task-list"))

def assert_clearable(index, expected):
self.assertEqual(response.data[index]["clearable"], expected)
def assert_clearable(index, expected):
self.assertEqual(response.data[index]["clearable"], expected)

for i in [0, 1, 2, 3]:
assert_clearable(i, False)
for i in [4, 5, 6]:
assert_clearable(i, True)
for i in [0, 1, 2, 3]:
assert_clearable(i, False)
for i in [4, 5, 6]:
assert_clearable(i, True)

def test_restart_task(self, queue_mock):
def test_restart_task(self, queue_mock, priority_queue_mock):
queue_mock.restart_job.return_value = 1
queue_mock.fetch_job.return_value = fake_job(state=State.QUEUED, job_id=1)

Expand All @@ -139,6 +138,8 @@ def test_restart_task(self, queue_mock):
self.assertDictEqual(response.data, expected_response)


@patch("kolibri.core.tasks.api.priority_queue")
@patch("kolibri.core.tasks.api.queue")
class TaskAPIPermissionsTestCase(APITestCase):
def setUp(self):
DeviceSettings.objects.create(is_provisioned=True)
Expand All @@ -147,7 +148,7 @@ def setUp(self):
self.facility.add_admin(admin)
self.client.login(username=admin.username, password=DUMMY_PASSWORD)

def test_exportlogs_permissions(self):
def test_exportlogs_permissions(self, queue_mock, priority_queue_mock):
with patch("kolibri.core.tasks.api._job_to_response", return_value={}):
response = self.client.post(
reverse("kolibri:core:task-startexportlogcsv"),
Expand All @@ -156,13 +157,13 @@ def test_exportlogs_permissions(self):
)
self.assertEqual(response.status_code, 200)

def test_list_permissions(self):
def test_list_permissions(self, queue_mock, priority_queue_mock):
with patch("kolibri.core.tasks.api._job_to_response", return_value={}):
response = self.client.get(reverse("kolibri:core:task-list"), format="json")
self.assertEqual(response.status_code, 200)


@patch("kolibri.core.tasks.api.facility_queue", spec=True)
@patch("kolibri.core.tasks.api.facility_queue")
class FacilityTaskAPITestCase(BaseAPITestCase):
def assertJobResponse(self, job_data, response):
id = job_data.get("job_id", fake_job_defaults.get("job_id"))
Expand Down
45 changes: 14 additions & 31 deletions kolibri/core/tasks/test/test_job_running.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,15 @@
import os
import tempfile
import time
import uuid

import pytest
from sqlalchemy import create_engine
from sqlalchemy.pool import NullPool

from kolibri.core.tasks.compat import Event
from kolibri.core.tasks.exceptions import JobNotRestartable
from kolibri.core.tasks.job import Job
from kolibri.core.tasks.job import State
from kolibri.core.tasks.queue import Queue
from kolibri.core.tasks.storage import Storage
from kolibri.core.tasks.test.base import connection
from kolibri.core.tasks.utils import get_current_job
from kolibri.core.tasks.utils import import_stringified_func
from kolibri.core.tasks.utils import stringify_func
Expand All @@ -21,37 +18,22 @@

@pytest.fixture
def backend():
fd, filepath = tempfile.mkstemp()
connection = create_engine(
"sqlite:///{path}".format(path=filepath),
connect_args={"check_same_thread": False},
poolclass=NullPool,
)
b = Storage(connection)
yield b
b.clear()
os.close(fd)
os.remove(filepath)
with connection() as c:
b = Storage(c)
b.clear(force=True)
yield b
b.clear(force=True)


@pytest.fixture
def inmem_queue():
fd, filepath = tempfile.mkstemp()
connection = create_engine(
"sqlite:///{path}".format(path=filepath),
connect_args={"check_same_thread": False},
poolclass=NullPool,
)
e = Worker(queues="pytest", connection=connection)
c = Queue(queue="pytest", connection=connection)
c.e = e
yield c
e.shutdown()
os.close(fd)
try:
os.remove(filepath)
except OSError:
pass
with connection() as conn:
e = Worker(queues="pytest", connection=conn)
c = Queue(queue="pytest", connection=conn)
c.e = e
c.storage.clear(force=True)
yield c
e.shutdown()


@pytest.fixture
Expand Down Expand Up @@ -201,6 +183,7 @@ def wait_for_state_change(inmem_queue, job_id, state):
assert time_spent < 5


@pytest.mark.django_db
class TestQueue(object):
def test_enqueues_a_function(self, inmem_queue):
job_id = inmem_queue.enqueue(id, 1)
Expand Down

0 comments on commit 2e41b45

Please sign in to comment.