Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kafka consumer detail #116

Open
yeomko22 opened this issue Sep 20, 2021 · 4 comments
Open

kafka consumer detail #116

yeomko22 opened this issue Sep 20, 2021 · 4 comments

Comments

@yeomko22
Copy link
Owner

multithread consumer

  • 파티션을 여러개로 운영하는 경우 데이터 병렬 처리를 위해 파티션 개수와 컨슈머 개수를 동일하게 맞추는 것이 가장 좋다.
  • 토픽은 1개 이상의 파티션으로 이루어져 있으며, 1개의 파티션은 1개의 컨슈머가 할당되어 데이터를 처리할 수 있다.
  • n개 스레드를 가진 1개의 프로세스를 운영하거나, 1개 스레드를 가진 n개의 프로세스를 운영하는 방법이 있으며, 이는 개발자 선택이다.
  • 컨슈머 스레드 비정상 종료일 경우 프로세스가 종료될 수 있고, 이럴 경우 다른 스레드에 영향을 줄 수 있다.
  • 데이터의 중복 또는 유실을 조심해라

컨슈머 운영 전략

  • 멀티 워커 스레드 전략: 컨슈머 스레드는 1개만 실행, 데이터 처리를 담당하는 워커 스레드는 여러개를 실행하는 방법
  • 컨슈머 멀티 스레드 전략: 컨슈머 인스턴스에서 poll 메서드를 호출하는 스레드를 여러개 띄워 사용하는 방법
@yeomko22
Copy link
Owner Author

kafka consumer multi worker method

  • 브로커로부터 전달받은 레코드들을 병렬로 처리한다면 1개의 컨슈머 스레드로부터 받은 데이터들을 더욱 빠르게 처리할 수 있다.
  • ExecutorService라는 자바 라이브러리를 사용하면 병렬처리 스레드를 효율적으로 생성하고 관리할 수 있다.
  • Executors.newCachedThreadPool을 활용해서 쉽게 쓰레드 풀을 만들 수 있다.
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(10));
for (ConsumerRecord<String, String> record: records) {
    ConsumerWorker worker = new ConsumerWorker(record.value());
    executorService.execute(worker);
}
  • 쓰레드를 사용하면 데이터 처리가 끝나지 않았음에도 커밋을 한다. 따라서 리밸런싱, 컨슈머 장애 시에 데이터 유실이 발생할 수 있다.
  • 쓰레드 별로 처리 시간이 달라질 수 있다. 중복이나 역전 현상이 발생해도 빠른 속도가 필요할 때 적합하다.
  • 은행 잔고처럼 순서가 민감한 경우에는 데이터 역전을 예방해야한다.

@yeomko22
Copy link
Owner Author

Kafka consumer multi thread method

  • 하나의 파티션은 동일 컨슈머 중 최대 1개까지 할당
  • 하나의 컨슈머는 여러 파티션에 할당
  • 컨슈머 스레드를 늘려서 각 파티션에 할당하면 레코드 병렬 처리가 가능
  • 토픽 파티션 개수 만큼 컨슈머 스레드를 운영하는 것
ExcutorService executorSrervice = Executors.newCachedThreadPool();
for (int i=0; i<CONSUMER_COUNT; i++) {
    ConsumerWorker worker = new ConsumerWorker(configs, TOPIC_NAME, i);
    executorService.execute(worker);
}

@yeomko22
Copy link
Owner Author

컨슈머 랙

  • 컨슈머 랙: 토픽의 최신 오프셋과 컨슈머 오프셋 간의 차이
  • 컨슈머 랙은 컨슈머 그룹과 토픽, 파티션 별로 생성된다. 1개 토픽에 3개 파티션, 1개의 컨슈머 그룹이 구독을 하면 컨슈머 랙은 총 3개
  • 프로듀서가 보내는 양이 컨슈머 처리량보다 크다면 컨슈머 랙은 늘어난다.
  • 컨슈머 처리량이 프로듀서가 보내는 양보다 크면 컨슈머 랙은 줄어들다가 0이 된다.

컨슈머 metrics()를 이용한 컨슈머 랙 조회

for (Map.Extry<MetricName, ? extends Metric> entry : kafkaConsumer.metrics().entrySet()) {
    if ("records-lag-max".equl(etnry.getKey().name()) |
         "records-lag".equl(etnry.getKey().name()) |
         "records-lag-avg".equl(etnry.getKey().name())) {
        Metric metric = entry.getValue();
        logger.info("{}:{}", entry.getKey().name(), metric.metricValue());
    }
}
  • 그 외에도 datadog, confluent control center와 같은 카프카 종합 모니터링 툴을 사용하면 된다.

카프카 버로우

  • 카프카 버로우는 링크드인에서 공개한 오픈소스로 restapi를 통해서 컨슈머 그룹별 컨슈머 랙을 조회할 수 있다.
  • 다수의 클러스터를 동시에 연결하여 컨슈머 랙을 확인한다.
  • 그러나 버로우 기본 알람 모듈을 사용하기 보다는 컨슈머 랙 지표를 별도 저장소(influxDB, elasticsearch)에 저장하고 대쉬보드 (grafana, kibana)를 통해 조회, 알람 설정하는 것이 편리하다.

18

@yeomko22
Copy link
Owner Author

컨슈머 배포 프로세스

  • 중단 배포: 지연 발생해도 서비스 운영에 이슈가 없을 경우 사용
  • 무중단 배포: 중단 발생시 서비스에 영향이 클 경우 사용

무중단 배포

  • 중단 불가한 애플리케이션의 신규 로직 배포가 필요한 경우 무중단 배포가 필요
  • 블루/그린, 롤링, 카나리 사용이 가능

컨슈머 카나리 배포

  • 100개 파티션으로 운영하는 토픽이 있을 경우 1개 파티션에 컨슈머를 따로 배정하여 일부 데이터에 대해 신규 버전의 애플리케이션이 우선적으로 처리하는 방식으로 테스트 할 수 있다.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant