In [1]:
import subprocess
import time

# Paths to Kafka and ZooKeeper commands

In [2]:
ZOOKEEPER_START_COMMAND = "C:\\kafka\\bin\\windows\\zookeeper-server-start.bat C:\\kafka\\config\\zookeeper.properties"
KAFKA_START_COMMAND = "C:\\kafka\\bin\\windows\\kafka-server-start.bat C:\\kafka\\config\\server.properties"
ZOOKEEPER_STOP_COMMAND = "C:\\kafka\\bin\\windows\\zookeeper-server-stop.bat"
KAFKA_STOP_COMMAND = "C:\\kafka\\bin\\windows\\kafka-server-stop.bat"

# Function "helpers" to run Kafka commands

In [3]:
def run_kafka_command(command):
    """
    Run a Kafka command and return the output.
    """
    try:
        result = subprocess.run(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, shell=True)
        if result.returncode != 0:
            print(f"Error: {result.stderr}")
        return result.stdout.strip()
    except Exception as e:
        print(f"Exception: {e}")
        return None

In [4]:
def start_process_in_new_terminal(command):
    """
    Starts a process in a new terminal window.
    """
    subprocess.Popen(f'start cmd /k {command}', shell=True)

def stop_process(command):
    """
    Stops a process by executing a stop script.
    """
    print(f"Stopping process with command: {command}")
    subprocess.run(command, shell=True)

# --- Kafka and ZooKeeper Commands ---

## 0. Functions to Start and Stop Kafka


In [5]:
def start_kafka():
    """
    Starts ZooKeeper and Kafka in separate terminals.
    """
    print("Starting ZooKeeper...")
    start_process_in_new_terminal(ZOOKEEPER_START_COMMAND)
    print("Waiting 10 seconds for ZooKeeper to start...")
    time.sleep(10)
    
    print("Starting Kafka...")
    start_process_in_new_terminal(KAFKA_START_COMMAND)
    print("ZooKeeper and Kafka are now running in separate terminals.")

In [6]:
def stop_kafka():
    """
    Stops Kafka and ZooKeeper properly.
    """
    print("Stopping Kafka...")
    stop_process(KAFKA_STOP_COMMAND)
    print("Waiting 10 seconds for Kafak to stop...")
    time.sleep(10)
    print("Stopping ZooKeeper...")
    stop_process(ZOOKEEPER_STOP_COMMAND)
    print("Kafka and ZooKeeper have been stopped.")

## 1. List all Kafka topics

In [7]:
def list_topics():
    """
    Lists all Kafka topics.
    """
    # command = "kafka-topics.bat --bootstrap-server localhost:9092 --list"
    command = "kafka-topics.bat --bootstrap-server 192.168.0.61:9092 --list"
    
    print("Listing topics...")
    output = run_kafka_command(command)
    print(output or "No topics found.")

## 2. Delete a topic

In [8]:
def delete_topic(topic_name):
    """
    Deletes a Kafka topic.
    """
    command = f"kafka-topics.bat --bootstrap-server 192.168.0.61:9092 --delete --topic {topic_name}"
    print(f"Deleting topic: {topic_name}...")
    output = run_kafka_command(command)
    print(output or f"Topic '{topic_name}' deleted successfully.")

## 3. Create a topic

In [9]:
def create_topic(topic_name, partitions=1, replication_factor=1):
    """
    Creates a Kafka topic.
    """
    command = (
        f"kafka-topics.bat --bootstrap-server 192.168.0.61:9092 --create --topic {topic_name} "
        f"--partitions {partitions} --replication-factor {replication_factor}"
    )
    print(f"Creating topic: {topic_name}...")
    output = run_kafka_command(command)
    print(output or f"Topic '{topic_name}' created successfully.")

## 4. Set retention time for a topic

In [10]:
def set_retention(topic_name, retention_ms):
    """
    Sets the retention time for a Kafka topic.
    """
    command = (
        f"kafka-configs.bat --bootstrap-server 192.168.0.61:9092 --alter --entity-type topics "
        f"--entity-name {topic_name} --add-config retention.ms={retention_ms}"
    )
    print(f"Setting retention.ms={retention_ms} for topic: {topic_name}...")
    output = run_kafka_command(command)
    print(output or f"Retention set for topic '{topic_name}'.")

