Skip to content

Commit

Permalink
Handles closed db connections
Browse files Browse the repository at this point in the history
When the authentication middleware was added in pulpcore 3.15, it became the first place
in the content app that made an attempt to use the database. As a result, it is a convinient
place to handle InterfaceError and  OperationalError which are raised when the database
connection has been closed. When this occurs, Handler._reset_db_connection() is called to
clean up the database connection and the middleware tries to use the database again.

If the database connection is closed later in the handling of the request by the content app,
the user will still get a 500 error. However, the next request will be handled properly.

This patch also adds a call to Handler._reset_db_connection() inside the heartbeat method.

backports #9276

fixes: #9598
https://pulp.plan.io/issues/9598

(cherry picked from commit 3faa649)
  • Loading branch information
dkliban committed Dec 2, 2021
1 parent 5dc7314 commit 1ddf16a
Show file tree
Hide file tree
Showing 6 changed files with 96 additions and 12 deletions.
2 changes: 2 additions & 0 deletions CHANGES/9598.bugfix
@@ -0,0 +1,2 @@
Fixed bug where the content app would stop working after a brief loss of connection to the database.
(backported from #9276)
26 changes: 20 additions & 6 deletions pulpcore/content/__init__.py
Expand Up @@ -19,6 +19,10 @@
django.setup()

from django.conf import settings # noqa: E402: module level not at top of file
from django.db.utils import ( # noqa: E402: module level not at top of file
InterfaceError,
OperationalError,
)

from pulpcore.app.apps import pulp_plugin_configs # noqa: E402: module level not at top of file
from pulpcore.app.models import ContentAppStatus # noqa: E402: module level not at top of file
Expand All @@ -41,14 +45,24 @@ async def _heartbeat():
msg = i8ln_msg.format(name=name, interarrival=heartbeat_interval)

while True:
content_app_status, created = await sync_to_async(ContentAppStatus.objects.get_or_create)(
name=name
)

if not created:
await sync_to_async(content_app_status.save_heartbeat)()
try:
content_app_status, created = await sync_to_async(
ContentAppStatus.objects.get_or_create
)(name=name)

log.debug(msg)
if not created:
await sync_to_async(content_app_status.save_heartbeat)()

log.debug(msg)
except (InterfaceError, OperationalError):
await sync_to_async(Handler._reset_db_connection)()
i8ln_msg = _(
"Content App '{name}' failed to write a heartbeat to the database, sleeping for "
"'{interarrival}' seconds."
)
msg = i8ln_msg.format(name=name, interarrival=heartbeat_interval)
log.info(msg)
await asyncio.sleep(heartbeat_interval)


Expand Down
9 changes: 8 additions & 1 deletion pulpcore/content/authentication.py
Expand Up @@ -3,10 +3,13 @@

from asgiref.sync import sync_to_async
from aiohttp.web import middleware
from django.db.utils import InterfaceError, OperationalError
from django.http.request import HttpRequest
from rest_framework.views import APIView
from rest_framework.exceptions import APIException

from .handler import Handler

log = logging.getLogger(__name__)
_ = gettext.gettext

Expand All @@ -20,7 +23,11 @@ async def authenticate(request, handler):
def _authenticate_blocking():
drf_request = fake_view.initialize_request(django_request)
try:
fake_view.perform_authentication(drf_request)
try:
fake_view.perform_authentication(drf_request)
except (InterfaceError, OperationalError):
Handler._reset_db_connection()
fake_view.perform_authentication(drf_request)
except APIException as e:
log.warning(_('"{} {}" "{}": {}').format(request.method, request.path, request.host, e))

Expand Down
5 changes: 0 additions & 5 deletions pulpcore/content/handler.py
Expand Up @@ -121,7 +121,6 @@ async def list_distributions(self, request):
Returns:
:class:`aiohttp.web.HTTPOk`: The response back to the client.
"""
self._reset_db_connection()

def get_base_paths_blocking():
if self.distribution_model is None:
Expand Down Expand Up @@ -200,8 +199,6 @@ async def stream_content(self, request):
:class:`aiohttp.web.StreamResponse` or :class:`aiohttp.web.FileResponse`: The response
back to the client.
"""
self._reset_db_connection()

path = request.match_info["path"]
return await self._match_and_stream(path, request)

Expand Down Expand Up @@ -240,8 +237,6 @@ def _match_distribution(cls, path):
Raises:
PathNotResolved: when not matched.
"""
cls._reset_db_connection()

base_paths = cls._base_paths(path)
if cls.distribution_model is None:
try:
Expand Down
31 changes: 31 additions & 0 deletions pulpcore/tests/functional/api/using_plugin/test_distributions.py
Expand Up @@ -2,6 +2,7 @@
import csv
import hashlib
import os
from time import sleep
import unittest
from urllib.parse import urljoin

Expand All @@ -16,6 +17,7 @@
get_versions,
modify_repo,
sync,
utils as pulp3_utils,
)
from requests.exceptions import HTTPError

Expand Down Expand Up @@ -398,6 +400,35 @@ def test_content_served_immediate_with_range_request_start_value_larger_than_con
)
self.assertEqual(cm.exception.response.status_code, 416)

def test_content_served_after_db_restart(self):
"""
Assert that content can be downloaded after the database has been restarted.
This test also check that the HTML page with a list of distributions is also
available after the connection to the database has been closed.
"""
cfg = config.get_config()
pulp_host = cfg.hosts[0]
svc_mgr = cli.ServiceManager(cfg, pulp_host)
if svc_mgr._svc_mgr == "s6":
postgresql_service_name = "postgresql"
else:
postgresql_service_name = "*postgresql*"
postgresql_found = svc_mgr.is_active([postgresql_service_name])
self.assertTrue(
postgresql_found, "PostgreSQL service not found or is not active. Can't restart it."
)
svc_mgr.restart([postgresql_service_name])
# Wait for postgres to come back and all services to recover
sleep(2)
self.setup_download_test("immediate")
self.do_test_content_served()
url_fragments = [
cfg.get_content_host_base_url(),
"pulp/content",
]
content_app_root = "/".join(url_fragments)
pulp3_utils.http_get(content_app_root)

def setup_download_test(self, policy, url=None, publish=True):
# Create a repository
self.repo = self.repo_api.create(gen_repo())
Expand Down
35 changes: 35 additions & 0 deletions pulpcore/tests/unit/content/test_heartbeat.py
@@ -0,0 +1,35 @@
import asyncio
from unittest import skip
from unittest.mock import patch, call

from django.db.utils import InterfaceError, OperationalError
from django.test import TestCase

from pulpcore.content import _heartbeat


class ContentHeartbeatTestCase(TestCase):
@skip("Skipping while resolving https://github.com/rochacbruno/dynaconf/issues/689")
@patch("pulpcore.app.models.ContentAppStatus.objects.get_or_create")
@patch("pulpcore.content.handler.Handler._reset_db_connection")
def test_db_connection_interface_error(self, mock_reset_db, mock_get_or_create):
"""
Test that if an InterfaceError or OperationalError is raised,
Handler._reset_db_connection() is called
"""

class MockException(Exception):
pass

mock_get_or_create.side_effect = [InterfaceError(), OperationalError(), MockException()]

loop = asyncio.get_event_loop()
with self.settings(CONTENT_APP_TTL=1):
try:
loop.run_until_complete(_heartbeat())
except MockException:
pass
loop.close()

mock_get_or_create.assert_called()
mock_reset_db.assert_has_calls([call(), call()])

0 comments on commit 1ddf16a

Please sign in to comment.