In [2]:
# --- START OF FILE csv_to_kafka.py ---

import pandas as pd
import json
import time
import os
import logging
import signal
from kafka import KafkaProducer
from kafka.admin import KafkaAdminClient, NewTopic
from kafka.errors import KafkaError

# --- Cấu hình Logging ---
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

# --- Cấu hình ---
KAFKA_BROKER_URL = os.getenv('KAFKA_BROKER_URL', 'localhost:9092')
KAFKA_TOPIC = os.getenv('KAFKA_TOPIC', 'player-data-topic') # Đặt tên topic mới cho dữ liệu cầu thủ
CSV_FILE_PATH = os.getenv('CSV_FILE_PATH', '../data/Cleaned_Data.csv') # Đường dẫn tới file CSV

# --- Khởi tạo Kafka Producer ---
producer = None

def signal_handler(sig, frame):
    logger.info("Received interrupt signal. Cleaning up...")
    if producer:
        producer.flush(timeout=60)
        producer.close()
    logger.info("Cleanup completed. Exiting.")
    exit(0)

signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)

# --- Kiểm tra và tạo Kafka Topic ---
def ensure_kafka_topic():
    try:
        # Wait for Kafka to be available if running in a distributed environment
        logger.info(f"Attempting to connect to Kafka broker at {KAFKA_BROKER_URL} to check/create topic...")
        admin_client = None
        max_retries = 10
        for i in range(max_retries):
            try:
                admin_client = KafkaAdminClient(bootstrap_servers=[KAFKA_BROKER_URL], request_timeout_ms=5000)
                admin_client.list_topics() # Check connection
                logger.info("Successfully connected to Kafka for topic management.")
                break
            except Exception as e:
                logger.warning(f"Attempt {i+1}/{max_retries} failed to connect to Kafka for topic check: {e}. Retrying...")
                time.sleep(5) # Wait before retrying
        
        if not admin_client:
            raise ConnectionError(f"Failed to connect to Kafka brokers after {max_retries} attempts.")

        topic_list = admin_client.list_topics()
        if KAFKA_TOPIC not in topic_list:
            logger.info(f"Topic {KAFKA_TOPIC} does not exist. Creating...")
            # Adjust num_partitions and replication_factor as needed for your Kafka setup
            new_topic = NewTopic(name=KAFKA_TOPIC, num_partitions=1, replication_factor=1)
            admin_client.create_topics(new_topics=[new_topic], validate_only=False)
            logger.info(f"Topic {KAFKA_TOPIC} created successfully.")
        else:
            logger.info(f"Topic {KAFKA_TOPIC} already exists.")
        admin_client.close()
    except Exception as e:
        logger.error(f"CRITICAL ERROR ensuring Kafka topic: {e}")
        # Depending on criticality, you might want to exit here
        # raise # uncomment this if failing topic creation should stop the script

try:
    ensure_kafka_topic()
except Exception as e:
     logger.critical(f"Could not ensure Kafka topic {KAFKA_TOPIC}. Exiting.")
     exit(1)


try:
    logger.info(f"Connecting to Kafka broker at {KAFKA_BROKER_URL}...")
    producer = KafkaProducer(
        bootstrap_servers=[KAFKA_BROKER_URL],
        value_serializer=lambda v: json.dumps(v, ensure_ascii=False).encode('utf-8'),
        retries=5,
        acks='all',
        request_timeout_ms=30000 # Increase timeout for sending
    )
    producer.list_topics() # Force connection check
    logger.info("Successfully connected to Kafka producer.")
except Exception as e:
    logger.error(f"CRITICAL: Error connecting to Kafka producer: {e}")
    exit(1) # Exit if producer cannot be initialized

# --- Hàm gửi dữ liệu vào Kafka ---
def on_send_success(record_metadata):
    # logger.debug(f"Successfully sent message to topic {record_metadata.topic} partition {record_metadata.partition} offset {record_metadata.offset}")
    pass # Log success only in debug for performance with large files

def on_send_error(excp):
    logger.error(f"ERROR sending message to Kafka: {excp}")

def send_to_kafka_async(producer, topic, data, max_retries=3):
    """Sends data to Kafka asynchronously with retries."""
    if not data or not producer:
        logger.warning("Attempted to send empty data or producer is not initialized.")
        return False
    try:
        # Use a future to check for errors later, or rely on errback
        future = producer.send(topic, value=data)
        future.add_callback(on_send_success)
        future.add_errback(on_send_error)
        return True
    except Exception as e:
        # This catches errors *before* sending (e.g., serialization issues)
        logger.error(f"Error preparing message for Kafka: {e}")
        # Note: The errback will handle transient send failures (network, etc.)
        return False


# --- Hàm xử lý giá trị NaN từ pandas ---
def handle_nan(value):
    """Converts pandas NaN to None for JSON serialization."""
    import math # Import inside function to keep global scope cleaner

    if pd.isna(value):
        return None
    # Handle potential inf values if any exist (though unlikely in this dataset)
    if isinstance(value, float) and (math.isinf(value) or math.isnan(value)):
         return None
    # Convert numpy types to standard Python types
    if hasattr(value, 'item'): # Check if it's a numpy scalar
         return value.item()
    return value

# --- Vòng lặp đọc CSV và gửi vào Kafka ---
logger.info(f"Attempting to read data from file: {CSV_FILE_PATH}")

processed_rows = 0
successfully_prepared_sends = 0
failed_prepares = 0
rows_with_errors = 0 # Count rows that failed processing before sending

