diff --git a/kafka/src/main/java/com/alibaba/otter/canal/kafka/producer/CanalKafkaProducer.java b/kafka/src/main/java/com/alibaba/otter/canal/kafka/producer/CanalKafkaProducer.java index 1543abf99d..3318a89eb6 100644 --- a/kafka/src/main/java/com/alibaba/otter/canal/kafka/producer/CanalKafkaProducer.java +++ b/kafka/src/main/java/com/alibaba/otter/canal/kafka/producer/CanalKafkaProducer.java @@ -1,7 +1,9 @@ package com.alibaba.otter.canal.kafka.producer; +import java.io.IOException; import java.util.Properties; +import com.google.protobuf.ByteString; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; @@ -49,14 +51,25 @@ public void stop() { } } - public void send(Topic topic, Message message) { + public void send(Topic topic, Message message) throws IOException { boolean valid = false; - if (message != null && !message.getEntries().isEmpty()) { - for (CanalEntry.Entry entry : message.getEntries()) { - if (entry.getEntryType() != CanalEntry.EntryType.TRANSACTIONBEGIN - && entry.getEntryType() != CanalEntry.EntryType.TRANSACTIONEND) { - valid = true; - break; + if (message != null) { + if (message.isRaw() && !message.getRawEntries().isEmpty()) { + for (ByteString byteString : message.getRawEntries()) { + CanalEntry.Entry entry = CanalEntry.Entry.parseFrom(byteString); + if (entry.getEntryType() != CanalEntry.EntryType.TRANSACTIONBEGIN + && entry.getEntryType() != CanalEntry.EntryType.TRANSACTIONEND) { + valid = true; + break; + } + } + } else if (!message.getEntries().isEmpty()){ + for (CanalEntry.Entry entry : message.getEntries()) { + if (entry.getEntryType() != CanalEntry.EntryType.TRANSACTIONBEGIN + && entry.getEntryType() != CanalEntry.EntryType.TRANSACTIONEND) { + valid = true; + break; + } } } } diff --git a/kafka/src/main/java/com/alibaba/otter/canal/kafka/producer/MessageSerializer.java b/kafka/src/main/java/com/alibaba/otter/canal/kafka/producer/MessageSerializer.java index 339e4f8cee..17fca1170b 100644 --- a/kafka/src/main/java/com/alibaba/otter/canal/kafka/producer/MessageSerializer.java +++ b/kafka/src/main/java/com/alibaba/otter/canal/kafka/producer/MessageSerializer.java @@ -1,14 +1,13 @@ package com.alibaba.otter.canal.kafka.producer; -import java.util.Map; - +import com.alibaba.otter.canal.protocol.CanalEntry; +import com.alibaba.otter.canal.protocol.CanalPacket; +import com.alibaba.otter.canal.protocol.Message; import org.apache.kafka.common.errors.SerializationException; import org.apache.kafka.common.serialization.Serializer; import org.springframework.util.CollectionUtils; -import com.alibaba.otter.canal.protocol.CanalEntry; -import com.alibaba.otter.canal.protocol.CanalPacket; -import com.alibaba.otter.canal.protocol.Message; +import java.util.Map; /** * Kafka Message类的序列化 @@ -25,12 +24,15 @@ public void configure(Map configs, boolean isKey) { @Override public byte[] serialize(String topic, Message data) { try { - if (data == null) return null; - else { + if (data != null) { CanalPacket.Messages.Builder messageBuilder = CanalPacket.Messages.newBuilder(); - if (data.getId() != -1 && !CollectionUtils.isEmpty(data.getEntries())) { - for (CanalEntry.Entry entry : data.getEntries()) { - messageBuilder.addMessages(entry.toByteString()); + if (data.getId() != -1) { + if (data.isRaw() && !CollectionUtils.isEmpty(data.getRawEntries())) { + messageBuilder.addAllMessages(data.getRawEntries()); + } else if (!CollectionUtils.isEmpty(data.getEntries())) { + for (CanalEntry.Entry entry : data.getEntries()) { + messageBuilder.addMessages(entry.toByteString()); + } } } CanalPacket.Packet.Builder packetBuilder = CanalPacket.Packet.newBuilder(); @@ -41,6 +43,7 @@ public byte[] serialize(String topic, Message data) { } catch (Exception e) { throw new SerializationException("Error when serializing message to byte[] "); } + return null; } @Override