Skip to content
This repository has been archived by the owner on Jan 13, 2024. It is now read-only.

Commit

Permalink
azure command to handle pig script on Azure
Browse files Browse the repository at this point in the history
  • Loading branch information
sdpython committed Nov 2, 2014
1 parent f1bc0e6 commit ebf23e3
Show file tree
Hide file tree
Showing 3 changed files with 487 additions and 101 deletions.
1 change: 1 addition & 0 deletions README.rst
Expand Up @@ -82,6 +82,7 @@ Versions
* **add:** add magic command and methods to enable a shell from a notebook (a kind of putty) (command ``%open_remove_shell``, ...)
* **new:** function :func:`parse_code <pyensae.languages.antlr_grammar_use.parse_code>` parses a script in R, PIG, SQLite syntax and checks if there is any mistake, it requires `antlr4 <https://pypi.python.org/pypi/antlr4-python3-runtime/>`_
* **new:** new class :class:`AzureClient <pyensae.remote.azure_connection.AzureClient>` to handle some basic needs with Azure
* **add:** add magic command and methods to handle Azure from a notebook
* **0.8 - 2014/10/24**
* **add:** add method :meth:`copy_to <pyensae.sql.database_main.Database.copy_to>` to copy every table from a database to another one
* **fix:** class :class:`Database <pyensae.sql.database_main.Database>` can now handle in memory database
Expand Down
303 changes: 295 additions & 8 deletions src/pyensae/remote/azure_connection.py
Expand Up @@ -5,16 +5,42 @@
"""

import time, socket, datetime
import requests
import azure.storage

class AzureException(Exception):
"""
exception raised by @see cl AzureClient
"""
def __init__(self, message, code, json):
"""
store more information than a regular exception
@param message error message
@param code error code
@param json information in json format
"""
Exception.__init__(self, message)
self.ret = (code, json)

def __str__(self):
"""
usual
"""
s = Exception.__str__(self)
f = "STATUS: {0}, JSON: {1}\n{2}".format(self.ret[0], self.ret[1], s)
return f

class AzureClient():
"""
.. index: Azure
A simple class to access to communite with `Azure <http://azure.microsoft.com/>`_.
It requires modules
`azure <https://github.com/Azure/azure-sdk-for-python>`_.
It requires modules:
* `azure <https://github.com/Azure/azure-sdk-for-python>`_
* `requests <http://docs.python-requests.org/en/latest/>`_
Main functionalities related to blob:
* list_containers, create_container, list_blobs, put_blob, put_block_blob_from_bytes
Expand Down Expand Up @@ -58,6 +84,10 @@ class AzureClient():
@endcode
@endexample
Many function uses WebHCat API.
The error code can be found here:
`Error Codes and Responses <https://cwiki.apache.org/confluence/display/Hive/WebHCat+UsingWebHCat#WebHCatUsingWebHCat-ErrorCodesandResponses>`_.
"""

_blob_properties = [
Expand All @@ -80,27 +110,48 @@ class AzureClient():
"etag",
]

def __init__(self, account_name, account_key, fLOG = None):
def __init__(self, blob_name,
blob_key,
hadoop_name = None,
hadoop_key = None,
hadoop_user_name = "admin",
fLOG = None):
"""
constructor
@param account_name account name
@param account_key account key
@param blob_name blob storage name
@param blob_key account key for the blob storage
@param hadoop_name hadoop server name (can be None if HDInsight is not used)
@param hadoop_key hadoop key (can be None if HDInsight is not used)
@param fLOG logging function
"""
self.account_name = account_name
self.account_key = account_key
self.account_name = blob_name
self.account_key = blob_key
self.hadoop_name = hadoop_name
self.hadoop_key = hadoop_key
self.hadoop_user_name = hadoop_user_name
if fLOG is None:
def _log_(*l,**p): return
self.LOG = _log_
else:
self.LOG = fLOG

@staticmethod
def mask_string(s):
"""
return empty string or as many ``*`` as the length of the string
"""
if s is None : return ""
else: return "*" * len(s)

def __str__(self):
"""
usual
"""
return "AzureClient"
mes = "AzureClient [blob:({0},{1}), hadoop:({2},{3},{4})]".format( AzureClient.mask_string(self.account_name),
AzureClient.mask_string(self.account_key), AzureClient.mask_string(self.hadoop_name),
AzureClient.mask_string(self.hadoop_key), AzureClient.mask_string(self.hadoop_user_name))
return mes

def open_blob_service(self):
"""
Expand Down Expand Up @@ -222,6 +273,7 @@ def delete_blob(self, blob_service, container_name, blob_name):

