Skip to content
Permalink
Browse files

move shortcuts, add function to submit hive script

  • Loading branch information...
sdpython committed Oct 14, 2015
1 parent e5b55af commit 0e670ece2e7eb754982dba09cb29777e18867569
@@ -39,7 +39,7 @@
import pyquickhelper

from pyquickhelper import fLOG, run_cmd
from src.pyensae import ASSHClient
from src.pyensae.remote import ASSHClient

thisfold = os.path.abspath(os.path.split(__file__)[0])
thiscomm = os.path.join(thisfold, "..")
@@ -146,6 +146,7 @@ def test_script_pig(self):
os.path.split(__file__)[0]),
"data")

fLOG("AA")
# python script

pyth = """
@@ -167,6 +168,8 @@ def test_script_pig(self):
if not os.path.exists(fold):
os.mkdir(fold)

fLOG("BB")

pyfile = os.path.join(fold, "pystream.py")
with open(pyfile, "w", encoding="utf8") as f:
f.write(pyth)
@@ -187,6 +190,7 @@ def test_script_pig(self):
err,
len(out)))

fLOG("CC")
# PIG script

pig = """
@@ -215,10 +219,21 @@ def test_script_pig(self):
STORE matrice INTO 'unittest2/results.txt' USING PigStorage('\t') ;
""".replace(" ", "")

hive_sql = """
DROP TABLE IF EXISTS bikes20;
CREATE TABLE bikes20 (sjson STRING);
LOAD DATA INPATH "/user/__USERNAME__/unittest2/paris*.txt" INTO TABLE bikes20;
SELECT * FROM bikes20 LIMIT 10;
""".replace("__USERNAME__", self.client.username)
fLOG(hive_sql)
#${hiveconf:UTT}

pigfile = os.path.join(fold, "pystream.pig")
with open(pigfile, "w", encoding="utf8") as f:
f.write(pig)

fLOG("DD upload")

# we upload some files

files = os.listdir(data)
@@ -235,6 +250,8 @@ def test_script_pig(self):
if self.client.dfs_exists("unittest2/results.txt"):
self.client.dfs_rm("unittest2/results.txt", True)

fLOG("FF")

# we test the syntax
out, err = self.client.pig_submit(pigfile, dependencies=[pyfile], check=True,
no_exception=True,
@@ -243,26 +260,49 @@ def test_script_pig(self):
if "pystream.pig syntax OK" not in err:
raise Exception("OUT:\n{0}\nERR:\n{1}".format(out, err))

fLOG("II")

# we submit the job
out, err = self.client.pig_submit(pigfile, dependencies=[pyfile],
stop_on_failure=True, no_exception=True,
redirection=None,
params=dict(UTT="unittest2"))

fLOG("JJ")

if "Total records written : 4" not in err:
raise Exception("OUT:\n{0}\nERR:\n{1}".format(out, err))
raise Exception("PIG OUT:\n{0}\nPIG ERR:\n{1}".format(out, err))

dest = os.path.join(fold, "out_merged.txt")
fLOG("dest=", dest)
if os.path.exists(dest):
os.remove(dest)

fLOG("KK")

self.client.download_cluster("unittest2/results.txt", dest, merge=True)
assert os.path.exists(dest)
with open(dest, "r", encoding="utf8") as f:
content = f.read()
fLOG("-----\n", content)
assert len(content.strip(" \n\r\t")) > 0

fLOG("LL")

# we submit the job
out, err = self.client.hive_submit(hive_sql,
redirection=None,
params=dict(UTT="unittest2"),
fLOG=fLOG)

fLOG("HIVE OUT")
fLOG(out)
fLOG("HIVE ERR")
fLOG(err)
#assert "(0,1.0,32.0,48.8724200631,2.34839523628,10042" in out

fLOG("END")


if __name__ == "__main__":
unittest.main()
@@ -47,20 +47,8 @@ def check(log=False):
"""
return True

from .finance.astock import StockPrices
from .datasource.data_velib import DataVelibCollect
from .datasource.convert import dBase2df, dBase2sqllite
from .file_helper.content_helper import replace_comma_by_point
from .file_helper.decompress_helper import decompress_zip, decompress_targz, decompress_gz
from .file_helper.jython_helper import run_jython, get_jython_jar, is_java_installed, download_java_standalone
from .file_helper.content_helper import file_head, file_tail
from .resources.http_retrieve import download_data
from .sql.database_helper import import_flatfile_into_database
from .sql.sql_interface import InterfaceSQL, InterfaceSQLException
from .sql.database_main import Database
from .remote.ssh_remote_connection import ASSHClient
from .remote.azure_connection import AzureClient
from .graph_helper.graphviz_helper import run_dot


try:
from IPython import get_ipython
@@ -79,9 +67,8 @@ def check(log=False):
from .graph_helper.magic_graph import register_graph_magics
from .notebook_helper.magic_notebook import register_notebook_magics
except Exception as e:
import warnings
warnings.warn(str(e))
raise ImportError("ipython does not seem to be available") from e

ip = get_ipython()
if ip is not None:
# the program is not run from a notebook
@@ -0,0 +1,7 @@
"""
@file
@brief Shortcuts to datasource
"""

from .data_velib import DataVelibCollect
from .convert import dBase2df, dBase2sqllite
@@ -0,0 +1,9 @@
"""
@file
@brief Shortcuts to file_helper
"""

from .content_helper import replace_comma_by_point
from .decompress_helper import decompress_zip, decompress_targz, decompress_gz
from .jython_helper import run_jython, get_jython_jar, is_java_installed, download_java_standalone
from .content_helper import file_head, file_tail
@@ -0,0 +1,6 @@
"""
@file
@brief shortcuts to finance
"""

