Skip to content

Commit

Permalink
Merge d1a89ab into 25c29b6
Browse files Browse the repository at this point in the history
  • Loading branch information
kevin-bates authored Jan 14, 2020
2 parents 25c29b6 + d1a89ab commit bfd4492
Show file tree
Hide file tree
Showing 8 changed files with 49 additions and 21 deletions.
3 changes: 1 addition & 2 deletions tests/test_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,13 +70,12 @@ def test_uri_parsing(self):
self.assertEqual(result_uri.hostname, '123.45.67.89')
self.assertEqual(result_uri.port, 1234)
self.assertEqual(result_uri.is_https, False)

result_uri = base.Uri('https://test-domain.com:1234')
self.assertEqual(result_uri.scheme, 'https')
self.assertEqual(result_uri.hostname, 'test-domain.com')
self.assertEqual(result_uri.port, 1234)
self.assertEqual(result_uri.is_https, True)


def get_client(self):
client = base.BaseYarnAPI()
Expand Down
6 changes: 5 additions & 1 deletion yarn_api_client/application_master.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,14 @@
# -*- coding: utf-8 -*-
from __future__ import unicode_literals

import logging
from .base import BaseYarnAPI
from .hadoop_conf import get_webproxy_endpoint


log = logging.getLogger(__name__)


class ApplicationMaster(BaseYarnAPI):
"""
The MapReduce Application Master REST API's allow the user to get status
Expand All @@ -24,7 +29,6 @@ class ApplicationMaster(BaseYarnAPI):
"""
def __init__(self, service_endpoint=None, timeout=30, auth=None, verify=True):
if not service_endpoint:
self.logger.debug('Get configuration from hadoop conf dir')
service_endpoint = get_webproxy_endpoint(timeout, auth, verify)

super(ApplicationMaster, self).__init__(service_endpoint, timeout, auth, verify)
Expand Down
31 changes: 21 additions & 10 deletions yarn_api_client/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,23 @@
from __future__ import unicode_literals

import logging
import os
import requests

from datetime import datetime

from .errors import APIError, ConfigurationError

try:
from urlparse import urlparse, urlunparse
except ImportError:
from urllib.parse import urlparse, urlunparse

# Bump logging level of connectionpool to WARNING - this is just too verbose otherwise
logging.getLogger('urllib3.connectionpool').setLevel(os.environ.get('URLLIB_LOG_LEVEL', logging.WARNING))

log = logging.getLogger(__name__)


class Response(object):
"""
Expand Down Expand Up @@ -49,7 +57,6 @@ def to_url(self, api_path=None):


class BaseYarnAPI(object):
__logger = None
response_class = Response

def __init__(self, service_endpoint=None, timeout=None, auth=None, verify=True):
Expand All @@ -72,8 +79,6 @@ def request(self, api_path, method='GET', **kwargs):
self._validate_configuration()
api_endpoint = self.service_uri.to_url(api_path)

self.logger.info('API Endpoint {}'.format(api_endpoint))

if method == 'GET':
headers = {}
else:
Expand All @@ -82,20 +87,26 @@ def request(self, api_path, method='GET', **kwargs):
if 'headers' in kwargs and kwargs['headers']:
headers.update(kwargs['headers'])

begin = datetime.now()
response = self.session.request(method=method, url=api_endpoint, headers=headers, timeout=self.timeout, **kwargs)
end = datetime.now()
log.debug(
"'{method}' request against endpoint '{endpoint}' took {duration} ms".format(
method=method,
endpoint=api_endpoint,
duration=round((end-begin).total_seconds()*1000,3)
)
)

if response.status_code in (200, 202):
return self.response_class(response)
else:
msg = 'Response finished with status: %s. Details: %s' % (response.status_code, response.text)
msg = "Response finished with status: {status}. Details: {msg}".format(
status=response.status_code,
msg=response.text
)
raise APIError(msg)

def construct_parameters(self, arguments):
params = dict((key, value) for key, value in arguments if value is not None)
return params

@property
def logger(self):
if self.__logger is None:
self.__logger = logging.getLogger(self.__module__)
return self.__logger
15 changes: 12 additions & 3 deletions yarn_api_client/hadoop_conf.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
# -*- coding: utf-8 -*-
import logging
import os
import xml.etree.ElementTree as ET
import requests

log = logging.getLogger(__name__)

CONF_DIR = os.getenv('HADOOP_CONF_DIR', '/etc/hadoop/conf')


Expand All @@ -14,7 +17,8 @@ def _get_rm_ids(hadoop_conf_path):


def _get_maximum_container_memory(hadoop_conf_path):
container_memory = int(parse(os.path.join(hadoop_conf_path,'yarn-site.xml'), 'yarn.nodemanager.resource.memory-mb'))
container_memory = int(parse(os.path.join(hadoop_conf_path, 'yarn-site.xml'),
'yarn.nodemanager.resource.memory-mb'))
return container_memory


Expand Down Expand Up @@ -46,17 +50,19 @@ def _get_resource_manager(hadoop_conf_path, rm_id=None):
def check_is_active_rm(url, timeout=30, auth=None, verify=True):
try:
response = requests.get(url + "/cluster", timeout=timeout, auth=auth, verify=verify)
except:
except Exception as e:
log.warning("Exception encountered accessing RM '{url}': '{err}', continuing...".format(url=url, err=e))
return False

if response.status_code != 200:
print("Error to access RM - HTTP Code {}".format(response.status_code))
log.warning("Failed to access RM '{url}' - HTTP Code '{status}', continuing...".format(url=url, status=response.status_code))
return False
else:
return True


def get_resource_manager_endpoint(timeout=30, auth=None, verify=True):
log.info('Getting resource manager endpoint from config: {config_path}'.format(config_path=os.path.join(CONF_DIR, 'yarn-site.xml')))
hadoop_conf_path = CONF_DIR
rm_ids = _get_rm_ids(hadoop_conf_path)
if rm_ids:
Expand All @@ -72,18 +78,21 @@ def get_resource_manager_endpoint(timeout=30, auth=None, verify=True):

def get_jobhistory_endpoint():
config_path = os.path.join(CONF_DIR, 'mapred-site.xml')
log.info('Getting jobhistory endpoint from config: {config_path}'.format(config_path=config_path))
prop_name = 'mapreduce.jobhistory.webapp.address'
return parse(config_path, prop_name)


def get_nodemanager_endpoint():
config_path = os.path.join(CONF_DIR, 'yarn-site.xml')
log.info('Getting nodemanager endpoint from config: {config_path}'.format(config_path=config_path))
prop_name = 'yarn.nodemanager.webapp.address'
return parse(config_path, prop_name)


def get_webproxy_endpoint(timeout=30, auth=None, verify=True):
config_path = os.path.join(CONF_DIR, 'yarn-site.xml')
log.info('Getting webproxy endpoint from config: {config_path}'.format(config_path=config_path))
prop_name = 'yarn.web-proxy.address'
value = parse(config_path, prop_name)
return value or get_resource_manager_endpoint(timeout, auth, verify)
Expand Down
5 changes: 4 additions & 1 deletion yarn_api_client/history_server.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
# -*- coding: utf-8 -*-
from __future__ import unicode_literals

import logging
from .base import BaseYarnAPI
from .constants import JobStateInternal
from .errors import IllegalArgumentError
from .hadoop_conf import get_jobhistory_endpoint

log = logging.getLogger(__name__)


class HistoryServer(BaseYarnAPI):
"""
Expand All @@ -24,7 +28,6 @@ class HistoryServer(BaseYarnAPI):
"""
def __init__(self, service_endpoint=None, timeout=30, auth=None, verify=True):
if not service_endpoint:
self.logger.debug('Get information from hadoop conf dir')
service_endpoint = get_jobhistory_endpoint()

super(HistoryServer, self).__init__(service_endpoint, timeout, auth, verify)
Expand Down
3 changes: 1 addition & 2 deletions yarn_api_client/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,12 @@
import argparse
import logging
from pprint import pprint
import sys

from .constants import (YarnApplicationState, FinalApplicationStatus,
ApplicationState, JobStateInternal)
from . import ResourceManager, NodeManager, HistoryServer, ApplicationMaster

logging.basicConfig(stream=sys.stdout, level=logging.DEBUG)
log = logging.getLogger(__name__)


def get_parser():
Expand Down
4 changes: 3 additions & 1 deletion yarn_api_client/node_manager.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
# -*- coding: utf-8 -*-
import logging
from .base import BaseYarnAPI
from .constants import ApplicationState
from .errors import IllegalArgumentError
from .hadoop_conf import get_nodemanager_endpoint

log = logging.getLogger(__name__)

LEGAL_APPLICATION_STATES = {s for s, _ in ApplicationState}


Expand Down Expand Up @@ -35,7 +38,6 @@ class NodeManager(BaseYarnAPI):
"""
def __init__(self, service_endpoint=None, timeout=30, auth=None, verify=True):
if not service_endpoint:
self.logger.debug('Get configuration from hadoop conf dir')
service_endpoint = get_nodemanager_endpoint()

super(NodeManager, self).__init__(service_endpoint, timeout, auth, verify)
Expand Down
3 changes: 2 additions & 1 deletion yarn_api_client/resource_manager.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
# -*- coding: utf-8 -*-
from __future__ import unicode_literals
import logging
from .base import BaseYarnAPI
from .constants import YarnApplicationState, FinalApplicationStatus
from .errors import IllegalArgumentError
from .hadoop_conf import get_resource_manager_endpoint, check_is_active_rm, CONF_DIR, _get_maximum_container_memory
from collections import deque

log = logging.getLogger(__name__)
LEGAL_STATES = {s for s, _ in YarnApplicationState}
LEGAL_FINAL_STATUSES = {s for s, _ in FinalApplicationStatus}

Expand Down Expand Up @@ -73,7 +75,6 @@ class ResourceManager(BaseYarnAPI):
def __init__(self, service_endpoints=None, timeout=30, auth=None, verify=True):
active_service_endpoint = None
if not service_endpoints:
self.logger.debug('Get configuration from hadoop conf dir: {conf_dir}'.format(conf_dir=CONF_DIR))
active_service_endpoint = get_resource_manager_endpoint(timeout, auth, verify)
else:
for endpoint in service_endpoints:
Expand Down

0 comments on commit bfd4492

Please sign in to comment.