diff --git a/README.rst b/README.rst index 40de5a4c..5f240e38 100644 --- a/README.rst +++ b/README.rst @@ -82,6 +82,7 @@ Versions * **add:** add magic command and methods to enable a shell from a notebook (a kind of putty) (command ``%open_remove_shell``, ...) * **new:** function :func:`parse_code ` parses a script in R, PIG, SQLite syntax and checks if there is any mistake, it requires `antlr4 `_ * **new:** new class :class:`AzureClient ` to handle some basic needs with Azure + * **add:** add magic command and methods to handle Azure from a notebook * **0.8 - 2014/10/24** * **add:** add method :meth:`copy_to ` to copy every table from a database to another one * **fix:** class :class:`Database ` can now handle in memory database diff --git a/src/pyensae/remote/azure_connection.py b/src/pyensae/remote/azure_connection.py index 06463a9e..d3926e7a 100644 --- a/src/pyensae/remote/azure_connection.py +++ b/src/pyensae/remote/azure_connection.py @@ -5,16 +5,42 @@ """ import time, socket, datetime +import requests import azure.storage +class AzureException(Exception): + """ + exception raised by @see cl AzureClient + """ + def __init__(self, message, code, json): + """ + store more information than a regular exception + + @param message error message + @param code error code + @param json information in json format + """ + Exception.__init__(self, message) + self.ret = (code, json) + + def __str__(self): + """ + usual + """ + s = Exception.__str__(self) + f = "STATUS: {0}, JSON: {1}\n{2}".format(self.ret[0], self.ret[1], s) + return f + class AzureClient(): """ .. index: Azure A simple class to access to communite with `Azure `_. - It requires modules - `azure `_. + It requires modules: + + * `azure `_ + * `requests `_ Main functionalities related to blob: * list_containers, create_container, list_blobs, put_blob, put_block_blob_from_bytes @@ -58,6 +84,10 @@ class AzureClient(): @endcode @endexample + + Many function uses WebHCat API. + The error code can be found here: + `Error Codes and Responses `_. """ _blob_properties = [ @@ -80,27 +110,48 @@ class AzureClient(): "etag", ] - def __init__(self, account_name, account_key, fLOG = None): + def __init__(self, blob_name, + blob_key, + hadoop_name = None, + hadoop_key = None, + hadoop_user_name = "admin", + fLOG = None): """ constructor - @param account_name account name - @param account_key account key + @param blob_name blob storage name + @param blob_key account key for the blob storage + @param hadoop_name hadoop server name (can be None if HDInsight is not used) + @param hadoop_key hadoop key (can be None if HDInsight is not used) @param fLOG logging function """ - self.account_name = account_name - self.account_key = account_key + self.account_name = blob_name + self.account_key = blob_key + self.hadoop_name = hadoop_name + self.hadoop_key = hadoop_key + self.hadoop_user_name = hadoop_user_name if fLOG is None: def _log_(*l,**p): return self.LOG = _log_ else: self.LOG = fLOG + + @staticmethod + def mask_string(s): + """ + return empty string or as many ``*`` as the length of the string + """ + if s is None : return "" + else: return "*" * len(s) def __str__(self): """ usual """ - return "AzureClient" + mes = "AzureClient [blob:({0},{1}), hadoop:({2},{3},{4})]".format( AzureClient.mask_string(self.account_name), + AzureClient.mask_string(self.account_key), AzureClient.mask_string(self.hadoop_name), + AzureClient.mask_string(self.hadoop_key), AzureClient.mask_string(self.hadoop_user_name)) + return mes def open_blob_service(self): """ @@ -222,6 +273,7 @@ def delete_blob(self, blob_service, container_name, blob_name): def url_blob(self, blob_service, container, blob_name): """ + returns an url for a blob file name @param container container @param blob_name blob_name @@ -243,3 +295,238 @@ def copy_blob(self, blob_service, container, blob_name, source): res = blob_service.copy_blob(container, blob_name, url) return res + def url_webHCatUrl(self, cmd): + """ + returns an url to the cluster + + @param cmd something like ``pig``, ``status`` + @return url + """ + if self.hadoop_name is None: + raise AttributeError("no hadoop server was given to the constructor") + webHCatUrl='https://' + self.hadoop_name + '.azurehdinsight.net/templeton/v1/' + cmd + return webHCatUrl + + def wasb_to_file(self, container_name, blob_file): + """ + return something like ``wasb://demo@myblobstorage.blob...`` + + @param container_name name of a container + @param blob_file path to a file + @return return a url to blob file (pig script for example) + """ + return 'wasb://{1}@{0}.blob.core.windows.net/{2}'.format(container_name, + self.account_name, blob_file) + + def wasb_prefix(self, container_name): + """ + when using an instruction ``LOAD`` in a PIG script, + file blob name must be reference using a wasb syntax. + This method returns the prefix to add. + + @return wasb prefix + """ + return self.wasb_to_file(container_name, "") + + def get_status(self): + """ + return the status of the webHCatUrl server + + @return json + """ + if self.hadoop_user_name is None: + raise AttributeError("no hadoop user name was given to the constructor") + if self.hadoop_key is None: + raise AttributeError("no hadoop password was given to the constructor") + + webHCatUrl = self.url_webHCatUrl("status") + + r = requests.get( webHCatUrl, + auth=(self.hadoop_user_name, self.hadoop_key)) + if r.status_code != 200: + raise AzureException("unable to the status of server: " + webHCatUrl, r.status_code, r.json()) + return r.json() + + def get_version(self): + """ + return the status of the WebHCat version + + @return json + """ + if self.hadoop_user_name is None: + raise AttributeError("no hadoop user name was given to the constructor") + if self.hadoop_key is None: + raise AttributeError("no hadoop password was given to the constructor") + + webHCatUrl = self.url_webHCatUrl("version/hive") + + r = requests.get( webHCatUrl, + auth=(self.hadoop_user_name, self.hadoop_key)) + if r.status_code != 200: + raise AzureException("unable to the version of server: " + webHCatUrl, r.status_code, r.json()) + return r.json() + + def pig_submit(self, container_name, blob_pig, status_dir = None): + """ + Submit a PIG job assuming this script + was uploaded to the blog storage + + The code comes from `How to use HDInsight from Linux `_ + and `start a Pig + Jython job in HDInsight thru WebHCat `_. + The API is described at `Pig Job — POST pig `_. + + @param container_name name of a container + @param blob_pig path to the job in the blob storage + @param status_dir folder used by Hadoop to store job's progress, it should contain + your alias if you want to avoid collision with others' jobs + @return json + + @example(Azure___Submit a job PIG) + + The script PIG must include an instruction ``LOAD``. + This instruction use file name defined with the `wasb syntax `_. + + If you place the string ``__CONTAINER__`` before a stream name, + it will be replaced by the corresponding wasb syntax associated + to the container name defined by ``container_name``. + The function will then load your script, + modify it and save another one with the by adding + ``.wasb.pig``. + + @code + blobstorage = "****" + blobpassword = "*********************" + hadoop_name = "*********" + hadoop_password = "********" + cl = AzureClient(blobstorage, + blobpassword, + hadoop_name, + hadoop_password) + script = ''' + myinput = LOAD '__CONTAINER__' + using PigStorage(',') + AS (index:long, sequence, tag, timestamp:long, dateformat, x:double,y:double, z:double, activity) ; + filt = FILTER myinput BY activity == 'walking' ; + STORE filt INTO '__CONTAINER__' USING PigStorage() ; + ''' + + script = script.replace("__CONTAINER__", cl.wasb_prefix(blobstorage)) + + with open("script_walking.pig","w") as f : + f.write(script) + + bs = cl.open_blob_service() + cl.upload(bs, blobstorage, "testensae/script.pig", "script_walking.pig") + + for f in cl.ls(bs, blobstorage, "testensae"): + print(f["name"]) + + js = cl.pig_submit(blobstorage, "testensae/script.pig", "status/pig/xavierdupre") + print(js) + + js = cl.job_status('job_1414863995725_0013') + @endcode + @endexample + """ + if self.hadoop_user_name is None: + raise AttributeError("no hadoop user name was given to the constructor") + if self.hadoop_key is None: + raise AttributeError("no hadoop password was given to the constructor") + + wasb = self.wasb_to_file(container_name, blob_pig) + + params = {'user.name':self.hadoop_user_name, + 'file': wasb, + 'arg':'-v'} + + if status_dir is not None: + params['statusdir'] = self.wasb_to_file(container_name, status_dir) + + webHCatUrl = self.url_webHCatUrl("pig") + + r = requests.post( webHCatUrl, + auth=(self.hadoop_user_name, self.hadoop_key), + data=params) + + if r.status_code != 200: + raise AzureException("unable to submit job: " + blob_pig, r.status_code, r.json()) + return r.json() + + def job_queue(self, showall = False): + """ + returns the list of jobs + + It uses the API `Job Information — GET queue/:jobid `_. + + @param showall if True, show all your jobs (not only yours) + @return list of jobs + """ + if self.hadoop_user_name is None: + raise AttributeError("no hadoop user name was given to the constructor") + if self.hadoop_key is None: + raise AttributeError("no hadoop password was given to the constructor") + + webHCatUrl = self.url_webHCatUrl("jobs") + + params = { "user.name": self.hadoop_user_name } + if showall: + params["showall"]="true" + + r = requests.get( webHCatUrl, + auth=(self.hadoop_user_name, self.hadoop_key), + params=params) + + if r.status_code != 200: + raise AzureException("unable to get job queue", r.status_code, r.json()) + return r.json() + + def job_status(self, jobid): + """ + return the status of a job + + see `List Versions — GET version `_ + for the outcome + + @param jobid jobid + @return json + """ + if self.hadoop_user_name is None: + raise AttributeError("no hadoop user name was given to the constructor") + if self.hadoop_key is None: + raise AttributeError("no hadoop password was given to the constructor") + + params = { "user.name":self.hadoop_user_name} + webHCatUrl = self.url_webHCatUrl("jobs/" + jobid) + + r = requests.get( webHCatUrl, + auth=(self.hadoop_user_name, self.hadoop_key), + params=params) + if r.status_code != 200: + raise AzureException("unable to the version of server: " + webHCatUrl, r.status_code, r.json()) + return r.json() + + def job_kill(self, jobid): + """ + kills a job + + see `Delete Job — DELETE queue/:jobid `_ + for the outcome + + @param jobid jobid + @return json + """ + if self.hadoop_user_name is None: + raise AttributeError("no hadoop user name was given to the constructor") + if self.hadoop_key is None: + raise AttributeError("no hadoop password was given to the constructor") + + params = { "user.name":self.hadoop_user_name} + webHCatUrl = self.url_webHCatUrl("jobs/" + jobid) + + r = requests.delete( webHCatUrl, + auth=(self.hadoop_user_name, self.hadoop_key), + params=params) + if r.status_code != 200: + raise AzureException("unable to the version of server: " + webHCatUrl, r.status_code, r.json()) + return r.json() + diff --git a/src/pyensae/remote/magic_azure.py b/src/pyensae/remote/magic_azure.py index d624e4e9..04962fb8 100644 --- a/src/pyensae/remote/magic_azure.py +++ b/src/pyensae/remote/magic_azure.py @@ -9,13 +9,15 @@ from IPython.core.magic import Magics, magics_class, line_magic, cell_magic from IPython.core.magic import line_cell_magic from IPython.core.display import HTML -from .azure_connection import AzureClient +from .azure_connection import AzureClient, AzureException @magics_class class MagicAzure(Magics): """ Defines magic commands to access `blob storage `_ and `HDInsight `_. + + When the container is not specified, it will take the default one. """ def _replace_params(self, cell): @@ -52,6 +54,22 @@ def open_blob(self, line): @see me open_blob """ return self.blob_open(line) + + @line_magic + def azureclient(self,line): + """ + returns the AzureClient object + """ + cl, bs = self.get_blob_connection() + return cl + + @line_magic + def blobservice(self,line): + """ + returns the BlobService object + """ + cl, bs = self.get_blob_connection() + return bs @line_magic def blob_open (self, line): @@ -61,13 +79,13 @@ def blob_open (self, line): spl = line.strip().split() if len(spl) != 3 and len(spl) != 0: print("Usage:") - print(" blob_open ") + print(" blob_open ") print(" blob_open") print("") - print("No parameter means blobstorage, blobalias, blobpassword will be found in the workspace") + print("No parameter means blobstorage, blobpassword will be found in the workspace") else: - if len(spl)==3: - server,username,password = spl + if len(spl)==2: + server,password = spl elif self.shell is not None: server = self.shell.user_ns.get("blobstorage",None) password = self.shell.user_ns.get("blobpassword",None) @@ -87,14 +105,56 @@ def blob_open (self, line): self.shell.user_ns["remote_azure_blob"] = bs return bs - def create_client(self, account_name, account_key): + @line_magic + def hd_open (self, line): + """ + open a connection to blob service + """ + spl = line.strip().split() + if len(spl) != 3 and len(spl) != 0: + print("Usage:") + print(" hd_open ") + print(" hd_open") + print("") + print("No parameter means blobstorage, blobpassword, hadoop_server, hadoop_password will be found in the workspace.") + print("HDInsight is able to work with multiple blob storage.") + print("Only one is allowed here.") + else: + if len(spl)==4: + server,password,hadoop_server,hadoop_password = spl + elif self.shell is not None: + server = self.shell.user_ns.get("blobstorage",None) + password = self.shell.user_ns.get("blobpassword",None) + hadoop_server = self.shell.user_ns.get("hadoop_server",None) + hadoop_password = self.shell.user_ns.get("hadoop_password",None) + if server is None : raise KeyError("unable to find blobstorage") + if password is None : raise KeyError("unable to find blobpassword") + if hadoop_server is None : raise KeyError("unable to find hadoop_server") + if hadoop_password is None : raise KeyError("unable to find hadoop_password") + else: + raise Exception("No detected workspace.") + + if self.shell is None: + raise Exception("No detected workspace.") + + if "remote_azure_blob" in self.shell.user_ns: + raise Exception("a connection is still open, close it first") + + cl = self.create_client(server, password, hadoop_server, hadoop_password) + bs = cl.open_blob_service() + self.shell.user_ns["remote_azure_blob"] = bs + return bs + + def create_client(self, account_name, account_key, hadoop_server = None, hadoop_password = None): """ Create a @see cl AzureClient and stores in the workspace. @param account_name login @param account_key password + @param hadoop_server hadoop server + @param hadoop_password hadoop password """ - cl = AzureClient(account_name, account_key) + cl = AzureClient(account_name, account_key, hadoop_server, hadoop_password) self.shell.user_ns["remote_azure_client"] = cl return cl @@ -132,6 +192,31 @@ def ls_blob(self, line): """ return self.blob_ls(line) + def _interpret_path(self, line, cl, bs, empty_is_value = False): + """ + Interpret a path + + @param line line + @param cl @see cl AzureClient + @param bs blob service + @param empty_is_value if True, do not raise an exception + @return container, remotepath + """ + line = line.strip() + if line.startswith("/"): + container = cl.account_name + line = line.lstrip("/") + remotepath = line + else: + spl = line.split("/") + container = spl[0] + remotepath = None if len(spl)==1 else "/".join(spl[1:]) + + if not empty_is_value and len(remotepath)==0: + raise FileNotFoundError("path should not be empty: " + line) + + return container, remotepath + @line_magic def blob_ls(self, line): """ @@ -139,16 +224,12 @@ def blob_ls(self, line): """ if line is None or len(line.strip()) == 0: print("Usage:") - print(" blob_ls container/path") + print(" blob_ls ") + print("or") + print(" blob_ls ") else : - line = line.strip() - if line.startswith("/"): - raise FileNotFoundError("path cannot start with '/': " + path) - - spl = line.split("/") - container = spl[0] - remotepath = None if len(spl)==1 else "/".join(spl[1:]) cl, bs = self.get_blob_connection() + container, remotepath = self._interpret_path(line, cl, bs, True) l = cl.ls(bs, container, remotepath) return pandas.DataFrame(l) @@ -175,22 +256,15 @@ def blob_up(self, line): if len(spl) != 2 : print("Usage:") print(" blob_up ") - print("") - print("no space allowed in file names") + print("or") + print(" blob_up ") else : localfile,remotepath = spl if not os.path.exists(localfile) : raise FileNotFoundError(localfile) - - if remotepath.startswith("/"): - raise FileNotFoundError("remotepath cannot start with '/': " + remotepath) - - spl = remotepath.split("/") - if len(spl) <= 1 : - raise FileNotFoundError("the path is too short: " + remotepath) - container = spl[0] - remotepath = "/".join(spl[1:]) + cl, bs = self.get_blob_connection() + container,remotepath = self._interpret_path(remotepath, cl, bs) cl.upload(bs, container, remotepath, localfile) return remotepath @@ -215,24 +289,16 @@ def blob_down(self, line): spl = line.strip().split() if len(spl) != 2 : print("Usage:") - print(" blob_down ") - print("") - print("no space allowed in file names") + print(" blob_down ") + print("or") + print(" blob_down ") else : remotepath,localfile = spl if os.path.exists(localfile) : raise Exception("file {0} cannot be overwritten".format(localfile)) - if remotepath.startswith("/"): - raise FileNotFoundError("remotepath cannot start with '/': " + remotepath) - - spl = remotepath.split("/") - if len(spl) == 1 : - raise FileNotFoundError("the path is too short: " + remotepath) - container = spl[0] - remotepath = "/".join(spl[1:]) - cl, bs = self.get_blob_connection() + container,remotepath = self._interpret_path(remotepath, cl, bs) cl.download(bs, container, remotepath, localfile) return localfile @@ -243,22 +309,12 @@ def blob_delete(self, line): """ if line is None or len(line.strip()) == 0: print("Usage:") - print(" blob_delete ") - print("") - print("no space allowed in file names") + print(" blob_delete ") + print("or") + print(" blob_delete ") else : - remotepath = line.strip() - - if remotepath.startswith("/"): - raise FileNotFoundError("remotepath cannot start with '/': " + remotepath) - - spl = remotepath.split("/") - if len(spl) == 1 : - raise FileNotFoundError("the path is too short: " + remotepath) - container = spl[0] - remotepath = "/".join(spl[1:]) - cl, bs = self.get_blob_connection() + container, remotepath = self._interpret_path(line, cl, bs) cl.delete_blob(bs, container, remotepath) return True @@ -270,67 +326,109 @@ def blob_copy(self, line): spl = line.strip().split() if len(spl) != 2 : print("Usage:") - print(" blob_copy ") - print("") - print("no space allowed in file names") + print(" blob_copy ") + print("or") + print(" blob_copy ") else : src,dest = spl - - if src.startswith("/"): - raise FileNotFoundError("remotepath cannot start with '/': " + src) - if dest.startswith("/"): - raise FileNotFoundError("remotepath cannot start with '/': " + dest) - - spl = src.split("/") - if len(spl) == 1 : - raise FileNotFoundError("the path is too short: " + src) - container = spl[0] - src = "/".join(spl[1:]) - - spl = dest.split("/") - if len(spl) == 1 : - raise FileNotFoundError("the path is too short: " + dest) - if container != spl[0]: - raise Exception("the two containers should be the same: {0} != {1}".format(container, spl[0])) - dest = "/".join(spl[1:]) - cl, bs = self.get_blob_connection() + container,src = self._interpret_path(src, cl, bs) + container_,dest = self._interpret_path(dest, cl, bs) + if container != container_: + raise AzureException("containers should be the same: {0} != {1}".format(container, container_)) cl.copy_blob(bs, container, dest, src) return True + + @line_magic + def hd_queue(self, line): + """ + defines ``%hq_queue`` + """ + showall = line in ["showall", "1", 1, "True", True, "true"] + cl, bs = self.get_blob_connection() + return cl.job_queue(showall = showall) + + @line_magic + def hd_job_status(self, line): + """ + defines ``%hd_job_status`` + """ + line = line.strip() + if len(line) == 0 : + print("Usage:") + print(" hd_job_status ") + else: + jobid = line + cl, bs = self.get_blob_connection() + return cl.job_status(jobid) + + @line_magic + def hd_job_kill(self, line): + """ + defines ``%hd_job_kill`` + """ + line = line.strip() + if len(line) == 0 : + print("Usage:") + print(" hd_job_kill ") + else: + jobid = line + cl, bs = self.get_blob_connection() + return cl.job_kill(jobid) + + @line_magic + def hd_wasb_prefix(self, line): + """ + defines ``%hd_wasb_prefix`` + """ + line = line.strip() + cl, bs = self.get_blob_connection() + return cl.wasb_to_file(cl.account_name) - @line_magic - def azurepigsubmit(self, line): + @cell_magic + def PIG_azure(self, line, cell = None): """ - defines command ``%azurepigsubmit`` + defines command ``%%PIG_azure`` """ - if line in [None, ""]: + if line in [None, ""] : print("Usage:") - print(" %azurepigsubmit ") + print(" %%PIG_azure ") print("") - print("The file is local.") + print("The command store the content of the cell as a local file.") + print("It also replaces __CONTAINER__ as a right prefix for streams.") else: filename = line.strip() - spl = filename.split() - filename = spl[0] - raise NotImplementedError() - + script = cell.replace("\r","") + cl, bs = self.get_blob_connection() + prefix = cl.wasb_prefix(cl.account_name) + script = script.replace("__CONTAINER__", prefix + "/") + with open(filename, "w", encoding="utf8") as f : + f.write(script) + @line_magic - def azurejobsyntax(self, line): + def hd_pig_submit(self, line): """ - defines command ``%azurejobsyntax`` + defines command ``%hd_pig_submit`` """ if line in [None, ""]: print("Usage:") - print(" %azurejobsyntax ") + print(" %hd_pig_submit ") print("") + print("The file is local.") else: - filename = line.strip() - if not os.path.exists(filename): - raise FileNotFoundError(filename) + line = line.strip() + if not os.path.exists(line): + raise FileNotFoundError(line) - dest = os.path.split(filename)[-1] - raise NotImplementedError() - + username = self.shell.user_ns["username"] if "username" in self.shell.user_ns else os.environ.get("USERNAME","nouser") + remote = "scripts/pig/{0}/{1}".format(username, os.path.split(line)[-1]) + sd = "scripts/run/{0}".format(username) + + cl, bs = self.get_blob_connection() + cl.upload(bs, cl.account_name, remote, line) + r = cl.pig_submit(cl.account_name,remote,status_dir=sd) + return r + def register_azure_magics(): """