def url_blob(self, blob_service, container, blob_name):
"""
returns an url for a blob file name
@param container container
@param blob_name blob_name
Expand All @@ -243,3 +295,238 @@ def copy_blob(self, blob_service, container, blob_name, source):
res = blob_service.copy_blob(container, blob_name, url)
return res

def url_webHCatUrl(self, cmd):
"""
returns an url to the cluster
@param cmd something like ``pig``, ``status``
@return url
"""
if self.hadoop_name is None:
raise AttributeError("no hadoop server was given to the constructor")
webHCatUrl='https://' + self.hadoop_name + '.azurehdinsight.net/templeton/v1/' + cmd
return webHCatUrl

def wasb_to_file(self, container_name, blob_file):
"""
return something like ``wasb://demo@myblobstorage.blob...``
@param container_name name of a container
@param blob_file path to a file
@return return a url to blob file (pig script for example)
"""
return 'wasb://{1}@{0}.blob.core.windows.net/{2}'.format(container_name,
self.account_name, blob_file)

def wasb_prefix(self, container_name):
"""
when using an instruction ``LOAD`` in a PIG script,
file blob name must be reference using a wasb syntax.
This method returns the prefix to add.
@return wasb prefix
"""
return self.wasb_to_file(container_name, "")

def get_status(self):
"""
return the status of the webHCatUrl server
@return json
"""
if self.hadoop_user_name is None:
raise AttributeError("no hadoop user name was given to the constructor")
if self.hadoop_key is None:
raise AttributeError("no hadoop password was given to the constructor")

webHCatUrl = self.url_webHCatUrl("status")

r = requests.get( webHCatUrl,
auth=(self.hadoop_user_name, self.hadoop_key))
if r.status_code != 200:
raise AzureException("unable to the status of server: " + webHCatUrl, r.status_code, r.json())
return r.json()

def get_version(self):
"""
return the status of the WebHCat version
@return json
"""
if self.hadoop_user_name is None:
raise AttributeError("no hadoop user name was given to the constructor")
if self.hadoop_key is None:
raise AttributeError("no hadoop password was given to the constructor")

webHCatUrl = self.url_webHCatUrl("version/hive")

r = requests.get( webHCatUrl,
auth=(self.hadoop_user_name, self.hadoop_key))
if r.status_code != 200:
raise AzureException("unable to the version of server: " + webHCatUrl, r.status_code, r.json())
return r.json()

def pig_submit(self, container_name, blob_pig, status_dir = None):
"""
Submit a PIG job assuming this script
was uploaded to the blog storage
The code comes from `How to use HDInsight from Linux <http://blogs.msdn.com/b/benjguin/archive/2014/02/18/how-to-use-hdinsight-from-linux.aspx>`_
and `start a Pig + Jython job in HDInsight thru WebHCat <http://blogs.msdn.com/b/benjguin/archive/2014/03/21/start-a-pig-jython-job-in-hdinsight-thru-webhcat.aspx>`_.
The API is described at `Pig Job — POST pig <https://cwiki.apache.org/confluence/display/Hive/WebHCat+Reference+Pig>`_.
@param container_name name of a container
@param blob_pig path to the job in the blob storage
@param status_dir folder used by Hadoop to store job's progress, it should contain
your alias if you want to avoid collision with others' jobs
@return json
@example(Azure___Submit a job PIG)
The script PIG must include an instruction ``LOAD``.
This instruction use file name defined with the `wasb syntax <http://azure.microsoft.com/en-us/documentation/articles/hdinsight-use-blob-storage/>`_.
If you place the string ``__CONTAINER__`` before a stream name,
it will be replaced by the corresponding wasb syntax associated
to the container name defined by ``container_name``.
The function will then load your script,
modify it and save another one with the by adding
``.wasb.pig``.
@code
blobstorage = "****"
blobpassword = "*********************"
hadoop_name = "*********"
hadoop_password = "********"
cl = AzureClient(blobstorage,
blobpassword,
hadoop_name,
hadoop_password)
script = '''
myinput = LOAD '__CONTAINER__<input.csv>'
using PigStorage(',')
AS (index:long, sequence, tag, timestamp:long, dateformat, x:double,y:double, z:double, activity) ;
filt = FILTER myinput BY activity == 'walking' ;
STORE filt INTO '__CONTAINER__<output.csv>' USING PigStorage() ;
'''
script = script.replace("__CONTAINER__", cl.wasb_prefix(blobstorage))
with open("script_walking.pig","w") as f :
f.write(script)
bs = cl.open_blob_service()
cl.upload(bs, blobstorage, "testensae/script.pig", "script_walking.pig")
for f in cl.ls(bs, blobstorage, "testensae"):
print(f["name"])
js = cl.pig_submit(blobstorage, "testensae/script.pig", "status/pig/xavierdupre")
print(js)
js = cl.job_status('job_1414863995725_0013')
@endcode
@endexample
"""
if self.hadoop_user_name is None:
raise AttributeError("no hadoop user name was given to the constructor")
if self.hadoop_key is None:
raise AttributeError("no hadoop password was given to the constructor")

wasb = self.wasb_to_file(container_name, blob_pig)

params = {'user.name':self.hadoop_user_name,
'file': wasb,
'arg':'-v'}

if status_dir is not None:
params['statusdir'] = self.wasb_to_file(container_name, status_dir)

webHCatUrl = self.url_webHCatUrl("pig")

r = requests.post( webHCatUrl,
auth=(self.hadoop_user_name, self.hadoop_key),
data=params)

if r.status_code != 200:
raise AzureException("unable to submit job: " + blob_pig, r.status_code, r.json())
return r.json()

def job_queue(self, showall = False):
"""
returns the list of jobs
It uses the API `Job Information — GET queue/:jobid <https://cwiki.apache.org/confluence/display/Hive/WebHCat+Reference+Jobs>`_.
@param showall if True, show all your jobs (not only yours)
@return list of jobs
"""
if self.hadoop_user_name is None:
raise AttributeError("no hadoop user name was given to the constructor")
if self.hadoop_key is None:
raise AttributeError("no hadoop password was given to the constructor")

webHCatUrl = self.url_webHCatUrl("jobs")

params = { "user.name": self.hadoop_user_name }
if showall:
params["showall"]="true"

r = requests.get( webHCatUrl,
auth=(self.hadoop_user_name, self.hadoop_key),
params=params)

if r.status_code != 200:
raise AzureException("unable to get job queue", r.status_code, r.json())
return r.json()

def job_status(self, jobid):
"""
return the status of a job
see `List Versions — GET version <https://cwiki.apache.org/confluence/display/Hive/WebHCat+Reference+Job>`_
for the outcome
@param jobid jobid
@return json
"""
if self.hadoop_user_name is None:
raise AttributeError("no hadoop user name was given to the constructor")
if self.hadoop_key is None:
raise AttributeError("no hadoop password was given to the constructor")

params = { "user.name":self.hadoop_user_name}
webHCatUrl = self.url_webHCatUrl("jobs/" + jobid)

r = requests.get( webHCatUrl,
auth=(self.hadoop_user_name, self.hadoop_key),
params=params)
if r.status_code != 200:
raise AzureException("unable to the version of server: " + webHCatUrl, r.status_code, r.json())
return r.json()

def job_kill(self, jobid):
"""
kills a job
see `Delete Job — DELETE queue/:jobid <https://cwiki.apache.org/confluence/display/Hive/WebHCat+Reference+DeleteJob>`_
for the outcome
@param jobid jobid
@return json
"""
if self.hadoop_user_name is None:
raise AttributeError("no hadoop user name was given to the constructor")
if self.hadoop_key is None:
raise AttributeError("no hadoop password was given to the constructor")

params = { "user.name":self.hadoop_user_name}
webHCatUrl = self.url_webHCatUrl("jobs/" + jobid)

r = requests.delete( webHCatUrl,
auth=(self.hadoop_user_name, self.hadoop_key),
params=params)
if r.status_code != 200:
raise AzureException("unable to the version of server: " + webHCatUrl, r.status_code, r.json())
return r.json()

0 comments on commit ebf23e3

Please sign in to comment.