Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP

Comparing changes

Choose two branches to see what’s changed or to start a new pull request. If you need to, you can also compare across forks.

Open a pull request

Create a new pull request by comparing changes across two branches. If you need to, you can also compare across forks.
base fork: joey/sqoop
base: 310c6e055a
...
head fork: joey/sqoop
compare: cf44d6fa3a
  • 9 commits
  • 27 files changed
  • 0 commit comments
  • 1 contributor
Commits on Sep 16, 2011
Arvind Prabhakar SQOOP-319. Support for replacing Hive delimiters.
(Joey Echeverria via Arvind Prabhakar)


git-svn-id: https://svn.apache.org/repos/asf/incubator/sqoop/trunk@1161382 13f79535-47bb-0310-9956-ffa450edef68
f5d7c78
Arvind Prabhakar SQOOP-321. Support date/time for incremental append imports.
(Bilung Lee via Arvind Prabhakar)


git-svn-id: https://svn.apache.org/repos/asf/incubator/sqoop/trunk@1161404 13f79535-47bb-0310-9956-ffa450edef68
5f4ac75
Arvind Prabhakar SQOOP-326. Upgrade Avro dependency version.
(Alejandro Abdelnur via Arvind Prabhakar)


git-svn-id: https://svn.apache.org/repos/asf/incubator/sqoop/trunk@1163453 13f79535-47bb-0310-9956-ffa450edef68
1ee6566
Arvind Prabhakar SQOOP-330. Descriptive error message when column name cannot be deter…
…mined.

(Jarek Jarcec Cecho via Arvind Prabhakar)


git-svn-id: https://svn.apache.org/repos/asf/incubator/sqoop/trunk@1165969 13f79535-47bb-0310-9956-ffa450edef68
8648b34
Arvind Prabhakar SQOOP-323. Support for NVARCHAR datatype.
(Jarek Jarcec Cecho via Bilung Lee)


git-svn-id: https://svn.apache.org/repos/asf/incubator/sqoop/trunk@1165981 13f79535-47bb-0310-9956-ffa450edef68
70acf09
Arvind Prabhakar SQOOP-327. Mixed update/insert export for Oracle.
(Bilung Lee via Arvind Prabhakar)


git-svn-id: https://svn.apache.org/repos/asf/incubator/sqoop/trunk@1166930 13f79535-47bb-0310-9956-ffa450edef68
265e1b7
Arvind Prabhakar SQOOP-332. Cannot use --as-avrodatafile with --query.
(Joseph Boyd via Arvind Prabhakar)


git-svn-id: https://svn.apache.org/repos/asf/incubator/sqoop/trunk@1170977 13f79535-47bb-0310-9956-ffa450edef68
6982e4d
Arvind Prabhakar SQOOP-336. Avro import does not support varbinary types.
(Tom White via Arvind Prabhakar)


git-svn-id: https://svn.apache.org/repos/asf/incubator/sqoop/trunk@1170979 13f79535-47bb-0310-9956-ffa450edef68
9c61c35
Arvind Prabhakar SQOOP-325. Sqoop doesn't build on itelliJ.
(Alex Newman via Arvind Prabhakar)


git-svn-id: https://svn.apache.org/repos/asf/incubator/sqoop/trunk@1171193 13f79535-47bb-0310-9956-ffa450edef68
cf44d6f
Showing with 815 additions and 141 deletions.
  1. +2 −0  ivy.xml
  2. +2 −2 ivy/libraries.properties
  3. +8 −1 pom.xml
  4. +6 −0 src/docs/man/sqoop-export.txt
  5. +10 −0 src/docs/user/export.txt
  6. +2 −0  src/docs/user/hive-args.txt
  7. +5 −2 src/docs/user/hive.txt
  8. +47 −0 src/java/com/cloudera/sqoop/SqoopOptions.java
  9. +6 −0 src/java/com/cloudera/sqoop/hive/HiveTypes.java
  10. +13 −1 src/java/com/cloudera/sqoop/lib/FieldFormatter.java
  11. +49 −0 src/java/com/cloudera/sqoop/manager/ConnManager.java
  12. +57 −0 src/java/com/cloudera/sqoop/manager/OracleManager.java
  13. +6 −0 src/java/com/cloudera/sqoop/manager/SqlManager.java
  14. +89 −0 src/java/com/cloudera/sqoop/mapreduce/JdbcUpsertExportJob.java
  15. +116 −0 src/java/com/cloudera/sqoop/mapreduce/OracleUpsertOutputFormat.java
  16. +3 −3 src/java/com/cloudera/sqoop/mapreduce/UpdateOutputFormat.java
  17. +7 −3 src/java/com/cloudera/sqoop/orm/AvroSchemaGenerator.java
  18. +22 −6 src/java/com/cloudera/sqoop/orm/ClassWriter.java
  19. +20 −0 src/java/com/cloudera/sqoop/tool/BaseSqoopTool.java
  20. +34 −23 src/java/com/cloudera/sqoop/tool/ExportTool.java
  21. +48 −17 src/java/com/cloudera/sqoop/tool/ImportTool.java
  22. +43 −63 src/test/com/cloudera/sqoop/TestAvroImport.java
  23. +79 −18 src/test/com/cloudera/sqoop/TestAvroImportExportRoundtrip.java
  24. +52 −2 src/test/com/cloudera/sqoop/TestIncrementalImport.java
  25. +73 −0 src/test/com/cloudera/sqoop/hive/TestHiveImport.java
  26. +14 −0 src/test/com/cloudera/sqoop/manager/OracleExportTest.java
  27. +2 −0  testdata/hive/scripts/fieldWithNewlineReplacementImport.q
