Skip to content

Commit

Permalink
1. Reusing HttpConnection cause ResponseNotReady(see also http://stac…
Browse files Browse the repository at this point in the history
…koverflow.com/questions/3231543/python-httplib-responsenotready), fix this bug

2. Make compatible to resource manager which ha enabled
3. Add cases for new feature
4. Remove run tests in py32 py33 and add py34 py35
5. Bump version to 0.2.4
  • Loading branch information
tianjin.gutj committed Jun 29, 2016
1 parent 9c13c50 commit 648c19e
Show file tree
Hide file tree
Showing 9 changed files with 168 additions and 38 deletions.
4 changes: 2 additions & 2 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ script:
env:
- TOXENV=py26
- TOXENV=py27
- TOXENV=py32
- TOXENV=py33
- TOXENV=py34
- TOXENV=py35
after_success:
coveralls
4 changes: 2 additions & 2 deletions docs/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,9 @@
# built documents.
#
# The short X.Y version.
version = '0.2.3'
version = '0.2.4'
# The full version, including alpha/beta/rc tags.
release = '0.2.3'
release = '0.2.4'

# The language for content autogenerated by Sphinx. Refer to documentation
# for a list of supported languages.
Expand Down
4 changes: 2 additions & 2 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@ def find_version(*file_paths):
'Operating System :: OS Independent',
'Programming Language :: Python :: 2.6',
'Programming Language :: Python :: 2.7',
'Programming Language :: Python :: 3.2',
'Programming Language :: Python :: 3.3',
'Programming Language :: Python :: 3.4',
'Programming Language :: Python :: 3.5',
'Topic :: System :: Distributed Computing',
],
)
7 changes: 0 additions & 7 deletions tests/test_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,6 @@


class BaseYarnAPITestCase(TestCase):
def test_http_property_cache(self):
client = self.get_client()
http_conn1 = client.http_conn
http_conn2 = client.http_conn

self.assertIs(http_conn1, http_conn2)

def test_request(self):
client = self.get_client()
with patch('yarn_api_client.base.HTTPConnection') as http_conn_mock:
Expand Down
105 changes: 102 additions & 3 deletions tests/test_hadoop_conf.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,25 @@
# -*- coding: utf-8 -*-
from tempfile import NamedTemporaryFile

import mock
from mock import patch
from tests import TestCase

from yarn_api_client import hadoop_conf


_http_request_method = ''
_http_getresponse_method = ''

try:
from httplib import HTTPConnection, OK, NOT_FOUND
_http_request_method = 'httplib.HTTPConnection.request'
_http_getresponse_method = 'httplib.HTTPConnection.getresponse'
except ImportError:
from http.client import HTTPConnection, OK, NOT_FOUND
_http_request_method = 'http.client.HTTPConnection.request'
_http_getresponse_method = 'http.client.HTTPConnection.getresponse'

empty_config = '<configuration></configuration>'.encode('latin1')

