Skip to content

Commit

Permalink
[HUDI-5046] Support all the hive sync options for flink sql (apache#6985
Browse files Browse the repository at this point in the history
)
  • Loading branch information
danny0405 authored and Alexey Kudinkin committed Dec 14, 2022
1 parent 7e8c0db commit 365c3fa
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -703,9 +703,10 @@ private FlinkOptions() {
// ------------------------------------------------------------------------

public static final ConfigOption<Boolean> HIVE_SYNC_ENABLED = ConfigOptions
.key("hive_sync.enable")
.key("hive_sync.enabled")
.booleanType()
.defaultValue(false)
.withFallbackKeys("hive_sync.enable")
.withDescription("Asynchronously sync Hive meta to HMS, default false");

public static final ConfigOption<String> HIVE_SYNC_DB = ConfigOptions
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.hudi.hive.HiveSyncTool;
import org.apache.hudi.hive.ddl.HiveSyncMode;
import org.apache.hudi.table.format.FilePathUtils;
import org.apache.hudi.util.StreamerUtil;

import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.configuration.Configuration;
Expand Down Expand Up @@ -93,7 +94,7 @@ public static HiveSyncContext create(Configuration conf, SerializableConfigurati

@VisibleForTesting
public static Properties buildSyncConfig(Configuration conf) {
TypedProperties props = new TypedProperties();
TypedProperties props = StreamerUtil.flinkConf2TypedProperties(conf);
props.setPropertyIfNonNull(META_SYNC_BASE_PATH.key(), conf.getString(FlinkOptions.PATH));
props.setPropertyIfNonNull(META_SYNC_BASE_FILE_FORMAT.key(), conf.getString(FlinkOptions.HIVE_SYNC_FILE_FORMAT));
props.setPropertyIfNonNull(HIVE_USE_PRE_APACHE_INPUT_FORMAT.key(), "false");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,25 +19,26 @@
package org.apache.hudi.sink.utils;

import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.hive.HiveSyncConfig;

import org.apache.flink.configuration.Configuration;
import org.junit.jupiter.api.Test;

import java.lang.reflect.Method;
import java.util.Properties;

import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_PARTITION_FIELDS;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;

/**
* Test cases for {@link HiveSyncContext}.
*/
public class TestHiveSyncContext {
/**
* Test that the file ids generated by the task can finally shuffled to itself.
* Test partition path fields sync.
*/
@Test
public void testBuildSyncConfig() throws Exception {
void testSyncedPartitions() {
Configuration configuration1 = new Configuration();
Configuration configuration2 = new Configuration();
String hiveSyncPartitionField = "hiveSyncPartitionField";
Expand All @@ -48,15 +49,21 @@ public void testBuildSyncConfig() throws Exception {

configuration2.setString(FlinkOptions.PARTITION_PATH_FIELD, partitionPathField);

Class<?> threadClazz = Class.forName("org.apache.hudi.sink.utils.HiveSyncContext");
Method buildSyncConfigMethod = threadClazz.getDeclaredMethod("buildSyncConfig", Configuration.class);
buildSyncConfigMethod.setAccessible(true);

Properties props1 = HiveSyncContext.buildSyncConfig(configuration1);
Properties props2 = HiveSyncContext.buildSyncConfig(configuration2);

assertEquals(hiveSyncPartitionField, props1.getProperty(META_SYNC_PARTITION_FIELDS.key()));
assertEquals(partitionPathField, props2.getProperty(META_SYNC_PARTITION_FIELDS.key()));
}

/**
* Test an option that has no shortcut key.
*/
@Test
void testOptionWithoutShortcutKey() {
Configuration configuration3 = new Configuration();
configuration3.setBoolean(HiveSyncConfig.HIVE_CREATE_MANAGED_TABLE.key(), true);
Properties props3 = HiveSyncContext.buildSyncConfig(configuration3);
assertTrue(Boolean.parseBoolean(props3.getProperty(HiveSyncConfig.HIVE_CREATE_MANAGED_TABLE.key(), "false")));
}
}

0 comments on commit 365c3fa

Please sign in to comment.