Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

LIKAFKA-8933: Maintain partitions across non-identity mirror maker #55

Merged
merged 5 commits into from
Mar 24, 2017

Conversation

WarrenGreen
Copy link
Contributor

No description provided.

_nextIndexPerKey.put(key, new AtomicLong(0));
_sensors.addPartitionSensors(partition);
}
_produceExecutor.scheduleWithFixedDelay(new ProduceRunnable(partition, key), _produceDelayMs, _produceDelayMs, TimeUnit.MILLISECONDS);
Copy link
Contributor

@smccauliff smccauliff Mar 9, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like there could be more code reuse if the ProduceService has a protected method like createProduceRunnable(int partition). This way this entire method does not need to be duplicated.

HashMap<Integer, String> keyMapping = new HashMap<>();

int keyIndex = 0;
while (keyMapping.size() < partitionNum) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not just have the key be the partition number? I don't see the advantage of this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We chatted about this a while ago. The issue is that mirror maker does not propagate partition numbers across clusters so we have to use keys. The partitions will then be assigned by a modulus of a hash of the key. If we don't purposefully select our keys, we could have a non-uniform assignment across partitions.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry I wasn't clear. I guess I'm not sure what the difference between just setting the key to be a partition number vs a pseudo random number. Does this help with covering all the partitions in the destination cluster?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We aren't just setting it to be a random number. We are setting it to be a random number which correctly maps to a unique partition when hashed. By searching for keys which each hash to unique partitions we are maintaining uniform partition assignments across the pipeline

import static org.apache.kafka.common.utils.Utils.murmur2;


