In [19]:
import json
import os
import csv
from confluent_kafka import Consumer, KafkaException, KafkaError
from pyarrow import fs
import pandas as pd
import subprocess

conf = {'bootstrap.servers': "192.168.0.201:9092", 
        'group.id' : 'test-group',
         'auto.offset.reset' : 'earliest'}
consumer = Consumer(conf)
consumer.subscribe(['RSH_KAFKA'])

classpath = subprocess.Popen(["/home/hadoop/hadoop/bin/hdfs", "classpath", "--glob"],
                 stdout=subprocess.PIPE).communicate()[0]
os.environ['CLASSPATH'] = classpath.decode('utf-8')
os.environ['ARROW_LIBHDFS_DIR'] = "/home/hadoop/hadoop/lib/native"
hdfs = fs.HadoopFileSystem(host='192.168.0.160', port=8020, user='hadoop')


# Initialize DataFrame to store messages
df = pd.DataFrame()
next_offset_threshold = 100

def process_message(message):
    global df, next_offset_threshold
    item_json_str = message.value().decode('utf-8')
    item = json.loads(item_json_str)
    
    # Convert item to DataFrame and concatenate with existing DataFrame
    item_df = pd.DataFrame([item])
    df = pd.concat([df, item_df], ignore_index=True)
    
    # Log the addition of a new item
    print(f"Added message to DataFrame: offset={message.offset()}")

    # Check the message offset and write to HDFS at specific offsets
    if message.offset() >= next_offset_threshold:
        write_to_hdfs(df, next_offset_threshold)
        df = pd.DataFrame()  # Reset DataFrame
        next_offset_threshold += 100  # Increase the threshold for the next batch


def write_to_hdfs(dataframe, offset):
    # Define HDFS file path for the current offset
    hdfs_file_path = f"/user/rsh/tmp2_offset_{offset}.csv"
    with hdfs.open_output_stream(hdfs_file_path) as fs_stream:
        line = dataframe.to_csv(index=False)
        fs_stream.write(line.encode())

try:
    while True:
        msg = consumer.poll(timeout=1.0)
        if msg is None:
            break
        if msg.error():
            if msg.error().code() == KafkaError._PARTITION_EOF:
                # End of partition event
                print(f"{msg.topic()} [{msg.partition()}] reached end at offset {msg.offset()}")
            elif msg.error():
                raise KafkaException(msg.error())
        else:
            # Proper message
            process_message(msg)
except KeyboardInterrupt:
    print("Consumer interrupted by user.")
finally:
    # Write remaining messages for the last set to HDFS
    if not df.empty:
        write_to_hdfs(df, 'final')
    
    # Close down consumer to commit final offsets.
    consumer.close()
    print("Consumer closed.")


In [20]:
conf = {'bootstrap.servers': "192.168.0.201:9092", 
        'group.id' : 'test-group',
         'auto.offset.reset' : 'earliest'}
consumer = Consumer(conf)
consumer.subscribe(['RSH_KAFKA'])

In [21]:
classpath = subprocess.Popen(["/home/hadoop/hadoop/bin/hdfs", "classpath", "--glob"],
                 stdout=subprocess.PIPE).communicate()[0]
os.environ['CLASSPATH'] = classpath.decode('utf-8')
os.environ['ARROW_LIBHDFS_DIR'] = "/home/hadoop/hadoop/lib/native"
hdfs = fs.HadoopFileSystem(host='192.168.0.160', port=8020, user='hadoop')


# Initialize DataFrame to store messages
df = pd.DataFrame()
next_offset_threshold = 100

def process_message(message):
    global df, next_offset_threshold
    item_json_str = message.value().decode('utf-8')
    item = json.loads(item_json_str)
    
    # Convert item to DataFrame and concatenate with existing DataFrame
    item_df = pd.DataFrame([item])
    df = pd.concat([df, item_df], ignore_index=True)
    
    # Log the addition of a new item
    print(f"Added message to DataFrame: offset={message.offset()}")

    # Check the message offset and write to HDFS at specific offsets
    if message.offset() >= next_offset_threshold:
        write_to_hdfs(df, next_offset_threshold)
        df = pd.DataFrame()  # Reset DataFrame
        next_offset_threshold += 100  # Increase the threshold for the next batch


def write_to_hdfs(dataframe, offset):
    # Define HDFS file path for the current offset
    hdfs_file_path = f"/user/rsh/tmp2_offset_{offset}.csv"
    with hdfs.open_output_stream(hdfs_file_path) as fs_stream:
        line = dataframe.to_csv(index=False)
        fs_stream.write(line.encode())


In [None]:
try:
    while True:
        msg = consumer.poll(timeout=1.0)
        if msg is None:
            break
        if msg.error():
            if msg.error().code() == KafkaError._PARTITION_EOF:
                # End of partition event
                print(f"{msg.topic()} [{msg.partition()}] reached end at offset {msg.offset()}")
            elif msg.error():
                raise KafkaException(msg.error())
        else:
            # Proper message
            process_message(msg)
except KeyboardInterrupt:
    print("Consumer interrupted by user.")
finally:
    # Write remaining messages for the last set to HDFS
    if not df.empty:
        write_to_hdfs(df, 'final')
    
    # Close down consumer to commit final offsets.
    consumer.close()
    print("Consumer closed.")