Skip to content
Browse files

Initial commit

  • Loading branch information...
1 parent c15edbd commit 91b6b0c82f6aaa1d31c4e3933aa882f97a95dd1e @alamb alamb committed Jun 14, 2012
Showing with 9,375 additions and 4 deletions.
  1. +27 −0 LICENSE.txt
  2. +27 −0 LICENSE.txt~
  3. +0 −4 README.md
  4. +32 −0 README.txt
  5. +19 −0 VERTICA-DEVELOPER-README
  6. +118 −0 customize_config.pl
  7. BIN license/CorporateCLA.pdf
  8. BIN license/PersonalCLA.pdf
  9. +130 −0 makefile
  10. +62 −0 src/build.xml
  11. +133 −0 src/hadoop-connector/build.xml
  12. +61 −0 src/hadoop-connector/com/vertica/hadoop/Relation.java
  13. +561 −0 src/hadoop-connector/com/vertica/hadoop/VerticaConfiguration.java
  14. +293 −0 src/hadoop-connector/com/vertica/hadoop/VerticaInputFormat.java
  15. +210 −0 src/hadoop-connector/com/vertica/hadoop/VerticaInputSplit.java
  16. +208 −0 src/hadoop-connector/com/vertica/hadoop/VerticaOutputFormat.java
  17. +740 −0 src/hadoop-connector/com/vertica/hadoop/VerticaRecord.java
  18. +92 −0 src/hadoop-connector/com/vertica/hadoop/VerticaRecordReader.java
  19. +108 −0 src/hadoop-connector/com/vertica/hadoop/VerticaRecordWriter.java
  20. +125 −0 src/hadoop-connector/com/vertica/hadoop/deprecated/VerticaInputSplit.java
  21. +29 −0 src/hadoop-connector/com/vertica/hadoop/deprecated/VerticaStreamingInput.java
  22. +39 −0 src/hadoop-connector/com/vertica/hadoop/deprecated/VerticaStreamingOutput.java
  23. +93 −0 src/hadoop-connector/com/vertica/hadoop/deprecated/VerticaStreamingRecordReader.java
  24. +126 −0 src/hadoop-connector/com/vertica/hadoop/deprecated/VerticaStreamingRecordWriter.java
  25. +136 −0 src/hadoop-connector/com/vertica/hadoop/deprecated/VerticaUtil.java
  26. +87 −0 src/hadoop-example/build.xml
  27. +140 −0 src/hadoop-example/com/vertica/hadoop/VerticaExample.java
  28. +151 −0 src/hadoop-example/com/vertica/hadoop/hdfs2vertica.java
  29. +28 −0 src/makefile
  30. +94 −0 src/pig-connector/build.xml
  31. +317 −0 src/pig-connector/com/vertica/pig/VerticaLoader.java
  32. +143 −0 src/pig-connector/com/vertica/pig/VerticaStorer.java
  33. +99 −0 src/squeal/build.xml
  34. +94 −0 src/squeal/conf/verticafuncs.properties
  35. +33 −0 src/squeal/makefile
  36. +129 −0 src/squeal/src/com/vertica/squeal/Deparser.java
  37. +68 −0 src/squeal/src/com/vertica/squeal/InputScript.java
  38. +971 −0 src/squeal/src/com/vertica/squeal/LOTranslator.java
  39. +57 −0 src/squeal/src/com/vertica/squeal/NameUtil.java
  40. +87 −0 src/squeal/src/com/vertica/squeal/SchemaUtil.java
  41. +46 −0 src/squeal/src/com/vertica/squeal/Script.java
  42. +90 −0 src/squeal/src/com/vertica/squeal/ScriptRunner.java
  43. +653 −0 src/squeal/src/com/vertica/squeal/Squeal.java
  44. +245 −0 src/squeal/src/com/vertica/squeal/SquealParser.java
  45. +3 −0 testing/data/chars.dat
  46. +3 −0 testing/data/chars2.dat
  47. +3 −0 testing/data/chars3.dat
  48. +15 −0 testing/data/datasource
  49. +8 −0 testing/data/ints.dat
  50. +5 −0 testing/hdfslib/glob_test/region.tbl
  51. +5 −0 testing/hdfslib/glob_test/region1.tbl
  52. +5 −0 testing/hdfslib/glob_test/region11.tbl
  53. +5 −0 testing/hdfslib/glob_test/region12.tbl
  54. +5 −0 testing/hdfslib/glob_test/region2.tbl
  55. +5 −0 testing/hdfslib/glob_test/region31.tbl
  56. +5 −0 testing/hdfslib/glob_test/region_nopermissions.tbl
  57. +30 −0 testing/hdfslib/tpch_mini/customer.tbl
  58. +99 −0 testing/hdfslib/tpch_mini/lineitem.tbl
  59. +25 −0 testing/hdfslib/tpch_mini/nation.tbl
  60. +55 −0 testing/hdfslib/tpch_mini/orders.tbl
  61. +35 −0 testing/hdfslib/tpch_mini/part.tbl
  62. +45 −0 testing/hdfslib/tpch_mini/partsupp.tbl
  63. +5 −0 testing/hdfslib/tpch_mini/region.tbl
  64. +20 −0 testing/hdfslib/tpch_mini/supplier.tbl
  65. +110 −0 testing/makefile
  66. +26 −0 testing/pig-connector/Apache_10/expected/bugs.out
  67. +6 −0 testing/pig-connector/Apache_10/expected/no_permissions.out
  68. +35 −0 testing/pig-connector/Apache_10/expected/passwords.out
  69. +3 −0 testing/pig-connector/Apache_10/expected/passwords_bad_load.out
  70. +2 −0 testing/pig-connector/Apache_10/expected/passwords_bad_store.out
  71. +40 −0 testing/pig-connector/Apache_10/expected/smoke-test-load.out
  72. +26 −0 testing/pig-connector/Apache_10/expected/smoke-test-store.out
  73. +2 −0 testing/pig-connector/Apache_10/expected/teardown.out
  74. +6 −0 testing/pig-connector/Apache_10/test_driver.skip
  75. +35 −0 testing/pig-connector/CDH2/expected/bugs.out
  76. +4 −0 testing/pig-connector/CDH2/expected/no_permissions.out
  77. +40 −0 testing/pig-connector/CDH2/expected/passwords.out
  78. +3 −0 testing/pig-connector/CDH2/expected/passwords_bad_load.out
  79. +3 −0 testing/pig-connector/CDH2/expected/passwords_bad_store.out
  80. +47 −0 testing/pig-connector/CDH2/expected/smoke-test-load.out
  81. +33 −0 testing/pig-connector/CDH2/expected/smoke-test-store.out
  82. +3 −0 testing/pig-connector/CDH2/expected/teardown.out
  83. +6 −0 testing/pig-connector/CDH2/test_driver.skip
  84. +75 −0 testing/pig-connector/CDH4-Yarn/expected/bugs.out
  85. +41 −0 testing/pig-connector/CDH4-Yarn/expected/no_permissions.out
  86. +78 −0 testing/pig-connector/CDH4-Yarn/expected/passwords.out
  87. +5 −0 testing/pig-connector/CDH4-Yarn/expected/passwords_bad_load.out
  88. +4 −0 testing/pig-connector/CDH4-Yarn/expected/passwords_bad_store.out
  89. +84 −0 testing/pig-connector/CDH4-Yarn/expected/smoke-test-load.out
  90. +73 −0 testing/pig-connector/CDH4-Yarn/expected/smoke-test-store.out
  91. +6 −0 testing/pig-connector/CDH4-Yarn/expected/teardown.out
  92. +6 −0 testing/pig-connector/CDH4-Yarn/test_driver.skip
  93. +75 −0 testing/pig-connector/CDH4/expected/bugs.out
  94. +41 −0 testing/pig-connector/CDH4/expected/no_permissions.out
  95. +78 −0 testing/pig-connector/CDH4/expected/passwords.out
  96. +5 −0 testing/pig-connector/CDH4/expected/passwords_bad_load.out
  97. +4 −0 testing/pig-connector/CDH4/expected/passwords_bad_store.out
  98. +84 −0 testing/pig-connector/CDH4/expected/smoke-test-load.out
  99. +73 −0 testing/pig-connector/CDH4/expected/smoke-test-store.out
  100. +6 −0 testing/pig-connector/CDH4/expected/teardown.out
  101. +6 −0 testing/pig-connector/CDH4/test_driver.skip
  102. +26 −0 testing/pig-connector/HortonWorks/expected/bugs.out
  103. +5 −0 testing/pig-connector/HortonWorks/expected/no_permissions.out
  104. +37 −0 testing/pig-connector/HortonWorks/expected/passwords.out
  105. +3 −0 testing/pig-connector/HortonWorks/expected/passwords_bad_load.out
  106. +2 −0 testing/pig-connector/HortonWorks/expected/passwords_bad_store.out
  107. +40 −0 testing/pig-connector/HortonWorks/expected/smoke-test-load.out
  108. +26 −0 testing/pig-connector/HortonWorks/expected/smoke-test-store.out
  109. +2 −0 testing/pig-connector/HortonWorks/expected/teardown.out
  110. +6 −0 testing/pig-connector/HortonWorks/test_driver.skip
  111. +96 −0 testing/pig-connector/makefile
  112. +8 −0 testing/pig-connector/pigtest_smoke.in
  113. +37 −0 testing/pig-connector/scripts/bugs.pig
  114. +8 −0 testing/pig-connector/scripts/ints.dat
  115. +5 −0 testing/pig-connector/scripts/no_permissions.pig
  116. +16 −0 testing/pig-connector/scripts/passwords.pig
  117. +4 −0 testing/pig-connector/scripts/passwords_bad_load.pig
  118. +5 −0 testing/pig-connector/scripts/passwords_bad_store.pig
  119. +21 −0 testing/pig-connector/scripts/smoke-test-load.pig
  120. +33 −0 testing/pig-connector/scripts/smoke-test-store.pig
  121. +2 −0 testing/pig-connector/scripts/smoke-test.dat
  122. +2 −0 testing/pig-connector/scripts/teardown.pig
  123. +9 −0 testing/pig-connector/test_driver.skip
  124. +26 −0 testing/squeal/expected/complex-group.out
  125. +25 −0 testing/squeal/expected/joins.out
  126. +35 −0 testing/squeal/expected/simple.out
  127. +97 −0 testing/squeal/makefile
  128. +1 −0 testing/squeal/scripts/bar.log
  129. +30 −0 testing/squeal/scripts/complex-group.pig
  130. +10 −0 testing/squeal/scripts/foo.log
  131. +9 −0 testing/squeal/scripts/group.pig
  132. +10 −0 testing/squeal/scripts/joins.pig
  133. +7 −0 testing/squeal/scripts/mygroup.pig
  134. +4 −0 testing/squeal/scripts/proj.pig
  135. +12 −0 testing/squeal/scripts/simple.pig
  136. +3 −0 testing/squeal/scripts/split.pig
  137. +6 −0 testing/squeal/scripts/url.log
  138. +198 −0 testing/squeal/squeal
  139. +3 −0 testing/squeal/squeal_smoke.in
  140. +5 −0 testing/squeal/test_driver.skip
  141. +9 −0 testing/streaming/hdfs2v_mapper.py
  142. +11 −0 testing/streaming/hdfs2vertica
  143. +4 −0 testing/streaming/mapper.py
  144. +8 −0 testing/streaming/reducer.py
  145. +13 −0 testing/streaming/vertica2vertica
