<a href="https://colab.research.google.com/github/team05-MLOps-pipeline/kafka-consumer-db-dag-/blob/main/%5Bairflow%5D_kafka_hdfs.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
from airflow.operators.python_operator import PythonOperator
from airflow import DAG
from datetime import datetime, timedelta
from confluent_kafka import Consumer, KafkaError, TopicPartition
from hdfs import InsecureClient
import json

# DAG 설정
default_args = {
    'owner': 'kafka_to_hdfs_dag',
    'depends_on_past': False,
    'start_date': datetime(2023, 10, 23),
    'retries': 3,
    'retry_delay': timedelta(minutes=5),  # 5분 간격으로 실행
}

dag = DAG('kafka_to_hdfs_dag',
          default_args=default_args,
          catchup=False,
          schedule_interval=timedelta(minutes=5))



def kafka_to_hdfs():

    topic_name = "euntest"
    partition = 0

    kafka_conf = {
        'bootstrap.servers': 'ip:9092',
        'group.id': 'devinu',
        'auto.offset.reset': 'latest'
    }

    consumer = Consumer(kafka_conf)
    consumer.subscribe([topic_name])

    # Hadoop 클라이언트 설정
    hdfs_client = InsecureClient('http://ip9870', user='root')

    now = datetime.now()

    file_name = now.strftime("%Y%m%d%H%M%S") + '.txt'
    hdfs_path = '/test/' + file_name

    data_to_save = []
    partition_lag = 0
    topic = TopicPartition(topic_name, partition)
    consumer.assign([topic])

    while True:
        committed = consumer.committed([topic])[0].offset
        last_offset = consumer.get_watermark_offsets(topic)[1]
        partition_lag = last_offset - committed

        if partition_lag <= 1:
            break

        msg = consumer.poll(timeout=10)

        if msg is None:
            print("No message received.")
            # break
            continue

        print(last_offset, committed, partition_lag)

        if msg.error():
            # raise KafkaException(msg.error())
            pass
        else:
            value = msg.value().decode('unicode_escape')
            data = json.loads(value)

            # print(data)
            data_to_save.append(data)

    if data_to_save:
        with hdfs_client.write(hdfs_path, encoding='utf-8', overwrite=True) as writer:
            for data in data_to_save:
                try:
                    # data를 JSON 문자열로 직렬화
                    data_json = json.dumps(data, ensure_ascii=False)
                    writer.write(data_json + '\n')
                except Exception as e:
                    print(f"An error occurred: {str(e)}")

        print(f"Saved  messages to HDFS")


# Airflow PythonOperator를 사용하여 위에서 정의한 함수를 실행
kafka_to_hdfs_task = PythonOperator(
    task_id='kafka_to_hdfs_task',
    python_callable=kafka_to_hdfs,
    dag=dag
)