In [None]:
from kafka import KafkaConsumer
from elasticsearch import Elasticsearch
from elasticsearch.helpers import bulk, BulkIndexError
import json
from datetime import datetime

In [None]:
#Consumer
def consume_kafka_messages():
    consumer = KafkaConsumer(
        "ready_data",
        bootstrap_servers="localhost:9094",
        value_deserializer=lambda m: json.loads(m.decode("utf-8")),
        api_version=(3, 8, 0),
    )
    print("Connected to Kafka topic: ready_data")
    
    for message in consumer:
        yield message.value

In [None]:
es = Elasticsearch(
    "https://localhost:9200",
    basic_auth=("elastic", "WcmG1w0ArSFUHJlLRAMp"),
    ca_certs=r"C:\Users\toshiba\Downloads\elasticsearch-8.17.0-windows-x86_64\elasticsearch-8.17.0\config\certs\http_ca.crt"
)

In [None]:
es.indices.create(
    index=index_name,
    body={
        "mappings": {
            "properties": {
                "text": {"type": "text"},
                "hashtags": {"type": "keyword"},
                "coordinates": {"type": "geo_point"},
                "created_at": {"type": "date", "format": "strict_date_optional_time"},
                "sentiment": {"type": "keyword"}
            }
        }
    },
    ignore=400 
)

In [None]:
def process_and_insert_data():
    actions = []

    for tweet in consume_kafka_messages():
        coordinates = None
        if tweet["coordinates"] != ["unknown"]:
            try:
                coordinates = {
                    "lat": float(tweet["coordinates"][0]),
                    "lon": float(tweet["coordinates"][1])
                }
            except ValueError:
                print(f"Invalid coordinates: {tweet['coordinates']}")

        try:
            #reformat created_at
            created_at = datetime.strptime(tweet["created_at"], "%Y-%m-%dT%H:%M:%S%z").isoformat().replace("+00:00", "Z")
        except ValueError:
            print(f"Invalid date format: {tweet['created_at']}")
            continue

        action = {
            "_index": index_name,
            "_source": {
                "text": tweet["text"],
                "hashtags": list(set(tweet["hashtags"])),
                "coordinates": coordinates,
                "created_at": created_at,
                "sentiment": tweet["sentiment"] if tweet["sentiment"] in ["positive", "negative"] else None
            }
        }
        actions.append(action)

        # After accumulating a batch, insert into Elasticsearch
        if len(actions) >= 1000:
            try:
                bulk(es, actions)
                print(f"Inserted {len(actions)} documents into the index '{index_name}'.")
                actions.clear()
            except BulkIndexError as e:
                print(f"Failed to index some documents: {e.errors}")

    # Insert any remaining actions if there are less than 1000 left
    if actions:
        try:
            bulk(es, actions)
            print(f"Inserted {len(actions)} documents into the index '{index_name}'.")
        except BulkIndexError as e:
            print(f"Failed to index some documents: {e.errors}")

