Skip to content
Permalink
Browse files

update documentation and exception

  • Loading branch information...
sdpython committed Jan 27, 2016
1 parent ef51b2e commit aa725de8e7dd3ee3e428a2bffefc19d429f654d9
Showing with 69 additions and 5 deletions.
  1. +67 −3 src/pyensae/remote/azure_connection.py
  2. +2 −2 src/pyensae/remote/azure_exception.py
@@ -653,7 +653,7 @@ def url_webHCatUrl(self, cmd):
"""
if self.hadoop_name is None:
raise AttributeError(
"no hadoop server was given to the constructor")
"no hadoop server was given to the constructor for cmd: {0}".format(cmd))
webHCatUrl = 'https://' + self.hadoop_name + \
'.azurehdinsight.net/templeton/v1/' + cmd
return webHCatUrl
@@ -872,17 +872,71 @@ def pig_submit(self,
data=params)

if r.status_code != 200:
raise AzureException("unable to submit job: " + pig_file, r)
raise AzureException(
"unable to submit job: {0}\n---\nWITH PARAMS\n---\n{1}".format(pig_file, params), r)
return r.json()

def job_queue(self, showall=False):
def job_queue(self, showall=False, fields=None):
"""
returns the list of jobs
It uses the API `Job Information — GET queue/:jobid <https://cwiki.apache.org/confluence/display/Hive/WebHCat+Reference+Jobs>`_.
@param showall if True, show all your jobs (not only yours)
@param fields to add fields in the requests
@return list of jobs
@example(List job queue)
Most of the time, a job remains stuck in the job queue because
it is full. Here is a code to check that is the case on
a Azure cluster. It should be executed from a notebook.
Connection ::
blobstorage = "..."
blobpassword = "..."
hadoop_server = "..."
hadoop_password = "..."
username = "..."
import pyensae
client, bs = %hd_open
Job queue ::
res = client.job_queue()
res.reverse() # last submitted jobs first
Displays the first 20 jobs::
for i, r in enumerate(res[:20]):
st = client.job_status(r["id"])
print(i, r, st["status"]["state"],datetime.fromtimestamp(float(st["status"]["startTime"])/1000), st["status"]["jobName"])
print(st["userargs"].get("file", None), st["profile"].get("jobName", None))
It gives::
0 {'detail': None, 'id': 'job_1451961118663_3126'} PREP 2016-01-26 21:57:28.756000 TempletonControllerJob
wasb://..../scripts/pig/titi.pig TempletonControllerJob
1 {'detail': None, 'id': 'job_1451961118663_3125'} PREP 2016-01-26 21:57:28.517999 TempletonControllerJob
wasb://..../scripts/pig/pre_processing.pig TempletonControllerJob
2 {'detail': None, 'id': 'job_1451961118663_3124'} PREP 2016-01-26 21:50:32.742000 TempletonControllerJob
wasb://..../scripts/pig/titi.pig TempletonControllerJob
3 {'detail': None, 'id': 'job_1451961118663_3123'} RUNNING 2016-01-26 21:46:57.219000 TempletonControllerJob
wasb://..../scripts/pig/alg1.pig TempletonControllerJob
4 {'detail': None, 'id': 'job_1451961118663_3122'} SUCCEEDED 2016-01-26 21:40:34.687999 PigLatin:pre_processing.pig
None PigLatin:pre_processing.pig
5 {'detail': None, 'id': 'job_1451961118663_3121'} RUNNING 2016-01-26 21:41:29.657000 TempletonControllerJob
wasb://..../scripts/pig/Algo_LDA2.pig TempletonControllerJob
6 {'detail': None, 'id': 'job_1451961118663_3120'} SUCCEEDED 2016-01-26 21:40:06.859999 TempletonControllerJob
wasb://..../scripts/pig/alg1.pig TempletonControllerJob
To kill a job::
client.job_kill("id")
@endexample
"""
if self.hadoop_user_name is None:
raise AttributeError(
@@ -896,6 +950,10 @@ def job_queue(self, showall=False):
params = {"user.name": self.hadoop_user_name}
if showall:
params["showall"] = "true"
if fields:
if fields != "*":
raise ValueError("fields can only be *")
params["fields"] = fields

r = requests.get(webHCatUrl,
auth=(self.hadoop_user_name, self.hadoop_key),
@@ -914,6 +972,12 @@ def job_status(self, jobid):
@param jobid jobid
@return json
You can extract the *startTime* by doing::
from datetime import datetime
st = client.job_status(<job_id>)
datetime.fromtimestamp(float(st["status"]["startTime"])/1000)
"""
if self.hadoop_user_name is None:
raise AttributeError(
@@ -27,7 +27,7 @@ def __init__(self, message, ret):
except Exception as e:
js = str(e) + "\n" + str(ret)

self.ret = (code, js)
self.ret = (code, js, ret)
else:
self.ret = (None, None)

@@ -36,5 +36,5 @@ def __str__(self):
usual
"""
s = Exception.__str__(self)
f = "STATUS: {0}, JSON: {1}\n{2}".format(self.ret[0], self.ret[1], s)
f = "STATUS: {0}, JSON: {1}\n{2}\nREQUEST:\n{3}".format(self.ret[0], self.ret[1], s, self.ret[2])
return f

0 comments on commit aa725de

Please sign in to comment.
You can’t perform that action at this time.