Skip to content

Commit

Permalink
DB-9843 External Tables: Own directory partition parsing (#3822)
Browse files Browse the repository at this point in the history
* DB-9843 External Tables: Own directory partition parsing

fixing issues:
- DB-9843: Don't copy files for schema infer, but also don't check all files for schema -> overall faster
- DB-10568 Partitioned by using VARCHAR but value is 1 will be infered as INT and can't be read

* DB-9843 add unit tests

* DB-9843 address review

* DB-9843 fix mem

* DB-9843 fix rebase issues, add unit test getCsvOptions

* DB-9843 fix spotbugs

* DB-9843 align with 2.8 code

* DB-9843 parameter DateFormat to avoid hdp/cdp differences

* DB-9843 fix findbugs-exclude-filter.xml

* DB-9843 fix testStringIntPartitionParsing, move license

* DB-9843 address code review
  • Loading branch information
martinrupp committed Nov 23, 2020
1 parent 063ce02 commit 4f49e56
Show file tree
Hide file tree
Showing 23 changed files with 1,491 additions and 655 deletions.
4 changes: 4 additions & 0 deletions findbugs-exclude-filter.xml
Expand Up @@ -128,5 +128,9 @@
<Class name="com.splicemachine.spark2.splicemachine.package$"/>
<Bug pattern="NM_METHOD_NAMING_CONVENTION"/>
</Match>
<!-- SplicePartitioningUtils is external code <Class name="~SplicePartitioningUtils"/> -->
<Match>
<Class name="~com\.splicemachine\.spark\.splicemachine\.SplicePartitioningUtils.*"/>
</Match>

</FindBugsFilter>

Large diffs are not rendered by default.

Expand Up @@ -19,9 +19,7 @@
import com.splicemachine.db.iapi.sql.ResultColumnDescriptor;
import com.splicemachine.db.iapi.sql.execute.ExecRow;
import com.splicemachine.db.iapi.types.DataValueDescriptor;
import com.splicemachine.db.iapi.types.SQLLongint;
import com.splicemachine.db.impl.sql.compile.ExplainNode;
import com.splicemachine.db.impl.sql.execute.ValueRow;
import com.splicemachine.derby.iapi.sql.execute.SpliceOperation;
import com.splicemachine.derby.impl.SpliceSpark;
import com.splicemachine.derby.impl.sql.execute.operations.DMLWriteOperation;
Expand All @@ -31,18 +29,7 @@
import com.splicemachine.derby.impl.sql.execute.operations.export.ExportOperation;
import com.splicemachine.derby.impl.sql.execute.operations.framework.SpliceGenericAggregator;
import com.splicemachine.derby.impl.sql.execute.operations.window.WindowContext;
import com.splicemachine.derby.stream.function.AbstractSpliceFunction;
import com.splicemachine.derby.stream.function.CountWriteFunction;
import com.splicemachine.derby.stream.function.ExportFunction;
import com.splicemachine.derby.stream.function.LocatedRowToRowFunction;
import com.splicemachine.derby.stream.function.LocatedRowToRowAvroFunction;
import com.splicemachine.derby.stream.function.RowToLocatedRowFunction;
import com.splicemachine.derby.stream.function.SpliceFlatMapFunction;
import com.splicemachine.derby.stream.function.SpliceFunction;
import com.splicemachine.derby.stream.function.SpliceFunction2;
import com.splicemachine.derby.stream.function.SplicePairFunction;
import com.splicemachine.derby.stream.function.SplicePredicateFunction;
import com.splicemachine.derby.stream.function.TakeFunction;
import com.splicemachine.derby.stream.function.*;
import com.splicemachine.derby.stream.iapi.*;
import com.splicemachine.derby.stream.output.BulkDeleteDataSetWriterBuilder;
import com.splicemachine.derby.stream.output.BulkInsertDataSetWriterBuilder;
Expand All @@ -51,7 +38,6 @@
import com.splicemachine.derby.stream.output.InsertDataSetWriterBuilder;
import com.splicemachine.derby.stream.output.UpdateDataSetWriterBuilder;
import com.splicemachine.derby.stream.output.*;
import com.splicemachine.derby.stream.utils.ExternalTableUtils;
import com.splicemachine.spark.splicemachine.ShuffleUtils;
import com.splicemachine.system.CsvOptions;
import com.splicemachine.utils.ByteDataInput;
Expand All @@ -70,10 +56,8 @@
import org.apache.hadoop.mapreduce.security.TokenCache;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
Expand Down Expand Up @@ -789,43 +773,17 @@ public DataSet<ExecRow> writeAvroFile(DataSetProcessor dsp,
String compression,
OperationContext context) throws StandardException
{
compression = SparkDataSet.getAvroCompression(compression);

StructType dataSchema = null;
StructType tableSchema = generateTableSchema(context);

// what is this? why is this so different from parquet/orc ?
// actually very close to NativeSparkDataSet.writeFile
dataSchema = ExternalTableUtils.getDataSchema(dsp, tableSchema, partitionBy, location, "a");

StructType dataSchema = SparkExternalTableUtil.getDataSchemaAvro(dsp, tableSchema, partitionBy, location);
if (dataSchema == null)
dataSchema = tableSchema;

Dataset<Row> insertDF = SpliceSpark.getSession().createDataFrame(
rdd.map(new SparkSpliceFunctionWrapper<>(new CountWriteFunction(context))).map(new LocatedRowToRowAvroFunction()),
rdd.map(new SparkSpliceFunctionWrapper<>(new EmptyFunction())).map(new LocatedRowToRowAvroFunction()),
dataSchema);


// We duplicate the code in NativeSparkDataset.writeAvroFile here to avoid calling ExternalTableUtils.getDataSchema() twice
List<String> partitionByCols = new ArrayList();
for (int i = 0; i < partitionBy.length; i++) {
partitionByCols.add(dataSchema.fields()[partitionBy[i]].name());
}
if (partitionBy.length > 0) {
List<Column> repartitionCols = new ArrayList();
for (int i = 0; i < partitionBy.length; i++) {
repartitionCols.add(new Column(dataSchema.fields()[partitionBy[i]].name()));
}
insertDF = insertDF.repartition(scala.collection.JavaConversions.asScalaBuffer(repartitionCols).toList());
}
if (compression.equals("none")) {
compression = "uncompressed";
}
insertDF.write().option(SPARK_COMPRESSION_OPTION,compression).partitionBy(partitionByCols.toArray(new String[partitionByCols.size()]))
.mode(SaveMode.Append).format("com.databricks.spark.avro").save(location);
ValueRow valueRow=new ValueRow(1);
valueRow.setColumn(1,new SQLLongint(context.getRecordsWritten()));
return new SparkDataSet<>(SpliceSpark.getContext().parallelize(Collections.singletonList(valueRow), 1));
return new NativeSparkDataSet<>(insertDF, context)
.writeAvroFile(dataSchema, partitionBy, location, compression, context);
}

@SuppressWarnings({ "unchecked", "rawtypes" })
Expand All @@ -838,31 +796,13 @@ public NativeSparkDataSet getNativeSparkDataSet( OperationContext context) throw
return new NativeSparkDataSet<>(insertDF, context);
}

static String getParquetCompression(String compression )
{
// parquet in spark supports: lz4, gzip, lzo, snappy, none, zstd.
if( compression.equals("zlib") )
compression = "gzip";
return compression;
}

public static String getAvroCompression(String compression) {
// avro supports uncompressed, snappy, deflate, bzip2 and xz
if (compression.equals("none"))
compression = "uncompressed";
else if( compression.equals("zlib"))
compression = "deflate";
return compression;
}

@Override
public DataSet<ExecRow> writeParquetFile(DataSetProcessor dsp,
int[] partitionBy,
String location,
String compression,
OperationContext context) throws StandardException {

compression = getParquetCompression( compression );
return getNativeSparkDataSet( context )
.writeParquetFile(dsp, partitionBy, location, compression, context);
}
Expand Down

0 comments on commit 4f49e56

Please sign in to comment.