Skip to content

Commit

Permalink
[FLINK-18130][hive][fs-connector] File name conflict for different jo…
Browse files Browse the repository at this point in the history
…bs in filesystem/hive sink ()


This closes apache#12485
  • Loading branch information
JingsongLi authored and zhangjun committed Jul 7, 2020
1 parent f492ed5 commit 2e16970
Show file tree
Hide file tree
Showing 7 changed files with 132 additions and 11 deletions.
Expand Up @@ -77,6 +77,7 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;

import static org.apache.flink.table.filesystem.FileSystemOptions.SINK_ROLLING_POLICY_FILE_SIZE;
import static org.apache.flink.table.filesystem.FileSystemOptions.SINK_ROLLING_POLICY_TIME_INTERVAL;
Expand Down Expand Up @@ -141,9 +142,10 @@ public final DataStreamSink consumeDataStream(DataStream dataStream) {
isCompressed);
String extension = Utilities.getFileExtension(jobConf, isCompressed,
(HiveOutputFormat<?, ?>) hiveOutputFormatClz.newInstance());
extension = extension == null ? "" : extension;
OutputFileConfig outputFileConfig = OutputFileConfig.builder()
.withPartSuffix(extension).build();
.withPartPrefix("part-" + UUID.randomUUID().toString())
.withPartSuffix(extension == null ? "" : extension)
.build();
if (isBounded) {
FileSystemOutputFormat.Builder<Row> builder = new FileSystemOutputFormat.Builder<>();
builder.setPartitionComputer(new HiveRowPartitionComputer(
Expand Down
Expand Up @@ -45,6 +45,8 @@
import org.apache.flink.table.types.utils.TypeConversions;
import org.apache.flink.types.Row;

import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;

import com.klarna.hiverunner.HiveShell;
import com.klarna.hiverunner.annotations.HiveSQL;
import org.apache.hadoop.hive.conf.HiveConf;
Expand All @@ -60,6 +62,7 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
Expand Down Expand Up @@ -228,6 +231,25 @@ public void testWriteNullValues() throws Exception {
}
}

@Test
public void testBatchAppend() {
TableEnvironment tEnv = HiveTestUtils.createTableEnvWithBlinkPlannerBatchMode(SqlDialect.HIVE);
tEnv.registerCatalog(hiveCatalog.getName(), hiveCatalog);
tEnv.useCatalog(hiveCatalog.getName());
tEnv.executeSql("create database db1");
tEnv.useDatabase("db1");
try {
tEnv.executeSql("create table append_table (i int, j int)");
TableEnvUtil.execInsertSqlAndWaitResult(tEnv, "insert into append_table select 1, 1");
TableEnvUtil.execInsertSqlAndWaitResult(tEnv, "insert into append_table select 2, 2");
ArrayList<Row> rows = Lists.newArrayList(tEnv.executeSql("select * from append_table").collect());
rows.sort(Comparator.comparingInt(o -> (int) o.getField(0)));
Assert.assertEquals(Arrays.asList(Row.of(1, 1), Row.of(2, 2)), rows);
} finally {
tEnv.executeSql("drop database db1 cascade");
}
}

@Test(timeout = 120000)
public void testDefaultSerPartStreamingWrite() throws Exception {
testStreamingWrite(true, false, true, this::checkSuccessFiles);
Expand All @@ -253,6 +275,34 @@ public void testNonPartStreamingMrWrite() throws Exception {
testStreamingWrite(false, true, false, (p) -> {});
}

@Test(timeout = 120000)
public void testStreamingAppend() throws Exception {
testStreamingWrite(false, false, false, (p) -> {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
StreamTableEnvironment tEnv = HiveTestUtils.createTableEnvWithBlinkPlannerStreamMode(env);
tEnv.registerCatalog(hiveCatalog.getName(), hiveCatalog);
tEnv.useCatalog(hiveCatalog.getName());

TableEnvUtil.execInsertSqlAndWaitResult(
tEnv,
"insert into db1.sink_table select 6,'a','b','2020-05-03','12'");

assertBatch("db1.sink_table", Arrays.asList(
"1,a,b,2020-05-03,7",
"1,a,b,2020-05-03,7",
"2,p,q,2020-05-03,8",
"2,p,q,2020-05-03,8",
"3,x,y,2020-05-03,9",
"3,x,y,2020-05-03,9",
"4,x,y,2020-05-03,10",
"4,x,y,2020-05-03,10",
"5,x,y,2020-05-03,11",
"5,x,y,2020-05-03,11",
"6,a,b,2020-05-03,12"));
});
}

private void checkSuccessFiles(String path) {
File basePath = new File(path, "d=2020-05-03");
Assert.assertEquals(5, basePath.list().length);
Expand Down Expand Up @@ -317,6 +367,18 @@ private void testStreamingWrite(
tEnv.sqlQuery("select * from my_table"),
"sink_table");

assertBatch("db1.sink_table", Arrays.asList(
"1,a,b,2020-05-03,7",
"1,a,b,2020-05-03,7",
"2,p,q,2020-05-03,8",
"2,p,q,2020-05-03,8",
"3,x,y,2020-05-03,9",
"3,x,y,2020-05-03,9",
"4,x,y,2020-05-03,10",
"4,x,y,2020-05-03,10",
"5,x,y,2020-05-03,11",
"5,x,y,2020-05-03,11"));

// using batch table env to query.
List<String> results = new ArrayList<>();
TableEnvironment batchTEnv = HiveTestUtils.createTableEnvWithBlinkPlannerBatchMode();
Expand Down Expand Up @@ -346,6 +408,19 @@ private void testStreamingWrite(
}
}

private void assertBatch(String table, List<String> expected) {
// using batch table env to query.
List<String> results = new ArrayList<>();
TableEnvironment batchTEnv = HiveTestUtils.createTableEnvWithBlinkPlannerBatchMode();
batchTEnv.registerCatalog(hiveCatalog.getName(), hiveCatalog);
batchTEnv.useCatalog(hiveCatalog.getName());
batchTEnv.executeSql("select * from " + table).collect()
.forEachRemaining(r -> results.add(r.toString()));
results.sort(String::compareTo);
expected.sort(String::compareTo);
Assert.assertEquals(expected, results);
}

private RowTypeInfo createHiveDestTable(String dbName, String tblName, TableSchema tableSchema, int numPartCols) throws Exception {
CatalogTable catalogTable = createHiveCatalogTable(tableSchema, numPartCols);
hiveCatalog.createTable(new ObjectPath(dbName, tblName), catalogTable, false);
Expand Down
Expand Up @@ -236,6 +236,43 @@ trait FileSystemITCaseBase {
row(19, 3, "x19")
))
}

@Test
def testInsertAppend(): Unit = {
tableEnv.sqlUpdate("insert into partitionedTable select x, y, a, b from originalT")
tableEnv.execute("test1")

tableEnv.sqlUpdate("insert into partitionedTable select x, y, a, b from originalT")
tableEnv.execute("test2")

check(
"select y, b, x from partitionedTable where a=3",
Seq(
row(17, 1, "x17"),
row(18, 2, "x18"),
row(19, 3, "x19"),
row(17, 1, "x17"),
row(18, 2, "x18"),
row(19, 3, "x19")
))
}

@Test
def testInsertOverwrite(): Unit = {
tableEnv.sqlUpdate("insert overwrite partitionedTable select x, y, a, b from originalT")
tableEnv.execute("test1")

tableEnv.sqlUpdate("insert overwrite partitionedTable select x, y, a, b from originalT")
tableEnv.execute("test2")

check(
"select y, b, x from partitionedTable where a=3",
Seq(
row(17, 1, "x17"),
row(18, 2, "x18"),
row(19, 3, "x19")
))
}
}

object FileSystemITCaseBase {
Expand Down
Expand Up @@ -26,7 +26,7 @@ import org.apache.flink.table.planner.runtime.utils.{StreamingTestBase, TestSink
import org.apache.flink.types.Row

import org.junit.Assert.assertEquals
import org.junit.Before
import org.junit.{Before, Test}

import scala.collection.Seq

Expand Down Expand Up @@ -55,4 +55,8 @@ abstract class StreamFileSystemITCaseBase extends StreamingTestBase with FileSys
expectedResult.map(TestSinkUtil.rowToString(_)).sorted,
sink.getAppendResults.sorted)
}

// Streaming mode not support overwrite
@Test
override def testInsertOverwrite(): Unit = {}
}
Expand Up @@ -35,6 +35,7 @@
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
import org.apache.flink.streaming.api.functions.sink.filesystem.BucketAssigner;
import org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig;
import org.apache.flink.streaming.api.functions.sink.filesystem.PartFileInfo;
import org.apache.flink.streaming.api.functions.sink.filesystem.RollingPolicy;
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
Expand Down Expand Up @@ -63,6 +64,7 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;

