Skip to content
This repository has been archived by the owner on Dec 5, 2018. It is now read-only.

Commit

Permalink
Merge pull request #95 from mozilla/fix-92-open-connections
Browse files Browse the repository at this point in the history
Fix #92 __heartbeat__ opens but never closes connections
  • Loading branch information
jaredhirsch committed Apr 14, 2016
2 parents 952f269 + 54e44f9 commit 187b950
Show file tree
Hide file tree
Showing 5 changed files with 71 additions and 57 deletions.
1 change: 0 additions & 1 deletion recommendation/celeryconfig.py

This file was deleted.

13 changes: 2 additions & 11 deletions recommendation/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,19 +12,10 @@
YAHOO_OAUTH_KEY = env.get('YAHOO_OAUTH_KEY', '')
YAHOO_OAUTH_SECRET = env.get('YAHOO_OAUTH_SECRET', '')


RECOMMENDATION_SERVICES = env.get('RECOMMENDATION_SERVICES')
if DEBUG and RECOMMENDATION_SERVICES:
REDIS_HOST = RECOMMENDATION_SERVICES
REDIS_PORT = 6379
REDIS_DB = 0
CELERY_BROKER_URL = 'redis://%s:6379/0' % RECOMMENDATION_SERVICES
MEMCACHED_HOST = '%s:11211' % RECOMMENDATION_SERVICES
else:
REDIS_HOST = env.get('RECOMMENDATION_REDIS_HOST', 'redis')
REDIS_PORT = int(env.get('RECOMMENDATION_REDIS_PORT', 6379))
REDIS_DB = int(env.get('RECOMMENDATION_REDIS_DB', 0))
CELERY_BROKER_URL = env.get('CELERY_BROKER_URL', 'redis://redis:6379/0')
MEMCACHED_HOST = env.get('MEMCACHED_HOST', 'memcached:11211')

REDIS_TIMEOUT = env.get('RECOMMENDATION_REDIS_TIMEOUT', 10)
CELERY_BROKER_URL = env.get('CELERY_BROKER_URL', 'redis://%s:%d/%d' %
(REDIS_HOST, REDIS_PORT, REDIS_DB))
2 changes: 0 additions & 2 deletions recommendation/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
from celery import Celery
from flask import Flask

from recommendation import celeryconfig
from recommendation import conf
from recommendation.cors import cors_headers
from recommendation.mozlog.formatter import MozLogFormatter
Expand Down Expand Up @@ -52,7 +51,6 @@ def create_queue(app=None):
app = app or create_app()

queue = Celery(app.import_name, broker=app.config['CELERY_BROKER_URL'])
queue.config_from_object(celeryconfig)
queue.conf.update(app.config)
TaskBase = queue.Task

Expand Down
49 changes: 28 additions & 21 deletions recommendation/views/status.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
from os import path

import kombu
import redis
from celery.app.control import Control
from flask import Blueprint, jsonify, send_file
from redis.exceptions import ConnectionError as RedisConnectionError

from recommendation import conf
from recommendation.memcached import memcached
from recommendation.views.static import STATIC_DIR

Expand Down Expand Up @@ -36,32 +34,32 @@ def memcached_status():
raise ServiceDown()


def redis_status():
def redis_status(app):
"""
Raises ServiceDown if the Redis server used as a celery broker is down.
Since our application should not have access to the Redis server, we test
this by instantiating a Celery Control and attempting to ping it.
"""
cxn = kombu.Connection(conf.CELERY_BROKER_URL)
try:
cxn.connect()
except redis.exceptions.ConnectionError:
Control(app=app).ping(timeout=1)
except RedisConnectionError:
raise ServiceDown()
else:
cxn.close()


def celery_status():
def celery_status(app):
"""
Raises ServiceDown if there are no Celery workers available.
Raises ServiceDown if any Celery worker servers are down, if any clusters
have no workers, or if any workers are down.
"""
from recommendation.factory import create_queue
control = Control(app=create_queue())
try:
if not control.ping():
clusters = Control(app=app).ping(timeout=1)
if not clusters:
raise ServiceDown()
for cluster in clusters:
if not cluster:
raise ServiceDown()

# Redis connection errors will be handled by `redis_status`.
except redis.exceptions.ConnectionError:
pass
for host, status in cluster.items():
if 'ok' not in status or status['ok'] != 'pong':
raise ServiceDown()