try:
    if not os.path.exists(CSV_FILE_PATH):
        logger.critical(f"Error: CSV file not found at {CSV_FILE_PATH}")
        exit(1)

    # Read the CSV file using pandas
    # Using dtype=object for numerical columns can help inspect values before converting
    # Or explicitly define dtypes if known and simple:
    # dtypes = {
    #     'player_id': 'Int64', # Use Int64 for nullable integer
    #     'age': 'Float64', # Use Float64 for nullable float
    #     # ... define for other columns with potential missing values
    # }
    # df = pd.read_csv(CSV_FILE_PATH, dtype=dtypes)

    # Let pandas infer types, then handle NaNs manually during processing
    df = pd.read_csv(CSV_FILE_PATH)

    logger.info(f"Successfully read {len(df)} rows from {CSV_FILE_PATH}.")

    # Iterate over DataFrame rows
    # Using itertuples() is often faster than iterrows()
    # We convert tuples to dicts explicitly to control structure and NaN handling
    for index, row in df.iterrows():
        processed_rows += 1
        try:
            # Construct the data dictionary for Kafka message
            player_data_record = {
                'processing_timestamp': int(time.time() * 1000),
                'player_id': handle_nan(row.get('player_id')),
                'name': handle_nan(row.get('name')),
                'player_club': handle_nan(row.get('player_club')),
                'age': handle_nan(row.get('age')),
                'position': handle_nan(row.get('position')),
                'market_value': handle_nan(row.get('market_value')),
                'nationality': handle_nan(row.get('nationality')),
                'player_height': handle_nan(row.get('player_height')),
                'player_agent': handle_nan(row.get('player_agent')),
                'strong_foot': handle_nan(row.get('strong_foot')),
                'contract_value_time': handle_nan(row.get('contract_value_time')),
                'appearances': handle_nan(row.get('appearances')),
                'PPG': handle_nan(row.get('PPG')),
                'goals': handle_nan(row.get('goals')),
                'assists': handle_nan(row.get('assists')),
                'own_goals': handle_nan(row.get('own_goals')),
                'substitutions_on': handle_nan(row.get('substitutions_on')),
                'substitutions_off': handle_nan(row.get('substitutions_off')),
                'yellow_cards': handle_nan(row.get('yellow_cards')),
                'second_yellow_cards': handle_nan(row.get('second_yellow_cards')),
                'red_cards': handle_nan(row.get('red_cards')),
                'penalty_goals': handle_nan(row.get('penalty_goals')),
                'minutes_per_goal': handle_nan(row.get('minutes_per_goal')),
                'minutes_played': handle_nan(row.get('minutes_played')),
                'source_file': CSV_FILE_PATH
            }

            # Basic validation: Skip rows with missing critical data like player_id or name
            if player_data_record.get('player_id') is None or player_data_record.get('name') is None:
                 logger.warning(f"Skipping row {index} due to missing player_id or name.")
                 rows_with_errors += 1
                 continue

            # Send the record to Kafka
            if send_to_kafka_async(producer, KAFKA_TOPIC, player_data_record):
                successfully_prepared_sends += 1
                # Log progress periodically
                if processed_rows % 100 == 0: # Log every 100 rows
                    logger.info(f"Processed {processed_rows} rows so far. Prepared {successfully_prepared_sends} messages for sending.")
            else:
                failed_prepares += 1
                rows_with_errors += 1 # Count as an error if preparation failed

        except Exception as row_e:
            logger.error(f"ERROR processing row {index}: {row_e}")
            rows_with_errors += 1

        # Optional: Add a small delay between sending messages if needed
        # time.sleep(0.01) # Small delay, adjust or remove based on performance needs

except Exception as main_e:
    logger.error(f"An error occurred during file reading or processing: {main_e}")

finally:
    logger.info("-" * 30)
    logger.info("CSV processing finished.")
    logger.info(f"Total rows processed: {processed_rows}")
    logger.info(f"Rows skipped or failed during preparation: {rows_with_errors}")
    logger.info(f"Attempted to send {successfully_prepared_sends} messages to Kafka.")

    if producer:
        logger.info("Flushing Kafka producer (waiting for pending messages)...")
        try:
            # producer.flush() is blocking and waits for all async sends to complete
            producer.flush(timeout=60) # Wait up to 60 seconds
            logger.info("Kafka producer flushed.")
        except Exception as flush_e:
            logger.error(f"ERROR during producer flush: {flush_e}")
        finally:
            logger.info("Closing Kafka producer.")
            producer.close()

    logger.info("Script finished.")

# --- END OF FILE csv_to_kafka.py ---

2025-05-23 23:26:43,531 - INFO - Attempting to connect to Kafka broker at localhost:9092 to check/create topic...
2025-05-23 23:26:43,534 - INFO - <BrokerConnection client_id=kafka-python-2.1.5, node_id=bootstrap-0 host=localhost:9092 <connecting> [IPv6 ('::1', 9092, 0, 0)]>: connecting to localhost:9092 [('::1', 9092, 0, 0) IPv6]
2025-05-23 23:26:43,565 - INFO - <BrokerConnection client_id=kafka-python-2.1.5, node_id=bootstrap-0 host=localhost:9092 <checking_api_versions_recv> [IPv6 ('::1', 9092, 0, 0)]>: Broker version identified as 2.6
2025-05-23 23:26:43,566 - INFO - <BrokerConnection client_id=kafka-python-2.1.5, node_id=bootstrap-0 host=localhost:9092 <connected> [IPv6 ('::1', 9092, 0, 0)]>: Connection complete.
2025-05-23 23:26:43,603 - INFO - <BrokerConnection client_id=kafka-python-2.1.5, node_id=1 host=localhost:9092 <connecting> [IPv6 ('::1', 9092, 0, 0)]>: connecting to localhost:9092 [('::1', 9092, 0, 0) IPv6]
2025-05-23 23:26:43,605 - INFO - <BrokerConnection client_id=ka