In [2]:
# 필요한 라이브러리 선언
from confluent_kafka import Producer
from pyarrow import fs
import pandas as pd
import json, os, subprocess

# 필요한 전역 변수 선언
city_code = [11,26,27,28,29,30,31,36,41,43,44,46,47,48,50,51,52] # zcode에 사용할 시군구 코드
url = 'https://apis.data.go.kr/B552584/EvCharger/getChargerStatus?serviceKey=34lIyWAciAsJlGtIZ4ltpLy2sLZDR%2BBRWvAv8RgoADNEd%2BKCgHe84XiSRUwL8JMMIubzsFW3ddjcNlhZHhvJIQ%3D%3D&pageNo={}&numOfRows={}&period=5&zcode={}&dataType=JSON'

# 개인화 전역 변수 @@@@@@@@@@@@@@@@@@ 수정 필요 @@@@@@@@@@@@@@@@@@
# hadoop 디렉터리
hdfs_dir = "encore_20240614_khy"
# kafka 토픽
kafka_topic = "encore_20240614_khy"

In [None]:
for cc in city_code:
  com = ['curl', url.format(1, 5, cc)]
  rt = subprocess.run(com, capture_output=True, text=True)

  total_cnt = json.loads(rt.stdout)['totalCount']
  max_page = total_cnt // 50 + 1

  data_per_cc = []
  for i in range(1, max_page + 1):
    com = ['curl', url.format(i, 50, cc)]
    rt = subprocess.run(com, capture_output=True, text=True)

    temp_item = json.loads(rt.stdout)['items']['item']
    data_per_cc.extend(temp_item)

  # make dir data if not exist
  if not os.path.exists('data'):
    os.makedirs('data')
  
  # to csv file
  with open(f'data/{cc}.csv', 'w') as f:
    # write header
    f.write(','.join(data_per_cc[0].keys()) + '\n')
    # write data
    for item in data_per_cc:
      f.write(','.join(item.values()) + '\n')

In [None]:
# hdfs에 접근하기 위한 환경변수 설정
classpath = subprocess.Popen(["/home/hadoop/hadoop/bin/hdfs", "classpath", "--glob"],stdout=subprocess.PIPE).communicate()[0]
os.environ['CLASSPATH'] = classpath.decode('utf-8')
os.environ['ARROW_LIBHDFS_DIR'] = "/home/hadoop/hadoop/lib/native"
hdfs = fs.HadoopFileSystem(host='192.168.0.160', port=8020, user='hadoop')


# 로컬 data 디렉터리에 있는 csv 파일을 hdfs로 전송
for cc in city_code:
  df = pd.read_csv(f'data/{cc}.csv')
  df.fillna('-', inplace=True)
  # write to hdfs
  with hdfs.open_output_stream(f'/{hdfs_dir}/{cc}.csv') as f:
    df.to_csv(f, index=False)

In [6]:
class Hdfs2Kafka(object):
  def __init__(self):
    classpath = subprocess.Popen(['/home/hadoop/hadoop/bin/hdfs', 'classpath', '--glob'], stdout=subprocess.PIPE).communicate()[0]
    os.environ['CLASSPATH'] = classpath.decode('utf-8')
    os.environ['ARROW_LIBHDFS_DIR'] = '/home/hadoop/hadoop/lib/native'
    self._hdfs = fs.HadoopFileSystem(host='192.168.0.160', port=8020, user='hadoop')

    kafka_brokers = "192.168.0.201:9092"
    kafka_resetType = "earliest"

    conf = {'bootstrap.servers': kafka_brokers}
    self._producer = Producer(**conf)

  def getHdFileInfo(self, filename):
    f_Info = self._hdfs.get_file_info(filename)
    print(f"type: {str(f_Info.type)}")
    print(f"path: {f_Info.path}")
    print(f"size: {f_Info.size}")
    print(f"last_modified: {f_Info.mtime}")
    
    # return f_Info

  def readHdFile(self, filename):
    with self._hdfs.open_input_file(filename) as f:
      read_data = f.read().decode('utf-8').splitlines()
      return [line.split(',') for line in read_data]
    
  def sendData2Kafka(self, topic, list_line):
    for data in list_line:
      str_tmp = ",".join(data).split(",")
      modified_data = ",".join(str_tmp[:2]) + "," + ",".join(str_tmp[4:])
      print(modified_data)
      self._producer.poll(0)
      self._producer.produce(topic, modified_data.encode('utf-8'), callback=kafka_producer_call)
      self._producer.flush()
  
  def checkKafkaTopic(self, topic):
    topic_list = self._producer.list_topics().topics
    if topic in topic_list:
      print(f"topic {topic} is exist")
      return True
    else:
      print(f"topic {topic} is not exist")
      return False
    
  def createKafkaTopic(self, topic):
    self._producer.create_topics([topic])

def kafka_producer_call(err, msg):
  if err is not None:
    print(f"Failed to deliver message, err={str(err)} msg={str(msg)}")
  else:
    print(f"Message produced, topic={msg.topic()} partition=[{msg.partition()}] @ offset={msg.offset()}")
    print(f"Message.topic: {msg.topic()}")
    print(f"Message.timestamp: {msg.timestamp()}")
    print(f"Message.key: {msg.key()}")
    print(f"Message.value: {msg.value().decode('utf-8')}")
    print(f"Message.partition: {msg.partition()}")
    print(f"Message.offset: {msg.offset()}")

In [8]:
# kafka producer 객체 생성
kafka_prod = Hdfs2Kafka()

# kafka 토픽이 존재하는지 확인
if(kafka_prod.checkKafkaTopic(kafka_topic)):
  print("kafka topic is exist, good to go")
else:
  print("kafka topic is not exist, create topic")
  kafka_prod.createKafkaTopic(kafka_topic)
  print(f"kafka topic \"{kafka_topic}\" created")

topic encore_20240614_khy is exist


True

In [None]:
# hdfs로부터 csv 파일을 읽어서 kafka로 전송
for cc in city_code:
  temp = kafka_prod.readHdFile(f'/{hdfs_dir}/{cc}.csv')
  kafka_prod.sendData2Kafka(kafka_topic, temp)