Skip to content

Commit

Permalink
Segment compaction for upsert real-time tables (apache#10463)
Browse files Browse the repository at this point in the history
  • Loading branch information
robertzych authored and s0nskar committed Aug 10, 2023
1 parent 7e17094 commit ed25df0
Show file tree
Hide file tree
Showing 19 changed files with 1,531 additions and 154 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -245,19 +245,19 @@ private void setupHelixSystemProperties() {
// NOTE: Helix will disconnect the manager and disable the instance if it detects flapping (too frequent disconnect
// from ZooKeeper). Setting flapping time window to a small value can avoid this from happening. Helix ignores the
// non-positive value, so set the default value as 1.
System.setProperty(SystemPropertyKeys.FLAPPING_TIME_WINDOW, _config
.getProperty(CommonConstants.Helix.CONFIG_OF_CONTROLLER_FLAPPING_TIME_WINDOW_MS,
System.setProperty(SystemPropertyKeys.FLAPPING_TIME_WINDOW,
_config.getProperty(CommonConstants.Helix.CONFIG_OF_CONTROLLER_FLAPPING_TIME_WINDOW_MS,
CommonConstants.Helix.DEFAULT_FLAPPING_TIME_WINDOW_MS));
}

private void setupHelixClusterConstraints() {
String maxStateTransitions = _config
.getProperty(CommonConstants.Helix.CONFIG_OF_HELIX_INSTANCE_MAX_STATE_TRANSITIONS,
String maxStateTransitions =
_config.getProperty(CommonConstants.Helix.CONFIG_OF_HELIX_INSTANCE_MAX_STATE_TRANSITIONS,
CommonConstants.Helix.DEFAULT_HELIX_INSTANCE_MAX_STATE_TRANSITIONS);
Map<ClusterConstraints.ConstraintAttribute, String> constraintAttributes = new HashMap<>();
constraintAttributes.put(ClusterConstraints.ConstraintAttribute.INSTANCE, ".*");
constraintAttributes
.put(ClusterConstraints.ConstraintAttribute.MESSAGE_TYPE, Message.MessageType.STATE_TRANSITION.name());
constraintAttributes.put(ClusterConstraints.ConstraintAttribute.MESSAGE_TYPE,
Message.MessageType.STATE_TRANSITION.name());
ConstraintItem constraintItem = new ConstraintItem(constraintAttributes, maxStateTransitions);

_helixControllerManager.getClusterManagmentTool()
Expand Down Expand Up @@ -371,8 +371,8 @@ private void setUpHelixController() {
private void setUpPinotController() {
// install default SSL context if necessary (even if not force-enabled everywhere)
TlsConfig tlsDefaults = TlsUtils.extractTlsConfig(_config, ControllerConf.CONTROLLER_TLS_PREFIX);
if (StringUtils.isNotBlank(tlsDefaults.getKeyStorePath()) || StringUtils
.isNotBlank(tlsDefaults.getTrustStorePath())) {
if (StringUtils.isNotBlank(tlsDefaults.getKeyStorePath()) || StringUtils.isNotBlank(
tlsDefaults.getTrustStorePath())) {
LOGGER.info("Installing default SSL context for any client requests");
TlsUtils.installDefaultSSLSocketFactory(tlsDefaults);
}
Expand All @@ -392,8 +392,9 @@ private void setUpPinotController() {
_config.getProperty(CommonConstants.Controller.CONFIG_OF_CONTROLLER_QUERY_REWRITER_CLASS_NAMES));

LOGGER.info("Initializing Helix participant manager");
_helixParticipantManager = HelixManagerFactory
.getZKHelixManager(_helixClusterName, _helixParticipantInstanceId, InstanceType.PARTICIPANT, _helixZkURL);
_helixParticipantManager =
HelixManagerFactory.getZKHelixManager(_helixClusterName, _helixParticipantInstanceId, InstanceType.PARTICIPANT,
_helixZkURL);

// LeadControllerManager needs to be initialized before registering as Helix participant.
LOGGER.info("Initializing lead controller manager");
Expand Down Expand Up @@ -502,8 +503,7 @@ protected void configure() {
LOGGER.info("Starting controller admin application on: {}", ListenerConfigUtil.toString(_listenerConfigs));
_adminApp.start(_listenerConfigs);

_controllerMetrics.addCallbackGauge("dataDir.exists",
() -> new File(_config.getDataDir()).exists() ? 1L : 0L);
_controllerMetrics.addCallbackGauge("dataDir.exists", () -> new File(_config.getDataDir()).exists() ? 1L : 0L);
_controllerMetrics.addCallbackGauge("dataDir.fileOpLatencyMs", () -> {
File dataDir = new File(_config.getDataDir());
if (dataDir.exists()) {
Expand Down Expand Up @@ -673,7 +673,7 @@ protected List<PeriodicTask> setupControllerPeriodicTasks() {
_taskManagerStatusCache = getTaskManagerStatusCache();
_taskManager =
new PinotTaskManager(_helixTaskResourceManager, _helixResourceManager, _leadControllerManager, _config,
_controllerMetrics, _taskManagerStatusCache);
_controllerMetrics, _taskManagerStatusCache, _executorService, _connectionManager);
periodicTasks.add(_taskManager);
_retentionManager =
new RetentionManager(_helixResourceManager, _leadControllerManager, _config, _controllerMetrics);
Expand All @@ -693,8 +693,9 @@ protected List<PeriodicTask> setupControllerPeriodicTasks() {
new SegmentStatusChecker(_helixResourceManager, _leadControllerManager, _config, _controllerMetrics,
_executorService);
periodicTasks.add(_segmentStatusChecker);
_realtimeConsumerMonitor = new RealtimeConsumerMonitor(_config, _helixResourceManager, _leadControllerManager,
_controllerMetrics, _executorService);
_realtimeConsumerMonitor =
new RealtimeConsumerMonitor(_config, _helixResourceManager, _leadControllerManager, _controllerMetrics,
_executorService);
periodicTasks.add(_realtimeConsumerMonitor);
_segmentRelocator = new SegmentRelocator(_helixResourceManager, _leadControllerManager, _config, _controllerMetrics,
_executorService, _connectionManager);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executor;
import javax.annotation.Nullable;
import org.apache.commons.httpclient.MultiThreadedHttpConnectionManager;
import org.apache.helix.model.HelixConfigScope;
import org.apache.helix.model.builder.HelixConfigScopeBuilder;
import org.apache.helix.task.TaskState;
Expand Down Expand Up @@ -51,15 +53,20 @@ public class ClusterInfoAccessor {
private final ControllerConf _controllerConf;
private final ControllerMetrics _controllerMetrics;
private final LeadControllerManager _leadControllerManager;
private final Executor _executor;
private final MultiThreadedHttpConnectionManager _connectionManager;

public ClusterInfoAccessor(PinotHelixResourceManager pinotHelixResourceManager,
PinotHelixTaskResourceManager pinotHelixTaskResourceManager, ControllerConf controllerConf,
ControllerMetrics controllerMetrics, LeadControllerManager leadControllerManager) {
ControllerMetrics controllerMetrics, LeadControllerManager leadControllerManager, Executor executor,
MultiThreadedHttpConnectionManager connectionManager) {
_pinotHelixResourceManager = pinotHelixResourceManager;
_pinotHelixTaskResourceManager = pinotHelixTaskResourceManager;
_controllerConf = controllerConf;
_controllerMetrics = controllerMetrics;
_leadControllerManager = leadControllerManager;
_executor = executor;
_connectionManager = connectionManager;
}

/**
Expand Down Expand Up @@ -94,6 +101,20 @@ public List<SegmentZKMetadata> getSegmentsZKMetadata(String tableNameWithType) {
return ZKMetadataProvider.getSegmentsZKMetadata(_pinotHelixResourceManager.getPropertyStore(), tableNameWithType);
}

/**
* Get shared executor
*/
public Executor getExecutor() {
return _executor;
}

/**
* Get shared connection manager
*/
public MultiThreadedHttpConnectionManager getConnectionManager() {
return _connectionManager;
}

/**
* Fetches the ZNRecord under MINION_TASK_METADATA/${tableNameWithType}/${taskType} for the given
* taskType and tableNameWithType
Expand All @@ -114,8 +135,8 @@ public ZNRecord getMinionTaskMetadataZNRecord(String taskType, String tableNameW
*/
@Nullable
public SegmentLineage getSegmentLineage(String tableNameWithType) {
return SegmentLineageAccessHelper
.getSegmentLineage(_pinotHelixResourceManager.getPropertyStore(), tableNameWithType);
return SegmentLineageAccessHelper.getSegmentLineage(_pinotHelixResourceManager.getPropertyStore(),
tableNameWithType);
}

/**
Expand All @@ -127,8 +148,8 @@ public SegmentLineage getSegmentLineage(String tableNameWithType) {
* @param expectedVersion The expected version of data to be overwritten. Set to -1 to override version check.
*/
public void setMinionTaskMetadata(BaseTaskMetadata taskMetadata, String taskType, int expectedVersion) {
MinionTaskMetadataUtils
.persistTaskMetadata(_pinotHelixResourceManager.getPropertyStore(), taskType, taskMetadata, expectedVersion);
MinionTaskMetadataUtils.persistTaskMetadata(_pinotHelixResourceManager.getPropertyStore(), taskType, taskMetadata,
expectedVersion);
}

/**
Expand Down Expand Up @@ -175,8 +196,9 @@ public String getDataDir() {
* @return cluster config
*/
public String getClusterConfig(String configName) {
HelixConfigScope helixConfigScope = new HelixConfigScopeBuilder(HelixConfigScope.ConfigScopeProperty.CLUSTER)
.forCluster(_pinotHelixResourceManager.getHelixClusterName()).build();
HelixConfigScope helixConfigScope =
new HelixConfigScopeBuilder(HelixConfigScope.ConfigScopeProperty.CLUSTER).forCluster(
_pinotHelixResourceManager.getHelixClusterName()).build();
Map<String, String> configMap =
_pinotHelixResourceManager.getHelixAdmin().getConfig(helixConfigScope, Collections.singletonList(configName));
return configMap != null ? configMap.get(configName) : null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,10 @@
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import javax.annotation.Nullable;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.httpclient.MultiThreadedHttpConnectionManager;
import org.apache.helix.AccessOption;
import org.apache.helix.task.TaskState;
import org.apache.helix.zookeeper.zkclient.IZkChildListener;
Expand Down Expand Up @@ -114,15 +116,16 @@ public class PinotTaskManager extends ControllerPeriodicTask<Void> {
public PinotTaskManager(PinotHelixTaskResourceManager helixTaskResourceManager,
PinotHelixResourceManager helixResourceManager, LeadControllerManager leadControllerManager,
ControllerConf controllerConf, ControllerMetrics controllerMetrics,
TaskManagerStatusCache<TaskGeneratorMostRecentRunInfo> taskManagerStatusCache) {
TaskManagerStatusCache<TaskGeneratorMostRecentRunInfo> taskManagerStatusCache, Executor executor,
MultiThreadedHttpConnectionManager connectionManager) {
super("PinotTaskManager", controllerConf.getTaskManagerFrequencyInSeconds(),
controllerConf.getPinotTaskManagerInitialDelaySeconds(), helixResourceManager, leadControllerManager,
controllerMetrics);
_helixTaskResourceManager = helixTaskResourceManager;
_taskManagerStatusCache = taskManagerStatusCache;
_clusterInfoAccessor =
new ClusterInfoAccessor(helixResourceManager, helixTaskResourceManager, controllerConf, controllerMetrics,
leadControllerManager);
leadControllerManager, executor, connectionManager);
_taskGeneratorRegistry = new TaskGeneratorRegistry(_clusterInfoAccessor);
_skipLateCronSchedule = controllerConf.isSkipLateCronSchedule();
_maxCronScheduleDelayInSeconds = controllerConf.getMaxCronScheduleDelayInSeconds();
Expand Down Expand Up @@ -561,8 +564,8 @@ private String scheduleTask(PinotTaskGenerator taskGenerator, List<TableConfig>
long successRunTimestamp = System.currentTimeMillis();
for (TableConfig tableConfig : enabledTableConfigs) {
_taskManagerStatusCache.saveTaskGeneratorInfo(tableConfig.getTableName(), taskGenerator.getTaskType(),
taskGeneratorMostRecentRunInfo -> taskGeneratorMostRecentRunInfo.addErrorRunMessage(
successRunTimestamp, errors.toString()));
taskGeneratorMostRecentRunInfo -> taskGeneratorMostRecentRunInfo.addErrorRunMessage(successRunTimestamp,
errors.toString()));
// before the first task schedule, the follow gauge metric will be empty
// TODO: find a better way to report task generation information
_controllerMetrics.setOrUpdateTableGauge(tableConfig.getTableName(), taskGenerator.getTaskType(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,4 +139,23 @@ public static class SegmentGenerationAndPushTask {
public static final String CONFIG_NUMBER_CONCURRENT_TASKS_PER_INSTANCE =
"SegmentGenerationAndPushTask.numConcurrentTasksPerInstance";
}

public static class UpsertCompactionTask {
public static final String TASK_TYPE = "UpsertCompactionTask";
/**
* The time period to wait before picking segments for this task
* e.g. if set to "2d", no task will be scheduled for a time window younger than 2 days
*/
public static final String BUFFER_TIME_PERIOD_KEY = "bufferTimePeriod";
/**
* The maximum percent of old records allowed for a completed segment.
* e.g. if the percent surpasses 30, then the segment may be compacted
*/
public static final String INVALID_RECORDS_THRESHOLD_PERCENT = "invalidRecordsThresholdPercent";
/**
* The maximum count of old records for a completed segment
* e.g. if the count surpasses 100k, then the segment may be compacted
*/
public static final String INVALID_RECORDS_THRESHOLD_COUNT = "invalidRecordsThresholdCount";
}
}

0 comments on commit ed25df0

Please sign in to comment.