In [None]:
%pip install minio

In [8]:
import os
import json
import time
import csv
from kafka import KafkaConsumer
from minio import Minio


last_received_time = time.time()
inactivity_timeout = 10

In [9]:
column_names = ["Index", "Type", "Title", "Director", "Cast", "Country", "Release Date", 
                "Year", "Rating", "Duration", "Genre", "Description"]

In [None]:
minio_client = Minio(
    'localhost:9000',
    access_key='ROOTNAME',
    secret_key='CHANGEME123',
    secure=False
    
)

bucket_name = 'my-kafka-bucket'

In [17]:
# Create a Kafka consumer
consumer = KafkaConsumer(
    'NewTopic',  # Replace with the actual topic name
    bootstrap_servers=['localhost:9092'],  # Same as the producer
    group_id='your_consumer_group',  # Consumer group ID (optional)
    auto_offset_reset='earliest'
)

In [12]:
# Kafka server path where you want to save the CSV file
kafka_server_path = 'C:\\Users\\cihat\\OneDrive\\Masaüstü\\pythonwork\\kafka-server\\kafka_output.csv'

In [13]:
# Function to create the bucket if it doesn't exist
def create_bucket_if_not_exists():
    if not minio_client.bucket_exists(bucket_name):
        minio_client.make_bucket(bucket_name)
        print(f"Bucket {bucket_name} created.")
    else:
        print(f"Bucket {bucket_name} already exists.")

In [None]:
def save_to_minio():
    global last_received_time  # Track inactivity

    # Ensure the bucket exists before uploading data
    create_bucket_if_not_exists()

    # Open the CSV file at the specified path (write mode)
    with open(kafka_server_path, 'w', newline='', encoding='utf-8') as csvfile:
        writer = csv.writer(csvfile)

        # Write the header with column names
        writer.writerow(column_names)
        print("Header written to CSV.")

        index = 1  # Optional: Index to keep track of row number in the CSV

        while True:
            # Poll for messages with a timeout of 100 ms
            messages = consumer.poll(timeout_ms=10000)

            # If there are no messages, continue the loop
            if not messages:
                print("No messages received.")
                if time.time() - last_received_time > inactivity_timeout:
                    print("Inactivity timeout reached, stopping consumer.")
                    break  # Break the loop if inactivity timeout is reached
                continue


            for topic_partition, messages_list in messages.items():
                for message in messages_list:
                    print(f"Processing message: {message.value}")

                    message_data = json.loads(message.value.decode('utf-8'))

                    if isinstance(message_data, dict):

                        row = [
                            index,  # Index
                            message_data.get('Type', ''),
                            message_data.get('Title', ''),
                            message_data.get('Director', ''),
                            message_data.get('Cast', ''),
                            message_data.get('Country', ''),
                            message_data.get('Release Date', ''),
                            message_data.get('Year', ''),
                            message_data.get('Rating', ''),
                            message_data.get('Duration', ''),
                            message_data.get('Genre', ''),
                            message_data.get('Description', '')
                        ]
                        

                        writer.writerow(row)
                        print(f"Row written: {row}")
                        index += 1


        csvfile.flush()
        os.fsync(csvfile.fileno())


    file_size = os.path.getsize(kafka_server_path)
    print(f"File size before upload: {file_size} bytes")

    if file_size > 0:
        with open(kafka_server_path, 'rb') as f:
            minio_client.put_object(
                bucket_name,
                'kafka_output.csv',  # Object name in MinIO
                f,  # File object
                file_size  # File size in bytes
            )
        print("Data uploaded to MinIO as kafka_output.csv")
    else:
        print("File is empty, skipping upload.")

# Call the function to start consuming and sending data to MinIO
save_to_minio()

In [None]:
#for c in consumer:
#    print(c.value)