from .finance.astock import StockPrices
@@ -0,0 +1,7 @@
"""
@file
@brief Shortcuts to remote
"""

from .ssh_remote_connection import ASSHClient
from .azure_connection import AzureClient
@@ -524,10 +524,31 @@ def dfs_rm(self, path, recursive=False):
"\nERR:\n" +
err)

@staticmethod
def build_command_line_parameters(params, command_name="-param"):
"""
builds a string for ``pig`` based on the parameters in params
@param params dictionary
@param command_name ``-param`` or ``-hiveconf``
@return string
.. versionadded:: 1.1
"""
if params is None:
return ""
res = []
for k, v in sorted(params.items()):
if '"' in v:
v = v.replace('"', '\\"')
one = '{2} {0}="{1}"'.format(k, v, command_name)
res.append(one)
return " ".join(res)

def pig_submit(self, pig_file,
dependencies=None,
params=None,
redirection="redirection",
redirection="redirection.pig",
local=False,
stop_on_failure=False,
check=False,
@@ -560,11 +581,11 @@ def pig_submit(self, pig_file,
The function executes the command line::
pig -execute -f <filename>
pig -execute -f <filename>
With redirection::
pig -execute -f <filename> 2> redirection.err 1> redirection.out &
pig -execute -f <filename> 2> redirection.pig.err 1> redirection.pig.out &
.. versionadded:: 1.1
"""
@@ -596,8 +617,13 @@ def pig_submit(self, pig_file,
dest,
sparams)
else:
cmd = "pig{2}{3}{4} -execute -f {0}{5} 2> {1}.err 1> {1}.out &".format(dest,
redirection, slocal, sstop_on_failure, scheck, sparams)
cmd = "pig{2}{3}{4} -execute -f {0}{5} 2> {1}.err 1> {1}.out &".format(
dest,
redirection,
slocal,
sstop_on_failure,
scheck,
sparams)

if isinstance(cmd, list):
raise TypeError("this should not happen:" + str(cmd))
@@ -606,22 +632,94 @@ def pig_submit(self, pig_file,
out, err = self.execute_command(cmd, no_exception=no_exception)
return out, err

@staticmethod
def build_command_line_parameters(params):
def hive_submit(self, hive_file_or_query,
params=None,
redirection="redirection.hive",
no_exception=True,
fLOG=noLOG):
"""
builds a string for ``pig`` based on the parameters in params
submits a PIG script, it first upload the script
to the default folder and submit it
@param hive_file_or_query pig script (local)
@param params parameters to send to the job
@param redirection string empty or not
@param no_exception sent to @see me execute_command
@param fLOG logging function
@return out, err from @see me execute_command
If *redirection* is not empty, the job is submitted but
the function returns after the standard output and error were
redirected to ``redirection.hive.out`` and ``redirection.hive.err``.
The function executes the command line::
hive -f <filename>
Or::
hive -e <query>
With redirection::
hive -execute -f <filename> 2> redirection.hive.err 1> redirection.hive.out &
If there is no redirection, the function
waits and return the output.
@param params dictionary
@return string
@example(Submit a HIVE query)
@code
client = ASSHClient()
hive_sql = '''
DROP TABLE IF EXISTS bikes20;
CREATE TABLE bikes20 (sjson STRING);
LOAD DATA INPATH "/user/__USERNAME__/unittest2/paris*.txt" INTO TABLE bikes20;
SELECT * FROM bikes20 LIMIT 10;
'''.replace("__USERNAME__", self.client.username)
out,err = client.hive_submit(hive_sql, redirection=None)
@endcode
@endexample
.. versionadded:: 1.1
"""
if params is None:
return ""
res = []
for k, v in sorted(params.items()):
if '"' in v:
v = v.replace('"', '\\"')
one = '-param {0}="{1}"'.format(k, v)
res.append(one)
return " ".join(res)
if len(hive_file_or_query) < 5000 and os.path.exists(hive_file_or_query):
dest = os.path.split(hive_file_or_query)[-1]
self.upload(hive_file_or_query, dest)
command = "-f"
else:
command = "-e"
dest = hive_file_or_query.replace(
"\n", " ").replace("\r", "").replace("\t", " ")
dest = dest.replace("'", "\\'")
dest = "'{}'".format(dest.strip())

if params is not None:
sparams = ASSHClient.build_command_line_parameters(
params, "-hiveconf")
if len(sparams) > 0:
sparams = " " + sparams
else:
sparams = ""

if redirection is None:
cmd = "hive {0} {1}{2}".format(
command,
dest,
sparams)
else:
cmd = "hive {0} {1}{2} 2> {3}.err 1> {3}.out &".format(
command,
dest,
sparams,
redirection)

if isinstance(cmd, list):
raise TypeError("this should not happen:" + str(cmd))

fLOG("[hive_submit]:", cmd)
out, err = self.execute_command(cmd, no_exception=no_exception)
return out, err
@@ -7,7 +7,6 @@
import importlib
import re
import urllib.request
from ..file_helper.decompress_helper import decompress_zip, decompress_targz, decompress_gz


def remove_empty_line(file):
@@ -99,6 +98,8 @@ def download_data(name,
If it does not work, I suggest to use standard python:
`Download a file from Dropbox with Python <http://www.xavierdupre.fr/blog/2015-01-20_nojs.html>`_.
"""
from ..file_helper.decompress_helper import decompress_zip, decompress_targz, decompress_gz

if glo is None:
glo = globals()
if loc is None:
@@ -0,0 +1,8 @@
"""
@file
@brief Shortcuts to sql
"""

from .database_helper import import_flatfile_into_database
from .sql_interface import InterfaceSQL, InterfaceSQLException
from .database_main import Database

0 comments on commit 0e670ec

Please sign in to comment.
You can’t perform that action at this time.