yarn_site_xml = """\
Expand Down Expand Up @@ -37,19 +51,104 @@ def test_parse(self):
self.assertEqual(None, value)

def test_get_resource_host_port(self):
with patch('yarn_api_client.hadoop_conf.parse') as parse_mock:
with patch('yarn_api_client.hadoop_conf._get_rm_ids') as get_rm_ids_mock:
parse_mock.return_value = 'example.com:8022'
get_rm_ids_mock.return_value = None

host_port = hadoop_conf.get_resource_manager_host_port()

self.assertEqual(('example.com', '8022'), host_port)
parse_mock.assert_called_with('/etc/hadoop/conf/yarn-site.xml',
'yarn.resourcemanager.webapp.address')

parse_mock.reset_mock()
parse_mock.return_value = None

host_port = hadoop_conf.get_resource_manager_host_port()
self.assertIsNone(host_port)

@mock.patch('yarn_api_client.hadoop_conf._get_rm_ids')
@mock.patch('yarn_api_client.hadoop_conf.parse')
@mock.patch('yarn_api_client.hadoop_conf._check_is_active_rm')
def test_get_resource_host_port_with_ha(self, check_is_active_rm_mock, parse_mock, get_rm_ids_mock):
get_rm_ids_mock.return_value = ['rm1', 'rm2']
parse_mock.return_value = 'example.com:8022'
check_is_active_rm_mock.return_value = True
host_port = hadoop_conf.get_resource_manager_host_port()

self.assertEqual(('example.com', '8022'), host_port)
parse_mock.assert_called_with('/etc/hadoop/conf/yarn-site.xml',
'yarn.resourcemanager.webapp.address.rm1')

parse_mock.reset_mock()
parse_mock.return_value = None

host_port = hadoop_conf.get_resource_manager_host_port()
self.assertIsNone(host_port)

def test_get_rm_ids(self):
with patch('yarn_api_client.hadoop_conf.parse') as parse_mock:
parse_mock.return_value = 'rm1,rm2'
rm_list = hadoop_conf._get_rm_ids(hadoop_conf.CONF_DIR)
self.assertEqual(['rm1', 'rm2'], rm_list)
parse_mock.assert_called_with('/etc/hadoop/conf/yarn-site.xml', 'yarn.resourcemanager.ha.rm-ids')

parse_mock.reset_mock()
parse_mock.return_value = None

rm_list = hadoop_conf._get_rm_ids(hadoop_conf.CONF_DIR)
self.assertIsNone(rm_list)

@mock.patch(_http_request_method)
@mock.patch(_http_getresponse_method)
def test_check_is_active_rm(self, http_getresponse_mock, http_conn_request_mock):
class ResponseMock():
def __init__(self, status, header_dict):
self.status = status
self.header_dict = header_dict

def getheader(self, header_key, default_return):
if header_key in self.header_dict:
return self.header_dict[header_key]
else:
return default_return

http_conn_request_mock.return_value = None
http_getresponse_mock.return_value = ResponseMock(OK, {})
self.assertTrue(hadoop_conf._check_is_active_rm('example2', '8022'))
http_getresponse_mock.reset_mock()
http_getresponse_mock.return_value = ResponseMock(OK, {'Refresh':"testing"})
self.assertFalse(hadoop_conf._check_is_active_rm('example2', '8022'))
http_getresponse_mock.reset_mock()
http_getresponse_mock.return_value = ResponseMock(NOT_FOUND, {'Refresh':"testing"})
self.assertFalse(hadoop_conf._check_is_active_rm('example2', '8022'))
http_conn_request_mock.side_effect = Exception('error')
http_conn_request_mock.reset_mock()
http_conn_request_mock.return_value = None
self.assertFalse(hadoop_conf._check_is_active_rm('example2', '8022'))
pass

def test_get_resource_manager(self):
with patch('yarn_api_client.hadoop_conf.parse') as parse_mock:
parse_mock.return_value = 'example.com:8022'

host_port = hadoop_conf.get_resource_manager_host_port()
host_port = hadoop_conf._get_resource_manager(hadoop_conf.CONF_DIR, None)

self.assertEqual(('example.com', '8022'), host_port)
parse_mock.assert_called_with('/etc/hadoop/conf/yarn-site.xml',
'yarn.resourcemanager.webapp.address')

host_port = hadoop_conf._get_resource_manager(hadoop_conf.CONF_DIR, 'rm1')

self.assertEqual(('example.com', '8022'), host_port)
parse_mock.assert_called_with('/etc/hadoop/conf/yarn-site.xml',
'yarn.resourcemanager.webapp.address')
'yarn.resourcemanager.webapp.address.rm1')

parse_mock.reset_mock()
parse_mock.return_value = None

host_port = hadoop_conf.get_resource_manager_host_port()
host_port = hadoop_conf._get_resource_manager(hadoop_conf.CONF_DIR, 'rm1')
self.assertIsNone(host_port)

def test_get_jobhistory_host_port(self):
Expand Down
2 changes: 1 addition & 1 deletion tox.ini
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[tox]
envlist = py26,py27,py32,py33
envlist = py26,py27,py34,py35

[testenv]
deps =
Expand Down
2 changes: 1 addition & 1 deletion yarn_api_client/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# -*- coding: utf-8 -*-
__version__ = '0.2.3'
__version__ = '0.2.4'
__all__ = ['ApplicationMaster', 'HistoryServer', 'NodeManager',
'ResourceManager']

Expand Down
23 changes: 9 additions & 14 deletions yarn_api_client/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,10 @@ def request(self, api_path, **query_args):
path = api_path

self.logger.info('Request http://%s:%s%s', self.address, self.port, path)
self.http_conn.request('GET', path)

response = self.http_conn.getresponse()

http_conn = self.http_conn
http_conn.request('GET', path)
response = http_conn.getresponse()

if response.status == OK:
return self.response_class(response)
Expand All @@ -44,19 +45,13 @@ def construct_parameters(self, arguments):
params = dict((key, value) for key, value in arguments if value is not None)
return params


__http_conn = None
@property
def http_conn(self):
if self.__http_conn is None:
if self.address is None:
raise ConfigurationError('API address is not set')
elif self.port is None:
raise ConfigurationError('API port is not set')
self.__http_conn = HTTPConnection(self.address, self.port,
timeout=self.timeout)

return self.__http_conn
if self.address is None:
raise ConfigurationError('API address is not set')
elif self.port is None:
raise ConfigurationError('API port is not set')
return HTTPConnection(self.address, self.port, timeout=self.timeout)

__logger = None
@property
Expand Down
55 changes: 49 additions & 6 deletions yarn_api_client/hadoop_conf.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,64 @@
# -*- coding: utf-8 -*-
import os
import xml.etree.ElementTree as ET
try:
from httplib import HTTPConnection, OK
except ImportError:
from http.client import HTTPConnection, OK

CONF_DIR = '/etc/hadoop/conf'


def get_resource_manager_host_port():
config_path = os.path.join(CONF_DIR, 'yarn-site.xml')
def _get_rm_ids(hadoop_conf_path):
rm_ids = parse(os.path.join(hadoop_conf_path, 'yarn-site.xml'), 'yarn.resourcemanager.ha.rm-ids')
if rm_ids is not None:
rm_ids = rm_ids.split(',')
return rm_ids


def _get_resource_manager(hadoop_conf_path, rm_id = None):
prop_name = 'yarn.resourcemanager.webapp.address'
value = parse(config_path, prop_name)
if value is not None:
host, _, port = value.partition(':')
return host, port
if rm_id is not None:
rm_webapp_address = parse(os.path.join(hadoop_conf_path, 'yarn-site.xml'), '%s.%s' % (prop_name, rm_id))
else:
rm_webapp_address = parse(os.path.join(hadoop_conf_path, 'yarn-site.xml'), prop_name)
if rm_webapp_address is not None:
[host, port] = rm_webapp_address.split(':')
return (host, port)
else:
return None


def _check_is_active_rm(rm_web_host, rm_web_port):
conn = HTTPConnection(rm_web_host, rm_web_port)
try:
conn.request('GET', '/cluster')
except:
return False
response = conn.getresponse()
if response.status != OK:
return False
else:
if response.getheader('Refresh', None) is not None:
return False
return True


def get_resource_manager_host_port():
hadoop_conf_path = CONF_DIR
rm_ids = _get_rm_ids(hadoop_conf_path)
if rm_ids is not None:
for rm_id in rm_ids:
ret = _get_resource_manager(hadoop_conf_path, rm_id)
if ret is not None:
(host, port) = ret
if _check_is_active_rm(host, port):
return host, port
return None
else:
return _get_resource_manager(hadoop_conf_path, None)


def get_jobhistory_host_port():
config_path = os.path.join(CONF_DIR, 'mapred-site.xml')
prop_name = 'mapreduce.jobhistory.webapp.address'
Expand Down

0 comments on commit 648c19e

Please sign in to comment.