In [None]:
import json
import time
import requests
import re
from datetime import datetime
from io import BytesIO
from kafka import KafkaProducer, KafkaConsumer
from kafka.admin import KafkaAdminClient, NewTopic
from minio import Minio
from minio.error import S3Error
import threading

# MinIO and Kafka configuration
minio_url = "minio:9000"
access_key = "minio"
secret_key = "SU2orange!"
kafka_bootstrap_servers = "broker:29092"
kafka_topic = "news"
news_api_key = "9a09fc4de8ac40779d994d79451cfae3"

# Initialize Kafka Producer
producer = KafkaProducer(
    bootstrap_servers=kafka_bootstrap_servers,
    value_serializer=lambda v: json.dumps(v).encode('utf-8')
)

# Initialize Kafka Admin Client and create topic if it doesn't exist
admin_client = KafkaAdminClient(bootstrap_servers=kafka_bootstrap_servers)
topic = NewTopic(name=kafka_topic, num_partitions=1, replication_factor=1)
try:
    admin_client.create_topics(new_topics=[topic], validate_only=False)
    print(f"Topic '{kafka_topic}' created successfully")
except Exception as e:
    print(f"Topic '{kafka_topic}' may already exist or an error occurred: {e}")

# Initialize MinIO Client
minio_client = Minio(
    minio_url,
    access_key=access_key,
    secret_key=secret_key,
    secure=False
)

def fetch_news(api_key, producer, kafka_topic, news_topic):
    url = f"https://newsapi.org/v2/everything?q={news_topic}&apiKey={api_key}"
    response = requests.get(url)
    if response.status_code == 200:
        articles = response.json().get('articles', [])
        if articles:
            for article in articles:
                producer.send(kafka_topic, value=article)
            producer.flush()
            print(f"Sent {len(articles)} articles to Kafka topic '{kafka_topic}'")
        else:
            print(f"No articles found for topic: {news_topic}")
    else:
        print(f"Error fetching news: {response.status_code}")

def upload_to_minio(article, news_topic):
    bucket_name = re.sub(r'[^\w]', '_', news_topic)  

    # Parse the published date
    published_date = datetime.fromisoformat(article['publishedAt'][:-1])
    folder_name = published_date.strftime("%Y%m%d") 

    try:
        if not minio_client.bucket_exists(bucket_name):
            minio_client.make_bucket(bucket_name)
            print(f"Bucket '{bucket_name}' created.")
    except S3Error as e:
        print(f"Error creating bucket: {e}")
    
    title = re.sub(r'[^\w\s]', '_', article['title']).replace(' ', '_')
    object_name = f"{folder_name}/{title}-{published_date.strftime('%H%M%S')}.json"  

    article_json = json.dumps(article)
    article_bytes = article_json.encode('utf-8')
    article_stream = BytesIO(article_bytes)

    minio_client.put_object(
        bucket_name,
        object_name,
        article_stream,
        length=len(article_bytes),
        content_type='application/json'
    )
    
    print(f"Uploaded article to {bucket_name}/{object_name}")

def produce_news():
    while True:
        fetch_news(news_api_key, producer, kafka_topic, "olympics")
        time.sleep(300)

def consume_news():
    consumer = KafkaConsumer(
        kafka_topic,
        bootstrap_servers=kafka_bootstrap_servers,
        value_deserializer=lambda x: json.loads(x.decode('utf-8')),
        auto_offset_reset='earliest',
        enable_auto_commit=True,
        group_id='olympics'
    )
    
    print(f"Listening for messages on topic '{kafka_topic}'...")
    try:
        for message in consumer:
            article = message.value
            upload_to_minio(article, "olympics")
    except KeyboardInterrupt:
        print("Stopping consumer...")
    finally:
        consumer.close()

if __name__ == "__main__":
    # Start producer and consumer in separate threads
    producer_thread = threading.Thread(target=produce_news)
    consumer_thread = threading.Thread(target=consume_news)

    producer_thread.start()
    consumer_thread.start()

    producer_thread.join()
    consumer_thread.join()

Topic 'news' may already exist or an error occurred: [Error 36] TopicAlreadyExistsError: Request 'CreateTopicsRequest_v3(create_topic_requests=[(topic='news', num_partitions=1, replication_factor=1, replica_assignment=[], configs=[])], timeout=30000, validate_only=False)' failed with response 'CreateTopicsResponse_v3(throttle_time_ms=0, topic_errors=[(topic='news', error_code=36, error_message="Topic 'news' already exists.")])'.
Listening for messages on topic 'news'...
Sent 100 articles to Kafka topic 'news'
Uploaded article to olympics/20240726/Dengue_Fever_Threatens_to_Gate_Crash_the_2024_Summer_Olympics-193842.json
Uploaded article to olympics/20240718/Xfinity_is_showing_the_Olympics_with_Dolby_Vision_and_Dolby_Atmos_on_its_Stream_app-140046.json
Uploaded article to olympics/20240729/Saboteurs_Cut_Internet_Cables_in_Latest_Disruption_During_Paris_Olympics-130732.json
Uploaded article to olympics/20240726/How_to_stream_the_Olympics_like_a_champ-120000.json
Uploaded article to olympi