@@ -73,35 +73,36 @@ public void start(Map<String, String> settings) {
7373 this .session = RedisSessionImpl .create (this .config );
7474
7575 final Set <TopicPartition > assignment = this .context .assignment ();
76- final byte [][] partitionKeys = assignment .stream ()
77- .map (RedisSinkTask ::redisOffsetKey )
78- .map (s -> s .getBytes (Charsets .UTF_8 ))
79- .toArray (byte [][]::new );
80-
81-
82- final RedisFuture <List <KeyValue <byte [], byte []>>> partitionKeyFuture = this .session .asyncCommands ().mget (partitionKeys );
83- final List <SinkOffsetState > sinkOffsetStates ;
84- try {
85- final List <KeyValue <byte [], byte []>> partitionKey = partitionKeyFuture .get (this .config .operationTimeoutMs , TimeUnit .MILLISECONDS );
86- sinkOffsetStates = partitionKey .stream ()
87- .map (RedisSinkTask ::state )
88- .filter (Objects ::nonNull )
89- .collect (Collectors .toList ());
90- } catch (InterruptedException | ExecutionException | TimeoutException e ) {
91- throw new RetriableException (e );
92- }
93- Map <TopicPartition , Long > partitionOffsets = new HashMap <>(assignment .size ());
94- for (SinkOffsetState state : sinkOffsetStates ) {
95- partitionOffsets .put (state .topicPartition (), state .offset ());
96- log .info ("Requesting offset {} for {}" , state .offset (), state .topicPartition ());
97- }
98- for (TopicPartition topicPartition : assignment ) {
99- if (!partitionOffsets .containsKey (topicPartition )) {
100- partitionOffsets .put (topicPartition , 0L );
101- log .info ("Requesting offset {} for {}" , 0L , topicPartition );
76+ if (!assignment .isEmpty ()) {
77+ final byte [][] partitionKeys = assignment .stream ()
78+ .map (RedisSinkTask ::redisOffsetKey )
79+ .map (s -> s .getBytes (Charsets .UTF_8 ))
80+ .toArray (byte [][]::new );
81+
82+ final RedisFuture <List <KeyValue <byte [], byte []>>> partitionKeyFuture = this .session .asyncCommands ().mget (partitionKeys );
83+ final List <SinkOffsetState > sinkOffsetStates ;
84+ try {
85+ final List <KeyValue <byte [], byte []>> partitionKey = partitionKeyFuture .get (this .config .operationTimeoutMs , TimeUnit .MILLISECONDS );
86+ sinkOffsetStates = partitionKey .stream ()
87+ .map (RedisSinkTask ::state )
88+ .filter (Objects ::nonNull )
89+ .collect (Collectors .toList ());
90+ } catch (InterruptedException | ExecutionException | TimeoutException e ) {
91+ throw new RetriableException (e );
92+ }
93+ Map <TopicPartition , Long > partitionOffsets = new HashMap <>(assignment .size ());
94+ for (SinkOffsetState state : sinkOffsetStates ) {
95+ partitionOffsets .put (state .topicPartition (), state .offset ());
96+ log .info ("Requesting offset {} for {}" , state .offset (), state .topicPartition ());
97+ }
98+ for (TopicPartition topicPartition : assignment ) {
99+ if (!partitionOffsets .containsKey (topicPartition )) {
100+ partitionOffsets .put (topicPartition , 0L );
101+ log .info ("Requesting offset {} for {}" , 0L , topicPartition );
102+ }
102103 }
104+ this .context .offset (partitionOffsets );
103105 }
104- this .context .offset (partitionOffsets );
105106 }
106107
107108 private byte [] toBytes (String source , Object input ) {
@@ -129,6 +130,14 @@ private byte[] toBytes(String source, Object input) {
129130 return result ;
130131 }
131132
133+ static String formatLocation (SinkRecord record ) {
134+ return String .format (
135+ "topic = %s partition = %s offset = %s" ,
136+ record .topic (),
137+ record .kafkaPartition (),
138+ record .kafkaOffset ()
139+ );
140+ }
132141
133142 @ Override
134143 public void put (Collection <SinkRecord > records ) {
@@ -140,10 +149,19 @@ public void put(Collection<SinkRecord> records) {
140149 TopicPartitionCounter counter = new TopicPartitionCounter ();
141150
142151 for (SinkRecord record : records ) {
152+ log .trace ("put() - Processing record " + formatLocation (record ));
143153 if (null == record .key ()) {
144- throw new DataException ("The key for the record cannot be null." );
154+ throw new DataException (
155+ "The key for the record cannot be null. " + formatLocation (record )
156+ );
145157 }
146158 final byte [] key = toBytes ("key" , record .key ());
159+ if (null == key || key .length == 0 ) {
160+ throw new DataException (
161+ "The key cannot be an empty byte array. " + formatLocation (record )
162+ );
163+ }
164+
147165 final byte [] value = toBytes ("value" , record .value ());
148166
149167 SinkOperation .Type currentOperationType ;
@@ -174,18 +192,20 @@ public void put(Collection<SinkRecord> records) {
174192 );
175193
176194 final List <SinkOffsetState > offsetData = counter .offsetStates ();
177- operation = SinkOperation .create (SinkOperation .Type .SET , this .config , offsetData .size ());
178- operations .add (operation );
179- for (SinkOffsetState e : offsetData ) {
180- final byte [] key = String .format ("__kafka.offset.%s.%s" , e .topic (), e .partition ()).getBytes (Charsets .UTF_8 );
181- final byte [] value ;
182- try {
183- value = ObjectMapperFactory .INSTANCE .writeValueAsBytes (e );
184- } catch (JsonProcessingException e1 ) {
185- throw new DataException (e1 );
195+ if (!offsetData .isEmpty ()) {
196+ operation = SinkOperation .create (SinkOperation .Type .SET , this .config , offsetData .size ());
197+ operations .add (operation );
198+ for (SinkOffsetState e : offsetData ) {
199+ final byte [] key = String .format ("__kafka.offset.%s.%s" , e .topic (), e .partition ()).getBytes (Charsets .UTF_8 );
200+ final byte [] value ;
201+ try {
202+ value = ObjectMapperFactory .INSTANCE .writeValueAsBytes (e );
203+ } catch (JsonProcessingException e1 ) {
204+ throw new DataException (e1 );
205+ }
206+ operation .add (key , value );
207+ log .trace ("put() - Setting offset: {}" , e );
186208 }
187- operation .add (key , value );
188- log .trace ("put() - Setting offset: {}" , e );
189209 }
190210
191211 for (SinkOperation op : operations ) {
0 commit comments