In [31]:
from confluent_kafka import Consumer, KafkaError, TopicPartition
import time
import requests
import csv
from helper_modules.fetch_from_API import *

conf = {
    'bootstrap.servers': 'localhost:9092',
    'group.id': 'mlip-team25',
    'auto.offset.reset': 'earliest',
    'enable.auto.commit': False,  # Enable auto commit
    'auto.commit.interval.ms': 1000,
}

consumer = Consumer(**conf)
topic = 'movielog25'

# Calculate start and end timestamps for the previous day
current_time = time.time()
one_day_seconds = 24 * 60 * 60
start_of_previous_day = current_time - (current_time % one_day_seconds) - one_day_seconds
end_of_previous_day = start_of_previous_day + one_day_seconds

# Find the starting offset for the previous day
topic_partition = TopicPartition(topic, 0)  # Assuming single partition, partition ID 0
topic_partition.offset = int(start_of_previous_day * 1000)  # Kafka expects milliseconds
offsets = consumer.offsets_for_times([topic_partition])

if offsets[0].offset == -1:
    print("No messages found for the specified time range.")
else:
    # Seek to the start of the previous day
    consumer.assign([offsets[0]])
    print('Reading Kafka Broker and filtering logs for the previous day')

    count = 0
    # Open a CSV file to write the validated data
    with open('/home/team25/Milestone2/_SHARED_DATA_POOL/rate_log.csv', mode='w', newline='') as file:
        writer = csv.writer(file)
        # Write CSV headers
        writer.writerow(["user_id","movie_id","rate"])
        try:
            while True:
                if count == 2000:
                    break

                msg = consumer.poll(1.0)

                if msg is None:
                    continue
                if msg.error():
                    if msg.error().code() == KafkaError._PARTITION_EOF:
                        # Once we reach the end of the data for the previous day, we can break out of the loop
                        print('End of partition reached or end of previous day {0}/{1}'.format(msg.topic(), msg.partition()))
                        break
                    else:
                        print(msg.error())
                    continue

                message_timestamp = msg.timestamp()[1]  # Get message timestamp
                # Break the loop if the message is beyond the end of the previous day
                if message_timestamp > end_of_previous_day * 1000:
                    break

                message = msg.value().decode('utf-8')
                
                if "GET /rate" in message:
                    count += 1

                    # rate is digit between 0-5, and is digit
                    rate = message[-1]
                    if (rate.isdigit() == False and rate >= 1 and rate <= 5):
                        continue

                    # user_id is int after " " is replaced
                    parts1 = message.split(',')
                    user_id = parts1[1].replace(" ","")
                    if (user_id.isdigit() == False):
                        continue
                    if (get_user_info(str(user_id)) == None):
                        continue

                    # movie_id can be found through api
                    parts2 = message.split('/')
                    movie_id = parts2[2][:-2].replace(" ","")
                    if (get_movie_info(str(movie_id)) == None):
                        continue
                    #print(user_id, movie_id, rate)
                    writer.writerow([user_id, movie_id, rate])



        finally:
            consumer.close()

Reading Kafka Broker and filtering logs for the previous day


NameError: name 'get_user_info' is not defined

In [27]:
from confluent_kafka import Consumer, KafkaError, TopicPartition
import time

conf = {
    'bootstrap.servers': 'localhost:9092',
    'group.id': 'mlip-team25',
    'auto.offset.reset': 'earliest',
    'enable.auto.commit': False,  # Enable auto commit
    'auto.commit.interval.ms': 1000,
}

consumer = Consumer(**conf)
topic = 'movielog25'

# Calculate start timestamp for the current day
current_time = time.time()
one_day_seconds = 24 * 60 * 60
start_of_current_day = current_time - (current_time % one_day_seconds)

# Adjusting to start from the beginning of the current day
topic_partition = TopicPartition(topic, 0)  # Assuming single partition, partition ID 0
topic_partition.offset = int(start_of_current_day * 1000)  # Convert to milliseconds for Kafka
offsets = consumer.offsets_for_times([topic_partition])

if offsets[0].offset == -1:
    print("No messages found for today.")
else:
    # If an offset is found, assign the consumer to start from this offset
    consumer.assign([offsets[0]])
    print('Reading Kafka Broker and filtering logs for today')

    count = 0
    try:
        while True:
            if count == 2000:
                break

            msg = consumer.poll(1.0)

            if msg is None:
                print("msg is None")
                continue
            if msg.error():
                if msg.error().code() == KafkaError._PARTITION_EOF:
                    # Once we reach the end of the data for the previous day, we can break out of the loop
                    print('End of partition reached or end of previous day {0}/{1}'.format(msg.topic(), msg.partition()))
                    break
                else:
                    print(msg.error())
                continue

            message_timestamp = msg.timestamp()[1]  # Get message timestamp
            # Break the loop if the message is beyond the end of the previous day
            if message_timestamp > end_of_previous_day * 1000:
                break

            message = msg.value().decode('utf-8')
            
            if "GET /data/m" in message:
                print(message)
                count += 1
                rate = int(message[-1])

                parts1 = message.split(',')
                user_id = parts1[1]

                parts2 = message.split('/')
                movie_id = parts2[2][:-2]
                print(user_id, movie_id, rate)



    finally:
        consumer.close()

Reading Kafka Broker and filtering logs for today
