Skip to content
Permalink
Browse files

add function pig_submit to remote/remote_connection.py, modify magic_…

…remote, possibility to use parameters
  • Loading branch information...
sdpython committed Nov 24, 2014
1 parent 5e528cf commit d635f466b89e3366b2367cda4bb0f10770c387ee
Showing with 120 additions and 37 deletions.
  1. +25 −22 _unittests/ut_remote/test_cloudera.py
  2. +3 −14 src/pyensae/remote/magic_remote.py
  3. +92 −1 src/pyensae/remote/remote_connection.py
@@ -45,21 +45,21 @@ def tearDown(self):
if self.client is not None:
self.client.close()

def test_ls(self):
def _test_ls(self):
fLOG (__file__, self._testMethodName, OutputPrint = __name__ == "__main__")
if self.client is None: return
df = self.client.ls(".")
fLOG(df)
assert isinstance(df, pandas.DataFrame)

def test_hdfs_dfs_ls(self):
def _test_hdfs_dfs_ls(self):
fLOG (__file__, self._testMethodName, OutputPrint = __name__ == "__main__")
if self.client is None: return
df = self.client.dfs_ls(".")
fLOG(df)
assert isinstance(df, pandas.DataFrame)

def test_upload_download(self) :
def _test_upload_download(self) :
fLOG (__file__, self._testMethodName, OutputPrint = __name__ == "__main__")
if self.client is None: return
data = os.path.join(os.path.abspath(os.path.split(__file__)[0]), "data")
@@ -124,23 +124,25 @@ def test_script_pig(self) :
pyfile = os.path.join(fold, "pystream.py")
with open(pyfile,"w", encoding="utf8") as f : f.write(pyth)

tosend = """[{'address': "52 RUE D'ENGHIEN / ANGLE RUE DU FAUBOURG POISSONIERE - 75010 PARIS", 'collect_date': datetime.datetime(2014, 11, 11, 22, 1, 18, 331070), 'lng': 2.348395236282807, 'contract_name': 'Paris', 'name': '10042 - POISSONNIÈRE - ENGHIEN', 'banking': 0, 'lat': 48.87242006305313, 'bonus': 0, 'status': 'OPEN', 'available_bikes': 32, 'last_update': datetime.datetime(2014, 11, 11, 21, 59, 5), 'number': 10042, 'available_bike_stands': 1, 'bike_stands': 33}]"""
tosend = """[{'address': "52 RUE D'ENGHIEN / ANGLE RUE DU FAUBOURG POISSONIERE - 75010 PARIS", 'collect_date': datetime.datetime(2014, 11, 11, 22, 1, 18, 331070), 'lng': 2.348395236282807, 'contract_name': 'Paris', 'name': '10042 - POISSONNIÈRE - ENGHIEN', 'banking': 0, 'lat': 48.87242006305313, 'bonus': 0, 'status': 'OPEN', 'available_bikes': 32, 'last_update': datetime.datetime(2014, 11, 11, 21, 59, 5), 'number': 10042, 'available_bike_stands': 1, 'bike_stands': 33},{'address': "52 RUE D'ENGHIEN / ANGLE RUE DU FAUBOURG POISSONIERE - 75010 PARIS", 'collect_date': datetime.datetime(2014, 11, 11, 22, 1, 18, 331070), 'lng': 2.348395236282807, 'contract_name': 'Paris', 'name': '10042 - POISSONNIÈRE - ENGHIEN', 'banking': 0, 'lat': 48.87242006305313, 'bonus': 0, 'status': 'OPEN', 'available_bikes': 32, 'last_update': datetime.datetime(2014, 11, 11, 21, 59, 5), 'number': 10042, 'available_bike_stands': 1, 'bike_stands': 33}]"""

cmd = sys.executable.replace("pythonw", "python") + " " + pyfile + " name"
out,err = run_cmd(cmd, wait=True, sin=tosend, communicate=True, timeout=3, shell=False)
fLOG("OUT\n",out)
fLOG("ERR\n",err)
assert len(out) > 0

out = out.strip("\n\r ")
spl = out.split("\n")
if len(spl) != 2:
raise Exception("len:{2}\nOUT:\n{0}\nERR:\n{1}".format(out,err,len(out)))

# PIG script