@status.route('/__heartbeat__')
Expand All @@ -72,18 +70,27 @@ def heartbeat():
empty body.
"""
celery, memcached, redis = True, True, True

# create an shared celery app for testing
from recommendation.factory import create_queue
app = create_queue()

try:
celery_status()
celery_status(app)
except ServiceDown:
celery = False
try:
memcached_status()
except ServiceDown:
memcached = False
try:
redis_status()
redis_status(app)
except ServiceDown:
redis = False

# close any opened connections to broker
app.close()

return jsonify({
'celery': celery,
'memcached': memcached,
Expand Down
63 changes: 41 additions & 22 deletions recommendation/views/tests/test_status.py
Original file line number Diff line number Diff line change
@@ -1,22 +1,35 @@
import json
from os import path

import redis
from mock import patch
from nose.tools import eq_, ok_
from redis.exceptions import ConnectionError as RedisError

from recommendation.factory import create_queue
from recommendation.views.static import STATIC_DIR
from recommendation.views.status import (celery_status, memcached_status,
redis_status, ServiceDown)
from recommendation.tests.util import AppTestCase


CELERY_WORKER_OK = {'ok': 'pong'}
CELERY_PING_NO_CLUSTERS = []
CELERY_PING_OK = [{
'host1': CELERY_WORKER_OK,
'host2': CELERY_WORKER_OK
}]
MEMCACHED_WORKER_BAD = {'not ok': 'not pong'}
MEMCACHED_WORKER_OK = {'ok': 'pong'}

MEMCACHED_CLUSTER_BAD = {
'host1': MEMCACHED_WORKER_OK,
'host2': MEMCACHED_WORKER_BAD
}
MEMCACHED_CLUSTER_NO_WORKERS = {}
MEMCACHED_CLUSTER_OK = {
'host1': MEMCACHED_WORKER_OK,
'host2': MEMCACHED_WORKER_OK
}

MEMCACHED_PING_BAD = [MEMCACHED_CLUSTER_OK, MEMCACHED_CLUSTER_BAD]
MEMCACHED_PING_NO_CLUSTERS = []
MEMCACHED_PING_NO_WORKERS = [MEMCACHED_CLUSTER_OK,
MEMCACHED_CLUSTER_NO_WORKERS]
MEMCACHED_PING_OK = [MEMCACHED_CLUSTER_OK, MEMCACHED_CLUSTER_OK]


class TestStatusViews(AppTestCase):
Expand Down Expand Up @@ -89,31 +102,37 @@ def test_memcached_status_fail(self, mock_set):
with self.assertRaises(ServiceDown):
memcached_status()

@patch('recommendation.views.status.kombu.Connection.connect')
def test_redis_status_pass(self, mock_connect):
redis_status()
@patch('recommendation.views.status.Control.ping')
def test_redis_status_pass(self, mock_ping):
redis_status(create_queue())
self.assert_(True)

@patch('recommendation.views.status.kombu.Connection.connect')
def test_redis_status_fail(self, mock_connect):
mock_connect.side_effect = redis.exceptions.ConnectionError
@patch('recommendation.views.status.Control.ping')
def test_redis_status_fail(self, mock_ping):
mock_ping.side_effect = RedisError
with self.assertRaises(ServiceDown):
redis_status()
redis_status(create_queue())

@patch('recommendation.views.status.Control.ping')
def test_celery_status_pass(self, mock_ping):
mock_ping.return_value = CELERY_PING_OK
celery_status()
mock_ping.return_value = MEMCACHED_PING_OK
celery_status(create_queue())
self.assert_(True)

@patch('recommendation.views.status.Control.ping')
def test_celery_status_no_workers(self, mock_ping):
mock_ping.return_value = MEMCACHED_PING_NO_WORKERS
with self.assertRaises(ServiceDown):
celery_status(create_queue())

@patch('recommendation.views.status.Control.ping')
def test_celery_status_no_clusters(self, mock_ping):
mock_ping.return_value = CELERY_PING_NO_CLUSTERS
mock_ping.return_value = MEMCACHED_PING_NO_CLUSTERS
with self.assertRaises(ServiceDown):
celery_status()
celery_status(create_queue())

@patch('recommendation.views.status.Control.ping')
def test_celery_status_broker_down(self, mock_ping):
mock_ping.side_effect = redis.exceptions.ConnectionError
celery_status()
self.assert_(True)
def test_celery_status_workers_down(self, mock_ping):
mock_ping.return_value = MEMCACHED_PING_BAD
with self.assertRaises(ServiceDown):
celery_status(create_queue())

0 comments on commit 187b950

Please sign in to comment.