Skip to content

Commit

Permalink
BUG: kafka event is not consistent.
Browse files Browse the repository at this point in the history
Sometimes the events are consumed in Task-Service, while other times it is consumed in TimeEntry-Service; whereas it should be consumed by both services!
  • Loading branch information
seyedali-dev committed Jun 4, 2024
1 parent 494341a commit aa1f744
Show file tree
Hide file tree
Showing 5 changed files with 55 additions and 10 deletions.
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
package com.seyed.ali.timeentryservice.event;

import com.seyed.ali.timeentryservice.model.enums.OperationType;
import com.seyed.ali.timeentryservice.model.payload.ProjectDTO;
import com.seyed.ali.timeentryservice.repository.TimeEntryRepository;
import com.seyed.ali.timeentryservice.service.interfaces.TimeEntryService;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Service;

Expand All @@ -15,19 +15,19 @@
@RequiredArgsConstructor
public class ProjectEventListener {

private final TimeEntryService timeEntryService;
private final TimeEntryRepository timeEntryRepository;
private final ProjectEventService projectEventService;

@KafkaListener(
topics = "${spring.kafka.topic.name}",
groupId = "${spring.kafka.consumer.group-id}" // if we change the group-id name, the events from the previous data will also be logged, otherwise, only the new events will be logged!
)
public void listenProject(@Payload ConsumerRecord<String, ProjectDTO> record) {
public void handleProjectEvent(@Payload ConsumerRecord<String, ProjectDTO> record, @Header("OperationType") String operationType) {
ProjectDTO projectDTO = record.value();
log.info("Received message: {}", projectDTO);

this.timeEntryRepository.findByProjectId(projectDTO.getProjectId())
.forEach(this.timeEntryService::deleteTimeEntry);
switch (OperationType.valueOf(operationType)) {
case DELETE -> this.projectEventService.handleDeleteOperation(projectDTO);
case DETACH -> this.projectEventService.handleDetachOperation(projectDTO);
default -> log.warn("\"{}\" operation type not supported.", operationType);
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package com.seyed.ali.timeentryservice.event;

import com.seyed.ali.timeentryservice.model.payload.ProjectDTO;
import com.seyed.ali.timeentryservice.repository.TimeEntryRepository;
import com.seyed.ali.timeentryservice.service.interfaces.TimeEntryService;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;

import java.util.Optional;

@Slf4j
@Service
@RequiredArgsConstructor
public class ProjectEventService {

private final TimeEntryService timeEntryService;
private final TimeEntryRepository timeEntryRepository;

public void handleDeleteOperation(ProjectDTO projectDTO) {
log.info("\"Delete\" project, associated tasks & time entries: {}", projectDTO);
this.timeEntryRepository.findByProjectId(projectDTO.getProjectId())
.forEach(this.timeEntryService::deleteTimeEntry);
}

public void handleDetachOperation(ProjectDTO projectDTO) {
log.info("\"Detach\" project, associated tasks & time entries: {}", projectDTO);
this.timeEntryRepository.findByProjectId(projectDTO.getProjectId())
.forEach(timeEntry -> {
timeEntry.setProjectId(null);
timeEntry.setTaskId(null);
this.timeEntryRepository.save(timeEntry);
});
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package com.seyed.ali.timeentryservice.model.enums;

public enum OperationType {

DELETE, DETACH

}
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,6 @@ public interface TimeEntryRepository extends JpaRepository<TimeEntry, String> {

List<TimeEntry> findByProjectId(String projectId);

List<TimeEntry> findByTaskId(String taskId);

}
2 changes: 1 addition & 1 deletion src/main/resources/application-dev.yml
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ spring:
kafka:
consumer:
bootstrap-servers: localhost:9092
group-id: project_group
group-id: project_group_timeentry
auto-offset-reset: earliest

#configure deserialize classes for key & value pair
Expand Down

0 comments on commit aa1f744

Please sign in to comment.