Skip to content

Commit

Permalink
Merge pull request #831 from projectcalico/smc-fix-status-tight-loop
Browse files Browse the repository at this point in the history
Fix tight loop if watched etcd key does not exist.
  • Loading branch information
Shaun Crampton committed Oct 2, 2015
2 parents 9951512 + 865f0ae commit 592cff8
Show file tree
Hide file tree
Showing 9 changed files with 123 additions and 39 deletions.
18 changes: 15 additions & 3 deletions calico/etcdutils.py
Expand Up @@ -3,7 +3,6 @@
import logging
import re
import etcd
from etcd import EtcdConnectionFailed
from socket import timeout as SocketTimeout
import time

Expand Down Expand Up @@ -168,6 +167,7 @@ def loop(self):
# so the stack trace would just add log spam.
_log.error("Unexpected IO or etcd error, triggering "
"resync with etcd: %r.", e)
time.sleep(1) # Prevent tight loop due to unexpected error.
_log.info("%s.loop() stopped due to self.stop == True", self)

def register_path(self, *args, **kwargs):
Expand Down Expand Up @@ -207,7 +207,19 @@ def load_initial_dump(self):
:return: The etcd response object.
"""
initial_dump = self.client.read(self.key_to_poll, recursive=True)
initial_dump = None
while not initial_dump:
try:
initial_dump = self.client.read(self.key_to_poll,
recursive=True)
except etcd.EtcdKeyNotFound:
# Avoid tight-loop if the whole directory doesn't exist yet.
if self._stopped:
_log.info("Stopped: aborting load of initial dump.")
raise
_log.info("Waiting for etcd directory '%s' to exist...",
self.key_to_poll)
time.sleep(1)

# The etcd_index is the high-water-mark for the snapshot, record that
# we want to poll starting at the next index.
Expand Down Expand Up @@ -243,7 +255,7 @@ def wait_for_etcd_event(self):
timeout=Timeout(connect=10,
read=90))
_log.debug("etcd response: %r", response)
except EtcdConnectionFailed as e:
except etcd.EtcdConnectionFailed as e:
if isinstance(e.cause, (ReadTimeoutError, SocketTimeout)):
# This is expected when we're doing a poll and nothing
# happened. socket timeout doesn't seem to be caught by
Expand Down
16 changes: 15 additions & 1 deletion calico/felix/test/__init__.py
@@ -1,5 +1,19 @@
# Copyright (c) Metaswitch Networks 2015. All rights reserved.
# -*- coding: utf-8 -*-
# Copyright 2015 Metaswitch Networks
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from calico.test import lib
import logging

_log = logging.getLogger(__name__)
Expand Down
4 changes: 2 additions & 2 deletions calico/felix/test/base.py
Expand Up @@ -13,12 +13,12 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import json

import logging
import sys
import gevent
import gc

import gevent

if sys.version_info < (2, 7):
import unittest2 as unittest
else:
Expand Down
2 changes: 1 addition & 1 deletion calico/felix/test/test_fetcd.py
Expand Up @@ -138,7 +138,7 @@ def setUp(self):
self.watcher = _FelixEtcdWatcher(self.m_config, self.m_hosts_ipset)
self.m_splitter = Mock(spec=UpdateSplitter)
self.watcher.splitter = self.m_splitter
self.client = Mock(spec=etcd.Client)
self.client = Mock()
self.watcher.client = self.client

@patch("gevent.sleep", autospec=True)
Expand Down
16 changes: 16 additions & 0 deletions calico/openstack/test/__init__.py
@@ -0,0 +1,16 @@
# -*- coding: utf-8 -*-
# Copyright 2015 Metaswitch Networks
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from calico.test import lib
32 changes: 5 additions & 27 deletions calico/openstack/test/test_plugin_etcd.py
Expand Up @@ -19,41 +19,19 @@
Unit test for the Calico/OpenStack Plugin using etcd transport.
"""
import copy
import eventlet
import json
import mock
import unittest

import calico.openstack.test.lib as lib
import eventlet
import mock

import calico.test.lib as lib
import calico.openstack.mech_calico as mech_calico
import calico.openstack.t_etcd as t_etcd

from calico import common
from calico.datamodel_v1 import FELIX_STATUS_DIR


class EtcdException(Exception):
pass


class EtcdKeyNotFound(EtcdException):
pass


class EtcdClusterIdChanged(EtcdException):
pass


