Skip to content

Commit

Permalink
kafka producer 适配 row data for performance alibaba#726
Browse files Browse the repository at this point in the history
  • Loading branch information
wingerx committed Jul 31, 2018
1 parent fe12a84 commit 3d5f13f
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 17 deletions.
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package com.alibaba.otter.canal.kafka.producer;

import java.io.IOException;
import java.util.Properties;

import com.google.protobuf.ByteString;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
Expand Down Expand Up @@ -49,14 +51,25 @@ public void stop() {
}
}

public void send(Topic topic, Message message) {
public void send(Topic topic, Message message) throws IOException {
boolean valid = false;
if (message != null && !message.getEntries().isEmpty()) {
for (CanalEntry.Entry entry : message.getEntries()) {
if (entry.getEntryType() != CanalEntry.EntryType.TRANSACTIONBEGIN
&& entry.getEntryType() != CanalEntry.EntryType.TRANSACTIONEND) {
valid = true;
break;
if (message != null) {
if (message.isRaw() && !message.getRawEntries().isEmpty()) {
for (ByteString byteString : message.getRawEntries()) {
CanalEntry.Entry entry = CanalEntry.Entry.parseFrom(byteString);
if (entry.getEntryType() != CanalEntry.EntryType.TRANSACTIONBEGIN
&& entry.getEntryType() != CanalEntry.EntryType.TRANSACTIONEND) {
valid = true;
break;
}
}
} else if (!message.getEntries().isEmpty()){
for (CanalEntry.Entry entry : message.getEntries()) {
if (entry.getEntryType() != CanalEntry.EntryType.TRANSACTIONBEGIN
&& entry.getEntryType() != CanalEntry.EntryType.TRANSACTIONEND) {
valid = true;
break;
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
package com.alibaba.otter.canal.kafka.producer;

import java.util.Map;

import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.CanalPacket;
import com.alibaba.otter.canal.protocol.Message;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.Serializer;
import org.springframework.util.CollectionUtils;

import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.CanalPacket;
import com.alibaba.otter.canal.protocol.Message;
import java.util.Map;

/**
* Kafka Message类的序列化
Expand All @@ -25,12 +24,15 @@ public void configure(Map<String, ?> configs, boolean isKey) {
@Override
public byte[] serialize(String topic, Message data) {
try {
if (data == null) return null;
else {
if (data != null) {
CanalPacket.Messages.Builder messageBuilder = CanalPacket.Messages.newBuilder();
if (data.getId() != -1 && !CollectionUtils.isEmpty(data.getEntries())) {
for (CanalEntry.Entry entry : data.getEntries()) {
messageBuilder.addMessages(entry.toByteString());
if (data.getId() != -1) {
if (data.isRaw() && !CollectionUtils.isEmpty(data.getRawEntries())) {
messageBuilder.addAllMessages(data.getRawEntries());
} else if (!CollectionUtils.isEmpty(data.getEntries())) {
for (CanalEntry.Entry entry : data.getEntries()) {
messageBuilder.addMessages(entry.toByteString());
}
}
}
CanalPacket.Packet.Builder packetBuilder = CanalPacket.Packet.newBuilder();
Expand All @@ -41,6 +43,7 @@ public byte[] serialize(String topic, Message data) {
} catch (Exception e) {
throw new SerializationException("Error when serializing message to byte[] ");
}
return null;
}

@Override
Expand Down

0 comments on commit 3d5f13f

Please sign in to comment.