# 01. Load & Validate (MovieLens 25M)
-  Goal: load raw CSVs, run sanity checks, and save cleaned parquet files.


In [1]:
from pathlib import Path
import pandas as pd
import numpy as np

In [2]:
RAW = Path("../data/raw")
OUT = Path("../data/processed")
OUT.mkdir(parents=True, exist_ok=True)

ratings_path = RAW / "ratings.csv"
movies_path  = RAW / "movies.csv"
tags_path    = RAW / "tags.csv"

for p in [ratings_path, movies_path]:
    if not p.exists():
        raise FileNotFoundError(f"Missing file: {p.resolve()}")

## 1. Load Data

In [3]:
movies = pd.read_csv(movies_path)

ratings = pd.read_csv(
    ratings_path,
    dtype=({"userId": "int32", 
            "movieId": "int32", 
            "rating": "float32", 
            "timestamp": "int64"})
    )

tags = None
if tags_path.exists():
    tags = pd.read_csv(
        tags_path,
        dtype=({"userId": 'int32', 
                "movieId": 'int32', 
                "tag": 'string', 
                "timestamp": 'int64'})
        )

print(f"Movies: {movies.shape}")
print(f"Ratings: {ratings.shape}")
print(f"tags:" + (f" {tags.shape}" if tags is not None else " None"))

Movies: (62423, 3)
Ratings: (25000095, 4)
tags: (1093360, 4)


## 2. Basic Schema Check

In [4]:
expected_movies_cols = {"movieId", "title", "genres"}
expected_ratings_cols = {"userId", "movieId", "rating", "timestamp"}

assert expected_movies_cols.issubset(movies.columns), movies.columns
assert expected_ratings_cols.issubset(ratings.columns), ratings.columns

## 3. Convert timestamps (behavior research needs real time)

In [5]:
ratings["rated_at"] = pd.to_datetime(ratings["timestamp"], unit="s", utc=True)
ratings.drop(columns=["timestamp"], inplace=True)

if tags is not None:
    tags["tagged_at"] = pd.to_datetime(tags["timestamp"], unit="s", utc=True)
    tags.drop(columns=["timestamp"], inplace=True)

## 4. Sanity checks

In [6]:
checks = {}

checks["n_users_ratings"] = ratings["userId"].nunique()
checks["n_movies_ratings"] = ratings["movieId"].nunique()
checks["n_movies_table"] = movies["movieId"].nunique()

# rating scale
unique_ratings = np.sort(ratings["rating"].unique())
checks["rating_min"] = float(unique_ratings.min())
checks["rating_max"] = float(unique_ratings.max())
checks["rating_step_ok"] = bool(np.allclose((unique_ratings * 2) % 1, 0))  # multiples of 0.5

# time range
checks["rated_at_min"] = ratings["rated_at"].min()
checks["rated_at_max"] = ratings["rated_at"].max()

# orphan movieIds (ratings without movie metadata)
movie_ids = set(movies["movieId"].astype(int).unique())
orphan_mask = ~ratings["movieId"].isin(movie_ids)
checks["orphan_ratings_rows"] = int(orphan_mask.sum())

# duplicates (same user/movie/time)
checks["duplicate_triplets"] = int(ratings.duplicated(subset=["userId", "movieId", "rated_at"]).sum())

pd.Series(checks)

n_users_ratings                           162541
n_movies_ratings                           59047
n_movies_table                             62423
rating_min                                   0.5
rating_max                                   5.0
rating_step_ok                              True
rated_at_min           1995-01-09 11:46:49+00:00
rated_at_max           2019-11-21 09:15:03+00:00
orphan_ratings_rows                            0
duplicate_triplets                             0
dtype: object

## 5. Fixes / cleaning rules (minimal + explicit)

In [7]:
if checks["orphan_ratings_rows"] > 0:
    ratings = ratings.loc[~orphan_mask].copy()

if checks["duplicate_triplets"] > 0:
    ratings = ratings.drop_duplicates(subset=["userId", "movieId", "rated_at"]).copy()

ratings = ratings[(ratings["rating"] >= 0.5) & (ratings["rating"] <= 5.0)].copy()

## 6. Save processed Parquet (fast & SQL-ready later)

In [13]:
movies_out = OUT / "movies.parquet"
ratings_out = OUT / "ratings.parquet"
movies.to_parquet(movies_out, index=False, engine="fastparquet")
ratings.to_parquet(ratings_out, index=False, engine="fastparquet")

print("saved:", movies_out)
print("saved:", ratings_out)

if tags is not None:
    tags_out = OUT / "tags.parquet"
    tags.to_parquet(tags_out, index=False, engine="fastparquet")
    print("saved:", tags_out)

saved: ../data/processed/movies.parquet
saved: ../data/processed/ratings.parquet
saved: ../data/processed/tags.parquet


In [14]:
ratings.head()

Unnamed: 0,userId,movieId,rating,rated_at
0,1,296,5.0,2006-05-17 15:34:04+00:00
1,1,306,3.5,2006-05-17 12:26:57+00:00
2,1,307,5.0,2006-05-17 12:27:08+00:00
3,1,665,5.0,2006-05-17 15:13:40+00:00
4,1,899,3.5,2006-05-17 12:21:50+00:00


In [15]:
ratings_pq = pd.read_parquet(OUT / "ratings.parquet", engine="fastparquet")
movies_pq  = pd.read_parquet(OUT / "movies.parquet", engine="fastparquet")

print(ratings_pq.shape, movies_pq.shape)
print(ratings_pq.dtypes)
ratings_pq.head()

