Skip to content

Commit

Permalink
Merge e4cfb92 into 6602471
Browse files Browse the repository at this point in the history
  • Loading branch information
dimon222 committed Sep 16, 2019
2 parents 6602471 + e4cfb92 commit 42d3eac
Show file tree
Hide file tree
Showing 13 changed files with 178 additions and 164 deletions.
20 changes: 10 additions & 10 deletions itests/integration_test_resource_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,55 +29,55 @@ def setUpClass(self):

if yarn_endpoint_uri.hostname and yarn_endpoint_uri.port:
self.configured = True
self.resourceManager = ResourceManager(yarn_endpoint_uri.hostname, yarn_endpoint_uri.port)
self.resource_manager = ResourceManager([yarn_endpoint_uri.hostname + ":" + str(yarn_endpoint_uri.port)])

def test_cluster_information(self):
if self.configured:
info = self.resourceManager.cluster_information()
info = self.resource_manager.cluster_information()
pprint(info.data)
self.assertEqual(info.data['clusterInfo']['state'], 'STARTED')

def test_cluster_metrics(self):
if self.configured:
metrics = self.resourceManager.cluster_metrics()
metrics = self.resource_manager.cluster_metrics()
pprint(metrics.data)
self.assertGreater(metrics.data['clusterMetrics']['activeNodes'], 0)
self.assertIsNotNone(metrics.data['clusterMetrics']['totalNodes'])

def test_cluster_scheduler(self):
if self.configured:
scheduler = self.resourceManager.cluster_scheduler()
scheduler = self.resource_manager.cluster_scheduler()
pprint(scheduler.data)
self.assertIsNotNone(scheduler.data['scheduler']['schedulerInfo'])

def test_cluster_applications(self):
if self.configured:
apps = self.resourceManager.cluster_applications()
apps = self.resource_manager.cluster_applications()
pprint(apps.data)
self.assertIsNotNone(apps.data['apps'])

def test_cluster_application_state(self):
if self.configured:
apps = self.resourceManager.cluster_applications()
apps = self.resource_manager.cluster_applications()
appid = apps.data['apps']['app'][0]['id']
print(appid)
response = self.resourceManager.cluster_application_state(appid)
response = self.resource_manager.cluster_application_state(appid)
pprint(response.data)
pprint(response.data['state'])
self.assertIsNotNone(apps.data['apps'])

def test_cluster_application_statistics(self):
if self.configured:
appstats = self.resourceManager.cluster_application_statistics()
appstats = self.resource_manager.cluster_application_statistics()
pprint(appstats.data)
self.assertIsNotNone(appstats.data['appStatInfo'])

def test_cluster_nodes(self):
if self.configured:
nodes = self.resourceManager.cluster_nodes()
nodes = self.resource_manager.cluster_nodes()
pprint(nodes.data)
self.assertIsNotNone(nodes.data['nodes'])

running_nodes = self.resourceManager.cluster_nodes(state='RUNNING', healthy='true')
running_nodes = self.resource_manager.cluster_nodes(state='RUNNING', healthy='true')
pprint(running_nodes.data)
self.assertIsNotNone(nodes.data['nodes'])
4 changes: 0 additions & 4 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,6 @@ def find_version(*file_paths):
'requests>=2.7,<3.0',
],

extras_require = {
'kerberos': ['requests-kerberos'],
},

