In [1]:
from kafka import KafkaConsumer
import json

In [3]:
bootstrap_servers = "mydb.iptime.org:9092"
topic_name = "smartcar-data"

consumer = KafkaConsumer(
    topic_name,
    bootstrap_servers = bootstrap_servers,
    auto_offset_reset = "latest", 
    enable_auto_commit = True,
    group_id = "smartcar-group",
    value_deserializer=lambda x: json.loads(x.decode("utf-8"))
)

print("데이터 가져오는중")

for message in consumer:
    print(f"Received message: {message.value}")

데이터 가져오는중
Received message: {'car_id': '3134/100000', 'speed': 103.0, 'latitude': 35.017916, 'longitude': 129.140776, 'timestamp': 1746608038.6631258}
Received message: {'car_id': '3135/100000', 'speed': 102.5, 'latitude': 35.017883, 'longitude': 129.140586, 'timestamp': 1746608039.1661892}
Received message: {'car_id': '3136/100000', 'speed': 102.9, 'latitude': 35.017854, 'longitude': 129.140392, 'timestamp': 1746608039.669245}
Received message: {'car_id': '3137/100000', 'speed': 102.6, 'latitude': 35.017828, 'longitude': 129.140199, 'timestamp': 1746608040.1723366}
Received message: {'car_id': '3138/100000', 'speed': 102.9, 'latitude': 35.017803, 'longitude': 129.140005, 'timestamp': 1746608040.6750576}
Received message: {'car_id': '3139/100000', 'speed': 103.1, 'latitude': 35.017787, 'longitude': 129.139807, 'timestamp': 1746608041.1781933}
Received message: {'car_id': '3140/100000', 'speed': 102.9, 'latitude': 35.017768, 'longitude': 129.139611, 'timestamp': 1746608041.681296}
Recei

KeyboardInterrupt: 

In [7]:
bootstrap_servers = "mydb.iptime.org:9092"
topic_name = "smartcar-data"
out_file = 'smartcar_100.json'

consumer = KafkaConsumer(
    topic_name,
    bootstrap_servers = bootstrap_servers,
    auto_offset_reset = "latest", 
    enable_auto_commit = True,
    value_deserializer=lambda x: json.loads(x.decode("utf-8"))
)

print("데이터 가져오는중")

buffer = []
for message in consumer:
    buffer.append(message.value)
    if len(buffer) == 100:
        break

with open(out_file, 'w', encoding='utf-8') as f:
    for item in buffer:
        json.dump(item, f)
        f.write('\n')

print(f"✅ 메시지 {len(buffer)}개 저장 완료: {out_file}")

데이터 가져오는중
✅ 메시지 100개 저장 완료: smartcar_100.json


In [None]:
from kafka import KafkaConsumer
import json
import time
from datetime import datetime
import subprocess
import os
import traceback

bootstrap_servers = "mydb.iptime.org:9092"
topic_name = "smartcar-data"
save_interval_sec = 12 * 60 * 60  # 12시간
hdfs_dir = "/user/hadoop/smartcar-data"
local_dir = "/tmp/kafka_buffer"

# 로컬 저장 경로 생성
os.makedirs(local_dir, exist_ok=True)

while True:
    timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
    filename = f'smartcar_100_{timestamp}.json'
    local_path = os.path.join(local_dir, filename)
    buffer = []

    print(f"[{timestamp}] Kafka에서 메시지 100개 수집 시작...")

    try:
        consumer = KafkaConsumer(
            topic_name,
            bootstrap_servers=bootstrap_servers,
            auto_offset_reset="latest",
            enable_auto_commit=True,
            value_deserializer=lambda x: json.loads(x.decode("utf-8"))
        )

        for message in consumer:
            buffer.append(message.value)
            if len(buffer) == 100:
                break

    except Exception as e:
        print(f"[!] Kafka 소비 중 오류 발생: {e}")
        traceback.print_exc()

    finally:
        if buffer:
            # 로컬 파일 저장
            with open(local_path, 'w', encoding='utf-8') as f:
                for item in buffer:
                    json.dump(item, f)
                    f.write('\n')

            print(f"[✓] 메시지 {len(buffer)}개 저장 완료: {local_path}")

            # HDFS 디렉토리 생성 (있으면 생략됨)
            subprocess.run(['hdfs', 'dfs', '-mkdir', '-p', hdfs_dir], check=False)

            # HDFS로 업로드
            try:
                subprocess.run(['hdfs', 'dfs', '-put', local_path, hdfs_dir], check=True)
                print(f"[→] HDFS 적재 완료: {hdfs_dir}/{filename}")
                os.remove(local_path)
            except subprocess.CalledProcessError as e:
                print(f"[!] HDFS 업로드 실패: {e}")
        else:
            print("[!] 저장할 메시지가 없어 파일 생성 및 HDFS 업로드 생략됨")

    print(f"[⏳] 12시간 대기...\n")
    time.sleep(save_interval_sec)
