#  Movie Recommendations: Data Collection & Processing
This notebook documents the process of collecting and processing movie data from Kafka.
We extract movie watch events and ratings, clean the data, and prepare it for model training.

## 1. Connecting to Kafka & Extracting Data
We first establish a connection to the Kafka broker and extract all past movie interaction data from `movielog2`.

In [None]:
# Establish SSH connection to Kafka server (Run this in terminal, not in Jupyter)
!ssh -o ServerAliveInterval=60 -L 9092:localhost:9092 NetID@cs594.cs.uic.edu -NTf # edit the NetId to your NetID

(NetID@cs594.cs.uic.edu) Password: Connection to localhost port 9092 [tcp/XmlIpcRegSvc] succeeded!


In [None]:
# Verify Kafka Connection
!nc -zv localhost 9092

In [None]:
# Activate the Python Virtual Environment 
!source kafka_lab_env/bin/activate  # On macOS/Linux
!kafka_lab_env/Scripts/activate  # On Windows
!pip install kafka-python

## 2️. Listing Available Kafka Topics
We check for available topics to find the one containing movie logs.

In [None]:
from kafka import KafkaAdminClient

# Connect to Kafka Admin
admin_client = KafkaAdminClient(
    bootstrap_servers="localhost:9092"
)

# Get all topics
topics = admin_client.list_topics()
print("Available Topics in Kafka VM:")
for topic in topics:
    print(topic)

# 3. Extracting Data from Kafka Topic 'movielog2'
This section uses a Kafka consumer to read all past data from the topic and save it to a CSV file.

In [None]:
from kafka import KafkaConsumer
import pandas as pd

# Define Kafka Consumer
consumer = KafkaConsumer(
    'movielog2',  # Replace with your topic name
    bootstrap_servers='localhost:9092',
    auto_offset_reset='earliest',  # Read from the beginning
    enable_auto_commit=False
)

# Initialize list to store messages
data_list = []

# Fetch and store messages
for message in consumer:
    data = message.value.decode('utf-8')  # Decode message from bytes to string
    print(f"Received: {data}")  # Print each message
    data_list.append([data])  # Append data as a list

# Convert to DataFrame
df = pd.DataFrame(data_list, columns=["Message"])

# Save to CSV
df.to_csv("movielog2_data.csv", index=False)

print("✅ All past data successfully saved to movielog2_data.csv")

# 4. Processing & Cleaning Extracted Kafka Data
We extract movie watch events and ratings from raw logs and structure them into a usable format.

In [None]:
import pandas as pd
import re

# Read raw Kafka logs
with open("movielog2_data.csv", "r") as file:
    lines = file.readlines()

## 5. Extract Movie Watch Data
This extracts timestamps, user IDs, and movie names from logs where users watched movies.

In [None]:
watch_data = []

for line in lines:
    parts = line.strip().split(",")

    if len(parts) < 3:
        continue

    timestamp = parts[0]
    user_id = parts[1]
    match = re.search(r'GET /data/m/(.+)/\d+\.mpg', parts[2])

    if match:
        movie_name = match.group(1).replace("+", " ")
        watch_data.append([timestamp, user_id, movie_name])

# Convert to DataFrame
watch_df = pd.DataFrame(watch_data, columns=["Timestamp", "User_ID", "Movie_Name"])

# Remove duplicate watches (keep only one entry per user per movie)
watch_df = watch_df.drop_duplicates(subset=["User_ID", "Movie_Name"])

print("✅ Movie watch data extracted!")

## 6. Extract Movie Rating Data
This extracts timestamps, user IDs, and ratings from logs where users rated movies.

In [None]:
rating_data = []

for line in lines:
    parts = line.strip().split(",")

    if len(parts) < 3:
        continue

    timestamp = parts[0]
    user_id = parts[1]
    match = re.search(r'GET /rate/(.+)=(\d+)', parts[2])

    if match:
        movie_name = match.group(1).replace("+", " ")
        rating = match.group(2)
        rating_data.append([timestamp, user_id, movie_name, rating])

# Convert to DataFrame
rating_df = pd.DataFrame(rating_data, columns=["Timestamp", "User_ID", "Movie_Name", "Rating"])

print("✅ Movie rating data extracted!")

## 7. Merging Watch and Rating Data
We merge both datasets based on User_ID and Movie_Name to get a structured final dataset.

In [None]:
# Merge watch and rating data
final_df = pd.merge(watch_df, rating_df, on=["User_ID", "Movie_Name"], how="left")

# Save final merged file
final_df.to_csv("final_processed_data.csv", index=False)

print("✅ Final processed dataset saved as 'final_processed_data.csv'")