Skip to content

Commit

Permalink
Refine contribution #3827
Browse files Browse the repository at this point in the history
* Update year in licence headers
* Update Javadoc
* Add `this` keyword where appropriate
  • Loading branch information
fmbenhassine committed Mar 9, 2021
1 parent c886a60 commit 572a302
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 10 deletions.
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2013 the original author or authors.
* Copyright 2002-2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
Expand Down Expand Up @@ -45,6 +45,12 @@ public void write(List<? extends V> items) throws Exception {
}
flush();
}

/**
* Flush items to the key/value store.
*
* @throws Exception if unable to flush items
*/
protected void flush() throws Exception {}

/**
Expand Down
@@ -1,5 +1,5 @@
/*
* Copyright 2019-2020 the original author or authors.
* Copyright 2019-2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -44,19 +44,20 @@ public class KafkaItemWriter<K, T> extends KeyValueItemWriter<K, T> {
@Override
protected void writeKeyValue(K key, T value) {
if (this.delete) {
listenableFutures.add(this.kafkaTemplate.sendDefault(key, null));
this.listenableFutures.add(this.kafkaTemplate.sendDefault(key, null));
}
else {
listenableFutures.add(this.kafkaTemplate.sendDefault(key, value));
this.listenableFutures.add(this.kafkaTemplate.sendDefault(key, value));
}
}

@Override
protected void flush() throws Exception{
kafkaTemplate.flush();
for(ListenableFuture<SendResult<K,T>> future: listenableFutures){
this.kafkaTemplate.flush();
for(ListenableFuture<SendResult<K,T>> future: this.listenableFutures){
future.get();
}
listenableFutures.clear();
this.listenableFutures.clear();
}

@Override
Expand Down
@@ -1,5 +1,5 @@
/*
* Copyright 2019-2020 the original author or authors.
* Copyright 2019-2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -31,7 +31,9 @@
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.*;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.mockito.Mockito.times;

public class KafkaItemWriterTests {

Expand All @@ -49,7 +51,7 @@ public class KafkaItemWriterTests {
public void setUp() throws Exception {
MockitoAnnotations.openMocks(this);
when(this.kafkaTemplate.getDefaultTopic()).thenReturn("defaultTopic");
when(this.kafkaTemplate.sendDefault(any(), any())).thenReturn(future);
when(this.kafkaTemplate.sendDefault(any(), any())).thenReturn(this.future);
this.itemKeyMapper = new KafkaItemKeyMapper();
this.writer = new KafkaItemWriter<>();
this.writer.setKafkaTemplate(this.kafkaTemplate);
Expand Down

0 comments on commit 572a302

Please sign in to comment.