class EtcdEventIndexCleared(EtcdException):
pass


lib.m_etcd.EtcdException = EtcdException
lib.m_etcd.EtcdKeyNotFound = EtcdKeyNotFound
lib.m_etcd.EtcdClusterIdChanged = EtcdClusterIdChanged
lib.m_etcd.EtcdEventIndexCleared = EtcdEventIndexCleared


class TestPluginEtcd(lib.Lib, unittest.TestCase):
"""
Tests for the Calico mechanism driver. This covers the mainline
Expand Down Expand Up @@ -135,7 +113,7 @@ def check_etcd_delete(self, key, **kwargs):
try:
del self.etcd_data[key]
except KeyError:
raise EtcdKeyNotFound()
raise lib.EtcdKeyNotFound()
self.recent_deletes.add(key)

def assertEtcdWrites(self, expected):
Expand Down
16 changes: 16 additions & 0 deletions calico/test/__init__.py
@@ -0,0 +1,16 @@
# -*- coding: utf-8 -*-
# Copyright 2015 Metaswitch Networks
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from calico.test import lib
22 changes: 22 additions & 0 deletions calico/openstack/test/lib.py → calico/test/lib.py
Expand Up @@ -76,6 +76,28 @@
'status': 'ACTIVE'}


class EtcdException(Exception):
pass


class EtcdKeyNotFound(EtcdException):
pass


class EtcdClusterIdChanged(EtcdException):
pass


class EtcdEventIndexCleared(EtcdException):
pass


m_etcd.EtcdException = EtcdException
m_etcd.EtcdKeyNotFound = EtcdKeyNotFound
m_etcd.EtcdClusterIdChanged = EtcdClusterIdChanged
m_etcd.EtcdEventIndexCleared = EtcdEventIndexCleared


# Define a stub class, that we will use as the base class for
# CalicoMechanismDriver.
class DriverBase(object):
Expand Down
36 changes: 31 additions & 5 deletions calico/test/test_etcdutils.py
Expand Up @@ -21,9 +21,9 @@

import logging
import types
from etcd import EtcdResult
from mock import Mock
from calico.etcdutils import PathDispatcher
import etcd
from mock import Mock, patch, call
from calico.etcdutils import PathDispatcher, EtcdWatcher

from calico.felix.test.base import BaseTestCase

Expand Down Expand Up @@ -68,7 +68,7 @@ def assert_handled(self, key, exp_handler=SAME_AS_KEY, **exp_captures):
exp_handler = key
if isinstance(exp_handler, types.StringTypes):
exp_handler = exp_handler.strip("/")
m_response = Mock(spec=EtcdResult)
m_response = Mock(spec=etcd.EtcdResult)
m_response.key = key
m_response.action = self.action
self.dispatcher.handle_event(m_response)
Expand Down Expand Up @@ -122,7 +122,7 @@ def test_non_match(self):
self.assert_handled("/foo", exp_handler=None)

def test_cover_no_match(self):
m_result = Mock(spec=EtcdResult)
m_result = Mock(spec=etcd.EtcdResult)
m_result.key = "/a"
m_result.action = "unknown"
self.dispatcher.handle_event(m_result)
Expand Down Expand Up @@ -165,3 +165,29 @@ class TestDispatcherCaD(_TestPathDispatcherBase):
class TestDispatcherExpire(_TestPathDispatcherBase):
action = "expire"
expected_handlers = "delete"


class TestEtcdWatcher(BaseTestCase):
def setUp(self):
super(TestEtcdWatcher, self).setUp()
with patch("calico.etcdutils.EtcdWatcher.reconnect") as m_reconnect:
self.watcher = EtcdWatcher("foobar:4001", "/calico")
self.m_client = Mock()
self.watcher.client = self.m_client

def test_load_initial_dump(self):
m_response = Mock(spec=etcd.EtcdResult)
m_response.etcd_index = 10000
self.m_client.read.side_effect = [
etcd.EtcdKeyNotFound(),
m_response
]
with patch("time.sleep") as m_sleep:
self.assertEqual(self.watcher.load_initial_dump(), m_response)

m_sleep.assert_called_once_with(1)
self.m_client.read.assert_has_calls([
call("/calico", recursive=True),
call("/calico", recursive=True),
])
self.assertEqual(self.watcher.next_etcd_index, 10001)

0 comments on commit 592cff8

Please sign in to comment.