public class KeyedProduceService extends ProduceService implements Service {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This class does not have any documentation. I know there are many Kafka monitor classes that do not have documentation, but really that's not a good idea.

try {
long nextIndex = _nextIndexPerKey.get(_key).get();
String message = Utils.jsonFromFields(_topic, nextIndex, System.currentTimeMillis(), _producerId, _recordSize);
BaseProducerRecord record = new BaseProducerRecord(_topic, null, _key, message);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reuse would probably be higher if there was a super class with protected method byte[] getKey(). This can return null in the super class and in the keyed version it can return the key bytes.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We would still have to change how we handle line 105. We need the partition number to be non-null for the metrics but if we pass the partition number into the BaseProducerRecord, it will override the key.

We could get better code reuse if changed the logic of ProduceRunnable in the base class to insert null as the partition number if the key is non-null. I considered this but worried that the change in the base class wasn't congruent with the base classes purpose.

@smccauliff @lindong28 Do you think the improved code reuse is worth changing the base class in this way?

@@ -191,12 +191,12 @@ public boolean isRunning() {
return _running.get();
}

private class ProduceMetrics {
protected class ProduceMetrics {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably the visibility of this class does not need to change if the other refactorings are implemented.

}

private void consume() throws Exception {
Map<String, Long> nextIndexes = new HashMap<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method in the original consumer could reused by making this Map<Object, Long> not elegant, but probably ok. The other possibility would be type parameterize the consumer, but that seems like more work for little gain.

_sensors._consumeError.record();
continue;
}
String key = record.key();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have a protected method like Object sequenceNumberKeyFromRecord(GenericRecord consumed) that returns the key used in the hash map. This can be string or something for the keyed version and just Integer for the non-keyed version.

@lindong28
Copy link
Contributor

lindong28 commented Mar 10, 2017

@WarrenGreen Thanks for the patch. I am not sure why we need both KeyedProduceService and ProduceService. Is it possible to just modify the ProduceService so that it always produce message with key? Similarly I am not sure why we need a separate KeyedConsumeService.

@WarrenGreen
Copy link
Contributor Author

@lindong28 Yeah, I think that will be the best way to avoid code reuse. I'll update today.

@smccauliff
Copy link
Contributor

This is what I think is going to happen with how the keys are handled. Can you let me know I'm wrong about this?

import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import org.apache.kafka.common.utils.Utils;

import static org.apache.kafka.common.utils.Utils.murmur2;

public class PartitionMapping {

  private static final int MAX_PARTITIONS = 128;

  public static void main(String argv[]) throws Exception {

    for (int srcPartitionCount = 1; srcPartitionCount < MAX_PARTITIONS; srcPartitionCount++) {
      for (int destPartitionCount = 1; destPartitionCount < MAX_PARTITIONS; destPartitionCount++) {
        Map<Integer, String> srcPartitionToKey = buildPartitionToKeyMapping(srcPartitionCount);
        Set<Integer> coveredDestinationPartitions = new HashSet<>(destPartitionCount);
        for (String key : srcPartitionToKey.values()) {
          int destinationPartition = keyToDestinationPartition(key.getBytes(), destPartitionCount);
          coveredDestinationPartitions.add(destinationPartition);
        }

        System.out.println(srcPartitionCount + "|" + destPartitionCount + "|" + coveredDestinationPartitions.size());
      }
    }
  }

  /**
   * From Kafka Monitor, produce service
   */
  private static Map<Integer, String> buildPartitionToKeyMapping(int srcPartitionCount) {
    Map<Integer, String> partitionToKey = new HashMap<>();

    int keyIndex = 0;
    while (partitionToKey.size() < srcPartitionCount) {
      String key = keyIndex + "";
      int partition = getDefaultPartitionForKey(key, srcPartitionCount);
      if (!partitionToKey.containsKey(partition)) {
        partitionToKey.put(partition, key);
      }
      keyIndex++;
    }

    return partitionToKey;
  }

  /**
   * From DefaultPartitioner, kafka.  This is what the mirror maker producer is doing.  No?
   */
  private static int keyToDestinationPartition(byte[] key, int destPartitionCount) {
    return toPositive(Utils.murmur2(key)) % destPartitionCount;
  }

  private static int toPositive(int number) {
      return number & 0x7fffffff;
  }

  /**
   * From kafka monitor, produce service
   */
  private static int getDefaultPartitionForKey(String key, int srcPartitionCount) {
    byte[] keyBytes = key.getBytes();
    return toPositive(murmur2(keyBytes)) % srcPartitionCount;
  }
}

'''

@smccauliff
Copy link
Contributor

If the source and destination cluster have the same number of partitions then this works fine and I remove by objections to the way keys are used. Then my only concern is code reuse.

@WarrenGreen
Copy link
Contributor Author

WarrenGreen commented Mar 15, 2017 via email

@@ -70,6 +70,7 @@ public MirrorPipelineMonitor(Map<String, Object> props, String name) throws Exce
String topic,
String name) throws Exception {
Map<String, Object> topicManagementProps = createTopicManagementServiceProps(props, topic);
topicManagementProps.put(CommonServiceConfig.ZOOKEEPER_CONNECT_CONFIG, "");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the purpose of this empty zk url?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I get it is required config. But why do we provide an empty string here? Since it is required config, shouldn't it already be specified in the props which is passed it MirrorPipelineMonitor?

Copy link
Contributor

@lindong28 lindong28 Mar 16, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In general assigning an empty string to a required config seems weird. After taking a further look at the code, I realize that you have this line is because you want to change the PARTITIONS_TO_BROKER_RATIO_THRESHOLD based on its current value in the props and the current number of brokers in each cluster. However, this reveals another problem with MirrorPipelineMonitor, i.e. what if the number of brokers change after the MirrorPipelineMonitor starts running? If SRE doubles the number of brokers in one cluster, TopicManagementService will double partition number in that cluster only which breaks MirrorPipelineMonitor, right? It seems that the only choice is to build this logic into TopicManagementService so that it is able to keep partition number equivalent across multiple clusters.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So we prematurely create the TopicManagementServiceConfig as you requested in the previous commit so that we can retrieve the partitionsToBrokerRatio to determine the number of partitions we want to create. Because we create it prematurely, we don't have access to the zk url yet.

I agree that broker count needs to be managed dynamically and there is a TODO to do this. That is not what this commit is regarding though.

Copy link
Contributor

@lindong28 lindong28 Mar 21, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@WarrenGreen The reviewer usually makes suggestion based on the code snippet and it is possible that the comment itself may be wrong. You probably need to address the comment that you think is right, and defend the approach if you think the comment is wrong, since you own the code and have spent more time on the patch than anyone else.

I am not sure this can be a TODO. I think the goal is for us to monitor a pipeline of multiple clusters even when the number of live brokers can change. I don't think we have to do everything in this patch. But we have to do that before we are able to monitor the pipeline of multiple clusters in PROD, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll address this in another patch before reaching prod.

@@ -131,7 +131,7 @@ public void run() {
}

private void consume() throws Exception {
Map<Integer, Long> nextIndexes = new HashMap<>();
Map<String, Long> nextIndexes = new HashMap<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we change the key from partition number to record's key? It seems to me that the original code would work without change, no?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed below

}

public void run() {
try {
long nextIndex = _nextIndexPerPartition.get(_partition).get();
long nextIndex = _nextIndexPerKey.get(_key).get();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is a bit weird that _nextIndexPerKey is indexed by key while _recordsProducedPerPartition is still indexed by partition. In order to keep the change smaller and make the code style consistent, how about we still use _nextIndexPerPartition? We only need key to construct BaseProducerRecord.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, that's a good idea


int keyIndex = 0;
while (keyMapping.size() < partitionNum) {
String key = keyIndex + "";
Copy link
Contributor

@lindong28 lindong28 Mar 16, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you use Integer.toString(keyIndex). Also, it is not very clear to me what the index in keyIndex means. Can we name it e.g. nextInt?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure

}

//TODO: Remove method and convert uses to Util.toPositive when kafka version dependency reaches 0.10.1.10
public static int toPositive(int number) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we bump up the kafka version to 0.10.1.10 now to avoid copy&paste code?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll look into it. Previously we were using 10.0.18 and there was a compatibility issue between clients when we attempted to update the kafka version.

return number & 0x7fffffff;
}

private int getDefaultPartitionForKey(String key) {
Copy link
Contributor

@lindong28 lindong28 Mar 16, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We probably need a way for user to configure the partitioning algorithm. The patch currently assumes that MM will only use the new producer's partitioning algorithm. This is OK for users of vanilla Kafka. But we actually have hotfix in LinkedIn which changes the MM to use old producer's partitioning algorithm. This is because the algorithm used by old producer and new producer is different and we may cause message re-order if we simply change the algorithm.

We can user to configure this similar to how produce.producer.class in ProduceServiceConfig is configured.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I was thinking the same thing but found that the current design of partitioners would require us to base pipeline monitoring design decisions on partitioner implementations details.

Options would be to:

  1. Base our design on partitioner implementations details
  2. Create key hashing classes in monitor that correspond to hashing strategies in kafka
  3. Create key hashing classes in kafka and have partitioners import them and pipeline monitor import them

Option 3 has is the cleanest solution but has the broadest implications. Option 2 may be a good compromise between the options.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's not rely on change in the open source Kafka. I think we can do 2. Is option 2 similar to how produce.producer.class is configured in ProduceServiceConfig?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree. Let's stick with option 2.

produce.producer.class is most similar to option 3 but didn't require changes in open source kafka. Option 2 is somewhat similar.

String message = Utils.jsonFromFields(_topic, nextIndex, System.currentTimeMillis(), _producerId, _recordSize);
BaseProducerRecord record = new BaseProducerRecord(_topic, _partition, null, message);
BaseProducerRecord record = new BaseProducerRecord(_topic, null, _key, message);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We probably still need to provide partition to the record because some producer implementation such as KafkaRestProducer only accepts partition.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That should be fine. Partition number will override our key for as long as the partition is propagated through the pipeline but since they should correspond to each other, there shouldn't be issues

Copy link
Contributor

@lindong28 lindong28 Mar 21, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But that is based on the assumption that the underlying KMBaseProducer will use the same hashing algorithm to map key to the partition, right? I am not sure every implementation of KMBaseProducer will do that. For example, what if KafkaRestProducer will be extended in the future to allow user to specify the destination partition but not the message key? It seems easier to just provide both information and leave it to the actual KMBaseProducer implementation to decide which one to use.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we're saying the same thing. We want both the partition and key to be passed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here is another reason to include both partition and key. If we include only the key, we have to make sure that producer always use the same partitioning algorithm that is specified in the config and used by ProduceService. It is necessary but inconvenient to do so. For example, user have to change this algorithm to switch producer class from old producer and new producer. Things go wrong if they forgot to do so. We can avoid this inconvenience if user only wants to monitor one cluster. We can not avoid this inconvenience for user that wants to monitor a pipeline, and that inconvenience may be justified since user has to keep the producer's partition algorithm in-sync with MM's partitioning algorithm anyway.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah I see. Sure. Let's just keep both.

import static org.apache.kafka.common.utils.Utils.murmur2;


public class DefaultPartitioner implements Partitioner {
Copy link
Contributor

@lindong28 lindong28 Mar 22, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you remove the extra space between DefaultPartitioner and implements? Also, we probably need both old producer partition and new producer partitioner and it may be good idea to distinguish between them in the class name.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you reply to the second point in the comment?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've named it DefaultPartitioner based on the name used in Kafka Server. LinkedIn-Kafka-Clients producer also references this partitioner as the "DefaultPartitioner"

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The point is that we need partitioner for both old producer and new producer, right? After that, we need a better name to distinguish between the two. Currently you only have partitioner for old producer. What if open source user uses new producer in MM?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated naming

@@ -80,6 +81,8 @@ public ProduceService(Map<String, Object> props, String name) throws Exception {
_brokerList = config.getString(ProduceServiceConfig.BOOTSTRAP_SERVERS_CONFIG);
String producerClass = config.getString(ProduceServiceConfig.PRODUCER_CLASS_CONFIG);

String partitionerClassName = config.getString(ProduceServiceConfig.PARTITIONER_CLASS_CONFIG);
_partitioner = (Partitioner) Class.forName(partitionerClassName).getConstructor().newInstance();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure. But is it possible to just do config.getConfiguredInstance(ProduceServiceConfig.PARTITIONER_CLASS_CONFIG, Partitioner)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, that seems to work

*/
package com.linkedin.kmf.partitioner;

public interface Partitioner {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you name the interface KMPartitioner to avoid class name conflicts with open source classes, similar to KMBaseProducer?

@@ -70,6 +74,11 @@
NewProducer.class.getCanonicalName(),
ConfigDef.Importance.LOW,
PRODUCER_CLASS_DOC)
.define(PARTITIONER_CLASS_CONFIG,
ConfigDef.Type.STRING,
DefaultPartitioner.class.getCanonicalName(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We probably need to decide the default value based on the producer class (e.g. new or old). If user has explicitly configured both producer class and partitioner class that conflicts with each other, we should probably throw warning and quit. But its OK if you don't do it here. I can do it in another patch.

import java.util.Arrays;

public class OldByteArrayKMPartitioner implements KMPartitioner {
public int getPartitionForKey(String key, int partitionNum) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you explain why we need OldByteArrayKMPartitioner in addition to OldKMPartitioner?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know if there is a demand right now but it was one of the partitioners designed for the OldProducer

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we don't need it. We have added OldKMPartitioner and NewKMPartitioner because we have use-cases for both. It should be safe to remove this OldByteArrayKMPartitioner and only add it if it is useful.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sgtm


public interface KMPartitioner {

int getPartitionForKey(String key, int numPartitions);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we name this method partition(...) since the open source Partitioner use this method name?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For sure. Will update after coming to consensus to above comments

@lindong28 lindong28 merged commit ded25fb into linkedin:master Mar 24, 2017
lindong28 pushed a commit that referenced this pull request Mar 24, 2017
…sters (#55)

Currently ProduceService produces message without key to the specific
partition. Messages to the same partition will have incremental sequence number
in the payload to allow ConsumeService to detect message loss and out-of-order
message delivery. However, when we monitor a pipeline of clusters connected by
MirrorMaker, the message to the same partition in the destination cluster may
not have incremental sequence number since MM produces message without key.

This patch addresses the problem by producing message with key that will map to
the given partition using the specified partitioner class. The partitioner
class needs to be the same partitioner used by MM. It is configured as
org.apache.kafka.clients.producer.internals.DefaultPartitioner by default.
lindong28 pushed a commit that referenced this pull request Mar 24, 2017
…sters (#55)

Currently ProduceService produces message without key to the specific
partition. Messages to the same partition will have incremental sequence number
in the payload to allow ConsumeService to detect message loss and out-of-order
message delivery. However, when we monitor a pipeline of clusters connected by
MirrorMaker, the message to the same partition in the destination cluster may
not have incremental sequence number since MM produces message without key.

This patch addresses the problem by producing message with key that will map to
the given partition using the specified partitioner class. The partitioner
class needs to be the same partitioner used by MM. It is configured as
org.apache.kafka.clients.producer.internals.DefaultPartitioner by default.
@@ -33,6 +34,9 @@
public static final String PRODUCER_CLASS_DOC = "Producer class that will be instantiated as producer in the produce service. "
+ "It can be NewProducer, or full class name of any class that implements the KMBaseProducer interface. ";

public static final String PARTITIONER_CLASS_CONFIG = "produce.partitioner.class";
public static final String PARTITIONER_CLASS_DOC = "KMPartitioner class that corresponds to the partitioner used the target cluster.";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I changed this to ... used in the ...

@lindong28
Copy link
Contributor

@WarrenGreen Thanks for the patch. LGTM.

@lindong28
Copy link
Contributor

@WarrenGreen Have you tested this patch before you update the patch? If not, can you to test it using both kafka-monitor-start.sh and single-cluster-monitor.sh in the future before every update?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants