Skip to content

Commit e54cecf

Browse files
committed
Idempotency
1 parent 03f9728 commit e54cecf

File tree

7 files changed

+25
-18
lines changed

7 files changed

+25
-18
lines changed

11-idempotency/idempotency-consumer/pom.xml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
<parent>
77
<groupId>org.springframework.boot</groupId>
88
<artifactId>spring-boot-starter-parent</artifactId>
9-
<version>2.7.5</version>
9+
<version>3.3.1</version>
1010
<relativePath /> <!-- lookup parent from repository -->
1111
</parent>
1212
<groupId>com.course.kafka</groupId>
@@ -68,4 +68,4 @@
6868
</plugins>
6969
</build>
7070

71-
</project>
71+
</project>

11-idempotency/idempotency-consumer/src/main/java/com/course/kafka/config/CacheConfig.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package com.course.kafka.config;
22

33
import java.time.Duration;
4+
import java.util.concurrent.TimeUnit;
45

56
import org.springframework.context.annotation.Bean;
67
import org.springframework.context.annotation.Configuration;
@@ -13,11 +14,14 @@
1314
public class CacheConfig {
1415
@Bean(name = "cachePurchaseRequest")
1516
public Cache<Integer, Boolean> cachePurchaseRequest() {
16-
return Caffeine.newBuilder().expireAfterWrite(Duration.ofMinutes(2)).maximumSize(1000).build();
17+
//return Caffeine.newBuilder().expireAfterWrite(Duration.ofMinutes(2)).maximumSize(1000).build();
18+
return Caffeine.newBuilder().expireAfterWrite(5, TimeUnit.MINUTES)
19+
.maximumSize(1000).build();
1720
}
1821

1922
@Bean(name = "cachePaymentRequest")
2023
public Cache<PaymentRequestCacheKey, Boolean> cachePaymentRequest() {
21-
return Caffeine.newBuilder().expireAfterWrite(Duration.ofMinutes(2)).maximumSize(1000).build();
24+
return Caffeine.newBuilder().expireAfterWrite(2, TimeUnit.MINUTES)
25+
.maximumSize(1000).build();
2226
}
23-
}
27+
}

11-idempotency/idempotency-consumer/src/main/java/com/course/kafka/consumer/PurchaseRequestConsumer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ private boolean isExistsInCache(int purchaseRequestId) {
3333

3434
@KafkaListener(topics = "t-purchase-request")
3535
public void consume(String message) throws JsonMappingException, JsonProcessingException {
36-
var purchaseRequest = objectMapper.readValue(message, PurchaseRequest.class);
36+
PurchaseRequest purchaseRequest = objectMapper.readValue(message, PurchaseRequest.class);
3737

3838
var processed = isExistsInCache(purchaseRequest.getId());
3939

11-idempotency/idempotency-producer/pom.xml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
<parent>
66
<groupId>org.springframework.boot</groupId>
77
<artifactId>spring-boot-starter-parent</artifactId>
8-
<version>2.7.5</version>
8+
<version>3.3.1</version>
99
<relativePath/> <!-- lookup parent from repository -->
1010
</parent>
1111
<groupId>com.course.kafka</groupId>
@@ -60,4 +60,4 @@
6060
</plugins>
6161
</build>
6262

63-
</project>
63+
</project>

11-idempotency/idempotency-producer/src/main/java/com/course/kafka/IdempotencyProducerApplication.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -24,12 +24,12 @@ public void run(String... args) throws Exception {
2424
PurchaseRequest pr2 = new PurchaseRequest(5552, "PR-Second", 992, "USD");
2525
PurchaseRequest pr3 = new PurchaseRequest(5553, "PR-Third", 993, "USD");
2626

27-
producer.send(pr1);
28-
producer.send(pr2);
29-
producer.send(pr3);
27+
producer.sendPurchaseRequest(pr1);
28+
producer.sendPurchaseRequest(pr2);
29+
producer.sendPurchaseRequest(pr3);
3030

3131
//sending the duplicate record again
32-
producer.send(pr1);
32+
producer.sendPurchaseRequest(pr1);
3333
}
3434

35-
}
35+
}

11-idempotency/idempotency-producer/src/main/java/com/course/kafka/producer/PurchaseRequestProducer.java

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,12 @@ public class PurchaseRequestProducer {
1717
@Autowired
1818
private ObjectMapper objectMapper;
1919

20-
public void send(PurchaseRequest purchaseRequest) throws JsonProcessingException {
21-
var json = objectMapper.writeValueAsString(purchaseRequest);
22-
kafkaTemplate.send("t-purchase-request", purchaseRequest.getPrNumber(), json);
20+
public void sendPurchaseRequest(PurchaseRequest purchaseRequest) throws JsonProcessingException {
21+
try {
22+
var json = objectMapper.writeValueAsString(purchaseRequest);
23+
kafkaTemplate.send("t-purchase-request", purchaseRequest.getPrNumber(), json);
24+
} catch (Exception e) {
25+
throw new RuntimeException(e);
26+
}
2327
}
24-
2528
}
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
1+
server.port=8085

0 commit comments

Comments
 (0)