Skip to content
This repository has been archived by the owner on Dec 13, 2019. It is now read-only.

Commit

Permalink
python: Add IP addresses to serverset entries
Browse files Browse the repository at this point in the history
ServerSet entries contain hostnames that require a DNS lookup before
use. When hostnames map to a unique, fixed IP address, this additional
lookup is unnecessary and can put strain on DNS infrastructure in large
deployments.

This change adds 2 optional fields to server set endpoints: `inet` and
`inet6` respectively for a human readable representation of an IPv4 and
of an IPv6 address.

Hostnames are still mandatory, for backward compatibility with clients
that expect those fields. Port is untouched.
  • Loading branch information
atollena committed May 13, 2016
1 parent 941c7b4 commit a85b7ad
Show file tree
Hide file tree
Showing 5 changed files with 187 additions and 40 deletions.
56 changes: 49 additions & 7 deletions src/python/twitter/common/zookeeper/serverset/endpoint.py
@@ -1,4 +1,5 @@
import json
import socket
from thrift.TSerialization import deserialize as thrift_deserialize

from gen.twitter.thrift.endpoint.ttypes import (
Expand All @@ -16,25 +17,59 @@
class Endpoint(object):
@classmethod
def unpack_thrift(cls, blob):
return cls(blob.host, blob.port)
return cls(blob.host, blob.port, blob.inet, blob.inet6)

@classmethod
def from_dict(cls, value):
inet = value.get('inet')
inet6 = value.get('inet6')
return Endpoint(value['host'], value['port'], inet, inet6)

@classmethod
def to_dict(cls, endpoint):
return {
d = {
'host': endpoint.host,
'port': endpoint.port
}
if endpoint.inet is not None:
d['inet'] = endpoint.inet
if endpoint.inet6 is not None:
d['inet6'] = endpoint.inet6
return d

def __init__(self, host, port, inet=None, inet6=None):
"""host -- the hostname of the device where this endpoint can be found
port -- the port number the device serves this endpoint on
inet -- a human-readable representation of the IPv4 address of the device
where this endpoint can be found.
inet6 -- a human-readable representation of the IPv6 address of the device
where this endpoint can be found.
`inet` and/or `inet6` can be used when present to avoid a DNS lookup.
def __init__(self, host, port):
"""
if not isinstance(host, Compatibility.string):
raise ValueError('Expected host to be a string!')
if not isinstance(port, int):
raise ValueError('Expected port to be an integer!')
if inet is not None:
try:
socket.inet_pton(socket.AF_INET, inet)
except socket.error:
raise ValueError('Expected "%s" to be a string containing a valid IPv4 address!' % inet)
if inet6 is not None:
try:
socket.inet_pton(socket.AF_INET6, inet6)
except socket.error:
raise ValueError('Expected "%s" to be a string containing a valid IPv6 address!' % inet6)

self._host = host
self._port = port
self._inet = inet
self._inet6 = inet6

def __key(self):
return (self.host, self.port)
return (self.host, self.port, self._inet, self._inet6)

def __eq__(self, other):
return isinstance(other, self.__class__) and self.__key() == other.__key()
Expand All @@ -50,6 +85,14 @@ def host(self):
def port(self):
return self._port

@property
def inet(self):
return self._inet

@property
def inet6(self):
return self._inet6

def __str__(self):
return '%s:%s' % (self.host, self.port)

Expand Down Expand Up @@ -123,12 +166,12 @@ def unpack_json(cls, blob, member_id=None):
for key in ('status', 'serviceEndpoint', 'additionalEndpoints'):
if key not in blob:
raise ValueError('Expected to find %s in ServiceInstance JSON!' % key)
additional_endpoints = dict((name, Endpoint(value['host'], value['port']))
additional_endpoints = dict((name, Endpoint.from_dict(value))
for name, value in blob['additionalEndpoints'].items())
shard = cls.__check_int(blob.get('shard'))
member_id = cls.__check_int(member_id)
return cls(
service_endpoint=Endpoint(blob['serviceEndpoint']['host'], blob['serviceEndpoint']['port']),
service_endpoint=Endpoint.from_dict(blob['serviceEndpoint']),
additional_endpoints=additional_endpoints,
status=Status.from_string(blob['status']),
shard=shard,
Expand Down Expand Up @@ -242,7 +285,6 @@ def __eq__(self, other):
def __hash__(self):
return hash(self.__key())


def __str__(self):
return 'ServiceInstance(%s, %s, %saddl: %s, status: %s)' % (
self.service_endpoint,
Expand Down
12 changes: 12 additions & 0 deletions src/thrift/com/twitter/thrift/endpoint.thrift
Expand Up @@ -76,6 +76,18 @@ struct Endpoint {
* The TCP port the endpoint listens on.
*/
2: i32 port

/*
* The human readable representation of a IPv4 address of the endpoint. May
* be used instead of a DNS lookup for host.
*/
3: optional string inet

/*
* The IPv6 address of the endpoint. May be used instead of a DNS lookup for
* host.
*/
4: optional string inet6
}

/*
Expand Down
4 changes: 2 additions & 2 deletions tests/python/twitter/common/zookeeper/serverset/BUILD
@@ -1,7 +1,7 @@
python_test_suite(
name = 'all',
dependencies = [
':endpoint',
':test_endpoint',
':test_kazoo_serverset',
':test_serverset_unit',
],
Expand All @@ -25,7 +25,7 @@ python_tests(
)

python_tests(
name = 'endpoint',
name = 'test_endpoint',
sources = ['test_endpoint.py'],
dependencies = [
'src/python/twitter/common/zookeeper/serverset:serverset_base'
Expand Down
145 changes: 117 additions & 28 deletions tests/python/twitter/common/zookeeper/serverset/test_endpoint.py
Expand Up @@ -14,47 +14,72 @@
# limitations under the License.
# ==================================================================================================

import pytest
from twitter.common.zookeeper.serverset.endpoint import Endpoint, ServiceInstance, Status


def _service_instance(vals):
json = '''{
"additionalEndpoints": {
"aurora": {
"host": "smfd-akb-%d-sr1.devel.twitter.com",
"port": 31181
},
"health": {
"host": "smfd-akb-%d-sr1.devel.twitter.com",
"port": 31181
}
},
"serviceEndpoint": {
"host": "smfd-akb-%d-sr1.devel.twitter.com",
"port": 31181
},
"shard": %d,
"member_id": %d,
"status": "ALIVE"
}''' % vals
def test_endpoint_constructor():
# Check that those do not throw
Endpoint('host', 8340)
Endpoint('host', 8340, '1.2.3.4')
Endpoint('host', 8340, None, '2001:db8:1234:ffff:ffff:ffff:ffff:ffff')
Endpoint('host', 8340, '1.2.3.4', '2001:db8:1234:ffff:ffff:ffff:ffff:ffff')

return ServiceInstance.unpack(json)
with pytest.raises(ValueError):
Endpoint('host', 8340, 'not an IP')
with pytest.raises(ValueError):
Endpoint('host', 8340, None, 'not an IPv6')


def test_endpoint_equality():
assert Endpoint('host', 8340) == Endpoint('host', 8340)
assert Endpoint('host', 8340, '1.2.3.4') == Endpoint('host', 8340, '1.2.3.4')
assert (Endpoint('host', 8340, '1.2.3.4', '2001:db8:1234:ffff:ffff:ffff:ffff:ffff')
== Endpoint('host', 8340, '1.2.3.4', '2001:db8:1234:ffff:ffff:ffff:ffff:ffff'))
assert (Endpoint('host', 8340, None, '2001:db8:1234:ffff:ffff:ffff:ffff:ffff')
== Endpoint('host', 8340, None, '2001:db8:1234:ffff:ffff:ffff:ffff:ffff'))


def test_endpoint_hash_equality():
assert Endpoint('host', 8340).__hash__() == Endpoint('host', 8340).__hash__()
assert Endpoint('host', 8340, '1.2.3.4').__hash__() == Endpoint('host', 8340, '1.2.3.4').__hash__()
assert (Endpoint('host', 8340, '1.2.3.4', '2001:db8:1234:ffff:ffff:ffff:ffff:ffff').__hash__()
== Endpoint('host', 8340, '1.2.3.4', '2001:db8:1234:ffff:ffff:ffff:ffff:ffff').__hash__())
assert (Endpoint('host', 8340, None, '2001:db8:1234:ffff:ffff:ffff:ffff:ffff').__hash__()
== Endpoint('host', 8340, None, '2001:db8:1234:ffff:ffff:ffff:ffff:ffff').__hash__())


def test_endpoint_inequality():
assert Endpoint('host', 8340) != Endpoint('xhost', 8341)
assert Endpoint('host', 8340) != Endpoint('xhost', 8340)
assert Endpoint('host', 8340) != Endpoint('host', 8341)
assert (Endpoint('host', 8340, '1.2.3.4', '2001:db8:1234:ffff:ffff:ffff:ffff:ffff')
!= Endpoint('host', 8340, '5.6.7.8', '2001:db8:5678:ffff:ffff:ffff:ffff:ffff'))
assert (Endpoint('host', 8340, None, '2001:db8:1234:ffff:ffff:ffff:ffff:ffff')
!= Endpoint('host', 8340, None, '2001:db8:5678:ffff:ffff:ffff:ffff:ffff'))


def test_endpoint_hash_inequality():
assert Endpoint('host', 8340).__hash__() != Endpoint('xhost', 8341).__hash__()
assert Endpoint('host', 8340).__hash__() != Endpoint('host', 8341).__hash__()
assert (Endpoint('host', 8340, '1.2.3.4', '2001:db8:1234:ffff:ffff:ffff:ffff:ffff').__hash__()
!= Endpoint('host', 8340, '5.6.7.8', '2001:db8:5678:ffff:ffff:ffff:ffff:ffff').__hash__())
assert (Endpoint('host', 8340, None, '2001:db8:1234:ffff:ffff:ffff:ffff:ffff').__hash__()
!= Endpoint('host', 8340, None, '2001:db8:5678:ffff:ffff:ffff:ffff:ffff').__hash__())


def test_endpoint_from_dict():
expected = {
Endpoint('smfd-akb-12-sr1', 31181): {'host': 'smfd-akb-12-sr1', 'port': 31181},
Endpoint('smfd-akb-12-sr1', 31181, '1.2.3.4'): {'host': 'smfd-akb-12-sr1', 'port': 31181, 'inet': '1.2.3.4'},
Endpoint('smfd-akb-12-sr1', 31181, '1.2.3.4', '2001:db8:5678:ffff:ffff:ffff:ffff:ffff'):
{'host': 'smfd-akb-12-sr1', 'port': 31181, 'inet': '1.2.3.4', 'inet6':
'2001:db8:5678:ffff:ffff:ffff:ffff:ffff'},
Endpoint('smfd-akb-12-sr1', 31181, None, '2001:db8:5678:ffff:ffff:ffff:ffff:ffff'):
{'host': 'smfd-akb-12-sr1', 'port': 31181, 'inet6': '2001:db8:5678:ffff:ffff:ffff:ffff:ffff'}
}
for (endpoint, dic) in expected.items():
assert Endpoint.to_dict(endpoint) == dic
assert Endpoint.from_dict(dic) == endpoint


def test_status_equality():
Expand All @@ -73,23 +98,87 @@ def test_status_hash_inequality():
assert Status.from_string('DEAD').__hash__() != Status.from_string('STARTING').__hash__()


def _service_instance(vals):
json = '''{
"additionalEndpoints": {
"aurora": {
"host": "smfd-akb-%d-sr1.devel.twitter.com",
"port": 31181
},
"health": {
"host": "smfd-akb-%d-sr1.devel.twitter.com",
"port": 31181
}
},
"serviceEndpoint": {
"host": "smfd-akb-%d-sr1.devel.twitter.com",
"port": 31181
},
"shard": %d,
"status": "ALIVE"
}''' % vals

return ServiceInstance.unpack(json)


def test_service_instance_equality():
vals = (1, 2, 3, 4, 5)
vals = (1, 2, 3, 4)
assert _service_instance(vals) == _service_instance(vals)


def test_service_instance_hash_equality():
vals = (1, 2, 3, 4, 5)
vals = (1, 2, 3, 4)
assert _service_instance(vals).__hash__() == _service_instance(vals).__hash__()


def test_service_instance_inequality():
vals = (1, 2, 3, 4, 5)
vals2 = (6, 7, 8, 9, 10)
vals = (1, 2, 3, 4)
vals2 = (5, 6, 7, 8)
assert _service_instance(vals) != _service_instance(vals2)


def test_service_instance_hash_inequality():
vals = (1, 2, 3, 4, 5)
vals2 = (6, 7, 8, 9, 10)
vals = (1, 2, 3, 4)
vals2 = (5, 6, 7, 8)
assert _service_instance(vals).__hash__() != _service_instance(vals2).__hash__()


def test_service_instance_to_json():
json = """{
"additionalEndpoints": {
"aurora": {
"host": "hostname",
"inet6": "2001:db8:1234:ffff:ffff:ffff:ffff:ffff",
"port": 22
},
"health": {
"host": "hostname",
"inet": "1.2.3.4",
"port": 23
},
"http": {
"host": "hostname",
"inet": "1.2.3.4",
"inet6": "2001:db8:1234:ffff:ffff:ffff:ffff:ffff",
"port": 23
}
},
"serviceEndpoint": {
"host": "hostname",
"port": 24
},
"shard": 1,
"status": "ALIVE"
}"""
service_instance = ServiceInstance(
Endpoint("hostname", 24),
{"aurora": Endpoint("hostname", 22, "1.2.3.4"),
"health": Endpoint("hostname", 23, None, "2001:db8:1234:ffff:ffff:ffff:ffff:ffff"),
"http": Endpoint("hostname", 23, "1.2.3.4", "2001:db8:1234:ffff:ffff:ffff:ffff:ffff"),
},
'ALIVE',
1
)

assert ServiceInstance.unpack(json) == service_instance
assert ServiceInstance.unpack(ServiceInstance.pack(service_instance)) == service_instance
Expand Up @@ -29,16 +29,20 @@
"additionalEndpoints": {
"aurora": {
"host": "smfd-aki-15-sr1.devel.twitter.com",
"port": 31510
"port": 31510,
"inet": "1.2.3.4"
},
"health": {
"host": "smfd-aki-15-sr1.devel.twitter.com",
"port": 31510
"port": 31511,
"inet6": "2001:db8:1234:ffff:ffff:ffff:ffff:ffff"
}
},
"serviceEndpoint": {
"host": "smfd-aki-15-sr1.devel.twitter.com",
"port": 31510
"port": 31512,
"inet": "1.2.3.4",
"inet6": "2001:db8:1234:ffff:ffff:ffff:ffff:ffff"
},
"shard": 0,
"member_id": 0,
Expand Down

0 comments on commit a85b7ad

Please sign in to comment.