Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kafka producer api #110

Open
yeomko22 opened this issue Sep 18, 2021 · 0 comments
Open

kafka producer api #110

yeomko22 opened this issue Sep 18, 2021 · 0 comments

Comments

@yeomko22
Copy link
Owner

카프카 클라이언트

  • 카프카 클러스터에 명령을 내리거나 데이터를 송수신하기 위해 카프카 클라이언트 라이브러리는 카프카 프로듀서, 컨슈머, 어드민 클라이언트를 제공하는 카프카 클라이언트를 사용해서 애플리케이션을 개발한다.
  • 프로듀서 API: 전송 시 리더 파티션을 가지고 있는 카프카 브로커와 통신, 데이터를 직렬화 하여 전송, 자바 기본형 데이터 뿐만 아니라 동영상, 이미지 같은 바이너리 데이터도 프로듀서를 통해 전송할 수 있다.
public class SimpleProducer {
    private final static Logger logger = LoggerFactory.getLogger(SimpleProducer.class);
    private final static String TOPIC_NAME = "test";
    private final static String BOOTSTRAP_SERVERS = "my-kafka:9092";

    public static void main(String[] args) {
        Properties configs = new Properties();
        configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        KafkaProducer<String, String> producer = new KafkaProducer<>(configs);
        String messageValue = "testMessage";
        ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME, messageValue);
        producer.send(record);
        producer.flush();
        producer.close();
    }
}
  • 실제 운영 환경에서는 데이터가 정상적으로 전송되었는지 확인하는 로직, 환경에 따른 추가적인 프로듀서 선택 옵션들의 설정 등과 같은 로직들이 포함된다.
  • 애플리케이션 단에서 토픽에 레코드를 전송하기 전에 카프카 커맨드를 이용해서 미리 토픽을 만들어준다.

11

  • 프로듀서 애플리케이션은 카프카 클러스터로 레코드를 전송한다.
  • 이 떄 내부적으로 파티셔너, 어큐물레이터, 센더를 거치게 된다.
  • 레코드 생성 시 파티션을 직접 지정하거나 타임스탬프 지정, 메세지 키 지정 등이 가능하다.
  • 프로듀서 인스턴스의 send()를 호출하게 되면 먼저 파티셔너에서 토픽의 어느 파티션으로 전송할 것인지 결정된다.
  • 파티션이 구분된 데이터는 어큐물레이터에 토픽별로 버퍼에 쌓인 다음 배치 단위로 전송된다.

프로듀서 필수 옵션

  • bootstrap.servers: 프카프카 클러스터에 속한 ㅍ브로커의 호스트 이름:포트를 1개 이상 작성
  • key.serializer: 레코드의 메시지 키를 직렬화 하는 클래스를 지정
  • value.serializer: 레코드의 메시지 값을 직렬화하는 클래스 지정

프로듀서 선택 옵션

  • acks: 프로듀서가 전송한 데이터가 브로커들에 정상적으로 저장되었는지 전송 성공 여부를 확인하는데 사용
    • 0, 1, -1 중 택 1, 1은 리더 파티션에 데이터가 저장되면 전송 성공, 0은 프로듀서가 전송한 즉시 브로커에 저장 여부와 상관없이 성공, -1은 리더 파티션과 팔로워 파티션에 데이터가 저장되면 성공하는 것으로 판단
  • buffer.memory: 브로커로 전송할 데이터를 배치로 모으기 위해 설정할 버퍼 메모리 양
  • retries: 브로커로부터 에러를 받고 난 뒤 재전송 시도 횟수 설정
  • batch.size: 배치로 전송할 레코드 최대 용량 지정, 기본값은 16384
  • linger.ms: 배치를 전송하기 전까지 기다리는 최소 시간, 기본값은 0
  • partitioner.class: 레코드를 파티션에 전송할 때 적용하는 파티션 클래스
  • enable.idempotence: 멱등성 프로듀서로 동작할 지 여부를 설정
  • transactional.id: 프로듀서가 레코드를 전송할 때 레코드를 트랜잭션 단위로 묶을지 여부를 설정

메시지 키를 가진 데이터 전송 프로듀서

// key 설정 레코드
ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME, "Pangyo", "23");

// partition 지정 레코드
ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME, 0, "Pangyo", "23");

커스텀 파티셔너

  • Partitioner 인터페이스를 직접 구현해서 특정 키가 특정 파티션에 배정되도록 적용할 수 있다.

브로커 정상 전송 여부 확인 프로듀서

ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME, messageValue);
RecordMetadata metadata = producer.send(record).get();
logger.info(metadata.toString());
  • producer.send(record).get() 함수를 호출하면 동기적으로 프로듀서가 보낸 데이터의 결과를 확인할 수 있다.
  • 레코드가 정상적으로 브로커에 적재되면 토픽 이름-파티션번호-오프셋번호 가 리턴된다. ex) test-1@3
  • 콜백 함수를 만든 뒤, send 함수에 콜백 함수를 추가해서 전송하면 비동기적으로 데이터 전송 결과를 확인할 수 있다.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant