Skip to content

Commit

Permalink
Merge pull request #6 from mapr/hue-3.7.0-mapr-fixing
Browse files Browse the repository at this point in the history
Hue 3.7.0 mapr fixing
  • Loading branch information
ntarasenko committed Mar 20, 2015
2 parents 126be80 + 3d056ad commit eddf817
Show file tree
Hide file tree
Showing 9 changed files with 93 additions and 34 deletions.
45 changes: 33 additions & 12 deletions apps/jobbrowser/src/jobbrowser/models.py
Expand Up @@ -20,13 +20,16 @@
import lxml.html
import re
import urllib2
import urllib2_kerberos
from desktop.lib.maprsasl import MaprSasl

from urlparse import urlparse, urlunparse

from desktop.lib.view_util import format_duration_in_millis
from desktop.lib import i18n
from hadoop import job_tracker
from hadoop import confparse
from hadoop import cluster
from hadoop.api.jobtracker.ttypes import JobNotFoundException

import hadoop.api.jobtracker.ttypes as ttypes
Expand Down Expand Up @@ -451,29 +454,47 @@ def get_task_log(self):
&start=<offset> : specify the start offset of the log section, when using a filter.
&end=<offset> : specify the end offset of the log section, when using a filter.
"""

log_filters = ('stdout', 'stderr', 'syslog')
logs = [self.get_specific_log(filter) for filter in log_filters]
return logs

def get_specific_log(self, log_filter):
tracker = self.get_tracker()
url = urlunparse(('http',
mapred_cluster = cluster.get_cluster_conf_for_job_submission()
scheme = 'https' if mapred_cluster.SECURITY_ENABLED.get() else 'http'

url = urlunparse((scheme,
'%s:%s' % (tracker.host, tracker.httpPort),
'tasklog',
None,
'attemptid=%s' % (self.attemptId,),
'attemptid=%s&filter=%s&plaintext=true' % (self.attemptId, log_filter),
None))
LOGGER.info('Retrieving %s' % (url,))
try:
data = urllib2.urlopen(url)
data = self.get_url_data(url)
except urllib2.URLError:
raise urllib2.URLError(_("Cannot retrieve logs from TaskTracker %(id)s.") % {'id': self.taskTrackerId})

et = lxml.html.parse(data)
log_sections = et.findall('body/pre')
logs = [section.text or '' for section in log_sections]
if len(logs) < 3:
LOGGER.warn('Error parsing task attempt log for %s at "%s". Found %d (not 3) log sections' %
(self.attemptId, url, len(log_sections)))
err = _("Hue encountered an error while retrieving logs from '%s'.") % (url,)
logs += [err] * (3 - len(logs))
return logs
return data

def get_url_data(self, url):
LOGGER.info('Retrieving %s' % (url,))

if self.task.jt.client.conf.mechanism in ['GSSAPI']:
handlers = urllib2_kerberos.HTTPKerberosAuthHandler()
headers = {}
elif self.task.jt.client.conf.mechanism in ['MAPR-SECURITY']:
mapr = MaprSasl()
handlers = urllib2.HTTPSHandler()
headers = {"Authorization" : 'MAPR-Negotiate ' + mapr.get_init_response()}
else:
return urllib2.urlopen(url).read()

opener = urllib2.build_opener(handlers)
request = urllib2.Request(url, headers=headers)

return opener.open(request).read()

class Tracker(object):

Expand Down
6 changes: 3 additions & 3 deletions desktop/conf.dist/hue.ini
Expand Up @@ -609,7 +609,7 @@
# Default port is 14000 for HttpFs.
webhdfs_url=http://localhost:14000/webhdfs/v1

# Change this if your HDFS cluster is Kerberos-secured
# Change this if your HDFS cluster is secured
## security_enabled=false

# Default umask for file and directory creation, specified in an octal value.
Expand All @@ -629,7 +629,7 @@
# Whether to submit jobs to this cluster
submit_to=False

# Change this if your YARN cluster is Kerberos-secured
# Change this if your YARN cluster is secured
## security_enabled=false

# Security mechanism of authentication none/GSSAPI/MAPR-SECURITY
Expand Down Expand Up @@ -678,7 +678,7 @@
# Whether to submit jobs to this cluster
submit_to=True

# Change this if your MapReduce cluster is Kerberos-secured
# Change this if your MapReduce cluster is secured
## security_enabled=false

# Security mechanism of authentication none/GSSAPI/MAPR-SECURITY
Expand Down
6 changes: 3 additions & 3 deletions desktop/conf/pseudo-distributed.ini.tmpl
Expand Up @@ -600,7 +600,7 @@
# Default port is 14000 for HttpFs.
webhdfs_url=http://localhost:14000/webhdfs/v1

# Change this if your HDFS cluster is Kerberos-secured
# Change this if your HDFS cluster is secured
## security_enabled=false

# Default umask for file and directory creation, specified in an octal value.
Expand All @@ -620,7 +620,7 @@
# Whether to submit jobs to this cluster
submit_to=False

# Change this if your YARN cluster is Kerberos-secured
# Change this if your YARN cluster is secured
## security_enabled=false

# URL of the ResourceManager API
Expand Down Expand Up @@ -666,7 +666,7 @@
# Whether to submit jobs to this cluster
submit_to=True

# Change this if your MapReduce cluster is Kerberos-secured
# Change this if your MapReduce cluster is secured
## security_enabled=false

# HA support by specifying multiple clusters
Expand Down
Expand Up @@ -177,11 +177,25 @@ public Server(UserGroupInformation serverUgi) throws TTransportException {
}

public TTransportFactory createTransportFactory(Configuration conf) throws Exception {
Map<String, String> saslProps = SASL_PROPS;;
RpcAuthMethod authMethod = RpcAuthRegistry.getAuthMethod(realUgi.getAuthenticationMethod());
String mechanism = authMethod.getMechanismName();
String protocol = authMethod.getProtocol();
String serverId = authMethod.getServerId();
Map<String, String> saslProps = SASL_PROPS;
RpcAuthMethod authMethod;
String mechanism;
String protocol = null;
String serverId = null;

try {
Class rpcAuthRegistryClass = HadoopThriftAuthBridge.class.getClassLoader()
.loadClass("org.apache.hadoop.security.rpcauth.RpcAuthRegistry");
Method getAuthMethod = rpcAuthRegistryClass.getMethod("getAuthMethod", UserGroupInformation.AuthenticationMethod.class);
authMethod = (RpcAuthMethod) getAuthMethod.invoke(null, realUgi.getAuthenticationMethod());
mechanism = authMethod.getMechanismName();
protocol = authMethod.getProtocol();
serverId = authMethod.getServerId();
} catch (Exception e) {
LOG.warn("Error with class org.apache.hadoop.security.rpcauth.RpcAuthRegistry" + e);
authMethod = realUgi.getRpcAuthMethodList().get(0);
mechanism = authMethod.getMechanismName();
}
if (AuthMethod.KERBEROS.getMechanismName().equals(mechanism)) {
String kerberosName = realUgi.getUserName();
final String names[] = SaslRpcServer.splitKerberosName(kerberosName);
Expand Down
2 changes: 1 addition & 1 deletion desktop/libs/hadoop/src/hadoop/cluster.py
Expand Up @@ -217,7 +217,7 @@ def get_next_ha_yarncluster():
for name in conf.YARN_CLUSTERS.keys():
config = conf.YARN_CLUSTERS[name]
if config.SUBMIT_TO.get():
rm = ResourceManagerApi(config.RESOURCE_MANAGER_API_URL.get(), config.SECURITY_ENABLED.get())
rm = ResourceManagerApi(config.RESOURCE_MANAGER_API_URL.get(), config.SECURITY_ENABLED.get(), config.SSL_CERT_CA_VERIFY.get(), config.MECHANISM.get())
if has_ha:
try:
cluster_info = rm.cluster()
Expand Down
11 changes: 10 additions & 1 deletion desktop/libs/hadoop/src/hadoop/yarn/clients.py
Expand Up @@ -22,6 +22,7 @@
import heapq

from desktop.lib.rest.http_client import HttpClient
from desktop.lib.maprsasl import HttpMaprAuth

from hadoop import cluster

Expand Down Expand Up @@ -53,8 +54,16 @@ def get_log_client(log_link):
if client_tuple is None:
client = HttpClient(base_url, logger=LOG)
yarn_cluster = cluster.get_cluster_conf_for_job_submission()
client.set_verify(yarn_cluster.SSL_CERT_CA_VERIFY.get())

if yarn_cluster.SECURITY_ENABLED.get():
client.set_kerberos_auth()
auth_clients = {'MAPR-SECURITY': HttpMaprAuth}
mechanism = yarn_cluster.MECHANISM.get()
if mechanism in auth_clients:
client._session.auth = auth_clients[mechanism]()
else:
client.set_kerberos_auth()

else:
_log_client_heap.remove(client_tuple)
client = client_tuple[1]
Expand Down
11 changes: 8 additions & 3 deletions desktop/libs/hadoop/src/hadoop/yarn/history_server_api.py
Expand Up @@ -21,6 +21,7 @@

from desktop.lib.rest.http_client import HttpClient
from desktop.lib.rest.resource import Resource
from desktop.lib.maprsasl import HttpMaprAuth
from hadoop import cluster


Expand All @@ -41,22 +42,26 @@ def get_history_server_api():
try:
if _api_cache is None:
yarn_cluster = cluster.get_cluster_conf_for_job_submission()
_api_cache = HistoryServerApi(yarn_cluster.HISTORY_SERVER_API_URL.get(), yarn_cluster.SECURITY_ENABLED.get(), yarn_cluster.SSL_CERT_CA_VERIFY.get())
_api_cache = HistoryServerApi(yarn_cluster.HISTORY_SERVER_API_URL.get(), yarn_cluster.SECURITY_ENABLED.get(), yarn_cluster.SSL_CERT_CA_VERIFY.get(), yarn_cluster.MECHANISM.get())
finally:
_api_cache_lock.release()
return _api_cache


class HistoryServerApi(object):

def __init__(self, oozie_url, security_enabled=False, ssl_cert_ca_verify=False):
def __init__(self, oozie_url, security_enabled=False, ssl_cert_ca_verify=False, mechanism='none'):
self._url = posixpath.join(oozie_url, 'ws/%s/history' % _API_VERSION)
self._client = HttpClient(self._url, logger=LOG)
self._root = Resource(self._client)
self._security_enabled = security_enabled

if self._security_enabled:
self._client.set_kerberos_auth()
auth_clients = {'MAPR-SECURITY': HttpMaprAuth}
if mechanism in auth_clients:
self._client._session.auth = auth_clients[mechanism]()
else:
self._client.set_kerberos_auth()

self._client.set_verify(ssl_cert_ca_verify)

Expand Down
11 changes: 8 additions & 3 deletions desktop/libs/hadoop/src/hadoop/yarn/mapreduce_api.py
Expand Up @@ -21,6 +21,7 @@

from desktop.lib.rest.http_client import HttpClient
from desktop.lib.rest.resource import Resource
from desktop.lib.maprsasl import HttpMaprAuth
from hadoop import cluster
from hadoop.yarn.resource_manager_api import get_resource_manager

Expand All @@ -42,22 +43,26 @@ def get_mapreduce_api():
try:
if _api_cache is None:
yarn_cluster = cluster.get_cluster_conf_for_job_submission()
_api_cache = MapreduceApi(yarn_cluster.PROXY_API_URL.get(), yarn_cluster.SECURITY_ENABLED.get(), yarn_cluster.SSL_CERT_CA_VERIFY.get())
_api_cache = MapreduceApi(yarn_cluster.PROXY_API_URL.get(), yarn_cluster.SECURITY_ENABLED.get(), yarn_cluster.SSL_CERT_CA_VERIFY.get(), yarn_cluster.MECHANISM.get())
finally:
_api_cache_lock.release()
return _api_cache


class MapreduceApi(object):

def __init__(self, oozie_url, security_enabled=False, ssl_cert_ca_verify=False):
def __init__(self, oozie_url, security_enabled=False, ssl_cert_ca_verify=False, mechanism='none'):
self._url = posixpath.join(oozie_url, 'proxy')
self._client = HttpClient(self._url, logger=LOG)
self._root = Resource(self._client)
self._security_enabled = security_enabled

if self._security_enabled:
self._client.set_kerberos_auth()
auth_clients = {'MAPR-SECURITY': HttpMaprAuth}
if mechanism in auth_clients:
self._client._session.auth = auth_clients[mechanism]()
else:
self._client.set_kerberos_auth()

self._client.set_verify(ssl_cert_ca_verify)

Expand Down
11 changes: 8 additions & 3 deletions desktop/libs/hadoop/src/hadoop/yarn/node_manager_api.py
Expand Up @@ -20,6 +20,7 @@

from desktop.lib.rest.http_client import HttpClient
from desktop.lib.rest.resource import Resource
from desktop.lib.maprsasl import HttpMaprAuth

from hadoop import cluster

Expand All @@ -34,18 +35,22 @@

def get_resource_manager_api(api_url):
yarn_cluster = cluster.get_cluster_conf_for_job_submission()
return ResourceManagerApi(api_url, yarn_cluster.SECURITY_ENABLED.get(), yarn_cluster.SSL_CERT_CA_VERIFY.get())
return ResourceManagerApi(api_url, yarn_cluster.SECURITY_ENABLED.get(), yarn_cluster.SSL_CERT_CA_VERIFY.get(), yarn_cluster.MECHANISM.get())


class ResourceManagerApi(object):
def __init__(self, oozie_url, security_enabled=False, ssl_cert_ca_verify=False):
def __init__(self, oozie_url, security_enabled=False, ssl_cert_ca_verify=False, mechanism='none'):
self._url = posixpath.join(oozie_url, 'ws', _API_VERSION)
self._client = HttpClient(self._url, logger=LOG)
self._root = Resource(self._client)
self._security_enabled = security_enabled

if self._security_enabled:
self._client.set_kerberos_auth()
auth_clients = {'MAPR-SECURITY': HttpMaprAuth}
if mechanism in auth_clients:
self._client._session.auth = auth_clients[mechanism]()
else:
self._client.set_kerberos_auth()

self._client.set_verify(ssl_cert_ca_verify)

Expand Down

0 comments on commit eddf817

Please sign in to comment.