entry_points = {
'console_scripts': [
'yarn_client = yarn_api_client.main:main',
Expand Down
6 changes: 3 additions & 3 deletions tests/test_application_master.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,11 @@ class AppMasterTestCase(TestCase):
def setUp(self):
self.app = ApplicationMaster('localhost')

@patch('yarn_api_client.application_master.get_webproxy_host_port')
@patch('yarn_api_client.application_master.get_webproxy_endpoint')
def test__init__(self, get_config_mock, request_mock):
get_config_mock.return_value = (None, None)
get_config_mock.return_value = None
ApplicationMaster()
get_config_mock.assert_called_with()
get_config_mock.assert_called_with(30)

def test_application_information(self, request_mock):
self.app.application_information('app_100500')
Expand Down
11 changes: 6 additions & 5 deletions tests/test_base.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
# -*- coding: utf-8 -*-
try:
from httplib import OK # NOQA
from urlparse import urlparse
except ImportError:
from http.client import OK # NOQA
from urllib.parse import urlparse

import json
import requests_mock
Expand Down Expand Up @@ -52,16 +54,15 @@ def test_http_configuration(self):
requests_get_mock.get('/ololo', text=json.dumps(BaseYarnAPITestCase.success_response()))

client = self.get_client()
client.address = None
client.port = 80
client.service_uri = None

with self.assertRaises(ConfigurationError):
client.request('/ololo')

def get_client(self):
client = base.BaseYarnAPI()
client.address = 'example.com'
client.port = 80
client.service_uri = base.Uri('example.com:80')
client.timeout = 0
client.kerberos_enabled = False
client.auth = None
client.verify = True
return client
71 changes: 35 additions & 36 deletions tests/test_hadoop_conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,42 +50,42 @@ def test_parse(self):
value = hadoop_conf.parse(f.name, key)
self.assertEqual(None, value)

def test_get_resource_host_port(self):
def test_get_resource_endpoint(self):
with patch('yarn_api_client.hadoop_conf.parse') as parse_mock:
with patch('yarn_api_client.hadoop_conf._get_rm_ids') as get_rm_ids_mock:
parse_mock.return_value = 'example.com:8022'
get_rm_ids_mock.return_value = None

host_port = hadoop_conf.get_resource_manager_host_port()
endpoint = hadoop_conf.get_resource_manager_endpoint()

self.assertEqual(('example.com', '8022'), host_port)
self.assertEqual('example.com:8022', endpoint)
parse_mock.assert_called_with('/etc/hadoop/conf/yarn-site.xml',
'yarn.resourcemanager.webapp.address')

parse_mock.reset_mock()
parse_mock.return_value = None

host_port = hadoop_conf.get_resource_manager_host_port()
self.assertIsNone(host_port)
endpoint = hadoop_conf.get_resource_manager_endpoint()
self.assertIsNone(endpoint)

@mock.patch('yarn_api_client.hadoop_conf._get_rm_ids')
@mock.patch('yarn_api_client.hadoop_conf.parse')
@mock.patch('yarn_api_client.hadoop_conf.check_is_active_rm')
def test_get_resource_host_port_with_ha(self, check_is_active_rm_mock, parse_mock, get_rm_ids_mock):
def test_get_resource_endpoint_with_ha(self, check_is_active_rm_mock, parse_mock, get_rm_ids_mock):
get_rm_ids_mock.return_value = ['rm1', 'rm2']
parse_mock.return_value = 'example.com:8022'
check_is_active_rm_mock.return_value = True
host_port = hadoop_conf.get_resource_manager_host_port()
endpoint = hadoop_conf.get_resource_manager_endpoint()

self.assertEqual(('example.com', '8022'), host_port)
self.assertEqual('example.com:8022', endpoint)
parse_mock.assert_called_with('/etc/hadoop/conf/yarn-site.xml',
'yarn.resourcemanager.webapp.address.rm1')

parse_mock.reset_mock()
parse_mock.return_value = None

host_port = hadoop_conf.get_resource_manager_host_port()
self.assertIsNone(host_port)
endpoint = hadoop_conf.get_resource_manager_endpoint()
self.assertIsNone(endpoint)

def test_get_rm_ids(self):
with patch('yarn_api_client.hadoop_conf.parse') as parse_mock:
Expand Down Expand Up @@ -116,83 +116,82 @@ def getheader(self, header_key, default_return):

http_conn_request_mock.return_value = None
http_getresponse_mock.return_value = ResponseMock(OK, {})
self.assertTrue(hadoop_conf.check_is_active_rm('example2', '8022'))
self.assertTrue(hadoop_conf.check_is_active_rm('example2:8022'))
http_getresponse_mock.reset_mock()
http_getresponse_mock.return_value = ResponseMock(OK, {'Refresh': "testing"})
self.assertFalse(hadoop_conf.check_is_active_rm('example2', '8022'))
self.assertFalse(hadoop_conf.check_is_active_rm('example2:8022'))
http_getresponse_mock.reset_mock()
http_getresponse_mock.return_value = ResponseMock(NOT_FOUND, {'Refresh': "testing"})
self.assertFalse(hadoop_conf.check_is_active_rm('example2', '8022'))
self.assertFalse(hadoop_conf.check_is_active_rm('example2:8022'))
http_conn_request_mock.side_effect = Exception('error')
http_conn_request_mock.reset_mock()
http_conn_request_mock.return_value = None
self.assertFalse(hadoop_conf.check_is_active_rm('example2', '8022'))
pass
self.assertFalse(hadoop_conf.check_is_active_rm('example2:8022'))

def test_get_resource_manager(self):
with patch('yarn_api_client.hadoop_conf.parse') as parse_mock:
parse_mock.return_value = 'example.com:8022'

host_port = hadoop_conf._get_resource_manager(hadoop_conf.CONF_DIR, None)
endpoint = hadoop_conf._get_resource_manager(hadoop_conf.CONF_DIR, None)

self.assertEqual(('example.com', '8022'), host_port)
self.assertEqual('example.com:8022', endpoint)
parse_mock.assert_called_with('/etc/hadoop/conf/yarn-site.xml', 'yarn.resourcemanager.webapp.address')

host_port = hadoop_conf._get_resource_manager(hadoop_conf.CONF_DIR, 'rm1')
endpoint = hadoop_conf._get_resource_manager(hadoop_conf.CONF_DIR, 'rm1')

self.assertEqual(('example.com', '8022'), host_port)
self.assertEqual(('example.com:8022'), endpoint)
parse_mock.assert_called_with('/etc/hadoop/conf/yarn-site.xml', 'yarn.resourcemanager.webapp.address.rm1')

parse_mock.reset_mock()
parse_mock.return_value = None

host_port = hadoop_conf._get_resource_manager(hadoop_conf.CONF_DIR, 'rm1')
self.assertIsNone(host_port)
endpoint = hadoop_conf._get_resource_manager(hadoop_conf.CONF_DIR, 'rm1')
self.assertIsNone(endpoint)

def test_get_jobhistory_host_port(self):
def test_get_jobhistory_endpoint(self):
with patch('yarn_api_client.hadoop_conf.parse') as parse_mock:
parse_mock.return_value = 'example.com:8022'

host_port = hadoop_conf.get_jobhistory_host_port()
endpoint = hadoop_conf.get_jobhistory_endpoint()

self.assertEqual(('example.com', '8022'), host_port)
self.assertEqual('example.com:8022', endpoint)
parse_mock.assert_called_with('/etc/hadoop/conf/mapred-site.xml',
'mapreduce.jobhistory.webapp.address')

parse_mock.reset_mock()
parse_mock.return_value = None

host_port = hadoop_conf.get_jobhistory_host_port()
self.assertIsNone(host_port)
endpoint = hadoop_conf.get_jobhistory_endpoint()
self.assertIsNone(endpoint)

def test_get_nodemanager_host_port(self):
def test_get_nodemanager_endpoint(self):
with patch('yarn_api_client.hadoop_conf.parse') as parse_mock:
parse_mock.return_value = 'example.com:8022'

host_port = hadoop_conf.get_nodemanager_host_port()
endpoint = hadoop_conf.get_nodemanager_endpoint()

self.assertEqual(('example.com', '8022'), host_port)
self.assertEqual('example.com:8022', endpoint)
parse_mock.assert_called_with('/etc/hadoop/conf/yarn-site.xml',
'yarn.nodemanager.webapp.address')

parse_mock.reset_mock()
parse_mock.return_value = None

host_port = hadoop_conf.get_nodemanager_host_port()
self.assertIsNone(host_port)
endpoint = hadoop_conf.get_nodemanager_endpoint()
self.assertIsNone(endpoint)

def test_get_webproxy_host_port(self):
def test_get_webproxy_endpoint(self):
with patch('yarn_api_client.hadoop_conf.parse') as parse_mock:
parse_mock.return_value = 'example.com:8022'

host_port = hadoop_conf.get_webproxy_host_port()
endpoint = hadoop_conf.get_webproxy_endpoint()

self.assertEqual(('example.com', '8022'), host_port)
self.assertEqual('example.com:8022', endpoint)
parse_mock.assert_called_with('/etc/hadoop/conf/yarn-site.xml',
'yarn.web-proxy.address')

parse_mock.reset_mock()
parse_mock.return_value = None

host_port = hadoop_conf.get_webproxy_host_port()
self.assertIsNone(host_port)
endpoint = hadoop_conf.get_webproxy_endpoint()
self.assertIsNone(endpoint)
4 changes: 2 additions & 2 deletions tests/test_history_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@ class HistoryServerTestCase(TestCase):
def setUp(self):
self.hs = HistoryServer('localhost')

@patch('yarn_api_client.history_server.get_jobhistory_host_port')
@patch('yarn_api_client.history_server.get_jobhistory_endpoint')
def test__init__(self, get_config_mock, request_mock):
get_config_mock.return_value = (None, None)
get_config_mock.return_value = None
HistoryServer()
get_config_mock.assert_called_with()

Expand Down
12 changes: 7 additions & 5 deletions tests/test_resource_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,16 @@

@patch('yarn_api_client.resource_manager.ResourceManager.request')
class ResourceManagerTestCase(TestCase):
def setUp(self):
self.rm = ResourceManager('localhost')
@patch('yarn_api_client.resource_manager.check_is_active_rm')
def setUp(self, check_is_active_rm_mock):
check_is_active_rm_mock.return_value = True
self.rm = ResourceManager(['localhost'])

@patch('yarn_api_client.resource_manager.get_resource_manager_host_port')
@patch('yarn_api_client.resource_manager.get_resource_manager_endpoint')
def test__init__(self, get_config_mock, request_mock):
get_config_mock.return_value = (None, None)
get_config_mock.return_value = "localhost"
ResourceManager()
get_config_mock.assert_called_with()
get_config_mock.assert_called_with(30)

def test_cluster_information(self, request_mock):
self.rm.cluster_information()
Expand Down
18 changes: 10 additions & 8 deletions yarn_api_client/application_master.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# -*- coding: utf-8 -*-
from __future__ import unicode_literals
from .base import BaseYarnAPI
from .hadoop_conf import get_webproxy_host_port
from .hadoop_conf import get_webproxy_endpoint


class ApplicationMaster(BaseYarnAPI):
Expand All @@ -15,17 +15,19 @@ class ApplicationMaster(BaseYarnAPI):
If `address` argument is `None` client will try to extract `address` and
`port` from Hadoop configuration files.
:param str address: Proxy HTTP address
:param int port: Proxy HTTP port
:param str service_endpoint: ApplicationMaster HTTP(S) address
:param int timeout: API connection timeout in seconds
:param boolean kerberos_enabled: Flag identifying is Kerberos Security has been enabled for YARN
:param AuthBase auth: Auth to use for requests
:param boolean verify: Either a boolean, in which case it controls whether
we verify the server's TLS certificate, or a string, in which case it must
be a path to a CA bundle to use. Defaults to ``True``
"""
def __init__(self, address=None, port=8088, timeout=30, kerberos_enabled=False):
if address is None:
def __init__(self, service_endpoint=None, timeout=30, auth=None, verify=True):
if not service_endpoint:
self.logger.debug('Get configuration from hadoop conf dir')
address, port = get_webproxy_host_port()
service_endpoint = get_webproxy_endpoint(timeout)

super(ApplicationMaster, self).__init__(address, port, timeout, kerberos_enabled)
super(ApplicationMaster, self).__init__(service_endpoint, timeout, auth, verify)

def application_information(self, application_id):
"""
Expand Down
Loading

0 comments on commit 42d3eac

Please sign in to comment.