Permalink
Browse files

finish copy work, examples for group_concat, little code cleanups.

  • Loading branch information...
1 parent 0159320 commit feed9e52a5cc791308529e1c2de919b718d462d7 @bvandiver bvandiver committed Dec 15, 2011
@@ -47,11 +47,12 @@ command - command to execute.
Output columns:
id - passed in
command - passed in
-text - line of output from shell command, one row per line
+text - lines of output (1 row per line), truncated to 64000 characters
The actual execution is accomplished by forking and running: bash -c <command>
Because the command is actually interpreted by the shell, pipes,
-redirects, looping and other shell functionality is available.
+redirects, looping and other shell functionality is available. One
+row per line of output is produced.
Example:
select shell_execute(local_node_name(),'date') over ();
@@ -68,6 +69,7 @@ select shell_execute(node_name, 'tail -10 vertica.log') over (partition by segva
See examples/shell.sql for a number of other example uses.
See examples/diagnostics.sql for a script to collect diagnostics through vsql + shell extension
+See examples/copy-cmd-writing.sql for a script to stage & load files from an ftp server
Notes:
* A badly formed shell command does not error the query; the text
@@ -1,13 +1,11 @@
-- code to generate a segmented table with exactly 1 row per node
-CREATE TABLE :tablename (segval int) segmented by hash(segval) :nodes ksafe;
+CREATE TABLE :tablename (segval int, node_name varchar(100)) segmented by hash(segval) :nodes ksafe;
-- create a temp table to store at least one row per node
CREATE LOCAL TEMP TABLE stage:tablename (a int) segmented by hash(a) :nodes;
-- load table with enough rows so that hash function is highly likely to get at least one on each node
INSERT INTO stage:tablename select int_sequence(1,cnt*10) over () from (select count(*) as cnt from nodes) n;
-- use last_value partitioned by node and group by to get exactly one integer which should hash onto each node
-INSERT /*+direct*/ INTO :tablename select lv from (select name,last_value(a) over (partition by name order by a rows between unbounded preceding and unbounded following) as lv from (select local_node_name() as name,a from stage:tablename) a) b group by lv;
+INSERT INTO :tablename select lv from (select name,last_value(a) over (partition by name order by a rows between unbounded preceding and unbounded following) as lv from (select local_node_name() as name,a from stage:tablename) a) b group by lv;
+UPDATE /*+direct*/ :tablename set node_name = local_node_name();
COMMIT;
DROP TABLE stage:tablename;
-
--- helper
-CREATE VIEW on:tablename as SELECT local_node_name() as node_name, segval FROM :tablename;
@@ -1,11 +1,12 @@
-- code to generate a segmented table with exactly 1 row per node
-CREATE LOCAL TEMP TABLE :tablename (segval int) ON COMMIT PRESERVE ROWS segmented by hash(segval) :nodes;
+CREATE LOCAL TEMP TABLE :tablename (segval int, node_name varchar(100)) ON COMMIT PRESERVE ROWS segmented by hash(segval) :nodes;
-- create a temp table to store at least one row per node
CREATE LOCAL TEMP TABLE stage:tablename (a int) segmented by hash(a) :nodes;
-- load table with enough rows so that hash function is highly likely to get at least one on each node
INSERT INTO stage:tablename select int_sequence(1,cnt*10) over () from (select count(*) as cnt from nodes) n;
-- use last_value partitioned by node and group by to get exactly one integer which should hash onto each node
INSERT /*+direct*/ INTO :tablename select lv from (select name,last_value(a) over (partition by name order by a rows between unbounded preceding and unbounded following) as lv from (select local_node_name() as name,a from stage:tablename) a) b group by lv;
+UPDATE /*+direct*/ :tablename SET node_name = local_node_name();
COMMIT;
DROP TABLE stage:tablename;
@@ -9,7 +9,9 @@ CREATE FUNCTION hostname as language 'C++' name 'HostnameFactory' library ShellL
CREATE TRANSFORM FUNCTION shell_execute as language 'C++' name 'ShellFactory' library ShellLib;
CREATE TRANSFORM FUNCTION int_sequence as language 'C++' name 'IntSequenceFactory' library ShellLib;
-\set tablename allnodes
-\set istemp
+\set tablename onallnodes
\set nodes 'all nodes'
\i ddl/create-dist-table.sql
+
+CREATE TABLE node_segment_reference (int segval, node_name varchar(100)) unsegmented all nodes;
+INSERT /*+direct*/ into node_segment_reference SELECT * from onallnodes;
@@ -1,2 +1 @@
DROP TABLE :tablename;
-DROP VIEW on:tablename;
@@ -1,3 +1,30 @@
-select '''' || fname || ''' ON ' || node_name from (select rank() over (order by node_id) as r, node_name from nodes) nodes, (select rank() over (order by fname) as r, fname from files) files where nodes.r = (files.r % 3) + 1;
+-- script to FTP files from an ftp server to temp locations across the
+-- cluster and execute a multi-node copy statement to load them.
-select name, b.command, b.text from (select name, shell_execute('stat ~/5.0_CATANIA/vertica/SQLTest/' || fname) over (partition by name) from (select local_node_name() as name,fname from files) a) b;
+-- usage:
+-- vsql -v tablename=\'table\' -v ftpurl=\'ftp.vertica.com\' -v tmpdir=\'/data/tmp\' -t -f examples/copy-cmd-writing.sql | vsql
+
+-- arguments
+
+--\set tablename '\'foo\''
+--\set copyopts '\'DIRECT\''
+--\set ftpurl '\'ftp://ftp.vertica.com/shellext/\''
+--\set tmpdir '\'/tmp/wgettemp/\''
+
+\o /dev/null
+create table files (fname varchar(1000)) segmented by hash(fname) all nodes;
+insert into files select text as fname from (select shell_execute('files','wget -q -O - '||:ftpurl ||' | grep File | cut -d ''"'' -f 2') over ()) a;
+select * from files;
+
+select shell_execute(local_node_name(), 'wget -nc -q -P '||:tmpdir||local_node_name()||' '||fname) over (partition by fname) from files order by fname;
+
+create table staged as select :tmpdir || text as file, id as node_name from (select shell_execute(node_name, 'ls '||:tmpdir||node_name) over (partition by segval) from onallnodes order by id) a;
+
+\o
+-- requires string_extensions
+select 'copy ' || :tablename || ' from ' || srcs || ' ' || :copyopts || ';' from (select group_concat(file || ' on ' || node_name) over () as srcs from staged) files;
+
+\o /dev/null
+drop table files;
+drop table staged;
+\o
@@ -10,8 +10,8 @@ select shell_execute(node_name,'tail -5 vertica.log') over (partition by segval)
--select shell_execute(node_name, 'killall -9 vertica') over (partition by segval) from onallnodes order by id;
-- vstack
--- cannot run vstack directly, as it needs vertica to read its
--- results... but vertica is paused for the vstack!
+-- cannot return vstack results directly, as it needs vertica to read its
+-- results... but vertica is paused for the vstack! So use a temp file.
-- select shell_execute(node_name, 'vstack > /tmp/vstack; cat /tmp/vstack') over (partition by segval) from onallnodes order by id;
@@ -34,10 +34,25 @@ select shell_execute(node_name, 'pwd') over (partition by segval) from onjustes
\i ddl/remove-dist-table.sql
+-- use vertica as a distributed launcher for anything
+-- control concurrency with resource pools!
+
+-- random tests
+
+-- line overflow
+select length(text) from (select shell_execute('id','for x in `seq 1 64001`; do echo -n q; done') over ()) a;
+
+-- no output
+select shell_execute('null','cat /dev/null') over ();
+
+-- nul chars
+select shell_execute('null','echo -e "hello\x00world"') over ();
+
----- int_sequence
select int_sequence(1,5) over ();
select int_sequence(5,1) over ();
select int_sequence(-1,-1) over ();
+
@@ -174,8 +174,6 @@ class Shell : public TransformFunction
{
if (buf[ptr] == '\n')
{
- // ensure null terminated
- //outputline[bufend] = '\0';
// Copy string into results
output_writer.getStringRef(0).copy(idstr);
output_writer.getStringRef(1).copy(cmdstr);
@@ -194,12 +192,10 @@ class Shell : public TransformFunction
// results from last line
if (bufend > 0)
{
- // ensure null terminated
- //outputline[bufend] = '\0';
// Copy string into results
output_writer.getStringRef(0).copy(idstr);
output_writer.getStringRef(1).copy(cmdstr);
- output_writer.getStringRef(2).copy(outputline);
+ output_writer.getStringRef(2).copy(outputline,bufend);
output_writer.next();
bufend = 0; // reset
}
@@ -211,15 +207,17 @@ class Shell : public TransformFunction
output_writer.next();
}
- subproc.terminate(false/*gently!*/);
+ subproc.terminate(true/*gently!*/);
} while (input_reader.next());
}
};
class ShellFactory : public TransformFunctionFactory
{
// Tell Vertica that we take in a row with 1 string, and return a row with 2 strings
- virtual void getPrototype(ServerInterface &srvInterface, ColumnTypes &argTypes, ColumnTypes &returnType)
+ virtual void getPrototype(ServerInterface &srvInterface,
+ ColumnTypes &argTypes,
+ ColumnTypes &returnType)
{
argTypes.addVarchar();
argTypes.addVarchar();
@@ -237,7 +235,8 @@ class ShellFactory : public TransformFunctionFactory
{
// Error out if we're called with anything but 1 argument
if (input_types.getColumnCount() != 2)
- vt_report_error(0, "Function only accepts 2 arguments, but %zu provided", input_types.getColumnCount());
+ vt_report_error(0, "Function only accepts 2 arguments, but %zu provided",
+ input_types.getColumnCount());
// first column outputs the id string passed in
int input_len = input_types.getColumnType(0).getStringLength();
@@ -251,8 +250,9 @@ class ShellFactory : public TransformFunctionFactory
// Our output size will never be more than the input size
output_types.addVarchar(input_len, "command");
- // other output is a line of output from the shell command, which is truncated at 65000 characters
- output_types.addVarchar(65000, "text");
+ // other output is a line of output from the shell command, which is
+ // truncated at LINE_MAX characters
+ output_types.addVarchar(LINE_MAX, "text");
}
virtual TransformFunction *createTransformFunction(ServerInterface &srvInterface)
@@ -21,6 +21,8 @@ WordCount: Counts the number of words that appear in a sentence
GenAnagram: For each input string, it produces many output strings, one for each
permutation of every subset of letters. (doc/Anagramarama.pdf)
+GroupConcat: Concatenates strings in a partition into a comma-separated list.
+
-------------------------------
BUILDING
-------------------------------
@@ -0,0 +1,5 @@
+-- get a list of nodes
+select group_concat(node_name) over () from nodes;
+
+-- nodes with storage for a projection
+select schema_name,projection_name,group_concat(node_name) over (partition by schema_name,projection_name) from (select distinct node_name,schema_name,projection_name from storage_containers) sc order by schema_name, projection_name;
@@ -1,6 +1,7 @@
/* Copyright (c) 2005 - 2011 Vertica, an HP company -*- C++ -*- */
/*
- * Description: User Defined Transform Function: for each partition, output a comma-separated list in a string
+ * Description: User Defined Transform Function: for each partition, output a
+ * comma-separated list in a string
*
* Create Date: Dec 15, 2011
*/
@@ -10,10 +11,12 @@
using namespace Vertica;
using namespace std;
+#define LINE_MAX 64000
/*
* Takes in a sequence of string values and produces a single output tuple with
- * a comma separated list of values.
+ * a comma separated list of values. If the output string would overflow the
+ * maximum line length, stop appending values and include a ", ..."
*/
class GroupConcat : public TransformFunction
@@ -27,6 +30,7 @@ class GroupConcat : public TransformFunction
ostringstream oss;
bool first = true;
+ bool exceeded = false;
do {
const VString &elem = input_reader.getStringRef(0);
@@ -35,11 +39,23 @@ class GroupConcat : public TransformFunction
{
continue;
}
- else
+ else if (!exceeded)
{
- if (!first) oss << ", ";
- first = false;
- oss << elem.str();
+ std::string s = elem.str();
+ size_t curpos = oss.tellp();
+ curpos += s.length() + 2;
+ if (curpos > LINE_MAX)
+ {
+ exceeded = true;
+ if (first) oss << "...";
+ else oss << ", ...";
+ }
+ else
+ {
+ if (!first) oss << ", ";
+ first = false;
+ oss << s;
+ }
}
} while (input_reader.next());
@@ -69,8 +85,8 @@ class GroupConcatFactory : public TransformFunctionFactory
if (input_types.getColumnCount() != 1)
vt_report_error(0, "Function only accepts 1 argument, but %zu provided", input_types.getColumnCount());
- // Our output size will never be more than the input size
- output_types.addVarchar(65000, "summary");
+ // output can be wide. Include extra space for a last ", ..."
+ output_types.addVarchar(LINE_MAX+5, "list");
}
virtual TransformFunction *createTransformFunction(ServerInterface &srvInterface)

0 comments on commit feed9e5

Please sign in to comment.