import static org.apache.flink.table.filesystem.FileSystemOptions.SINK_PARTITION_COMMIT_POLICY_KIND;
import static org.apache.flink.table.filesystem.FileSystemOptions.SINK_ROLLING_POLICY_FILE_SIZE;
Expand Down Expand Up @@ -126,6 +128,9 @@ public final DataStreamSink<RowData> consumeDataStream(DataStream<RowData> dataS
partitionKeys.toArray(new String[0]));

EmptyMetaStoreFactory metaStoreFactory = new EmptyMetaStoreFactory(path);
OutputFileConfig outputFileConfig = OutputFileConfig.builder()
.withPartPrefix("part-" + UUID.randomUUID().toString())
.build();

if (isBounded) {
FileSystemOutputFormat.Builder<RowData> builder = new FileSystemOutputFormat.Builder<>();
Expand All @@ -137,6 +142,7 @@ public final DataStreamSink<RowData> consumeDataStream(DataStream<RowData> dataS
builder.setOverwrite(overwrite);
builder.setStaticPartitions(staticPartitions);
builder.setTempPath(toStagingPath());
builder.setOutputFileConfig(outputFileConfig);
return dataStream.writeUsingOutputFormat(builder.build())
.setParallelism(dataStream.getParallelism());
} else {
Expand All @@ -155,12 +161,14 @@ public final DataStreamSink<RowData> consumeDataStream(DataStream<RowData> dataS
bucketsBuilder = StreamingFileSink.forRowFormat(
path, new ProjectionEncoder((Encoder<RowData>) writer, computer))
.withBucketAssigner(assigner)
.withOutputFileConfig(outputFileConfig)
.withRollingPolicy(rollingPolicy);
} else {
//noinspection unchecked
bucketsBuilder = StreamingFileSink.forBulkFormat(
path, new ProjectionBulkFactory((BulkWriter.Factory<RowData>) writer, computer))
.withBucketAssigner(assigner)
.withOutputFileConfig(outputFileConfig)
.withRollingPolicy(rollingPolicy);
}
return createStreamingSink(
Expand Down
Expand Up @@ -103,7 +103,7 @@ private void overwrite(Path destDir) throws Exception {
}

/**
* Moves files from srcDir to destDir. Delete files in destDir first when overwrite.
* Moves files from srcDir to destDir.
*/
private void renameFiles(List<Path> srcDirs, Path destDir) throws Exception {
for (Path srcDir : srcDirs) {
Expand All @@ -113,12 +113,7 @@ private void renameFiles(List<Path> srcDirs, Path destDir) throws Exception {
for (FileStatus srcFile : srcFiles) {
Path srcPath = srcFile.getPath();
Path destPath = new Path(destDir, srcPath.getName());
int count = 1;
while (!fs.rename(srcPath, destPath)) {
String name = srcPath.getName() + "_copy_" + count;
destPath = new Path(destDir, name);
count++;
}
fs.rename(srcPath, destPath);
}
}
}
Expand Down
Expand Up @@ -97,7 +97,7 @@ public Path createPartitionDir(String... partitions) {
}

private String newFileName() {
return String.format("%s%s-%s-file-%d%s",
return String.format("%s-%s-%s-file-%d%s",
outputFileConfig.getPartPrefix(), checkpointName(checkpointId),
taskName(taskNumber), nameCounter++, outputFileConfig.getPartSuffix());
}
Expand Down

0 comments on commit 2e16970

Please sign in to comment.