Sorry, we could not display the entire diff because it was too big.
View
27 LICENSE.txt
@@ -0,0 +1,27 @@
+Portions of this software Copyright (c) 2011-2012 by Vertica, an HP
+Company. All rights reserved.
+
+Redistribution and use in source and binary forms, with or without
+modification, are permitted provided that the following conditions are
+met:
+
+- Redistributions of source code must retain the above copyright
+ notice, this list of conditions and the following disclaimer.
+
+- Redistributions in binary form must reproduce the above copyright
+ notice, this list of conditions and the following disclaimer in the
+ documentation and/or other materials provided with the distribution.
+
+
+THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
View
27 LICENSE.txt~
@@ -0,0 +1,27 @@
+Portions of this software Copyright (c) 2011 by Vertica, an HP
+Company. All rights reserved.
+
+Redistribution and use in source and binary forms, with or without
+modification, are permitted provided that the following conditions are
+met:
+
+- Redistributions of source code must retain the above copyright
+ notice, this list of conditions and the following disclaimer.
+
+- Redistributions in binary form must reproduce the above copyright
+ notice, this list of conditions and the following disclaimer in the
+ documentation and/or other materials provided with the distribution.
+
+
+THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
View
4 README.md
@@ -1,4 +0,0 @@
-Vertica-Hadoop-Connector
-========================
-
-Vertica Hadoop Connector
View
32 README.txt
@@ -0,0 +1,32 @@
+****************************
+* Vertica Analytic Database
+*
+* Apache Hadoop & Pig Connector
+*
+* Copyright 2012 Vertica Systems, an HP company 2012
+****************************
+
+This directory contains jars, examples, doc & source code for
+Apache Hadoop & Pig connectors to the Vertica Analytic Database.
+
+IMPORTANT: If you wish to contribute anything to this repository, in
+order for us to accept your pull request you MUST sign and send a copy
+of the appropriate Contributor License Agreement to Vertica
+(github-owners@vertica.com):
+
+license/PersonalCLA.pdf: If you are contributing for yourself
+license/CorporateCLA.pdf: If you are contributing on behalf of your company
+
+Directory structure:
+ ./data Data files to be used with the examples.
+ ./doc Connector Documentation (generated with Javadocs)
+ ./src Source code
+ hadoop-vertica.jar Hadoop to Vertica Connector
+ pig-vertica.jar Pig to Vertica Connector
+ hadoop-vertica-example.jar Example for using the Hadoop Connector
+
+TODO:
+ helpful commands for compiling the code.
+ helpful commands for running examples.
+ Examples for pig connector.
+ Examples for Hadoop streaming.
View
19 VERTICA-DEVELOPER-README
@@ -0,0 +1,19 @@
+The current directory structure is chosen so that all
+Vertica testing environment specific stuff is in:
+1. "." or the hadoop root directory
+2. "testing" everything required to run a test in SF env.
+
+"src" directory contains all the source java files.
+There is ONE directory per JAR file (as suggested by JAVA standards).
+1. hadoop-connector
+2. hadoop-example
+3. pig-connector
+4. squeal
+
+Within each of these directories there can be:
+1. src - the actual src code.
+2. test - unit tests if any.
+3. other supporting stuff as needed.
+
+These directories should not contain any Vertica Testing environment
+specific stuff. Hopefully someday this will be in a public repository.
View
118 customize_config.pl
@@ -0,0 +1,118 @@
+############################
+# Usage: binary
+# path to config directory
+# JAVA_HOME
+# port # specify ports starting from this port.
+# test directory
+# number of nodes
+# path to vertica JDBC jar
+############################
+
+$configdir = $ARGV[0];
+$java = $ARGV[1];
+$port = $ARGV[2];
+$testdir = $ARGV[3];
+$numslaves = $ARGV[4];
+$verticajar = $ARGV[5];
+$hadoop_vertica_jars = $ARGV[6];
+$hname = `hostname`;
+chomp $hname;
+
+open (HADOOP_ENV, "<$configdir/hadoop-env.sh") or die "Could not open $configdir/hadoop-env.sh";
+@henv = <HADOOP_ENV>;
+close(HADOOP_ENV);
+
+for ($count = 0; $count <= $#henv; $count++) {
+ if ($henv[$count] =~ /export JAVA_HOME/) {
+ $henv[$count] = "export JAVA_HOME=$java\n";
+ }
+ if ($henv[$count] =~ /export HADOOP_LOG_DIR/) {
+ $henv[$count] = "export HADOOP_LOG_DIR=$testdir/logs\n";
+ }
+ if ($henv[$count] =~ /export HADOOP_PID_DIR/) {
+ $henv[$count] = "export HADOOP_PID_DIR=$testdir/\n";
+ }
+ if ($henv[$count] =~ /export HADOOP_CLASSPATH/) {
+ $henv[$count] = "export HADOOP_CLASSPATH=$verticajar:$hadoop_vertica_jars\n";
+ }
+ if ($henv[$count] =~ /export HADOOP_TASKTRACKER_OPTS/) {
+ $henv[$count] = "export HADOOP_TASKTRACKET_OPT='-classpath $hadoop_vertica_jars'\n";
+ }
+}
+
+open (HADOOP_ENV, ">$configdir/hadoop-env.sh") or die "Could not open $configdir/hadoop-env.sh";
+print HADOOP_ENV join("", @henv);
+close(HADOOP_ENV);
+
+open (LOGPROPS, "<$configdir/log4j.properties") or die "Could not open $configdir/log4j.properties";
+@logprops = <LOGPROPS>;
+close(LOGPROPS);
+
+@newlogprops = ();
+for ($count = 0; $count <= $#logprops; $count++) {
+ if ($logprops[$count] =~ /^hadoop.log.dir/) {
+ push(@newlogprops, "hadoop.log.dir=$testdir" . "/logs" . "\n");
+ } elsif ($logprops[$count] =~ /Custom Logging levels/) {
+ push(@newlogprops, $logprops[$count]);
+ push(@newlogprops, "log4j.logger.com.vertica.hadoop=INFO" . "\n");
+ } else {
+ push(@newlogprops, $logprops[$count]);
+ }
+}
+
+open (LOGPROPS, ">$configdir/log4j.properties") or die "Could not open $configdir/log4j.properties";
+print LOGPROPS join("", @newlogprops);
+close(LOGPROPS);
+
+open (CORESITE, ">$configdir/core-site.xml") or die "Could not open $configdir/core-site.xml";
+print CORESITE <<CORESITEEOF;
+<configuration>
+ <property>
+ <name>fs.default.name</name>
+ <value>hdfs://$hname:$port</value>
+ </property>
+</configuration>
+CORESITEEOF
+
+close(CORESITE);
+$port++;
+
+open (HDFSSITE, ">$configdir/hdfs-site.xml") or die "Could not open $configdir/hdfs-site.xml";
+print HDFSSITE <<HDFSSITEEOF;
+<configuration>
+ <property>
+ <name>hadoop.tmp.dir</name>
+ <value>$testdir</value>
+ </property>
+ <property>
+ <name>dfs.replication</name>
+ <value>1</value>
+ </property>
+</configuration>
+HDFSSITEEOF
+
+close(HDFSSITE);
+
+open (MAPREDSITE, ">$configdir/mapred-site.xml") or die "Could not open $configdir/mapred-site.xml";
+print MAPREDSITE <<MAPREDSITEEOF;
+<configuration>
+ <property>
+ <name>mapred.job.tracker</name>
+ <value>$hname:$port</value>
+ </property>
+</configuration>
+MAPREDSITEEOF
+
+close(MAPREDSITE);
+
+open (SLAVES, ">$configdir/slaves") or die "Could not open $configdir/slaves";
+for ($count = 0; $count < $numslaves; $count++) {
+ print SLAVES "$hname\n";
+}
+close(SLAVES);
+
+open (MASTER, ">$configdir/masters") or die "Could not open $configdir/masters";
+print MASTER "$hname\n";
+close(MASTER);
+
+exit(0);
View
BIN license/CorporateCLA.pdf
Binary file not shown.
View
BIN license/PersonalCLA.pdf
Binary file not shown.
View
130 makefile
@@ -0,0 +1,130 @@
+#+++++
+#
+# Description:
+#
+# Build the Vertica Hadoop System:
+#-----
+# $Source$
+###
+# Load the standard make definitions
+###
+ifndef VERTICA_SOURCE
+ export VERTICA_SOURCE := $(shell perl \
+ -e '$$_="$(PWD)"; do {print,exit if -d "$$_/make"} while s:/[^/]*$$::')
+endif
+ifndef JDBC_SOURCE
+ export JDBC_SOURCE := $(VERTICA_SOURCE)/client/JDBC/vjdbc
+endif
+SOURCE := $(VERTICA_SOURCE)
+include $(VERTICA_SOURCE)/make/include.mk
+
+ifndef HADOOP_SOURCE
+ $(echo "HADOOP_SOURCE is required. It should point to a checkout of SVNROOT/binary/Hadoop")
+endif
+
+export HADOOP_DISTRO := $(HADOOP_SOURCE)/$(DISTRO_NAME)
+
+# location for test output
+export PG_TESTOUT ?= $(TARGET)/Test/
+
+export HADOOP_HOME_WARN_SUPPRESS := 1
+export JAR_DIR := $(TARGET)/hadoop
+export HADOOP_CLASSPATH := $(JDBC_SOURCE)/../jars/vertica.jar:$(JAR_DIR)/hadoop-vertica.jar
+
+export HADOOP_START_CMD := hadoop-start
+export HADOOP_STOP_CMD := hadoop-stop
+
+ifdef USE_YARN
+ export HADOOP_START_CMD := yarn-start
+ export HADOOP_STOP_CMD := yarn-stop
+endif
+
+copy_docs: doc
+ ln -s $(JAR_DIR)/doc $(DOC_LOC)
+
+hadoop-src:
+ $(MAKE) -C $(HADOOP_SOURCE) hadoop-src
+
+jar: hadoop-src
+ make -C src $@
+
+doc:
+ make -C src $@
+
+clean:
+ make -C src $@
+ @rm -f $(JAR_DIR)/hadoop-pig-vertica.zip
+
+pkg-src:
+ rsync -az src $(JAR_DIR) --exclude=\*.svn* --exclude=makefile
+ rsync -az testing/data $(JAR_DIR) --exclude=\*.svn*
+ rsync -az README.txt $(JAR_DIR) --exclude=\*.svn*
+
+package: jar doc pkg-src
+ cd $(JAR_DIR); zip -rq hadoop-pig-vertica.zip hadoop-vertica.jar pig-vertica.jar hadoop-vertica-example.jar doc/ src/ data/ README.txt
+
+$(HADOOP_TESTBASE):
+ $(MKDIR) $(HADOOP_TESTBASE)
+
+# This target calls SummarizeTestFailures that looks for difs and core files
+test_checkresults:
+ @perl $(MAKE_DIR)/bin/SummarizeTestFailures.pl $(TARGET) $(PG_TESTOUT)
+
+start_clusters:
+ @$(MAKE) -C $(HADOOP_SOURCE) $(HADOOP_START_CMD) save_hadoop_info
+ @$(MAKE) -C $(VERTICA_SOURCE)/SQLTest test_setup_4node save_vertica_info
+
+stop_clusters:
+ @$(MAKE) -C $(HADOOP_SOURCE) $(HADOOP_STOP_CMD)
+ @$(MAKE) -C $(VERTICA_SOURCE)/SQLTest cluster_shutdown
+
+# pg_regress parameters
+export DRIVER := $(TARGET)/pig_driver.sh
+PIG_LIB := $(PIG_HOME)/lib
+
+export TMPDIR := $(PG_TESTOUT)/tmp
+export DIFF := BSdiff
+
+$(TMPDIR):
+ $(MKDIR) -p $(TMPDIR)
+
+connector_test_only:
+ @$(MAKE) start_clusters
+ @$(MAKE) -C testing connector_test
+ @$(MAKE) stop_clusters
+ @(cd $(VERTICA_SOURCE); $(MAKE) -C SQLTest test_checkresults)
+
+hdfs_test_only:
+ @$(MAKE) start_clusters
+ $(MAKE) -C testing hdfs_test
+ @$(MAKE) stop_clusters
+ @(cd $(VERTICA_SOURCE); $(MAKE) -C SQLTest test_checkresults)
+
+checkin_test:
+ @$(MAKE) start_clusters
+ @$(MAKE) -C testing hdfs_test
+ @$(MAKE) -C testing connector_test
+ @$(MAKE) stop_clusters
+ @(cd $(VERTICA_SOURCE); $(MAKE) -C SQLTest test_checkresults)
+
+cdh3_test_all:
+ DISTRO_NAME=CDH2 $(MAKE) connector_test_only
+
+hw_test_all:
+ DISTRO_NAME=HortonWorks $(MAKE) connector_test_only
+
+apache_test_all:
+ DISTRO_NAME=Apache_10 $(MAKE) clean jar
+ DISTRO_NAME=Apache_10 $(MAKE) checkin_test
+
+cdh4_test_all:
+ DISTRO_NAME=CDH4 $(MAKE) clean jar
+ DISTRO_NAME=CDH4 $(MAKE) checkin_test
+
+yarn_test_all:
+ DISTRO_NAME=CDH4-Yarn $(MAKE) clean jar
+ DISTRO_NAME=CDH4-Yarn USE_YARN=true $(MAKE) hdfs_test_only
+
+test_all_distro: yarn_test_all apache_test_all cdh3_test_all hw_test_all cdh4_test_all
+
+.PHONY: streaming hadoop-example hadoop-connector pig-connector squeal hadoop-src checkin_test checkin_test_all test_checkresults
View
62 src/build.xml
@@ -0,0 +1,62 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project basedir="." default="jar" name="hadoop-vertica">
+ <description>
+ Apache Hadoop/Pig Vertica Project contains
+ source code for the following JARs:
+ * Apache Hadoop to Vertica Connector.
+ * Apache Pig to Vertica Connector.
+ * Example to use the Hadoop/Vertica connector.
+ </description>
+
+ <property environment="env"/>
+ <condition property="ispig07">
+ <contains string="${pig.version}" substring="0.7"/>
+ </condition>
+ <target name="executeRecursively">
+ <ant dir="hadoop-connector" target="${action}"/>
+ <ant dir="hadoop-example" target="${action}"/>
+ <ant dir="pig-connector" target="${action}"/>
+ <antcall target="executeOnSqueal">
+ <param name="action" value="${action}"/>
+ </antcall>
+ </target>
+
+ <target name="executeOnSqueal" if="ispig07">
+ <ant dir="squeal" target="${action}"/>
+ </target>
+
+ <target name="doc"
+ description="Generate Javadocs for ALL modules">
+ <antcall target="executeRecursively">
+ <param name="action" value="doc"/>
+ </antcall>
+ </target>
+
+ <target name="jar"
+ description="Compile JARs for ALL modules">
+ <antcall target="executeRecursively">
+ <param name="action" value="jar"/>
+ </antcall>
+ </target>
+ <target name="clean">
+ <antcall target="executeRecursively">
+ <param name="action" value="clean"/>
+ </antcall>
+ </target>
+
+ <target name="hadoop-connector"
+ description="Compile JAR for Hadoop/Vertica Connector">
+ <ant dir="hadoop-connector" target="jar"/>
+ </target>
+ <target name="hadoop-example" depends="hadoop-connector"
+ description="Compile JAR for an example that uses Hadoop/Vertica Connector">
+ <ant dir="hadoop-example" target="jar"/>
+ </target>
+ <target name="pig-connector"
+ description="Compile JAR for Pig/Vertica Connector">
+ <ant dir="pig-connector" target="jar"/>
+ </target>
+ <target name="squeal" if="ispig07">
+ <ant dir="squeal" target="jar"/>
+ </target>
+</project>
View
133 src/hadoop-connector/build.xml
@@ -0,0 +1,133 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project basedir="." default="jar" name="hadoop-vertica">
+ <property environment="env"/>
+ <!--Required parameters. You have to specify these on the ANT command line -->
+ <!--JUNIT jars should be in the lib path. For e.g. specify using -lib -->
+ <property name="vertica.jar" value = "" />
+ <property name="hadoop.dir" value="" />
+ <property name="doc.dir" value="" />
+ <property name="dist" value=""/>
+
+ <property name="jar.name" value="hadoop-vertica" />
+ <property name="debuglevel" value="source,lines,vars"/>
+ <property name="javac.source" value="1.6" />
+ <property name="javac.target" value="1.6" />
+ <property name="version" value="1.4.1" />
+
+ <property name="src.dir" location="${basedir}" />
+ <property name="test.dir" location="${basedir}/test" />
+
+ <property name="build.dir" location="${basedir}/build" />
+ <property name="build.classes" location="${build.dir}/hadoop-connector/classes"/>
+ <property name="build.test.classes" location="${build.dir}/hadoop-connector/tests" />
+ <property name="junit.output.dir" value="${build.dir}/hadoop-connector/junit"/>
+
+ <!-- Lesson: You can add location of jar files here -->
+ <path id="compile.classpath">
+ <pathelement location="${vertica.jar}"/>
+ <pathelement location="${build.classes}"/>
+ <fileset dir="${hadoop.dir}">
+ <include name="**/*.jar"/>
+ <exclude name="junit-3.8.1.jar"/>
+ </fileset>
+ </path>
+
+ <!-- Classpath for unit tests (superset of compile.classpath) -->
+ <path id="test.classpath">
+ <fileset dir="${hadoop.dir}">
+ <include name="*.jar"/>
+ <exclude name="junit-3.8.1.jar"/>
+ </fileset>
+ <pathelement location="${dist}/${jar.name}.jar" />
+ <pathelement location="${build.test.classes}" />
+ </path>
+
+ <target name="init">
+ <available property="junit" classname="junit.framework.Test"/>
+ <available property="junit.task" classname="org.apache.tools.ant.taskdefs.optional.junit.JUnitTask"/>
+
+ <condition property="notJava6">
+ <not><equals arg1="${java.specification.version}" arg2="1.6"/></not>
+ </condition>
+
+ <fail if="notJava6" message="JDK 1.6 is required." />
+ </target>
+
+ <target name="clean">
+ <delete dir="${build.dir}/hadoop-connector"/>
+ <delete file="${dist}/${jar.name}.jar"/>
+ <delete dir="${doc.dir}/hadoop-connector"/>
+ </target>
+
+ <target depends="clean" name="cleanall"/>
+
+ <target depends="init" name="compile">
+ <mkdir dir="${build.classes}"/>
+ <echo message="${ant.project.name}: ${ant.file}"/>
+ <javac debug="true" debuglevel="${debuglevel}" destdir="${build.classes}"
+ source="${javac.source}" target="${javac.target}">
+ <src path="${src.dir}"/>
+ <classpath refid="compile.classpath"/>
+ </javac>
+ </target>
+
+ <target name="compile-test"
+ depends="jar"
+ description="Compile test classes">
+ <mkdir dir="${build.test.classes}" />
+ <javac srcdir="${test.dir}" includes="**/*.java" destdir="${build.test.classes}"
+ debug="${javac.debug}">
+ <classpath refid="test.classpath"/>
+ </javac>
+ </target>
+
+ <target name="doc">
+ <javadoc destdir="${doc.dir}/hadoop-connector" breakiterator="yes">
+ <classpath refid="compile.classpath"/>
+ <fileset dir="${src.dir}">
+ <include name="**/*.java"/>
+ </fileset>
+ </javadoc>
+ </target>
+
+ <target name="jar" depends="compile" description="Create main jar">
+ <mkdir dir="${dist}"/>
+ <tstamp>
+ <format property="builtat" pattern="yyyy-MM-dd HH:mm:ss Z" timezone="America/New_York"/>
+ </tstamp>
+ <exec executable="svnversion" outputproperty="svnversion"/>
+ <exec executable="uname" outputproperty="buildsystem"><arg value="-a"/></exec>
+ <jar jarfile="${dist}/${jar.name}.jar" basedir="${build.classes}">
+ <manifest>
+ <attribute name="Built-By" value="${user.name}"/>
+ <attribute name="Implementation-Vendor" value="Vertica Systems, Inc."/>
+ <attribute name="Implementation-Title" value="Apache Hadoop - Vertica Connector"/>
+ <attribute name="Implementation-Version" value="${version}"/>
+ <attribute name="SVN-Revision" value="${svnversion}"/>
+ <attribute name="Build-System" value="${buildsystem}"/>
+ <attribute name="Build-Time" value="${builtat}"/>
+ </manifest>
+ </jar>
+ </target>
+
+ <target name="run-test" depends="jar, compile-test">
+ <mkdir dir="${junit.output.dir}"/>
+ <junit fork="yes" printsummary="withOutAndErr" errorProperty="test.failed" failureProperty="test.failed"
+ haltonfailure="no">
+ <syspropertyset>
+ <propertyref name="mapred.vertica.hostnames"/>
+ <propertyref name="mapred.vertica.port"/>
+ <propertyref name="mapred.vertica.database"/>
+ <propertyref name="mapred.vertica.username"/>
+ <propertyref name="mapred.vertica.password"/>
+ </syspropertyset>
+ <formatter type="xml"/>
+ <batchtest todir="${junit.output.dir}">
+ <fileset dir="${test.dir}"
+ includes="**/Test*.java" excludes="**/${test.exclude}.java" />
+ </batchtest>
+ <classpath refid="test.classpath"/>
+ </junit>
+ <fail if="test.failed">Tests Failed!</fail>
+ </target>
+</project>
View
61 src/hadoop-connector/com/vertica/hadoop/Relation.java
@@ -0,0 +1,61 @@
+package com.vertica.hadoop;
+
+public class Relation {
+ private String table = null;
+ private String schema = null;
+ private String database = null;
+ private boolean defSchema = false;
+
+ public Relation(String name) {
+ if (name == null) return;
+
+ String[] splut = name.split("\\.");
+
+ if (splut.length == 3) {
+ database = splut[0];
+ schema = splut[1];
+ table = splut[2];
+ } else if (splut.length == 2) {
+ schema = splut[0];
+ table = splut[1];
+ } else if (splut.length == 1) {
+ defSchema = true;
+ schema = "public";
+ table = splut[0];
+ }
+ }
+
+ public boolean isNull() {
+ return table == null;
+ }
+
+ public boolean isDefaultSchema() {
+ return defSchema;
+ }
+
+ public String getDatabase() {
+ return database;
+ }
+
+ public String getSchema() {
+ return schema;
+ }
+
+ public String getTable() {
+ return table;
+ }
+
+ public StringBuilder getQualifiedName() {
+ StringBuilder sb = new StringBuilder();
+ if (database != null) {
+ sb.append(database);
+ sb.append('.');
+ }
+
+ sb.append(schema);
+ sb.append('.');
+ sb.append(table);
+
+ return sb;
+ }
+}
View
561 src/hadoop-connector/com/vertica/hadoop/VerticaConfiguration.java
@@ -0,0 +1,561 @@
+
+package com.vertica.hadoop;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Random;
+import java.util.Arrays;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.util.StringUtils;
+
+/**
+ * A container for configuration property names for jobs with Vertica
+ * input/output.
+ *
+ * The job can be configured using the static methods in this class,
+ * {@link VerticaInputFormat}, and {@link VerticaOutputFormat}. Alternatively,
+ * the properties can be set in the configuration with proper values.
+ *
+ * @see VerticaConfiguration#configureVertica(Configuration, String[], String,
+ * String, String, String username, String password)
+ * @see VerticaConfiguration#configureVertica(Configuration, String[], String,
+ * String, String, String, String, String[], String, String, String, String, String)
+ * @see VerticaInputFormat#setInput(Job, String)
+ * @see VerticaInputFormat#setInput(Job, String, Collection<List<Object>>)
+ * @see VerticaInputFormat#setInput(Job, String, String)
+ * @see VerticaInputFormat#setInput(Job, String, String...)
+ * @see VerticaOutputFormat#setOutput(Job, String)
+ * @see VerticaOutputFormat#setOutput(Job, String, Collection<VerticaTable>)
+ * @see VerticaOutputFormat#setOutput(Job, String, boolean)
+ * @see VerticaOutputFormat#setOutput(Job, String, boolean, String...)
+ */
+public class VerticaConfiguration {
+ /** JDBC debug logging level default */
+ public static final String DEBUG_PROP_DEFAULT = "0";
+
+ /** Class name for Vertica JDBC Driver */
+ public static final String VERTICA_DRIVER_CLASS = "com.vertica.jdbc.Driver";
+ public static final String VERTICA_DRIVER_CLASS_41 = "com.vertica.Driver";
+
+ /** Host names to connect to, selected from at random */
+ public static final String HOSTNAMES_PROP = "mapred.vertica.hostnames";
+
+ /** Name of database to connect to */
+ public static final String DATABASE_PROP = "mapred.vertica.database";
+
+ /** User name for Vertica */
+ public static final String USERNAME_PROP = "mapred.vertica.username";
+
+ /** Password for Vertica */
+ public static final String PASSWORD_PROP = "mapred.vertica.password";
+
+ /** Port for Vertica */
+ public static final String PORT_PROP = "mapred.vertica.port";
+
+ /** JDBC debug logging level */
+ public static final String DEBUG_PROP = "mapred.vertica.debug";
+
+ /**Batch Size for JDBC batch size */
+ public static final String BATCH_SIZE_PROP = "mapred.vertica.batchsize";
+
+ /** Host names to connect to, selected from at random */
+ public static final String OUTPUT_HOSTNAMES_PROP = "mapred.vertica.hostnames.output";
+
+ /** Name of database to connect to */
+ public static final String OUTPUT_DATABASE_PROP = "mapred.vertica.database.output";
+
+ /** User name for Vertica */
+ public static final String OUTPUT_USERNAME_PROP = "mapred.vertica.username.output";
+
+ /** Password for Vertica */
+ public static final String OUTPUT_PASSWORD_PROP = "mapred.vertica.password.output";
+
+ /** Password for Vertica */
+ public static final String OUTPUT_PORT_PROP = "mapred.vertica.port.output";
+
+ /** Query to run for input */
+ public static final String QUERY_PROP = "mapred.vertica.input.query";
+
+ /** Query to run to retrieve parameters */
+ public static final String QUERY_PARAM_PROP = "mapred.vertica.input.query.paramquery";
+
+ /** Static parameters for query */
+ public static final String QUERY_PARAMS_PROP = "mapred.vertica.input.query.params";
+
+ /** Optional input delimiter for streaming */
+ public static final String INPUT_DELIMITER_PROP = "mapred.vertica.input.delimiter";
+
+ /** Optional input terminator for streaming */
+ public static final String INPUT_TERMINATOR_PROP = "mapred.vertica.input.terminator";
+
+ /** Output table name */
+ public static final String OUTPUT_TABLE_NAME_PROP = "mapred.vertica.output.table.name";
+
+ /** Definition of output table types */
+ public static final String OUTPUT_TABLE_DEF_PROP = "mapred.vertica.output.table.def";
+
+ /** Whether to drop tables */
+ public static final String OUTPUT_TABLE_DROP = "mapred.vertica.output.table.drop";
+
+ /** Optional output format delimiter */
+ public static final String OUTPUT_DELIMITER_PROP = "mapred.vertica.output.delimiter";
+
+ /** Optional output format terminator */
+ public static final String OUTPUT_TERMINATOR_PROP = "mapred.vertica.output.terminator";
+
+ /**
+ * Override the sleep timer for optimize to poll when new projections have
+ * refreshed
+ */
+ public static final String OPTIMIZE_POLL_TIMER_PROP = "mapred.vertica.optimize.poll";
+
+ /**
+ * Property for speculative execution of MAP tasks
+ */
+ public static final String MAP_SPECULATIVE_EXEC = "mapred.map.tasks.speculative.execution";
+
+ /**
+ * Property for speculative execution of MAP tasks
+ */
+ public static final String REDUCE_SPECULATIVE_EXEC = "mapred.reduce.tasks.speculative.execution";
+
+ /**
+ * Sets the Vertica database connection information in the (@link
+ * Configuration)
+ *
+ * @param conf
+ * the configuration
+ * @param hostnames
+ * one or more hosts in the Vertica cluster
+ * @param database
+ * the name of the Vertica database
+ * @param username
+ * Vertica database username
+ * @param password
+ * Vertica database password
+ * @param port
+ * Vertica database port
+ */
+ public static void configureVertica(Configuration conf, String[] hostnames,
+ String database, String port, String username, String password) {
+ conf.setBoolean(MAP_SPECULATIVE_EXEC, false);
+ conf.setBoolean(REDUCE_SPECULATIVE_EXEC, false);
+
+ conf.setStrings(HOSTNAMES_PROP, hostnames);
+ conf.set(DATABASE_PROP, database);
+ conf.set(USERNAME_PROP, username);
+ conf.set(PASSWORD_PROP, password);
+ conf.set(PORT_PROP, port);
+ }
+
+ /**
+ * Sets the Vertica database connection information in the (@link
+ * Configuration)
+ *
+ * @param conf
+ * the configuration
+ * @param hostnames
+ * one or more hosts in the Vertica cluster
+ * @param database
+ * the name of the Vertica database
+ * @param username
+ * Vertica database username
+ * @param password
+ * Vertica database password
+ * @param port
+ * Vertica database port
+ * @param debug
+ * JDBC debug logging level
+ *
+ */
+ public static void configureVertica(Configuration conf, String[] hostnames,
+ String database, String port, String username, String password, String debug) {
+ configureVertica(conf, hostnames, database, port, username, password, debug);
+ conf.set(DEBUG_PROP, debug);
+ }
+
+ /**
+ * Sets the Vertica database connection information in the (@link
+ * Configuration)
+ *
+ * @param conf
+ * the configuration
+ * @param hostnames
+ * one or more hosts in the source Cluster
+ * @param database
+ * the name of the source Vertica database
+ * @param username
+ * for the source Vertica database
+ * @param password
+ * for the source Vertica database
+ * @param port
+ * for the source Vertica database
+ * @param debug
+ * JDBC debug logging level
+ * @param output_hostnames
+ * one or more hosts in the output Cluster
+ * @param output_database
+ * the name of the output VerticaDatabase
+ * @param output_username
+ * for the target Vertica database
+ * @param output_password
+ * for the target Vertica database
+ * @param output_port
+ * for the target Vertica database
+ */
+ public static void configureVertica(Configuration conf, String[] hostnames,
+ String database, String port, String username, String password, String debug,
+ String[] output_hostnames, String output_database, String output_port,
+ String output_username, String output_password) {
+ configureVertica(conf, hostnames, database, port, username, password, debug);
+ conf.setStrings(OUTPUT_HOSTNAMES_PROP, output_hostnames);
+ conf.set(OUTPUT_DATABASE_PROP, output_database);
+ conf.set(OUTPUT_PORT_PROP, output_port);
+ conf.set(OUTPUT_USERNAME_PROP, output_username);
+ conf.set(OUTPUT_PASSWORD_PROP, output_password);
+ }
+
+ private Configuration conf;
+
+ // default record terminator for writing output to Vertica
+ public static final String RECORD_TERMINATOR = "\u0008";
+
+ // default delimiter for writing output to Vertica
+ public static final String DELIMITER = "\u0007";
+
+ // defulat optimize poll timeout
+ public static final int OPTIMIZE_POLL_TIMER = 1000;
+
+ public static final int defaultBatchSize = 10000;
+
+ public VerticaConfiguration(Configuration conf) {
+ this.conf = conf;
+ }
+
+ public Configuration getConfiguration() {
+ return conf;
+ }
+
+ /**
+ * Returns a connection to a random host in the Vertica cluster
+ *
+ * @param output
+ * true if the connection is for writing
+ * @throws IOException
+ * @throws ClassNotFoundException
+ * @throws SQLException
+ */
+ public Connection getConnection(boolean output) throws IOException,
+ ClassNotFoundException, SQLException {
+ try {
+ Class.forName(VERTICA_DRIVER_CLASS);
+ } catch (ClassNotFoundException e) {
+ try {
+ Class.forName(VERTICA_DRIVER_CLASS_41);
+ } catch (ClassNotFoundException e2) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ String[] hosts = conf.getStrings(HOSTNAMES_PROP);
+ String user = conf.get(USERNAME_PROP);
+ String pass = conf.get(PASSWORD_PROP);
+ String database = conf.get(DATABASE_PROP);
+ String port = conf.get(PORT_PROP);
+ String debug = conf.get(DEBUG_PROP);
+
+
+ if((debug == null) || (debug == "")){
+ debug = DEBUG_PROP_DEFAULT;
+ }
+
+ if (output) {
+ hosts = conf.getStrings(OUTPUT_HOSTNAMES_PROP, hosts);
+ user = conf.get(OUTPUT_USERNAME_PROP, user);
+ pass = conf.get(OUTPUT_PASSWORD_PROP, pass);
+ database = conf.get(OUTPUT_DATABASE_PROP, database);
+ port = conf.get(OUTPUT_PORT_PROP, port);
+ }
+
+ if (hosts == null)
+ throw new IOException("Vertica requies a hostname defined by "
+ + HOSTNAMES_PROP);
+ if (hosts.length == 0)
+ throw new IOException("Vertica requies a hostname defined by "
+ + HOSTNAMES_PROP);
+ if (database == null)
+ throw new IOException("Vertica requies a database name defined by "
+ + DATABASE_PROP);
+
+ Random r = new Random();
+ if (user == null)
+ throw new IOException("Vertica requires a username defined by "
+ + USERNAME_PROP);
+
+ if (port == null)
+ throw new IOException("Vertica requires a port defined by "
+ + PORT_PROP);
+ return DriverManager.getConnection("jdbc:vertica://"
+ + hosts[r.nextInt(hosts.length)] + ":" + port + "/" + database,
+ user, pass);
+ }
+
+ public String getInputQuery() {
+ return conf.get(QUERY_PROP);
+ }
+
+ /**
+ * get Run this query and give the results to mappers.
+ *
+ * @param inputQuery
+ */
+ public void setInputQuery(String inputQuery) {
+ inputQuery = inputQuery.trim();
+ if (inputQuery.endsWith(";")) {
+ inputQuery = inputQuery.substring(0, inputQuery.length() - 1);
+ }
+ conf.set(QUERY_PROP, inputQuery);
+ }
+
+ /**
+ * Return the query used to retrieve parameters for the input query (if set)
+ *
+ * @return Returns the query for input parameters
+ */
+ public String getParamsQuery() {
+ return conf.get(QUERY_PARAM_PROP);
+ }
+
+ /**
+ * Query used to retrieve parameters for the input query. The result set must
+ * match the input query parameters preceisely.
+ *
+ * @param segment_params_query
+ */
+ public void setParamsQuery(String segment_params_query) {
+ conf.set(QUERY_PARAM_PROP, segment_params_query);
+ }
+
+ /**
+ * Return static input parameters if set
+ *
+ * @return Collection of list of objects representing input parameters
+ * @throws IOException
+ */
+ public Collection<List<Object>> getInputParameters() throws IOException {
+ Collection<List<Object>> values = null;
+ String[] query_params = conf.getStrings(QUERY_PARAMS_PROP);
+ if (query_params != null) {
+ values = new ArrayList<List<Object>>();
+ for (String str_params : query_params) {
+ DataInputBuffer in = new DataInputBuffer();
+ in.reset(StringUtils.hexStringToByte(str_params), str_params.length());
+ int sz = in.readInt();
+ ArrayList<Object> params = new ArrayList<Object>();
+ for (int count = 0; count < sz; count++) {
+ int type = in.readInt();
+ params.add(VerticaRecord.readField(type, in));
+ }
+ values.add(params);
+ }
+ }
+ return values;
+ }
+
+ /**
+ * Sets a collection of lists. Each list is passed to an input split and used
+ * as arguments to the input query.
+ *
+ * @param segmentParams
+ * @throws IOException
+ */
+ public void setInputParams(Collection<List<Object>> segment_params)
+ throws IOException
+ {
+ String[] values = new String[segment_params.size()];
+ int i = 0;
+ for (List<Object> params : segment_params) {
+ DataOutputBuffer out = new DataOutputBuffer();
+ out.writeInt(params.size());
+ for (Object obj : params)
+ {
+ int type = VerticaRecord.getType(obj);
+ out.writeInt(type);
+ VerticaRecord.write(obj, type, out);
+ }
+ values[i++] = StringUtils.byteToHexString(out.getData());
+ }
+ conf.setStrings(QUERY_PARAMS_PROP, values);
+ }
+
+ /**
+ * For streaming return the delimiter to separate values to the mapper
+ *
+ * @return Returns delimiter used to format streaming input data
+ */
+ public String getInputDelimiter() {
+ return conf.get(INPUT_DELIMITER_PROP, DELIMITER);
+ }
+
+ /**
+ * @deprecated As of release 1.5, this function is not called from the Java API.
+ * For streaming set the delimiter to separate values to the mapper
+ */
+ @Deprecated
+ public void setInputDelimiter(String delimiter) {
+ conf.set(INPUT_DELIMITER_PROP, delimiter);
+ }
+
+ /**
+ * For streaming return the record terminator to separate values to the mapper
+ *
+ * @return Returns recorder terminator for input data
+ */
+ public String getInputRecordTerminator() {
+ return conf.get(INPUT_TERMINATOR_PROP, RECORD_TERMINATOR);
+ }
+
+ /**
+ * @deprecated As of release 1.5, this function is not called from the Java API.
+ * For streaming set the record terminator to separate values to the mapper
+ */
+ @Deprecated
+ public void setInputRecordTerminator(String terminator) {
+ conf.set(INPUT_TERMINATOR_PROP, terminator);
+ }
+
+ /**
+ * Get the table that is the target of output
+ *
+ * @return Returns table name for output
+ */
+ public String getOutputTableName() {
+ return conf.get(OUTPUT_TABLE_NAME_PROP);
+ }
+
+ /**
+ * Set table that is being loaded as output
+ *
+ * @param tableName
+ */
+ public void setOutputTableName(String tableName) {
+ conf.set(OUTPUT_TABLE_NAME_PROP, tableName);
+ }
+
+ /**
+ * Return definition of columns for output table
+ *
+ * @return Returns table definition for output table
+ */
+ public String[] getOutputTableDef() {
+ return conf.getStrings(OUTPUT_TABLE_DEF_PROP);
+ }
+
+ /**
+ * Set the definition of a table for output if it needs to be created
+ *
+ * @param fieldNames
+ */
+ public void setOutputTableDef(String... args) {
+ if(args == null || args.length == 0 || Arrays.asList(args).contains(null)) return;
+ conf.setStrings(OUTPUT_TABLE_DEF_PROP, args);
+ }
+
+ /**
+ * Return the batch size for batch insert
+ *
+ * @return Returns the batch size for batch insert
+ */
+ public long getBatchSize() {
+ return conf.getLong(BATCH_SIZE_PROP, defaultBatchSize);
+ }
+
+ /**
+ * Return whether output table is truncated before loading
+ *
+ * @return Returns true if output table should be dropped before loading
+ */
+ public boolean getDropTable() {
+ return conf.getBoolean(OUTPUT_TABLE_DROP, false);
+ }
+
+ /**
+ * Set whether to truncate the output table before loading
+ *
+ * @param drop_table
+ */
+ public void setDropTable(boolean drop_table) {
+ conf.setBoolean(OUTPUT_TABLE_DROP, drop_table);
+ }
+
+ /**
+ * For streaming return the delimiter used by the reducer
+ *
+ * @return Returns delimiter to use for output data
+ */
+ public String getOutputDelimiter() {
+ return conf.get(OUTPUT_DELIMITER_PROP, DELIMITER);
+ }
+
+ /**
+ * @deprecated As of release 1.5, this function is not called from the Java API.
+ * For streaming set the delimiter used by the reducer
+ *
+ * @param delimiter
+ */
+ @Deprecated
+ public void setOutputDelimiter(String delimiter) {
+ conf.set(OUTPUT_DELIMITER_PROP, delimiter);
+ }
+
+ /**
+ * For streaming return the record terminator used by the reducer
+ *
+ * @return Returns the record terminator for output data
+ */
+ public String getOutputRecordTerminator() {
+ return conf.get(OUTPUT_TERMINATOR_PROP, RECORD_TERMINATOR);
+ }
+
+ /**
+ * @deprecated As of release 1.5, this function is not called from the Java API.
+ * For streaming set the record terminator used by the reducer
+ *
+ * @param terminator
+ */
+ @Deprecated
+ public void setOutputRecordTerminator(String terminator) {
+ conf.set(OUTPUT_TERMINATOR_PROP, terminator);
+ }
+
+ /**
+ * @deprecated As of release 1.5, this function is not called from the Java API.
+ * Returns poll timer for optimize loop
+ *
+ * @return Returns poll timer for optimize loop
+ */
+ @Deprecated
+ public Long getOptimizePollTimeout() {
+ return conf.getLong(OPTIMIZE_POLL_TIMER_PROP, OPTIMIZE_POLL_TIMER);
+ }
+
+ /**
+ * @deprecated As of release 1.5, this function is not called from the Java API.
+ * Set the timour for the optimize poll loop
+ *
+ * @param timeout
+ */
+ @Deprecated
+ public void setOptimizePollTimeout(Long timeout) {
+ conf.setLong(OPTIMIZE_POLL_TIMER_PROP, timeout);
+ }
+}
View
293 src/hadoop-connector/com/vertica/hadoop/VerticaInputFormat.java
@@ -0,0 +1,293 @@
+package com.vertica.hadoop;
+
+import java.io.IOException;
+import java.text.DateFormat;
+import java.text.ParseException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+
+import java.sql.Connection;
+import java.sql.DatabaseMetaData;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.sql.Statement;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+/**
+ * Input formatter that returns the results of a query executed against Vertica.
+ * The key is a record number within the result set of each mapper The value is
+ * a VerticaRecord, which uses a similar interface to JDBC ResultSets for
+ * returning values.
+ *
+ */
+public class VerticaInputFormat extends InputFormat<LongWritable, VerticaRecord> {
+ private static final Log LOG = LogFactory.getLog("com.vertica.hadoop");
+ private String inputQuery = null;
+ private String params = null;
+
+ public VerticaInputFormat() {}
+ /**
+ * Set a parameterized input query for a job and the query that returns the
+ * parameters.
+ *
+ * @param query
+ * SQL query that has parameters specified by question marks ("?")
+ * @param params
+ * SQL query that returns parameters for the input query or
+ * the parameters to substiture
+ */
+
+ public VerticaInputFormat(String query, String params) {
+ inputQuery = query;
+ this.params = params;
+ }
+
+ /**
+ * Set the input query for a job
+ *
+ * @param job
+ * @param inputQuery
+ * query to run against Vertica
+ */
+ public static void setInput(Job job, String inputQuery) {
+ job.setInputFormatClass(VerticaInputFormat.class);
+ VerticaConfiguration config =
+ new VerticaConfiguration(job.getConfiguration());
+ config.setInputQuery(inputQuery);
+ }
+
+ /**
+ * Set a parameterized input query for a job and the query that returns the
+ * parameters.
+ *
+ * @param job
+ * @param inputQuery
+ * SQL query that has parameters specified by question marks ("?")
+ * @param segmentParamsQuery
+ * SQL query that returns parameters for the input query
+ */
+
+ public static void setInput(Job job, String inputQuery,
+ String segmentParamsQuery) {
+ job.setInputFormatClass(VerticaInputFormat.class);
+ VerticaConfiguration config =
+ new VerticaConfiguration(job.getConfiguration());
+ config.setInputQuery(inputQuery);
+ config.setParamsQuery(segmentParamsQuery);
+ }
+
+ /**
+ * Set the input query and any number of comma delimited literal list of
+ * parameters
+ *
+ * @param job
+ * @param inputQuery
+ * SQL query that has parameters specified by question marks ("?")
+ * @param segmentParams
+ * any numer of comma delimited strings with literal parameters to
+ * substitute in the input query
+ */
+
+ @SuppressWarnings("serial")
+ public static void setInput(Job job, String inputQuery,
+ String... segmentParams) throws IOException {
+ // transform each param set into array
+ DateFormat datefmt = DateFormat.getDateInstance();
+ Collection<List<Object>> params = new HashSet<List<Object>>() {};
+ for (String strParams : segmentParams) {
+ List<Object> param = new ArrayList<Object>();
+
+ for (String strParam : strParams.split(",")) {
+ strParam = strParam.trim();
+ if (strParam.charAt(0) == '\''
+ && strParam.charAt(strParam.length() - 1) == '\'')
+ param.add(strParam.substring(1, strParam.length() - 1));
+ else {
+ try {
+ param.add(datefmt.parse(strParam));
+ } catch (ParseException e1) {
+ try {
+ param.add(Integer.parseInt(strParam));
+ } catch (NumberFormatException e2) {
+ throw new IOException("Error parsing argument " + strParam);
+ }
+ }
+ }
+ }
+
+ params.add(param);
+ }
+
+ setInput(job, inputQuery, params);
+ }
+
+ /**
+ * Set the input query and a collection of parameter lists
+ *
+ * @param job
+ * @param inpuQuery
+ * SQL query that has parameters specified by question marks ("?")
+ * @param segmentParams
+ * collection of ordered lists to subtitute into the input query
+ * @throws IOException
+ */
+
+ public static void setInput(Job job, String inpuQuery,
+ Collection<List<Object>> segmentParams) throws IOException {
+ job.setInputFormatClass(VerticaInputFormat.class);
+ VerticaConfiguration config = new VerticaConfiguration(job.getConfiguration());
+ config.setInputQuery(inpuQuery);
+ config.setInputParams(segmentParams);
+ }
+
+
+ /** {@inheritDoc} */
+ public RecordReader<LongWritable, VerticaRecord> createRecordReader(
+ InputSplit split, TaskAttemptContext context) throws IOException {
+ try {
+ return new VerticaRecordReader((VerticaInputSplit) split,
+ context.getConfiguration());
+ } catch (Exception e) {
+ e.printStackTrace();
+ return null;
+ }
+ }
+
+ /** {@inheritDoc} */
+ public List<InputSplit> getSplits(JobContext context) throws IOException {
+ Configuration conf = context.getConfiguration();
+ long numSplits = conf.getInt("mapreduce.job.maps", 1);
+ LOG.debug("creating splits up to " + numSplits);
+ List<InputSplit> splits = new ArrayList<InputSplit>();
+
+ int i = 0;
+
+ // This is the fancy part of mapping inputs...here's how we figure out
+ // splits
+ // get the params query or the params
+ VerticaConfiguration config = new VerticaConfiguration(conf);
+
+ if (inputQuery == null)
+ inputQuery = config.getInputQuery();
+
+ if (inputQuery == null)
+ throw new IOException("Vertica input requires query defined by "
+ + VerticaConfiguration.QUERY_PROP);
+
+ if (params == null)
+ params = config.getParamsQuery();
+
+ Collection<List<Object>> paramCollection = config.getInputParameters();
+
+ if (params != null && params.startsWith("select")) {
+ LOG.debug("creating splits using paramsQuery :" + params);
+ Connection conn = null;
+ Statement stmt = null;
+
+ try {
+ conn = config.getConnection(false);
+ stmt = conn.createStatement();
+ ResultSet rs = stmt.executeQuery(params);
+ ResultSetMetaData rsmd = rs.getMetaData();
+
+ while (rs.next()) {
+ List<Object> segmentParams = new ArrayList<Object>();
+ for (int j = 1; j <= rsmd.getColumnCount(); j++) {
+ segmentParams.add(rs.getObject(j));
+ }
+ splits.add(new VerticaInputSplit(inputQuery, segmentParams));
+ }
+ } catch (Exception e) {
+ throw new IOException(e);
+ } finally {
+ try {
+ if (stmt != null) stmt.close();
+ } catch (SQLException e) {
+ throw new IOException(e);
+ }
+ }
+ } else if (params != null) {
+ LOG.debug("creating splits using " + params + " params");
+ for (String strParam : params.split(",")) {
+ strParam = strParam.trim();
+ if (strParam.charAt(0) == '\''
+ && strParam.charAt(strParam.length() - 1) == '\'')
+ strParam = strParam.substring(1, strParam.length() - 1);
+ List<Object> segmentParams = new ArrayList<Object>();
+ segmentParams.add(strParam);
+ splits.add(new VerticaInputSplit(inputQuery, segmentParams));
+ }
+ } else if (paramCollection != null) {
+ LOG.debug("creating splits using " + paramCollection.size() + " params");
+ for (List<Object> segmentParams : paramCollection) {
+ // if there are more numSplits than params we're going to introduce some
+ // limit and offsets
+ splits.add(new VerticaInputSplit(inputQuery, segmentParams));
+ }
+ } else {
+ LOG.debug("creating splits using limit and offset");
+ Connection conn = null;
+ Statement stmt = null;
+
+ long count = 0;
+ long start = 0;
+ long end = 0;
+
+ // TODO: limit needs order by unique key
+ // TODO: what if there are more parameters than numsplits?
+ // prep a count(*) wrapper query and then populate the bind params for each
+ String countQuery = "SELECT COUNT(*) FROM (\n" + inputQuery + "\n) count";
+
+ try {
+ conn = config.getConnection(false);
+ stmt = conn.createStatement();
+ ResultSet rs = stmt.executeQuery(countQuery);
+ rs.next();
+
+ count = rs.getLong(1);
+ } catch (Exception e) {
+ throw new IOException(e);
+ } finally {
+ try {
+ if (stmt != null) stmt.close();
+ } catch (SQLException e) {
+ throw new IOException(e);
+ }
+ }
+
+ long splitSize = count / numSplits;
+ end = splitSize;
+
+ LOG.debug("creating " + numSplits + " splits for " + count + " records");
+
+ for (i = 1; i < numSplits; i++) {
+ splits.add(new VerticaInputSplit(inputQuery, start, end));
+ LOG.debug("Split(" + i + "), start:" + start + ", end:" + end);
+ start += splitSize;
+ end += splitSize;
+ count -= splitSize;
+ }
+
+ if (count > 0) {
+ splits.add(new VerticaInputSplit(inputQuery, start, count));
+ }
+ }
+
+ LOG.debug("returning " + splits.size() + " final splits");
+ return splits;
+ }
+}
View
210 src/hadoop-connector/com/vertica/hadoop/VerticaInputSplit.java
@@ -0,0 +1,210 @@
+package com.vertica.hadoop;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.InputSplit;
+
+/**
+ * Input split class for reading data from Vertica
+ *
+ */
+public class VerticaInputSplit extends InputSplit implements Writable {
+ private static final Log LOG = LogFactory.getLog("com.vertica.hadoop");
+
+ PreparedStatement stmt = null;
+ Connection connection = null;
+ VerticaConfiguration vtconfig = null;
+ String inputQuery = null;
+ List<Object> segmentParams = null;
+ long start = 0;
+ long end = 0;
+
+ /** (@inheritDoc) */
+ public VerticaInputSplit() {
+ LOG.trace("Input split default constructor");
+ }
+
+ /**
+ * Set the input query and a list of parameters to substitute when evaluating
+ * the query
+ *
+ * @param inputQuery
+ * SQL query to run
+ * @param segmentParams
+ * list of parameters to substitute into the query
+ * @param start
+ * the logical starting record number
+ * @param end
+ * the logical ending record number
+ */
+ public VerticaInputSplit(String inputQuery, List<Object> segmentParams) {
+ if (LOG.isDebugEnabled())
+ {
+ StringBuilder sb = new StringBuilder();
+ sb.append("Input split with query -");
+ sb.append(inputQuery);
+ sb.append("-, Parameters: ");
+
+ boolean addComma = false;
+ for (Object param : segmentParams) {
+ if (addComma)
+ sb.append(",");
+ sb.append(param.toString());
+ }
+ LOG.debug(sb.toString());
+ }
+
+ this.inputQuery = inputQuery;
+ this.segmentParams = segmentParams;
+ }
+
+ public VerticaInputSplit(String inputQuery, long start, long end) {
+ LOG.debug("Input split with query -"+inputQuery+"-, start row: "
+ + start + " and end row: " + end);
+ this.inputQuery = inputQuery;
+ this.start = start;
+ this.end = end;
+ }
+
+ /** (@inheritDoc) */
+ public void configure(Configuration conf) throws Exception {
+ LOG.trace("Input split configured");
+ vtconfig = new VerticaConfiguration(conf);
+ connection = vtconfig.getConnection(false);
+ connection.setAutoCommit(true);
+ connection.setTransactionIsolation(Connection.TRANSACTION_READ_COMMITTED);
+ }
+
+ /**
+ * Return the parameters used for input query
+ *
+ * @return
+ */
+ public List<Object> getSegmentParams() {
+ return segmentParams;
+ }
+
+ /**
+ * Run the query that, when executed returns input for the mapper
+ *
+ * @return
+ * @throws Exception
+ */
+ public ResultSet executeQuery() throws Exception {
+ LOG.trace("Input split execute query");
+
+ if (connection == null)
+ throw new Exception("Cannot execute query with no connection");
+
+ if (segmentParams != null) {
+ LOG.debug("Query:" + inputQuery + ". No. of params = " + segmentParams.size());
+ stmt = connection.prepareStatement(inputQuery);
+ int i = 1;
+ for (Object param : segmentParams)
+ {
+ stmt.setObject(i++, param);
+ LOG.debug("With param :" + param.toString());
+ }
+ }
+
+ long length = getLength();
+ if (length != 0)
+ {
+ String query = "SELECT * FROM ( " + inputQuery
+ + " ) limited LIMIT " + length + " OFFSET " + start;
+ LOG.debug("Query:" + query);
+ stmt = connection.prepareStatement(query);
+ }
+
+ LOG.debug("Executing query");
+ ResultSet rs = stmt.executeQuery();
+ return rs;
+ }
+
+ /** (@inheritDoc) */
+ public void close() throws SQLException {
+ stmt.close();
+ }
+
+ /**
+ * @return The index of the first row to select
+ */
+ public long getStart() {
+ return start;
+ }
+
+ /**
+ * @return The index of the last row to select
+ */
+ public long getEnd() {
+ return end;
+ }
+
+ /**
+ * @return The total row count in this split
+ */
+ public long getLength() throws IOException {
+ // TODO: figureout how to return length when there is no start and end
+ return end - start;
+ }
+
+ /** {@inheritDoc} */
+ public String[] getLocations() throws IOException {
+ return new String[] {};
+ }
+
+ /** (@inheritDoc) */
+ public Configuration getConfiguration() {
+ return vtconfig.getConfiguration();
+ }
+
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ inputQuery = Text.readString(in);
+ segmentParams = null;
+ long paramCount = in.readLong();
+ LOG.debug("Reading " + paramCount + " parameters");
+ if (paramCount > 0) {
+ int type = in.readInt();
+ segmentParams = new ArrayList<Object>();
+ for (int i = 0; i < paramCount; i++) {
+ segmentParams.add(VerticaRecord.readField(type, in));
+ }
+ }
+ start = in.readLong();
+ end = in.readLong();
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ Text.writeString(out, inputQuery);
+ if (segmentParams != null && segmentParams.size() > 0) {
+ LOG.debug("Writing out " + segmentParams.size() + " parameters");
+ out.writeLong(segmentParams.size());
+ int type = VerticaRecord.getType(segmentParams.get(0));
+ out.writeInt(type);
+ for (Object o : segmentParams)
+ VerticaRecord.write(o, type, out);
+ } else {
+ LOG.debug("Writing out no parameters");
+ out.writeLong(0);
+ }
+
+ out.writeLong(start);
+ out.writeLong(end);
+ }
+}
View
208 src/hadoop-connector/com/vertica/hadoop/VerticaOutputFormat.java
@@ -0,0 +1,208 @@
+package com.vertica.hadoop;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.Statement;
+import java.sql.SQLException;
+import java.sql.DatabaseMetaData;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+
+/**
+ * Output formatter for loading data to Vertica
+ *
+ */
+public class VerticaOutputFormat extends OutputFormat<Text, VerticaRecord> {
+ /**
+ * Set the output table
+ *
+ * @param conf
+ * @param tableName
+ */
+ public static void setOutput(Job job, String tableName) {
+ setOutput(job, tableName, false);
+ }
+
+ /**
+ * Set the output table and whether to truncate it before loading
+ *
+ * @param job
+ * @param tableName
+ * @param truncateTable
+ */
+ public static void setOutput(Job job, String tableName, boolean truncateTable) {
+ setOutput(job, tableName, truncateTable, (String[])null);
+ }
+
+ /**
+ * Set the output table, whether to truncate it before loading if it already exists or create
+ * table specification if it doesn't exist with column definitions
+ *
+ * @param job
+ * @param tableName
+ * @param truncateTable
+ * @param tableDef
+ * list of column definitions such as "foo int", "bar varchar(10)"
+ */
+ public static void setOutput(Job job, String tableName,
+ boolean truncateTable, String... tableDef) {
+ VerticaConfiguration vtconfig = new VerticaConfiguration(
+ job.getConfiguration());
+ vtconfig.setOutputTableName(tableName);
+ vtconfig.setOutputTableDef(tableDef);
+ //the following performs truncate table and not drop table
+ vtconfig.setDropTable(truncateTable);
+ }
+
+ /** {@inheritDoc} */
+ public void checkOutputSpecs(JobContext context) throws IOException {
+ checkOutputSpecs(new VerticaConfiguration(context.getConfiguration()));
+ }
+
+ public static void checkOutputSpecs(VerticaConfiguration vtconfig) throws IOException {
+ Relation vTable = new Relation(vtconfig.getOutputTableName());
+ if (vTable.isNull())
+ throw new IOException("Vertica output requires a table name defined by "
+ + VerticaConfiguration.OUTPUT_TABLE_NAME_PROP);
+ String[] def = vtconfig.getOutputTableDef();
+ boolean dropTable = vtconfig.getDropTable();
+
+ Statement stmt = null;
+ try {
+ Connection conn = vtconfig.getConnection(true);
+ DatabaseMetaData dbmd = conn.getMetaData();
+ ResultSet rs = dbmd.getTables(null, vTable.getSchema(), vTable.getTable(), null);
+ boolean tableExists = rs.next();
+
+ stmt = conn.createStatement();
+
+ if (tableExists && dropTable) {
+ stmt = conn.createStatement();
+ stmt.execute("TRUNCATE TABLE " + vTable.getQualifiedName().toString());
+ }
+
+ // create table if it doesn't exist
+ if (!tableExists) {
+ if (def == null)
+ throw new RuntimeException("Table " + vTable.getQualifiedName().toString()
+ + " does not exist and no table definition provided");
+ if (!vTable.isDefaultSchema()) {
+ stmt.execute("CREATE SCHEMA IF NOT EXISTS " + vTable.getSchema());
+ }
+ StringBuffer tabledef = new StringBuffer("CREATE TABLE ").append(
+ vTable.getQualifiedName().toString()).append(" (");
+ for (String column : def)
+ tabledef.append(column).append(",");
+ tabledef.replace(tabledef.length() - 1, tabledef.length(), ")");
+ stmt.execute(tabledef.toString());
+ }
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ } finally {
+ if (stmt != null)
+ try {
+ stmt.close();
+ } catch (SQLException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ /** {@inheritDoc} */
+ public RecordWriter<Text, VerticaRecord> getRecordWriter(
+ TaskAttemptContext context) throws IOException {
+
+ VerticaConfiguration config = new VerticaConfiguration(
+ context.getConfiguration());
+
+ String name = context.getJobName();
+ String table = config.getOutputTableName();
+ try {
+ Connection conn = config.getConnection(true);
+ return new VerticaRecordWriter(conn, table, config.getBatchSize());
+ } catch (Exception e) {
+ e.printStackTrace();
+ throw new IOException(e.getMessage());
+ }
+ }
+
+ /**
+ * Optionally called at the end of a job to optimize any newly created and
+ * loaded tables. Useful for new tables with more than 100k records.
+ *
+ * @param conf
+ * @throws Exception
+ */
+ public static void optimize(Configuration conf) throws Exception {
+ VerticaConfiguration vtconfig = new VerticaConfiguration(conf);
+ Connection conn = vtconfig.getConnection(true);
+
+ // TODO: consider more tables and skip tables with non-temp projections
+ Relation vTable = new Relation(vtconfig.getOutputTableName());
+ Statement stmt = conn.createStatement();
+ ResultSet rs = null;
+ HashSet<String> tablesWithTemp = new HashSet<String>();
+
+ //for now just add the single output table
+ tablesWithTemp.add(vTable.getQualifiedName().toString());
+
+ // map from table name to set of projection names
+ HashMap<String, Collection<String>> tableProj = new HashMap<String, Collection<String>>();
+ rs = stmt.executeQuery("select projection_schema, anchor_table_name, projection_name from projections;");
+ while(rs.next()) {
+ String ptable = rs.getString(1) + "." + rs.getString(2);
+ if(!tableProj.containsKey(ptable)) {
+ tableProj.put(ptable, new HashSet<String>());
+ }
+
+ tableProj.get(ptable).add(rs.getString(3));
+ }
+
+ for(String table : tablesWithTemp) {
+ if(!tableProj.containsKey(table)) {
+ throw new RuntimeException("Cannot optimize table with no data: " + table);
+ }
+ }
+
+ String designName = (new Integer(conn.hashCode())).toString();
+ stmt.execute("select dbd_create_workspace('" + designName + "')");
+ stmt.execute("select dbd_create_design('" + designName + "', '"
+ + designName + "')");
+ stmt.execute("select dbd_add_design_tables('" + designName + "', '"
+ + vTable.getQualifiedName().toString() + "')");
+ stmt.execute("select dbd_populate_design('" + designName + "', '"
+ + designName + "')");
+
+ //Execute
+ stmt.execute("select dbd_create_deployment('" + designName + "', '" + designName + "')");
+ stmt.execute("select dbd_add_deployment_design('" + designName + "', '" + designName + "', '" + designName + "')");
+ stmt.execute("select dbd_add_deployment_drop('" + designName + "', '" + designName + "')");
+ stmt.execute("select dbd_execute_deployment('" + designName + "', '" + designName + "')");
+
+ //Cleanup
+ stmt.execute("select dbd_drop_deployment('" + designName + "', '" + designName + "')");
+ stmt.execute("select dbd_remove_design('" + designName + "', '" + designName + "')");
+ stmt.execute("select dbd_drop_design('" + designName + "', '" + designName + "')");
+ stmt.execute("select dbd_drop_workspace('" + designName + "')");
+ }
+
+ /** (@inheritDoc) */
+ public OutputCommitter getOutputCommitter(TaskAttemptContext context)
+ throws IOException, InterruptedException {
+ return new FileOutputCommitter(FileOutputFormat.getOutputPath(context),
+ context);
+ }
+}
View
740 src/hadoop-connector/com/vertica/hadoop/VerticaRecord.java
@@ -0,0 +1,740 @@
+package com.vertica.hadoop;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.sql.Connection;
+import java.sql.DatabaseMetaData;
+import java.sql.Date;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.sql.Types;
+import java.text.ParseException;
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
+import java.util.Vector;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.BooleanWritable;
+import org.apache.hadoop.io.ByteWritable;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.VIntWritable;
+import org.apache.hadoop.io.VLongWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.util.StringUtils;
+
+//import com.vertica.jdbc.VerticaDayTimeInterval;
+//import com.vertica.jdbc.VerticaYearMonthInterval;
+
+/**
+ * Encapsulates a record read from or to be written to Vertica.
+ *
+ * A record consists of a list of columns in a specified order.
+ * The list of columns is automatically determined by:
+ * <ol>
+ * <li>ResultSet of <i>select</i> query</li>
+ * <li>Columns in the table</li>
+ * </ol>
+ *
+ * The record also contains many helper functions to serialize/deserialize data
+ * into various formats.
+ *
+ */
+public class VerticaRecord implements Writable {
+ private static final Log LOG = LogFactory.getLog("com.vertica.hadoop");
+ /**
+ * No. of columns in a record.
+ */
+ int columns = 0;
+
+ /**
+ * An ordered list of names of each column.
+ * There is an entry for each column. However, there is
+ * no guarantee that all columns have a name.
+ */
+ Vector<String> names = null;
+
+ /**
+ * An ordered list of types of each column
+ */
+ Vector<Integer> types = null;
+ /**
+ * An ordered list of values of each column
+ */
+ Vector<Object> values = null;
+ /**
+ * A map to easily get the position of a column from the name.
+ */
+ HashMap<String, Integer> nameMap = null;
+
+ DateFormat sqlfmt = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+
+ /**
+ * Default constructor should NOT be invoked by customers.
+ * Available mainly for Hadoop framework.
+ */
+ public VerticaRecord() {
+ names = new Vector<String>();
+ types = new Vector<Integer>();
+ values = new Vector<Object>();
+ nameMap = new HashMap<String, Integer>();
+ }
+
+ /**
+ * Constructor used when data is transferred to Vertica.
+ * All values in <code>conf</code> is validated.
+ *
+ * @param conf Provides all necessary information for setting up the record.
+ *
+ * @throws SQLException JDBC driver or Vertica encounters an error.
+ */
+
+ public VerticaRecord(Configuration conf)
+ throws ClassNotFoundException, SQLException, IOException {
+ VerticaConfiguration config = new VerticaConfiguration(conf);
+ String outTable = config.getOutputTableName();
+ Connection conn = config.getConnection(true);
+
+ DatabaseMetaData dbmd = conn.getMetaData();
+ Relation vTable = new Relation(outTable);