## 5. Describe topic configuration

In [11]:
def describe_topic(topic_name):
    """
    Describes a Kafka topic's configuration.
    """
    command = (
        f"kafka-configs.bat --bootstrap-server 192.168.0.61:9092 --describe --entity-type topics "
        f"--entity-name {topic_name}"
    )
    print(f"Describing topic: {topic_name}...")
    output = run_kafka_command(command)
    print(output or f"No configuration found for topic '{topic_name}'.")

## 6. Consumme messages from a topic

In [12]:
def consume_messages(topic_name, group_id):
    """
    Consumes messages from a Kafka topic using the Kafka CLI tools.
    """
    command = (
        f"kafka-console-consumer.bat --bootstrap-server 192.168.0.61:9092 "
        f"--topic {topic_name}  --from-beginning"
    )
    print(f"Consuming messages from topic '{topic_name}' with group ID '{group_id}'...")
    try:
        # Use run_kafka_command to execute the command and handle output
        output = run_kafka_command(command)
        if output:
            print(f"Messages consumed:\n{output}")
        else:
            print("No messages consumed or an error occurred.")
    except Exception as e:
        print(f"Exception while consuming messages: {e}")


# --- Example Usage ---

In [None]:
if __name__ == "__main__":
    while True:
        print("\n--- Kafka and ZooKeeper Management ---")
        print("[1] Start Kafka and ZooKeeper")
        print("[2] Stop Kafka and ZooKeeper")
        print("[3] List Kafka Topics")
        print("[4] Create a Kafka Topic")
        print("[5] Delete a Kafka Topic")
        print("[6] Set Retention for a Topic")
        print("[7] Describe a Topic")
        print("[8] Consume Messages from a Topic")
        print("[9] Quit")
        
        choice = input("Choose an option: ")
        
        if choice == "1":
            start_kafka()
        elif choice == "2":
            stop_kafka()
        elif choice == "3":
            list_topics()
        elif choice == "4":
            topic = input("Enter topic name to create: ")
            create_topic(topic)
        elif choice == "5":
            topic = input("Enter topic name to delete: ")
            delete_topic(topic)
        elif choice == "6":
            topic = input("Enter topic name: ")
            retention = input("Enter retention time in milliseconds: ")
            set_retention(topic, retention)
        elif choice == "7":
            topic = input("Enter topic name: ")
            describe_topic(topic)
        elif choice == "8":
            topic = input("Enter topic name to consume messages: ")
            group_id = input("Enter consumer group ID (default: 'test-consumer-group'): ") or "test-consumer-group"
            consume_messages(topic, group_id)
        elif choice == "9":
            print("Exiting.")
            break
        else:
            print("Invalid option. Try again.")



--- Kafka and ZooKeeper Management ---
[1] Start Kafka and ZooKeeper
[2] Stop Kafka and ZooKeeper
[3] List Kafka Topics
[4] Create a Kafka Topic
[5] Delete a Kafka Topic
[6] Set Retention for a Topic
[7] Describe a Topic
[8] Consume Messages from a Topic
[9] Quit
Starting ZooKeeper...
Waiting 10 seconds for ZooKeeper to start...
Starting Kafka...
ZooKeeper and Kafka are now running in separate terminals.

--- Kafka and ZooKeeper Management ---
[1] Start Kafka and ZooKeeper
[2] Stop Kafka and ZooKeeper
[3] List Kafka Topics
[4] Create a Kafka Topic
[5] Delete a Kafka Topic
[6] Set Retention for a Topic
[7] Describe a Topic
[8] Consume Messages from a Topic
[9] Quit
Starting ZooKeeper...
Waiting 10 seconds for ZooKeeper to start...
Starting Kafka...
ZooKeeper and Kafka are now running in separate terminals.

--- Kafka and ZooKeeper Management ---
[1] Start Kafka and ZooKeeper
[2] Stop Kafka and ZooKeeper
[3] List Kafka Topics
[4] Create a Kafka Topic
[5] Delete a Kafka Topic
[6] Set Ret