New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
controller static id and disable controller reblanace by default #202
Conversation
yangy0000
commented
Feb 10, 2019
•
edited
edited
- controller use static instead id instead of hostname
- disable controller auto rebalance
- fix bug in delete pipeline
- fix typo in adminresource
- update some opt name in manager admin resource to align with controller admin resource
if (znRecord == null) { | ||
znRecord = new ZNRecord(resourcePath); | ||
} | ||
znRecord.setSimpleField(_instanceId, _hostname); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we set top level as something like "Controller id to hostname map"? We might need this map for worker as well. Also we might use propertyStore for other purpose in the future.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks. I left some questions.
@@ -0,0 +1,20 @@ | |||
/* | |||
* Copyright (C) 2015-2017 Uber Technologies, Inc. (streaming-data@uber.com) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we update this to 2019?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looks like license check will failed after I update 2017 to 2019, I will have a separate PR later to update all the license and license check.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can update this https://github.com/uber/uReplicator/blob/master/pom.xml#L57 and then mvn license:format
to update all of them. (in another PR)
uReplicator-Common/src/main/java/com/uber/stream/kafka/mirrormaker/common/utils/Constants.java
Outdated
Show resolved
Hide resolved
@@ -294,14 +301,14 @@ private void validateInstanceToTopicPartitionsMap( | |||
} | |||
} | |||
if (mismatchTopicPartition.isEmpty()) { | |||
LOGGER.info("Validate OK: InstanceName: {}, route: {}, #topics: {}, #partitions: {}, #workers: {}, worker: {}", instanceName, routeSet, | |||
topicPartitions.size() - 1, partitionCount, instanceMap.get(instanceName).getWorkerSet().size(), instanceMap.get(instanceName).getWorkerSet()); | |||
LOGGER.info("Validate OK: InstanceName: {}, route: {}, #topics: {}, #partitions: {}, #workers: {}, worker: {}", hostname, routeSet, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you still want to keep InstanceName in the log message, or change it to hostname as well?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It might be useful to log both id and hostname so we can find the according host in deployment system even hostname is gone.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated
for (String instanceName : instanceToTopicPartitionsMap.keySet()) { | ||
Set<TopicPartition> topicPartitions = instanceToTopicPartitionsMap.get(instanceName); | ||
for (String instanceId : instanceToTopicPartitionsMap.keySet()) { | ||
String hostname = instanceIdAndNameMap.containsKey(instanceId) ? instanceIdAndNameMap.get(instanceId) : instanceId; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not fully understand your intention here. Is this always a hostname, or just an instance name that be equal to hostname?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hostname should always be here after this change, but when we roll out the change, if we deploy manager before controller, hostname is not going to be found before controller change is rollout, so I put a place holder here to be safe.
@@ -325,31 +332,31 @@ private void validateInstanceToTopicPartitionsMap( | |||
|
|||
if (topicOnlyInManager.size() > 1 || (topicOnlyInManager.size() == 1 && !topicOnlyInManager.iterator().next().startsWith(SEPARATOR))) { | |||
validateWrongCount++; | |||
LOGGER.error("Validate WRONG: InstanceName: {}, route: {}, topic only in manager: {}", instanceName, routeSet, topicOnlyInManager); | |||
LOGGER.error("Validate WRONG: InstanceName: {}, route: {}, topic only in manager: {}", hostname, routeSet, topicOnlyInManager); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
} | ||
|
||
if (!controllerTopics.isEmpty()) { | ||
validateWrongCount++; | ||
LOGGER.error("Validate WRONG: InstanceName: {}, route: {}, topic only in controller: {}", instanceName, routeSet, controllerTopics); | ||
LOGGER.error("Validate WRONG: InstanceName: {}, route: {}, topic only in controller: {}", hostname, routeSet, controllerTopics); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated
instanceToReplace.add(instanceName); | ||
// Check if any worker in route is down | ||
boolean routeWorkerDown = false; | ||
if (_enableRebalance || forceBalance) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: it can reduce nested level (and would be easier to read), if you can write sth like
if (!(_enableRebalance || forceBalance)) return
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
there is autoscaling follow by autobalancing, so we could't return if auto balance is disabled .
I'm going to extract controller auto balancing and worker auto balancing logic separate function, this should help the readability
PriorityQueue<InstanceTopicPartitionHolder> itphSet = _pipelineToInstanceMap.get(pipeline); | ||
for (InstanceTopicPartitionHolder itph : itphSet) { | ||
if (itph.getTotalNumPartitions() != 0) { | ||
throw new UnsupportedOperationException("Delete non-empty pipeline is not allowed, serving number of partitions: " |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you explain why disallow deleting non-empty pipeline? When can this happen, and what action to take when it happens?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this action might be triggered manually for pipeline cleanup. we might accidentally delete non-empty pipeline without this check, if so, topic in that pipeline will not been replicated.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
got it. thanks
@@ -88,12 +88,12 @@ public Representation post(Representation entity) { | |||
_helixMirrorMakerManager.disableAutoScaling(); | |||
LOGGER.info("Disabled autoscaling!"); | |||
responseJson.put("opt", "disable_autoscaling"); | |||
responseJson.put("auto_scaling", _helixMirrorMakerManager.isAutoBalancingEnabled()); | |||
responseJson.put("auto_scaling", _helixMirrorMakerManager.isAutoScalingEnabled()); | |||
} else if ("enable_autoscaling".equalsIgnoreCase(opt)) { | |||
_helixMirrorMakerManager.enableAutoScaling(); | |||
LOGGER.info("Enabled autobalancing!"); | |||
responseJson.put("opt", "enable_autobalancing"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should this be enable_autoscaling?
@@ -88,12 +88,12 @@ public Representation post(Representation entity) { | |||
_helixMirrorMakerManager.disableAutoScaling(); | |||
LOGGER.info("Disabled autoscaling!"); | |||
responseJson.put("opt", "disable_autoscaling"); | |||
responseJson.put("auto_scaling", _helixMirrorMakerManager.isAutoBalancingEnabled()); | |||
responseJson.put("auto_scaling", _helixMirrorMakerManager.isAutoScalingEnabled()); | |||
} else if ("enable_autoscaling".equalsIgnoreCase(opt)) { | |||
_helixMirrorMakerManager.enableAutoScaling(); | |||
LOGGER.info("Enabled autobalancing!"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
update logger as well
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated
znRecord = new ZNRecord(resourcePath); | ||
} | ||
znRecord.setSimpleField(_instanceId, _hostname); | ||
propertyStore.set(resourcePath, znRecord, AccessOption.PERSISTENT); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like it's setting the whole znRecord in each controller instead of incremental update, there might be race condition between different controllers.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also is there any place which might use the id->hostname after controller is down? Currently we don't remove the mapping after controller is down.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good point
@@ -294,14 +301,14 @@ private void validateInstanceToTopicPartitionsMap( | |||
} | |||
} | |||
if (mismatchTopicPartition.isEmpty()) { | |||
LOGGER.info("Validate OK: InstanceName: {}, route: {}, #topics: {}, #partitions: {}, #workers: {}, worker: {}", instanceName, routeSet, | |||
topicPartitions.size() - 1, partitionCount, instanceMap.get(instanceName).getWorkerSet().size(), instanceMap.get(instanceName).getWorkerSet()); | |||
LOGGER.info("Validate OK: InstanceName: {}, route: {}, #topics: {}, #partitions: {}, #workers: {}, worker: {}", hostname, routeSet, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It might be useful to log both id and hostname so we can find the according host in deployment system even hostname is gone.
…et hostname in InstanceConfig is working
// Check if any worker in route is down | ||
boolean routeWorkerDown = false; | ||
if (_enableRebalance || forceBalance) { | ||
for (String instanceName : instanceToTopicPartitionsMap.keySet()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
instanceId for consistency.
* controller static id and disable controller reblanace by default * update via comment * 1. update via comment 2. remove host name from property store since set hostname in InstanceConfig is working * update auto_balancing to autobalancing_status to align with get status response * fix ut failure * update parameter name for consistency