(25000095, 4) (62423, 3)
userId                    int32
movieId                   int32
rating                  float32
rated_at    datetime64[ns, UTC]
dtype: object


Unnamed: 0,userId,movieId,rating,rated_at
0,1,296,5.0,2006-05-17 15:34:04+00:00
1,1,306,3.5,2006-05-17 12:26:57+00:00
2,1,307,5.0,2006-05-17 12:27:08+00:00
3,1,665,5.0,2006-05-17 15:13:40+00:00
4,1,899,3.5,2006-05-17 12:21:50+00:00


---

In [16]:
r = ratings_pq.copy()

user_behavior = (
    r.groupby("userId")
     .agg(
        n_ratings=("rating", "size"),
        mean_rating=("rating", "mean"),
        std_rating=("rating", "std"),
        first_rating=("rated_at", "min"),
        last_rating=("rated_at", "max"),
        n_unique_movies=("movieId", "nunique"),
     )
     .reset_index()
)

user_behavior["active_days"] = (user_behavior["last_rating"] - user_behavior["first_rating"]).dt.days.clip(lower=0)
user_behavior["ratings_per_day"] = user_behavior["n_ratings"] / (user_behavior["active_days"] + 1)

user_behavior.sort_values("n_ratings", ascending=False).head(10)

Unnamed: 0,userId,n_ratings,mean_rating,std_rating,first_rating,last_rating,n_unique_movies,active_days,ratings_per_day
72314,72315,32202,3.080601,0.744733,2015-12-15 06:59:27+00:00,2019-10-08 10:10:29+00:00,32202,1393,23.10043
80973,80974,9178,3.28029,0.560175,2001-08-05 18:10:38+00:00,2015-12-24 20:44:49+00:00,9178,5254,1.746527
137292,137293,8913,3.184001,0.521824,2009-04-07 02:50:08+00:00,2019-11-16 02:06:19+00:00,8913,3874,2.300129
33843,33844,7919,2.580124,1.05745,2012-12-13 02:21:42+00:00,2019-11-03 16:26:55+00:00,7919,2516,3.146206
20054,20055,7488,3.208868,0.947946,2006-10-10 06:41:32+00:00,2015-02-12 06:35:57+00:00,7488,3046,2.457499
109730,109731,6647,2.816684,0.922711,2002-07-05 04:57:01+00:00,2019-11-19 15:04:59+00:00,6647,6346,1.047266
92045,92046,6564,3.475244,0.997729,2001-09-10 15:15:01+00:00,2019-11-19 03:46:34+00:00,6564,6643,0.987959
49402,49403,6553,1.522585,1.240511,2000-04-07 13:13:16+00:00,2019-11-15 04:11:30+00:00,6553,7160,0.915096
30878,30879,5693,2.876515,0.695103,2016-05-09 08:52:03+00:00,2018-04-05 19:05:41+00:00,5693,696,8.167862
115101,115102,5649,2.462825,1.42948,2016-06-25 04:55:26+00:00,2018-02-13 13:21:48+00:00,5649,598,9.430718


In [17]:
user_behavior.to_parquet(OUT / "user_behavior.parquet", index=False, engine="fastparquet")

In [18]:
movie_exposure = (
    r.groupby("movieId")
     .agg(
        n_ratings=("rating", "size"),
        mean_rating=("rating", "mean"),
        std_rating=("rating", "std"),
        n_unique_users=("userId", "nunique"),
        first_rated=("rated_at", "min"),
        last_rated=("rated_at", "max"),
     )
     .reset_index()
)

movie_exposure.to_parquet(OUT / "movie_exposure.parquet", index=False, engine="fastparquet")
movie_exposure.sort_values("n_ratings", ascending=False).head(10)

Unnamed: 0,movieId,n_ratings,mean_rating,std_rating,n_unique_users,first_rated,last_rated
351,356,81491,4.048011,0.93881,81491,1996-03-01 00:00:00+00:00,2019-11-21 08:42:20+00:00
314,318,81482,4.413576,0.760444,81482,1996-03-01 00:00:00+00:00,2019-11-21 08:36:40+00:00
292,296,79672,4.188912,0.958838,79672,1996-02-29 16:48:44+00:00,2019-11-21 05:13:58+00:00
585,593,74127,4.151341,0.861879,74127,1996-03-01 00:00:00+00:00,2019-11-21 05:13:36+00:00
2480,2571,72674,4.154099,0.912874,72674,1999-03-16 21:57:49+00:00,2019-11-21 05:14:11+00:00
257,260,68717,4.120189,0.981762,68717,1996-03-01 00:00:00+00:00,2019-11-20 21:05:39+00:00
475,480,64144,3.679175,0.939784,64144,1996-03-01 00:00:00+00:00,2019-11-19 20:24:53+00:00
522,527,60411,4.247579,0.874753,60411,1996-03-01 00:00:00+00:00,2019-11-21 05:13:07+00:00
108,110,59184,4.002273,0.967533,59184,1996-02-17 12:17:36+00:00,2019-11-20 18:53:26+00:00
2867,2959,58773,4.228311,0.870319,58773,1999-10-04 18:47:15+00:00,2019-11-21 08:55:14+00:00


In [19]:
u = user_behavior.sort_values("n_ratings", ascending=False)
cum = u["n_ratings"].cumsum() / u["n_ratings"].sum()
share_users_for_50pct = (cum <= 0.5).mean()
share_users_for_50pct

np.float64(0.11486947908527695)