View
2  ivy.xml
@@ -123,5 +123,7 @@ under the License.
<exclude org="log4j" module="log4j"/>
</dependency>
+ <exclude org="org.apache.hadoop" module="avro"/>
+
</dependencies>
</ivy-module>
View
4 ivy/libraries.properties
@@ -18,7 +18,7 @@
# This properties file lists the versions of the various artifacts we use.
# It drives ivy and the generation of a maven POM
-avro.version=1.5.2
+avro.version=1.5.3
checkstyle.version=5.0
@@ -40,7 +40,7 @@ ivy.version=2.0.0-rc2
junit.version=4.5
-log4j.version=1.2.15
+log4j.version=1.2.16
mvn.version=2.0.10
View
9 pom.xml
@@ -85,6 +85,8 @@ limitations under the License.
<hadoopVersion>0.20.2-cdh3u1</hadoopVersion>
<hbaseVersion>0.90.3-cdh3u1</hbaseVersion>
+ <log4j.version>1.2.16</log4j.version>
+
</properties>
<dependencies>
@@ -250,7 +252,12 @@ limitations under the License.
<version>1.5.8</version>
<scope>test</scope>
</dependency>
-
+ <dependency>
+ <groupId>log4j</groupId>
+ <artifactId>log4j</artifactId>
+ <version>${log4j.version}</version>
+ <scope>provided</scope>
+ </dependency>
</dependencies>
<build>
View
6 src/docs/man/sqoop-export.txt
@@ -43,6 +43,12 @@ Export control options
--update-key (col-name)::
Anchor column to use for updates
+--update-mode (mode)::
+ Specify how updates are performed when new rows are found with non-matching keys
+ in database. By default, "mode" is +updateonly+, in which case new rows are
+ silently ignored. Alternatively, "mode" can be +allowinsert+, in which case
+ new rows are inserted instead.
+
--input-null-string::
The string to be interpreted as null for string columns
View
10 src/docs/user/export.txt
@@ -52,6 +52,12 @@ Argument Description
parallel
+\--table <table-name>+ Table to populate
+\--update-key <col-name>+ Anchor column to use for updates
++\--update-mode <mode>+ Specify how updates are performed\
+ when new rows are found with\
+ non-matching keys in database.
+ Legal values for +mode+ include\
+ +updateonly+ (default) and\
+ +allowinsert+.
+\--input-null-string <null-string>+ The string to be interpreted as\
null for string columns
+\--input-null-non-string <null-string>+ The string to be interpreted as\
@@ -169,6 +175,10 @@ Likewise, if the column specified with +\--update-key+ does not
uniquely identify rows and multiple rows are updated by a single
statement, this condition is also undetected.
+Depending on the target database, you may also specify the +\--update-mode+
+argument with +allowinsert+ mode if you want to update rows if they exist
+in the database already or insert rows if they do not exist yet.
+
include::input-args.txt[]
include::output-args.txt[]
View
2  src/docs/user/hive-args.txt
@@ -32,6 +32,8 @@ Argument Description
to Hive.
+\--hive-drop-import-delims+ Drops '\n', '\r', and '\01' from string\
fields when importing to Hive.
++\--hive-delims-replacement+ Replace '\n', '\r', and '\01' from string\
+ fields with user defined string when importing to Hive.
+\--hive-partition-key+ Name of a hive field to partition are \
sharded on
+\--hive-partition-value <v>+ String-value that serves as partition key\
View
7 src/docs/user/hive.txt
@@ -58,8 +58,11 @@ rows contain string fields that have Hive's default row delimiters
(+\n+ and +\r+ characters) or column delimiters (+\01+ characters)
present in them. You can use the +\--hive-drop-import-delims+ option
to drop those characters on import to give Hive-compatible text data.
-This option should only be used if you use Hive's default delimiters
-and should not be used if different delimiters are specified.
+Alternatively, you can use the +\--hive-delims-replacement+ option
+to replace those characters with a user-defined string on import to give
+Hive-compatible text data. These options should only be used if you use
+Hive's default delimiters and should not be used if different delimiters
+are specified.
Sqoop will pass the field and record delimiters through to Hive. If you do
not set any delimiters and do use +\--hive-import+, the field delimiter will
View
47 src/java/com/cloudera/sqoop/SqoopOptions.java
@@ -152,6 +152,8 @@ public String toString() {
private boolean failIfHiveTableExists;
@StoredAsProperty("hive.table.name") private String hiveTableName;
@StoredAsProperty("hive.drop.delims") private boolean hiveDropDelims;
+ @StoredAsProperty("hive.delims.replacement")
+ private String hiveDelimsReplacement;
@StoredAsProperty("hive.partition.key") private String hivePartitionKey;
@StoredAsProperty("hive.partition.value") private String hivePartitionValue;
@@ -192,6 +194,22 @@ public String toString() {
// Column to use for the WHERE clause in an UPDATE-based export.
@StoredAsProperty("export.update.col") private String updateKeyCol;
+ /**
+ * Update mode option specifies how updates are performed when
+ * new rows are found with non-matching keys in database.
+ * It supports two modes:
+ * <ul>
+ * <li>UpdateOnly: This is the default. New rows are silently ignored.</li>
+ * <li>AllowInsert: New rows are inserted into the database.</li>
+ * </ul>
+ */
+ public enum UpdateMode {
+ UpdateOnly,
+ AllowInsert
+ }
+
+ @StoredAsProperty("export.new.update") private UpdateMode updateMode;
+
private DelimiterSet inputDelimiters; // codegen.input.delimiters.
private DelimiterSet outputDelimiters; // codegen.output.delimiters.
private boolean areDelimsManuallySet;
@@ -795,6 +813,8 @@ private void initDefaults(Configuration baseConfiguration) {
this.dbOutColumns = null;
this.incrementalMode = IncrementalMode.None;
+
+ this.updateMode = UpdateMode.UpdateOnly;
}
/**
@@ -1101,6 +1121,18 @@ public void setHiveDropDelims(boolean dropHiveDelims) {
}
/**
+ * @return the user-specified option to specify the replacement string
+ * for hive delimeters
+ */
+ public String getHiveDelimsReplacement() {
+ return hiveDelimsReplacement;
+ }
+
+ public void setHiveDelimsReplacement(String replacement) {
+ this.hiveDelimsReplacement = replacement;
+ }
+
+ /**
* @return the user-specified option to specify sqoop's behavior during
* target table creation if the table exists.
*/
@@ -1572,6 +1604,21 @@ public String getUpdateKeyCol() {
}
/**
+ * Set "UpdateOnly" to silently ignore new rows during update export.
+ * Set "AllowInsert" to insert new rows during update export.
+ */
+ public void setUpdateMode(UpdateMode mode) {
+ this.updateMode = mode;
+ }
+
+ /**
+ * @return how to handle new rows found in update export.
+ */
+ public UpdateMode getUpdateMode() {
+ return updateMode;
+ }
+
+ /**
* @return an ordered list of column names. The code generator should
* generate the DBWritable.write(PreparedStatement) method with columns
* exporting in this order, if it is non-null.
View
6 src/java/com/cloudera/sqoop/hive/HiveTypes.java
@@ -46,6 +46,12 @@ public static String toHiveType(int sqlType) {
return "STRING";
} else if (sqlType == Types.LONGVARCHAR) {
return "STRING";
+ } else if (sqlType == Types.NVARCHAR) {
+ return "STRING";
+ } else if (sqlType == Types.NCHAR) {
+ return "STRING";
+ } else if (sqlType == Types.LONGNVARCHAR) {
+ return "STRING";
} else if (sqlType == Types.NUMERIC) {
// Per suggestion on hive-user, this is converted to DOUBLE for now.
return "DOUBLE";
View
14 src/java/com/cloudera/sqoop/lib/FieldFormatter.java
@@ -32,8 +32,20 @@ private FieldFormatter() { }
* @return
*/
public static String hiveStringDropDelims(String str,
+ DelimiterSet delimiters) {
+ return hiveStringReplaceDelims(str, "", delimiters);
+ }
+
+ /**
+ * replace hive delimiters with a user-defined string passed to the
+ * --hive-delims-replacement option.
+ * @param str
+ * @param delimiters
+ * @return
+ */
+ public static String hiveStringReplaceDelims(String str, String replacement,
DelimiterSet delimiters) {
- String droppedDelims = str.replaceAll("\\n|\\r|\01", "");
+ String droppedDelims = str.replaceAll("\\n|\\r|\01", replacement);
return escapeAndEnclose(droppedDelims, delimiters);
}
View
49 src/java/com/cloudera/sqoop/manager/ConnManager.java
@@ -23,11 +23,14 @@
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.List;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import com.cloudera.sqoop.SqoopOptions;
import com.cloudera.sqoop.util.ExportException;
import com.cloudera.sqoop.util.ImportException;
@@ -274,6 +277,44 @@ public void updateTable(ExportJobContext context)
}
/**
+ * Export data stored in HDFS into a table in a database.
+ * This may update or insert rows into the target table depending on
+ * whether rows already exist in the target table or not.
+ */
+ public void upsertTable(ExportJobContext context)
+ throws IOException, ExportException {
+ throw new ExportException("Mixed update/insert is not supported"
+ + " against the target database yet");
+ }
+
+ /**
+ * Configure database output column ordering explicitly for code generator.
+ * The code generator should generate the DBWritable.write(PreparedStatement)
+ * method with columns exporting in this order.
+ */
+ public void configureDbOutputColumns(SqoopOptions options) {
+ // We're in update mode. We need to explicitly set the database output
+ // column ordering in the codeGenerator. The UpdateKeyCol must come
+ // last, because the UPDATE-based OutputFormat will generate the SET
+ // clause followed by the WHERE clause, and the SqoopRecord needs to
+ // serialize to this layout.
+ String updateKeyCol = options.getUpdateKeyCol();
+ String [] allColNames = getColumnNames(options.getTableName());
+ List<String> dbOutCols = new ArrayList<String>();
+ String upperCaseKeyCol = updateKeyCol.toUpperCase();
+ for (String col : allColNames) {
+ if (!upperCaseKeyCol.equals(col.toUpperCase())) {
+ dbOutCols.add(col); // add non-key columns to the output order list.
+ }
+ }
+
+ // Then add the update key column last.
+ dbOutCols.add(updateKeyCol);
+ options.setDbOutputColumns(dbOutCols.toArray(
+ new String[dbOutCols.size()]));
+ }
+
+ /**
* If a method of this ConnManager has returned a ResultSet to you,
* you are responsible for calling release() after you close the
* ResultSet object, to free internal resources. ConnManager
@@ -302,6 +343,14 @@ public String timestampToQueryString(Timestamp ts) {
}
/**
+ * Given a date/time, return the quoted string that can
+ * be inserted into a SQL statement, representing that date/time.
+ */
+ public String datetimeToQueryString(String datetime, int columnType) {
+ return "'" + datetime + "'";
+ }
+
+ /**
* This method allows the ConnManager to override the creation of an
* input-bounds query that is used to create splits when running import
* based on free-form query. Any non-null return value is used, whereas a null
View
57 src/java/com/cloudera/sqoop/manager/OracleManager.java
@@ -27,6 +27,7 @@
import java.sql.SQLException;
import java.sql.Statement;
import java.sql.Timestamp;
+import java.sql.Types;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
@@ -37,8 +38,11 @@
import org.apache.commons.logging.LogFactory;
import com.cloudera.sqoop.SqoopOptions;
+import com.cloudera.sqoop.SqoopOptions.UpdateMode;
import com.cloudera.sqoop.mapreduce.ExportBatchOutputFormat;
import com.cloudera.sqoop.mapreduce.JdbcExportJob;
+import com.cloudera.sqoop.mapreduce.JdbcUpsertExportJob;
+import com.cloudera.sqoop.mapreduce.OracleUpsertOutputFormat;
import com.cloudera.sqoop.mapreduce.db.OracleDataDrivenDBInputFormat;
import com.cloudera.sqoop.util.ExportException;
import com.cloudera.sqoop.util.ImportException;
@@ -382,6 +386,46 @@ public void exportTable(ExportJobContext context)
}
@Override
+ /**
+ * {@inheritDoc}
+ */
+ public void upsertTable(ExportJobContext context)
+ throws IOException, ExportException {
+ context.setConnManager(this);
+ JdbcUpsertExportJob exportJob =
+ new JdbcUpsertExportJob(context, OracleUpsertOutputFormat.class);
+ exportJob.runExport();
+ }
+
+ @Override
+ /**
+ * {@inheritDoc}
+ */
+ public void configureDbOutputColumns(SqoopOptions options) {
+ if (options.getUpdateMode() == UpdateMode.UpdateOnly) {
+ super.configureDbOutputColumns(options);
+ } else {
+ // We're in upsert mode. We need to explicitly set
+ // the database output column ordering in the codeGenerator.
+ String updateKeyCol = options.getUpdateKeyCol();
+ String [] allColNames = getColumnNames(options.getTableName());
+ List<String> dbOutCols = new ArrayList<String>();
+ dbOutCols.add(updateKeyCol);
+ String upperCaseKeyCol = updateKeyCol.toUpperCase();
+ for (String col : allColNames) {
+ if (!upperCaseKeyCol.equals(col.toUpperCase())) {
+ dbOutCols.add(col); // add update columns to the output order list.
+ }
+ }
+ for (String col : allColNames) {
+ dbOutCols.add(col); // add insert columns to the output order list.
+ }
+ options.setDbOutputColumns(dbOutCols.toArray(
+ new String[dbOutCols.size()]));
+ }
+ }
+
+ @Override
public ResultSet readTable(String tableName, String[] columns)
throws SQLException {
if (columns == null) {
@@ -535,6 +579,19 @@ public String timestampToQueryString(Timestamp ts) {
}
@Override
+ public String datetimeToQueryString(String datetime, int columnType) {
+ if (columnType == Types.TIMESTAMP) {
+ return "TO_TIMESTAMP('" + datetime + "', 'YYYY-MM-DD HH24:MI:SS.FF')";
+ } else if (columnType == Types.DATE) {
+ return "TO_DATE('" + datetime + "', 'YYYY-MM-DD HH24:MI:SS')";
+ } else {
+ String msg = "Column type is neither timestamp nor date!";
+ LOG.error(msg);
+ throw new RuntimeException(msg);
+ }
+ }
+
+ @Override
public boolean supportsStagingForExport() {
return true;
}
View
6 src/java/com/cloudera/sqoop/manager/SqlManager.java
@@ -496,6 +496,12 @@ public String toJavaType(int sqlType) {
return "String";
} else if (sqlType == Types.LONGVARCHAR) {
return "String";
+ } else if (sqlType == Types.NVARCHAR) {
+ return "String";
+ } else if (sqlType == Types.NCHAR) {
+ return "String";
+ } else if (sqlType == Types.LONGNVARCHAR) {
+ return "String";
} else if (sqlType == Types.NUMERIC) {
return "java.math.BigDecimal";
} else if (sqlType == Types.DECIMAL) {
View
89 src/java/com/cloudera/sqoop/mapreduce/JdbcUpsertExportJob.java
@@ -0,0 +1,89 @@
+/**
+ * Licensed to Cloudera, Inc. under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Cloudera, Inc. licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.cloudera.sqoop.mapreduce;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import com.cloudera.sqoop.mapreduce.db.DBConfiguration;
+import com.cloudera.sqoop.mapreduce.db.DBOutputFormat;
+
+import com.cloudera.sqoop.manager.ConnManager;
+import com.cloudera.sqoop.manager.ExportJobContext;
+
+/**
+ * Run an update/insert export using JDBC (JDBC-based UpsertOutputFormat).
+ */
+public class JdbcUpsertExportJob extends JdbcUpdateExportJob {
+
+ public static final Log LOG = LogFactory.getLog(
+ JdbcUpsertExportJob.class.getName());
+
+ public JdbcUpsertExportJob(final ExportJobContext context,
+ final Class<? extends OutputFormat> outputFormatClass)
+ throws IOException {
+ super(context, null, null, outputFormatClass);
+ }
+
+ @Override
+ protected void configureOutputFormat(Job job, String tableName,
+ String tableClassName) throws IOException {
+
+ ConnManager mgr = context.getConnManager();
+ try {
+ String username = options.getUsername();
+ if (null == username || username.length() == 0) {
+ DBConfiguration.configureDB(job.getConfiguration(),
+ mgr.getDriverClass(),
+ options.getConnectString());
+ } else {
+ DBConfiguration.configureDB(job.getConfiguration(),
+ mgr.getDriverClass(),
+ options.getConnectString(),
+ username, options.getPassword());
+ }
+
+ String [] colNames = options.getColumns();
+ if (null == colNames) {
+ colNames = mgr.getColumnNames(tableName);
+ }
+ if (null == colNames) {
+ throw new IOException(
+ "Export column names could not be determined for " + tableName);
+ }
+ DBOutputFormat.setOutput(job, tableName, colNames);
+
+ String updateKeyCol = options.getUpdateKeyCol();
+ if (null == updateKeyCol) {
+ throw new IOException("Update key column not set in export job");
+ }
+
+ job.setOutputFormatClass(getOutputFormatClass());
+ job.getConfiguration().set(SQOOP_EXPORT_TABLE_CLASS_KEY, tableClassName);
+ job.getConfiguration().set(SQOOP_EXPORT_UPDATE_COL_KEY, updateKeyCol);
+ } catch (ClassNotFoundException cnfe) {
+ throw new IOException("Could not load OutputFormat", cnfe);
+ }
+ }
+}
+
View
116 src/java/com/cloudera/sqoop/mapreduce/OracleUpsertOutputFormat.java
@@ -0,0 +1,116 @@
+/**
+ * Licensed to Cloudera, Inc. under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Cloudera, Inc. licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.cloudera.sqoop.mapreduce;
+
+import java.io.IOException;
+import java.sql.SQLException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import com.cloudera.sqoop.lib.SqoopRecord;
+
+/**
+ * Update an existing table with new value if the table already
+ * contains the row, or insert the data into the table if the table
+ * does not contain the row yet.
+ */
+public class OracleUpsertOutputFormat<K extends SqoopRecord, V>
+ extends UpdateOutputFormat<K, V> {
+
+ private static final Log LOG =
+ LogFactory.getLog(OracleUpsertOutputFormat.class);
+
+ @Override
+ /** {@inheritDoc} */
+ public RecordWriter<K, V> getRecordWriter(TaskAttemptContext context)
+ throws IOException {
+ try {
+ return new OracleUpsertRecordWriter(context);
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
+ }
+
+ /**
+ * RecordWriter to write the output to UPDATE/INSERT statements.
+ */
+ public class OracleUpsertRecordWriter extends UpdateRecordWriter {
+
+ public OracleUpsertRecordWriter(TaskAttemptContext context)
+ throws ClassNotFoundException, SQLException {
+ super(context);
+ }
+
+ /**
+ * @return an UPDATE/INSERT statement that modifies/inserts a row
+ * depending on whether the row already exist in the table or not.
+ */
+ protected String getUpdateStatement() {
+ boolean first;
+
+ StringBuilder sb = new StringBuilder();
+ sb.append("MERGE INTO ");
+ sb.append(tableName);
+ sb.append(" USING dual ON ( ");
+ sb.append(updateCol);
+ sb.append(" = ? )");
+
+ sb.append(" WHEN MATCHED THEN UPDATE SET ");
+ first = true;
+ for (String col : columnNames) {
+ if (!col.equals(updateCol)) {
+ if (first) {
+ first = false;
+ } else {
+ sb.append(", ");
+ }
+ sb.append(col);
+ sb.append(" = ?");
+ }
+ }
+
+ sb.append(" WHEN NOT MATCHED THEN INSERT ( ");
+ first = true;
+ for (String col : columnNames) {
+ if (first) {
+ first = false;
+ } else {
+ sb.append(", ");
+ }
+ sb.append(col);
+ }
+ sb.append(" ) VALUES ( ");
+ first = true;
+ for (String col : columnNames) {
+ if (first) {
+ first = false;
+ } else {
+ sb.append(", ");
+ }
+ sb.append("?");
+ }
+ sb.append(" )");
+
+ return sb.toString();
+ }
+ }
+}
View
6 src/java/com/cloudera/sqoop/mapreduce/UpdateOutputFormat.java
@@ -86,9 +86,9 @@ public void checkOutputSpecs(JobContext context)
*/
public class UpdateRecordWriter extends AsyncSqlRecordWriter<K, V> {
- private String tableName;
- private String [] columnNames; // The columns to update.
- private String updateCol; // The column containing the fixed key.
+ protected String tableName;
+ protected String [] columnNames; // The columns to update.
+ protected String updateCol; // The column containing the fixed key.
public UpdateRecordWriter(TaskAttemptContext context)
throws ClassNotFoundException, SQLException {
View
10 src/java/com/cloudera/sqoop/orm/AvroSchemaGenerator.java
@@ -63,10 +63,13 @@ public Schema generate() throws IOException {
field.addProp("sqlType", Integer.toString(sqlType));
fields.add(field);
}
- String doc = "Sqoop import of " + tableName;
- Schema schema = Schema.createRecord(tableName, doc, null, false);
+
+ String avroTableName = (tableName == null ? "QueryResult" : tableName);
+
+ String doc = "Sqoop import of " + avroTableName;
+ Schema schema = Schema.createRecord(avroTableName, doc, null, false);
schema.setFields(fields);
- schema.addProp("tableName", tableName);
+ schema.addProp("tableName", avroTableName);
return schema;
}
@@ -98,6 +101,7 @@ private Type toAvroType(int sqlType) {
case Types.TIMESTAMP:
return Type.LONG;
case Types.BINARY:
+ case Types.VARBINARY:
return Type.BYTES;
default:
throw new IllegalArgumentException("Cannot convert SQL type "
View
28 src/java/com/cloudera/sqoop/orm/ClassWriter.java
@@ -21,7 +21,6 @@
import org.apache.hadoop.io.BytesWritable;
import com.cloudera.sqoop.SqoopOptions;
import com.cloudera.sqoop.manager.ConnManager;
-import com.cloudera.sqoop.manager.SqlManager;
import com.cloudera.sqoop.lib.BigDecimalSerializer;
import com.cloudera.sqoop.lib.BooleanParser;
import com.cloudera.sqoop.lib.DelimiterSet;
@@ -826,10 +825,18 @@ private void generateToString(Map<String, Integer> columnTypes,
}
if (javaType.equals("String") && options.doHiveDropDelims()) {
- sb.append(" // special case for strings hive, dropping delimiters "
- + "\\n,\\r,\\01 from strings\n");
- sb.append(" __sb.append(FieldFormatter.hiveStringDropDelims("
- + stringExpr + ", delimiters));\n");
+ sb.append(" // special case for strings hive, dropping"
+ + "delimiters \\n,\\r,\\01 from strings\n");
+ sb.append(" __sb.append(FieldFormatter.hiveStringDropDelims("
+ + stringExpr + ", delimiters));\n");
+ } else if (javaType.equals("String")
+ && options.getHiveDelimsReplacement() != null) {
+ sb.append(" // special case for strings hive, replacing "
+ + "delimiters \\n,\\r,\\01 with '"
+ + options.getHiveDelimsReplacement() + "' from strings\n");
+ sb.append(" __sb.append(FieldFormatter.hiveStringReplaceDelims("
+ + stringExpr + ", \"" + options.getHiveDelimsReplacement() + "\", "
+ + "delimiters));\n");
} else {
sb.append(" __sb.append(FieldFormatter.escapeAndEnclose("
+ stringExpr + ", delimiters));\n");
@@ -1033,8 +1040,17 @@ public void generate() throws IOException {
String [] cleanedColNames = cleanColNames(colNames);
Set<String> uniqColNames = new HashSet<String>();
for (int i = 0; i < colNames.length; i++) {
- // Guarantee uniq col identifier
String identifier = cleanedColNames[i];
+
+ // Name can't be blank
+ if(identifier.isEmpty()) {
+ throw new IllegalArgumentException("We found column without column "
+ + "name. Please verify that you've entered all column names "
+ + "in your query if using free form query import (consider "
+ + "adding clause AS if you're using column transformation)");
+ }
+
+ // Guarantee uniq col identifier
if (uniqColNames.contains(identifier)) {
throw new IllegalArgumentException("Duplicate Column identifier "
+ "specified: '" + identifier + "'");
View
20 src/java/com/cloudera/sqoop/tool/BaseSqoopTool.java
@@ -98,6 +98,8 @@
public static final String HIVE_TABLE_ARG = "hive-table";
public static final String HIVE_OVERWRITE_ARG = "hive-overwrite";
public static final String HIVE_DROP_DELIMS_ARG = "hive-drop-import-delims";
+ public static final String HIVE_DELIMS_REPLACEMENT_ARG =
+ "hive-delims-replacement";
public static final String HIVE_PARTITION_KEY_ARG = "hive-partition-key";
public static final String HIVE_PARTITION_VALUE_ARG = "hive-partition-value";
public static final String CREATE_HIVE_TABLE_ARG =
@@ -136,6 +138,7 @@
public static final String VERBOSE_ARG = "verbose";
public static final String HELP_ARG = "help";
public static final String UPDATE_KEY_ARG = "update-key";
+ public static final String UPDATE_MODE_ARG = "update-mode";
// Arguments for incremental imports.
public static final String INCREMENT_TYPE_ARG = "incremental";
@@ -426,6 +429,12 @@ protected RelatedOptions getHiveOptions(boolean explicitHiveImport) {
+ "(\\n\\r) from imported string fields")
.withLongOpt(HIVE_DROP_DELIMS_ARG)
.create());
+ hiveOpts.addOption(OptionBuilder
+ .hasArg()
+ .withDescription("Replace Hive record \\0x01 and row delimiters "
+ + "(\\n\\r) from imported string fields with user-defined string")
+ .withLongOpt(HIVE_DELIMS_REPLACEMENT_ARG)
+ .create());
hiveOpts.addOption(OptionBuilder.withArgName("partition-key")
.hasArg()
.withDescription("Sets the partition key to use when importing to hive")
@@ -729,6 +738,11 @@ protected void applyHiveOptions(CommandLine in, SqoopOptions out)
out.setHiveDropDelims(true);
}
+ if (in.hasOption(HIVE_DELIMS_REPLACEMENT_ARG)) {
+ out.setHiveDelimsReplacement(
+ in.getOptionValue(HIVE_DELIMS_REPLACEMENT_ARG));
+ }
+
if (in.hasOption(HIVE_PARTITION_KEY_ARG)) {
out.setHivePartitionKey(in.getOptionValue(HIVE_PARTITION_KEY_ARG));
}
@@ -894,6 +908,12 @@ protected void validateHiveOptions(SqoopOptions options)
throws InvalidOptionsException {
// Empty; this method is present to maintain API consistency, and
// is reserved for future constraints on Hive options.
+ if (options.getHiveDelimsReplacement() != null
+ && options.doHiveDropDelims()) {
+ throw new InvalidOptionsException("The " + HIVE_DROP_DELIMS_ARG
+ + " option conflicts with the " + HIVE_DELIMS_REPLACEMENT_ARG
+ + " option." + HELP_STR);
+ }
}
protected void validateHBaseOptions(SqoopOptions options)
View
57 src/java/com/cloudera/sqoop/tool/ExportTool.java
@@ -19,7 +19,6 @@
package com.cloudera.sqoop.tool;
import java.io.IOException;
-import java.util.ArrayList;
import java.util.List;
import org.apache.commons.cli.CommandLine;
@@ -30,6 +29,7 @@
import com.cloudera.sqoop.Sqoop;
import com.cloudera.sqoop.SqoopOptions;
import com.cloudera.sqoop.SqoopOptions.InvalidOptionsException;
+import com.cloudera.sqoop.SqoopOptions.UpdateMode;
import com.cloudera.sqoop.cli.RelatedOptions;
import com.cloudera.sqoop.cli.ToolOptions;
import com.cloudera.sqoop.manager.ExportJobContext;
@@ -66,8 +66,13 @@ private void exportTable(SqoopOptions options, String tableName)
ExportJobContext context = new ExportJobContext(tableName, jarFile,
options);
if (options.getUpdateKeyCol() != null) {
- // UPDATE-based export.
- manager.updateTable(context);
+ if (options.getUpdateMode() == UpdateMode.UpdateOnly) {
+ // UPDATE-based export.
+ manager.updateTable(context);
+ } else {
+ // Mixed update/insert export
+ manager.upsertTable(context);
+ }
} else {
// INSERT-based export.
manager.exportTable(context);
@@ -84,26 +89,8 @@ public int run(SqoopOptions options) {
codeGenerator.setManager(manager);
- String updateKeyCol = options.getUpdateKeyCol();
- if (updateKeyCol != null) {
- // We're in update mode. We need to explicitly set the database output
- // column ordering in the codeGenerator. The UpdateKeyCol must come
- // last, because the UPDATE-based OutputFormat will generate the SET
- // clause followed by the WHERE clause, and the SqoopRecord needs to
- // serialize to this layout.
- String [] allColNames = manager.getColumnNames(options.getTableName());
- List<String> dbOutCols = new ArrayList<String>();
- String upperCaseKeyCol = updateKeyCol.toUpperCase();
- for (String col : allColNames) {
- if (!upperCaseKeyCol.equals(col.toUpperCase())) {
- dbOutCols.add(col); // add non-key columns to the output order list.
- }
- }
-
- // Then add the update key column last.
- dbOutCols.add(updateKeyCol);
- options.setDbOutputColumns(dbOutCols.toArray(
- new String[dbOutCols.size()]));
+ if (options.getUpdateKeyCol() != null) {
+ manager.configureDbOutputColumns(options);
}
try {
@@ -174,6 +161,13 @@ protected RelatedOptions getExportOptions() {
+ "to be executed in batch mode")
.withLongOpt(BATCH_ARG)
.create());
+ exportOpts.addOption(OptionBuilder
+ .withArgName("mode")
+ .hasArg()
+ .withDescription("Specifies how updates are performed when "
+ + "new rows are found with non-matching keys in database")
+ .withLongOpt(UPDATE_MODE_ARG)
+ .create());
return exportOpts;
}
@@ -257,6 +251,7 @@ public void applyOptions(CommandLine in, SqoopOptions out)
out.setClearStagingTable(true);
}
+ applyNewUpdateOptions(in, out);
applyInputFormatOptions(in, out);
applyOutputFormatOptions(in, out);
applyOutputFormatOptions(in, out);
@@ -335,5 +330,21 @@ public void validateOptions(SqoopOptions options)
validateCommonOptions(options);
validateCodeGenOptions(options);
}
+
+ private void applyNewUpdateOptions(CommandLine in, SqoopOptions out)
+ throws InvalidOptionsException {
+ if (in.hasOption(UPDATE_MODE_ARG)) {
+ String updateTypeStr = in.getOptionValue(UPDATE_MODE_ARG);
+ if ("updateonly".equals(updateTypeStr)) {
+ out.setUpdateMode(UpdateMode.UpdateOnly);
+ } else if ("allowinsert".equals(updateTypeStr)) {
+ out.setUpdateMode(UpdateMode.AllowInsert);
+ } else {
+ throw new InvalidOptionsException("Unknown new update mode: "
+ + updateTypeStr + ". Use 'updateonly' or 'allowinsert'."
+ + HELP_STR);
+ }
+ }
+ }
}
View
65 src/java/com/cloudera/sqoop/tool/ImportTool.java
@@ -20,13 +20,12 @@
import java.io.IOException;
-import java.math.BigDecimal;
-
import java.sql.Connection;
import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.sql.Statement;
-import java.sql.Timestamp;
+import java.sql.Types;
import java.util.List;
import java.util.Map;
@@ -66,6 +65,9 @@
// a single table).
private boolean allTables;
+ // store check column type for incremental option
+ private int checkColumnType;
+
public ImportTool() {
this("import", false);
}
@@ -159,7 +161,7 @@ private void saveIncrementalState(SqoopOptions options)
* Return the max value in the incremental-import test column. This
* value must be numeric.
*/
- private BigDecimal getMaxColumnId(SqoopOptions options) throws SQLException {
+ private Object getMaxColumnId(SqoopOptions options) throws SQLException {
StringBuilder sb = new StringBuilder();
sb.append("SELECT MAX(");
sb.append(options.getIncrementalTestColumn());
@@ -184,7 +186,17 @@ private BigDecimal getMaxColumnId(SqoopOptions options) throws SQLException {
return null;
}
- return rs.getBigDecimal(1);
+ ResultSetMetaData rsmd = rs.getMetaData();
+ checkColumnType = rsmd.getColumnType(1);
+ if (checkColumnType == Types.TIMESTAMP) {
+ return rs.getTimestamp(1);
+ } else if (checkColumnType == Types.DATE) {
+ return rs.getDate(1);
+ } else if (checkColumnType == Types.TIME) {
+ return rs.getTime(1);
+ } else {
+ return rs.getObject(1);
+ }
} finally {
try {
if (null != rs) {
@@ -205,6 +217,16 @@ private BigDecimal getMaxColumnId(SqoopOptions options) throws SQLException {
}
/**
+ * Determine if a column is date/time.
+ * @return true if column type is TIMESTAMP, DATE, or TIME.
+ */
+ private boolean isDateTimeColumn(int columnType) {
+ return (columnType == Types.TIMESTAMP)
+ || (columnType == Types.DATE)
+ || (columnType == Types.TIME);
+ }
+
+ /**
* Initialize the constraints which set the incremental import range.
* @return false if an import is not necessary, because the dataset has not
* changed.
@@ -224,24 +246,30 @@ private boolean initIncrementalConstraints(SqoopOptions options,
SqoopOptions.IncrementalMode incrementalMode = options.getIncrementalMode();
String nextIncrementalValue = null;
+ Object nextVal;
switch (incrementalMode) {
case AppendRows:
try {
- BigDecimal nextVal = getMaxColumnId(options);
- if (null != nextVal) {
- nextIncrementalValue = nextVal.toString();
+ nextVal = getMaxColumnId(options);
+ if (isDateTimeColumn(checkColumnType)) {
+ nextIncrementalValue = (nextVal == null) ? null
+ : manager.datetimeToQueryString(nextVal.toString(),
+ checkColumnType);
+ } else {
+ nextIncrementalValue = (nextVal == null) ? null : nextVal.toString();
}
} catch (SQLException sqlE) {
throw new IOException(sqlE);
}
break;
case DateLastModified:
- Timestamp dbTimestamp = manager.getCurrentDbTimestamp();
- if (null == dbTimestamp) {
+ checkColumnType = Types.TIMESTAMP;
+ nextVal = manager.getCurrentDbTimestamp();
+ if (null == nextVal) {
throw new IOException("Could not get current time from database");
}
-
- nextIncrementalValue = manager.timestampToQueryString(dbTimestamp);
+ nextIncrementalValue = manager.datetimeToQueryString(nextVal.toString(),
+ checkColumnType);
break;
default:
throw new ImportException("Undefined incremental import type: "
@@ -253,12 +281,13 @@ private boolean initIncrementalConstraints(SqoopOptions options,
StringBuilder sb = new StringBuilder();
String prevEndpoint = options.getIncrementalLastValue();
- if (incrementalMode == SqoopOptions.IncrementalMode.DateLastModified
- && null != prevEndpoint && !prevEndpoint.contains("\'")) {
- // Incremental imports based on timestamps should be 'quoted' in
+ if (isDateTimeColumn(checkColumnType) && null != prevEndpoint
+ && !prevEndpoint.startsWith("\'") && !prevEndpoint.endsWith("\'")) {
+ // Incremental imports based on date/time should be 'quoted' in
// ANSI SQL. If the user didn't specify single-quotes, put them
// around, here.
- prevEndpoint = "'" + prevEndpoint + "'";
+ prevEndpoint = manager.datetimeToQueryString(prevEndpoint,
+ checkColumnType);
}
String checkColName = manager.escapeColName(
@@ -320,7 +349,8 @@ private boolean initIncrementalConstraints(SqoopOptions options,
if (null == recordOptions) {
recordOptions = options;
}
- recordOptions.setIncrementalLastValue(nextIncrementalValue);
+ recordOptions.setIncrementalLastValue(
+ (nextVal == null) ? null : nextVal.toString());
return true;
}
@@ -839,6 +869,7 @@ public void validateOptions(SqoopOptions options)
validateCodeGenOptions(options);
validateOutputFormatOptions(options);
validateHBaseOptions(options);
+ validateHiveOptions(options);
}
}
View
106 src/test/com/cloudera/sqoop/TestAvroImport.java
@@ -19,19 +19,20 @@
package com.cloudera.sqoop;
import java.io.IOException;
-import java.sql.Connection;
+import java.nio.ByteBuffer;
import java.sql.SQLException;
-import java.sql.Statement;
import java.util.ArrayList;
import java.util.List;
import org.apache.avro.Schema;
import org.apache.avro.Schema.Field;
+import org.apache.avro.Schema.Type;
import org.apache.avro.file.DataFileReader;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.DatumReader;
import org.apache.avro.mapred.FsInput;
+import org.apache.avro.util.Utf8;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@@ -63,7 +64,7 @@
}
args.add("--table");
- args.add(HsqldbTestServer.getTableName());
+ args.add(getTableName());
args.add("--connect");
args.add(HsqldbTestServer.getUrl());
args.add("--warehouse-dir");
@@ -75,13 +76,14 @@
return args.toArray(new String[0]);
}
- // this test just uses the two int table.
- protected String getTableName() {
- return HsqldbTestServer.getTableName();
- }
-
public void testAvroImport() throws IOException {
+ String [] types = { "BIT", "INTEGER", "BIGINT", "REAL", "DOUBLE",
+ "VARCHAR(6)", "VARBINARY(2)", };
+ String [] vals = { "true", "100", "200", "1.0", "2.0",
+ "'s'", "'0102'", };
+ createTableWithColTypes(types, vals);
+
runImport(getOutputArgv(true));
Path outputFile = new Path(getTablePath(), "part-m-00000.avro");
@@ -89,72 +91,50 @@ public void testAvroImport() throws IOException {
Schema schema = reader.getSchema();
assertEquals(Schema.Type.RECORD, schema.getType());
List<Field> fields = schema.getFields();
- assertEquals(2, fields.size());
-
- assertEquals("INTFIELD1", fields.get(0).name());
- assertEquals(Schema.Type.UNION, fields.get(0).schema().getType());
- assertEquals(Schema.Type.INT,
- fields.get(0).schema().getTypes().get(0).getType());
- assertEquals(Schema.Type.NULL,
- fields.get(0).schema().getTypes().get(1).getType());
-
- assertEquals("INTFIELD2", fields.get(1).name());
- assertEquals(Schema.Type.UNION, fields.get(1).schema().getType());
- assertEquals(Schema.Type.INT,
- fields.get(1).schema().getTypes().get(0).getType());
- assertEquals(Schema.Type.NULL,
- fields.get(1).schema().getTypes().get(1).getType());
+ assertEquals(types.length, fields.size());
+
+ checkField(fields.get(0), "DATA_COL0", Schema.Type.BOOLEAN);
+ checkField(fields.get(1), "DATA_COL1", Schema.Type.INT);
+ checkField(fields.get(2), "DATA_COL2", Schema.Type.LONG);
+ checkField(fields.get(3), "DATA_COL3", Schema.Type.FLOAT);
+ checkField(fields.get(4), "DATA_COL4", Schema.Type.DOUBLE);
+ checkField(fields.get(5), "DATA_COL5", Schema.Type.STRING);
+ checkField(fields.get(6), "DATA_COL6", Schema.Type.BYTES);
GenericRecord record1 = reader.next();
- assertEquals(1, record1.get("INTFIELD1"));
- assertEquals(8, record1.get("INTFIELD2"));
+ assertEquals("DATA_COL0", true, record1.get("DATA_COL0"));
+ assertEquals("DATA_COL1", 100, record1.get("DATA_COL1"));
+ assertEquals("DATA_COL2", 200L, record1.get("DATA_COL2"));
+ assertEquals("DATA_COL3", 1.0f, record1.get("DATA_COL3"));
+ assertEquals("DATA_COL4", 2.0, record1.get("DATA_COL4"));
+ assertEquals("DATA_COL5", new Utf8("s"), record1.get("DATA_COL5"));
+ Object object = record1.get("DATA_COL6");
+ assertTrue(object instanceof ByteBuffer);
+ ByteBuffer b = ((ByteBuffer) object);
+ assertEquals((byte) 1, b.get(0));
+ assertEquals((byte) 2, b.get(1));
+ }
+
+ private void checkField(Field field, String name, Type type) {
+ assertEquals(name, field.name());
+ assertEquals(Schema.Type.UNION, field.schema().getType());
+ assertEquals(type, field.schema().getTypes().get(0).getType());
+ assertEquals(Schema.Type.NULL, field.schema().getTypes().get(1).getType());
}
public void testNullableAvroImport() throws IOException, SQLException {
- addNullRecord(); // Add a pair of NULL values to twointtable.
+ String [] types = { "INT" };
+ String [] vals = { null };
+ createTableWithColTypes(types, vals);
+
runImport(getOutputArgv(true));
Path outputFile = new Path(getTablePath(), "part-m-00000.avro");
DataFileReader<GenericRecord> reader = read(outputFile);
- boolean foundNullRecord = false;
-
- // Iterate thru the records in the output file til we find one that
- // matches (NULL, NULL).
- for (GenericRecord record : reader) {
- LOG.debug("Input record: " + record);
- if (record.get("INTFIELD1") == null && record.get("INTFIELD2") == null) {
- LOG.debug("Got null record");
- foundNullRecord = true;
- }
- }
- assertTrue(foundNullRecord);
- }
+ GenericRecord record1 = reader.next();
+ assertNull(record1.get("DATA_COL0"));
- /**
- * Add a record to the TWOINTTABLE that contains (NULL, NULL).
- *
- * @throws SQLException if there's a problem doing the INSERT statement.
- */
- private void addNullRecord() throws SQLException {
- Connection connection = null;
- Statement st = null;
- try {
- connection = this.getManager().getConnection();
- st = connection.createStatement();
- st.executeUpdate("INSERT INTO " + getTableName()
- + " VALUES(NULL, NULL)");
-
- connection.commit();
- } finally {
- if (null != st) {
- st.close();
- }
-
- if (null != connection) {
- connection.close();
- }
- }
}
private DataFileReader<GenericRecord> read(Path filename) throws IOException {
View
97 src/test/com/cloudera/sqoop/TestAvroImportExportRoundtrip.java
@@ -44,6 +44,16 @@
public static final Log LOG = LogFactory
.getLog(TestAvroImportExportRoundtrip.class.getName());
+ public void testRoundtripQuery() throws IOException, SQLException {
+ String[] argv = {};
+
+ runImport(getOutputArgvForQuery(true));
+ deleteTableData();
+ runExport(getExportArgvForQuery(true, 10, 10, newStrArray(argv, "-m", "" + 1)));
+
+ checkFirstColumnSum();
+ }
+
public void testRoundtrip() throws IOException, SQLException {
String[] argv = {};
@@ -81,13 +91,79 @@ public void testRoundtrip() throws IOException, SQLException {
/**
* Create the argv to pass to Sqoop.
+ *
+ * @return the argv as an array of strings.
+ */
+ protected String[] getOutputArgvForQuery(boolean includeHadoopFlags) {
+ ArrayList<String> args = new ArrayList<String>();
+
+ if (includeHadoopFlags) {
+ CommonArgs.addHadoopFlags(args);
+ }
+
+ args.add("--query");
+ args.add("select * from " + HsqldbTestServer.getTableName() + " where $CONDITIONS");
+ args.add("--connect");
+ args.add(HsqldbTestServer.getUrl());
+ args.add("--target-dir");
+ args.add(getWarehouseDir() + "/query_result");
+ args.add("--split-by");
+ args.add("INTFIELD1");
+ args.add("--as-avrodatafile");
+
+ return args.toArray(new String[0]);
+ }
+
+ protected String [] getExportArgv(boolean includeHadoopFlags,
+ int rowsPerStmt, int statementsPerTx, String... additionalArgv) {
+ ArrayList<String> args = formatAdditionalArgs(additionalArgv);
+
+ args.add("--table");
+ args.add(getTableName());
+ args.add("--export-dir");
+ args.add(getTablePath().toString());
+ args.add("--connect");
+ args.add(getConnectString());
+ args.add("-m");
+ args.add("1");
+
+ LOG.debug("args:");
+ for (String a : args) {
+ LOG.debug(" " + a);
+ }
+
+ return args.toArray(new String[0]);
+ }
+
+ protected String [] getExportArgvForQuery(boolean includeHadoopFlags,
+ int rowsPerStmt, int statementsPerTx, String... additionalArgv) {
+ ArrayList<String> args = formatAdditionalArgs(additionalArgv);
+
+ args.add("--table");
+ args.add(getTableName());
+ args.add("--export-dir");
+ args.add(getWarehouseDir() + "/query_result");
+ args.add("--connect");
+ args.add(getConnectString());
+ args.add("-m");
+ args.add("1");
+
+ LOG.debug("args:");
+ for (String a : args) {
+ LOG.debug(" " + a);
+ }
+
+ return args.toArray(new String[0]);
+ }
+
+ /**
+ * Create the argv to pass to Sqoop.
* @param includeHadoopFlags if true, then include -D various.settings=values
* @param rowsPerStmt number of rows to export in a single INSERT statement.
* @param statementsPerTx ## of statements to use in a transaction.
* @return the argv as an array of strings.
*/
- protected String [] getExportArgv(boolean includeHadoopFlags,
- int rowsPerStmt, int statementsPerTx, String... additionalArgv) {
+ protected ArrayList<String> formatAdditionalArgs(String... additionalArgv) {
ArrayList<String> args = new ArrayList<String>();
// Any additional Hadoop flags (-D foo=bar) are prepended.
@@ -120,22 +196,7 @@ public void testRoundtrip() throws IOException, SQLException {
}
}
}
-
- args.add("--table");
- args.add(getTableName());
- args.add("--export-dir");
- args.add(getTablePath().toString());
- args.add("--connect");
- args.add(getConnectString());
- args.add("-m");
- args.add("1");
-
- LOG.debug("args:");
- for (String a : args) {
- LOG.debug(" " + a);
- }
-
- return args.toArray(new String[0]);
+ return args;
}
// this test just uses the two int table.
View
54 src/test/com/cloudera/sqoop/TestIncrementalImport.java
@@ -347,6 +347,14 @@ public void runImport(SqoopOptions options, List<String> args) {
*/
private List<String> getArgListForTable(String tableName, boolean commonArgs,
boolean isAppend) {
+ return getArgListForTable(tableName, commonArgs, isAppend, false);
+ }
+
+ /**
+ * Return a list of arguments to import the specified table.
+ */
+ private List<String> getArgListForTable(String tableName, boolean commonArgs,
+ boolean isAppend, boolean appendTimestamp) {
List<String> args = new ArrayList<String>();
if (commonArgs) {
CommonArgs.addHadoopFlags(args);
@@ -360,8 +368,13 @@ public void runImport(SqoopOptions options, List<String> args) {
if (isAppend) {
args.add("--incremental");
args.add("append");
- args.add("--check-column");
- args.add("id");
+ if (!appendTimestamp) {
+ args.add("--check-column");
+ args.add("id");
+ } else {
+ args.add("--check-column");
+ args.add("last_modified");
+ }
} else {
args.add("--incremental");
args.add("lastmodified");
@@ -785,5 +798,42 @@ public void testTimestampBoundary() throws Exception {
runJob(TABLE_NAME);
assertDirOfNumbers(TABLE_NAME, 20);
}
+
+ public void testIncrementalAppendTimestamp() throws Exception {
+ // Run an import, and then insert rows with the last-modified timestamp
+ // set to the exact time when the first import runs. Run a second import
+ // and ensure that we pick up the new data.
+
+ long now = System.currentTimeMillis();
+
+ final String TABLE_NAME = "incrementalAppendTimestamp";
+ Timestamp thePast = new Timestamp(now - 100);
+ createTimestampTable(TABLE_NAME, 10, thePast);
+
+ Timestamp firstJobTime = new Timestamp(now);
+ InstrumentHsqldbManager.setCurrentDbTimestamp(firstJobTime);
+
+ // Configure the job to use the instrumented Hsqldb manager.
+ Configuration conf = newConf();
+ conf.set(ConnFactory.FACTORY_CLASS_NAMES_KEY,
+ InstrumentHsqldbManagerFactory.class.getName());
+
+ List<String> args = getArgListForTable(TABLE_NAME, false, true, true);
+ createJob(TABLE_NAME, args, conf);
+ runJob(TABLE_NAME);
+ assertDirOfNumbers(TABLE_NAME, 10);
+
+ // Add some more rows with the timestamp equal to the job run timestamp.
+ insertIdTimestampRows(TABLE_NAME, 10, 20, firstJobTime);
+ assertRowCount(TABLE_NAME, 20);
+
+ // Run a second job with the clock advanced by 100 ms.
+ Timestamp secondJobTime = new Timestamp(now + 100);
+ InstrumentHsqldbManager.setCurrentDbTimestamp(secondJobTime);
+
+ // Import only those rows.
+ runJob(TABLE_NAME);
+ assertDirOfNumbers(TABLE_NAME, 20);
+ }
}
View
73 src/test/com/cloudera/sqoop/hive/TestHiveImport.java
@@ -32,6 +32,7 @@
import org.junit.Test;
import com.cloudera.sqoop.SqoopOptions;
+import com.cloudera.sqoop.SqoopOptions.InvalidOptionsException;
import com.cloudera.sqoop.testutil.CommonArgs;
import com.cloudera.sqoop.testutil.HsqldbTestServer;
import com.cloudera.sqoop.testutil.ImportJobTestCase;
@@ -40,6 +41,7 @@
import com.cloudera.sqoop.tool.CreateHiveTableTool;
import com.cloudera.sqoop.tool.ImportTool;
import com.cloudera.sqoop.tool.SqoopTool;
+import org.apache.commons.cli.ParseException;
/**
* Test HiveImport capability after an import to HDFS.
@@ -363,6 +365,77 @@ public void testFieldWithHiveDelims() throws IOException,
* Test hive import with row that has new line in it.
*/
@Test
+ public void testFieldWithHiveDelimsReplacement() throws IOException,
+ InterruptedException {
+ final String TABLE_NAME = "FIELD_WITH_NL_REPLACEMENT_HIVE_IMPORT";
+
+ LOG.info("Doing import of single row into "
+ + "FIELD_WITH_NL_REPLACEMENT_HIVE_IMPORT table");
+ setCurTableName(TABLE_NAME);
+ setNumCols(3);
+ String[] types = { "VARCHAR(32)", "INTEGER", "CHAR(64)" };
+ String[] vals = { "'test with\nnew lines\n'", "42",
+ "'oh no " + '\01' + " field delims " + '\01' + "'", };
+ String[] moreArgs = { "--"+BaseSqoopTool.HIVE_DELIMS_REPLACEMENT_ARG, " "};
+
+ runImportTest(TABLE_NAME, types, vals,
+ "fieldWithNewlineReplacementImport.q", getArgv(false, moreArgs),
+ new ImportTool());
+
+ LOG.info("Validating data in single row is present in: "
+ + "FIELD_WITH_NL_REPLACEMENT_HIVE_IMPORT table");
+
+ // Ideally, we would actually invoke hive code to verify that record with
+ // record and field delimiters have values replaced and that we have the
+ // proper number of hive records. Unfortunately, this is a non-trivial task,
+ // and better dealt with at an integration test level
+ //
+ // Instead, this assumes the path of the generated table and just validate
+ // map job output.
+
+ // Get and read the raw output file
+ String whDir = getWarehouseDir();
+ File p = new File(new File(whDir, TABLE_NAME), "part-m-00000");
+ File f = new File(p.toString());
+ FileReader fr = new FileReader(f);
+ BufferedReader br = new BufferedReader(fr);
+ try {
+ // verify the output
+ assertEquals(br.readLine(), "test with new lines " + '\01' + "42"
+ + '\01' + "oh no field delims ");
+ assertEquals(br.readLine(), null); // should only be one line
+ } catch (IOException ioe) {
+ fail("Unable to read files generated from hive");
+ } finally {
+ br.close();
+ }
+ }
+
+ /**
+ * Test hive drop and replace option validation.
+ */
+ @Test
+ public void testHiveDropAndReplaceOptionValidation() throws ParseException {
+ LOG.info("Testing conflicting Hive delimiter drop/replace options");
+
+ setNumCols(3);
+ String[] moreArgs = { "--"+BaseSqoopTool.HIVE_DELIMS_REPLACEMENT_ARG, " ",
+ "--"+BaseSqoopTool.HIVE_DROP_DELIMS_ARG, };
+
+ ImportTool tool = new ImportTool();
+ try {
+ tool.validateOptions(tool.parseArguments(getArgv(false, moreArgs), null,
+ null, true));
+ fail("Expected InvalidOptionsException");
+ } catch (InvalidOptionsException ex) {
+ /* success */
+ }
+ }
+
+ /**
+ * Test hive import with row that has new line in it.
+ */
+ @Test
public void testImportHiveWithPartitions() throws IOException,
InterruptedException {
final String TABLE_NAME = "PARTITION_HIVE_IMPORT";
View
14 src/test/com/cloudera/sqoop/manager/OracleExportTest.java
@@ -263,4 +263,18 @@ public void testDatesAndTimes() throws IOException, SQLException {
assertColMinAndMax(forIdx(1), genTime);
}
}
+
+ /** Make sure mixed update/insert export work correctly. */
+ public void testUpsertTextExport() throws IOException, SQLException {
+ final int TOTAL_RECORDS = 10;
+ createTextFile(0, TOTAL_RECORDS, false);
+ createTable();
+ // first time will be insert.
+ runExport(getArgv(true, 10, 10, newStrArray(null,
+ "--update-key", "ID", "--update-mode", "allowinsert")));
+ // second time will be update.
+ runExport(getArgv(true, 10, 10, newStrArray(null,
+ "--update-key", "ID", "--update-mode", "allowinsert")));
+ verifyExport(TOTAL_RECORDS);
+ }
}
View
2  testdata/hive/scripts/fieldWithNewlineReplacementImport.q
@@ -0,0 +1,2 @@
+CREATE TABLE IF NOT EXISTS `FIELD_WITH_NL_REPLACEMENT_HIVE_IMPORT` ( `DATA_COL0` STRING, `DATA_COL1` INT, `DATA_COL2` STRING) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\001' LINES TERMINATED BY '\012' STORED AS TEXTFILE;
+LOAD DATA INPATH 'file:BASEPATH/sqoop/warehouse/FIELD_WITH_NL_REPLACEMENT_HIVE_IMPORT' INTO TABLE `FIELD_WITH_NL_REPLACEMENT_HIVE_IMPORT`;

No commit comments for this range

Something went wrong with that request. Please try again.