pig = """
DEFINE pystream `python pystream.py bonus available_bike_stands available_bikes lat lng name status` SHIP ('pystream.py') INPUT(stdin USING PigStreaming(',')) OUTPUT (stdout USING PigStreaming(','));
DEFINE pystream `python pystream.py bonus available_bike_stands available_bikes lat lng name status`
SHIP ('pystream.py')
INPUT(stdin USING PigStreaming(',')) OUTPUT (stdout USING PigStreaming(','));
jspy = LOAD 'unittest2/*.txt' USING PigStorage('\t') AS (arow:chararray);
jspy = LOAD '$UTT/*.txt' USING PigStorage('\t') AS (arow:chararray);
DUMP jspy ;
--DUMP jspy ;
matrice = STREAM jspy THROUGH pystream AS
( bonus:chararray,
@@ -178,20 +180,21 @@ def test_script_pig(self) :
if self.client.dfs_exists("unittest2/results.txt"):
self.client.dfs_rm("unittest2/results.txt", True)

# we upload the scripts
self.client.upload( [ pyfile, pigfile ], ".")

# we test the syntax
out, err = self.client.execute_command("pig -check pystream.pig", no_exception = True)
fLOG("OUT\n",out)
fLOG("ERR\n",err)
assert "pystream.pig syntax OK" in err
out, err = self.client.pig_submit(pigfile, dependencies = [pyfile], check=True,
no_exception = True,
params=dict(UTT="unittest2"))
if "pystream.pig syntax OK" not in err:
raise Exception("OUT:\n{0}\nERR:\n{1}".format(out,err))

# we submit the job
out, err = self.client.execute_command("pig -execute -stop_on_failure -f pystream.pig", no_exception = True)
fLOG("OUT\n",out)
fLOG("ERR\n",err)
assert "Total records written : 4" in err
out, err = self.client.pig_submit(pigfile, dependencies = [pyfile],
stop_on_failure = True, no_exception = True,
redirection = None,
params=dict(UTT="unittest2"))

if "Total records written : 4" not in err:
raise Exception("OUT:\n{0}\nERR:\n{1}".format(out,err))

dest = os.path.join(fold, "out_merged.txt")
fLOG("dest=",dest)
@@ -145,20 +145,10 @@ def jobsubmit(self, line):
redirection = None if len(spl) == 1 else spl[1]
if not os.path.exists(filename):
raise FileNotFoundError(filename)

dest = os.path.split(filename)[-1]

ssh = self.get_connection()
ssh.upload(filename, dest)
for py in pythons:
ssh.upload(py, os.path.split(py)[-1])
slocal = " -x local" if local else ""
out, err = ssh.pig_submit( filename, dependencies = python, redirection = redirection, local = local)

if redirection is None:
cmd = "pig{0} -execute -f ".format(slocal) + dest
else:
cmd = "pig{2} -execute -f {0} 2> {1}.err 1> {1}.out &".format(filename, redirection, slocal)

out, err = ssh.execute_command(cmd, no_exception = True)
if len(err) > 0 and (len(out) == 0 or "ERROR" in err or "FATAL" in err or "Exception" in err):
return HTML("<pre>\n%s\n</pre>" % err)
else:
@@ -217,8 +207,7 @@ def jobsyntax(self, line):

dest = os.path.split(filename)[-1]
ssh = self.get_connection()
ssh.upload(filename, dest)
out, err = ssh.execute_command("pig -check " + dest, no_exception = True)
out, err = ssh.pig_submit(filename, check=True, no_exception=True)
if len(err) > 0 and (len(out) == 0 or "ERROR" in err or "FATAL" in err or "Exception" in err):
return HTML("<pre>\n%s\n</pre>" % err)
else:
@@ -470,4 +470,95 @@ def dfs_rm(self, path, recursive = False):
out, err = self.execute_command(cmd + path, no_exception = True)
if out.startswith("Moved"): return out, err
else:
raise Exception("unable to remove " + path + "\nOUT\n" + out + "\nERR:\n" + err)
raise Exception("unable to remove " + path + "\nOUT\n" + out + "\nERR:\n" + err)

def pig_submit(self, pig_file,
dependencies = None,
params = None,
redirection = "redirection",
local = False,
stop_on_failure = False,
check = False,
no_exception = True) :
"""
submits a PIG script, it first upload the script
to the default folder and submit it
@param pig_file pig script
@param dependencies others files to upload (still in the default folder)
@param params parameters to send to the job
@param redirection string empty or not
@param local local run or not (option `-x local <https://cwiki.apache.org/confluence/display/PIG/PigTutorial>`_) (in that case, redirection will be empty)
@param stop_on_failure if True, add option ``-stop_on_failure`` on the command line
@param check if True, add option ``-check`` (in that case, redirection will be empty)
@param no_exception sent to @see me execute_command
@return out, err from @see me execute_command
If *redirection* is not empty, the job is submitted but
the function returns after the standard output and error were
redirected to ``redirection.out`` and ``redirection.err``.
The first file will contain the results of commands
`DESCRIBE <http://pig.apache.org/docs/r0.14.0/test.html#describe>`_
`DUMP <http://pig.apache.org/docs/r0.14.0/test.html#dump>`_,
`EXPLAIN <http://pig.apache.org/docs/r0.14.0/test.html#explain>`_.
The standard error receives logs and exceptions.
The function executes the command line::
pig -execute -f <filename>
With redirection::
pig -execute -f <filename> 2> redirection.err 1> redirection.out &
.. versionadded:: 1.1
"""
dest = os.path.split(pig_file)[-1]
self.upload(pig_file, dest)
for py in dependencies:
self.upload(py, os.path.split(py)[-1])

slocal = " -x local" if local else ""
sstop_on_failure = " -stop_on_failure" if stop_on_failure else ""
scheck = " -check" if check else ""

if local or check:
redirection = None

if redirection is None:
cmd = "pig{0}{1}{2} -execute -f ".format(slocal, sstop_on_failure, scheck) + dest
else:
cmd = "pig{2}{3}{4} -execute -f {0} 2> {1}.err 1> {1}.out &".format(dest,
redirection, slocal, sstop_on_failure, scheck)

if params is not None:
sparams = ASSHClient.build_command_line_parameters(params)
if len(sparams) > 0 :
cmd += " " + sparams

if isinstance(cmd, list):
raise TypeError("this should not happen:" + str(cmd))

out, err = self.execute_command(cmd, no_exception = no_exception)
return out, err

@staticmethod
def build_command_line_parameters(params):
"""
builds a string for ``pig`` based on the parameters in params
@param params dictionary
@return string
.. versionadded:: 1.1
"""
if params is None :
return ""
res = [ ]
for k,v in sorted(params.items()):
if '"' in v : v = v.replace('"', '\\"')
one = '-param {0}="{1}"'.format(k,v)
res.append(one)
return " ".join(res)

0 comments on commit d635f46

Please sign in to comment.
You can’t perform that action at this time.