# Extract Kafka log for ratings


In [None]:
from kafka import KafkaConsumer
import re

TOPIC_NAME = "movielog15"

# Regular expression for getting rating logs
rate_pattern = re.compile(r'^.*?,\d+,GET /rate/.*?=\d+$')

# Create Kafka consumer
consumer = KafkaConsumer(
    TOPIC_NAME,
    bootstrap_servers=['localhost:9092'],
    auto_offset_reset='earliest',
    enable_auto_commit=True,
    group_id='rating-group',
    value_deserializer=lambda x: x.decode('utf-8')
)

# Process and store logs
matching_count = 0
total_count = 0
with open('extracted_ratings.txt', 'w') as output_file:
    for message in consumer:
        total_count += 1
        line = message.value
        if rate_pattern.match(line):
            output_file.write(line + '\n')
            matching_count += 1
            if matching_count % 100 == 0:
                print(f"Matched {matching_count} rating logs out of {total_count} total logs processed")
        
        if matching_count >= 80000:
            break

print(f"Extracted exactly 80000 rating logs out of {total_count} total logs processed")
print("Extracted ratings saved to 'extracted_ratings.txt'")

# Convert Kafka log to csv

In [None]:
import csv

with open('extracted_ratings.txt', 'r') as infile, open('extracted_ratings.csv', 'w', newline='') as outfile:
    fieldnames = ['user_time', 'user_id', 'movie_id', 'movie_title', 'year', 'rating']
    writer = csv.DictWriter(outfile, fieldnames=fieldnames)
    
    writer.writeheader()
    
    for line in infile:
        parts = line.strip().split(',')
        
        if len(parts) < 3:
            print(f"Skipping malformed line: {line}")
            continue
        
        try:
            time = parts[0]
            userid = parts[1]
            request = parts[2].strip()
            
            if not request.startswith('GET /rate/'):
                print(f"Skipping unexpected request format: {line}")
                continue
            
            movie_rating = request[len('GET /rate/'):].split('=')
            
            if len(movie_rating) < 2:
                print(f"Skipping malformed movie path: {line}")
                continue
            
            movieid = movie_rating[0]
            movie_info = movie_rating[0].split('+')
            rating = movie_rating[1]
            
            year = movie_info[-1]  
            movietitle = ' '.join(movie_info[:-1])  
            
            writer.writerow({
                'user_time': time,
                'user_id': userid,
                'movie_id': movieid,
                'movie_title': movietitle,
                'year': year,
                'rating': rating
            })
        
        except Exception as e:
            print(f"Error processing line: {line}, Error: {str(e)}")
            continue


# Extract movie details from API

In [None]:
import requests
import pandas as pd

def get_movie_details(movie_id):
    response = requests.get(f"http://128.2.204.215:8080/movie/{movie_id}")
    if response.status_code == 200:
        return response.json()
    else:
        print(f"Failed to retrieve data for movie_id: {movie_id}")
        return None

def process_movie_details(movie_info):
    genres = ', '.join([genre['name'] for genre in movie_info.get('genres', [])])
    production_companies = ', '.join([company['name'] for company in movie_info.get('production_companies', [])])
    production_countries = ', '.join([country['name'] for country in movie_info.get('production_countries', [])])
    spoken_languages = ', '.join([lang['name'] for lang in movie_info.get('spoken_languages', [])])

    return {
        'movie_id': movie_info['id'],
        'tmdb_id': movie_info.get('tmdb_id', 'N/A'),
        'imdb_id': movie_info.get('imdb_id', 'N/A'),
        'title': movie_info.get('title', 'N/A'),
        'original_title': movie_info.get('original_title', 'N/A'),
        'adult': movie_info.get('adult', 'N/A'),
        'budget': movie_info.get('budget', 'N/A'),
        'genres': genres,
        'homepage': movie_info.get('homepage', 'N/A'),
        'original_language': movie_info.get('original_language', 'N/A'),
        'overview': movie_info.get('overview', 'N/A'),
        'popularity': movie_info.get('popularity', 'N/A'),
        'poster_path': movie_info.get('poster_path', 'N/A'),
        'production_companies': production_companies,
        'production_countries': production_countries,
        'release_date': movie_info.get('release_date', 'N/A'),
        'revenue': movie_info.get('revenue', 'N/A'),
        'runtime': movie_info.get('runtime', 'N/A'),
        'spoken_languages': spoken_languages,
        'status': movie_info.get('status', 'N/A'),
        'vote_average': movie_info.get('vote_average', 'N/A'),
        'vote_count': movie_info.get('vote_count', 'N/A')
    }

movie_ids = pd.read_csv('movie_ids.csv')['movie_id'].tolist()

movie_data = []

for movie_id in movie_ids:
    movie_info = get_movie_details(movie_id)
    if movie_info:
        processed_data = process_movie_details(movie_info)
        movie_data.append(processed_data)

movie_df = pd.DataFrame(movie_data)
movie_df.to_csv('movie_details.csv', index=False)

print("Movie details saved to 'movie_details.csv'")


# Extract User detail from API

In [None]:
import requests
import pandas as pd

def get_user_details(user_id):
    response = requests.get(f"http://128.2.204.215:8080/user/{user_id}")
    if response.status_code == 200:
        return response.json()
    else:
        print(f"Failed to retrieve data for user_id: {user_id}")
        return None

def process_user_details(user_info):
    return {
        'user_id': user_info['user_id'],
        'age': user_info.get('age', 'N/A'),
        'occupation': user_info.get('occupation', 'N/A'),
        'gender': user_info.get('gender', 'N/A')
    }

user_ids = pd.read_csv('user_ids.csv')['user_id'].tolist()

user_data = []

for user_id in user_ids:
    user_info = get_user_details(user_id)
    if user_info:
        processed_data = process_user_details(user_info)
        user_data.append(processed_data)

user_df = pd.DataFrame(user_data)
user_df.to_csv('user_details.csv', index=False)

print("User details saved to 'user_details.csv'")
