In [None]:
%load_ext autoreload
%autoreload 2

In [None]:
import json
from pathlib import Path
import pandas as pd
import numpy as np
import tarfile
import io
import re
import subprocess as sp

from typing import Dict, List, Tuple, Union, Optional

from loguru import logger


In [None]:
import waac
import waac.config as config

In [None]:
ROOT_DIR = config.ROOT_DIR
DATA_DIR = config.DATA_DIR
DOWNLOAD_DIR = DATA_DIR / "download"

IMDB_REMOTE_URI_PREFIX = "https://datasets.imdbws.com/"
IMDB_LOCAL_URI_PREFIX = "/Users/jillianaugustine/Documents/GitHub/women_and_code/data/imdb/20240503_"

rng = np.random.default_rng(seed=16042024)

In [None]:
LOCAL = True

IMDB_BASE_URI_PREFIX = IMDB_LOCAL_URI_PREFIX if LOCAL else IMDB_REMOTE_URI_PREFIX

In [None]:
text_files = list(DATA_DIR.glob("download/*.txt"))
for f in text_files:
    print(f)

In [None]:
fp = DOWNLOAD_DIR / "movie_titles.txt"
movie_titles_df = waac.txt_to_df(
    fp,
    config.raw_data_column_names[fp.name],
    encoding="latin-1"
)


movie_titles_df["movie_ID"] = (
    movie_titles_df["movie_ID"].astype(int)
)
# Set to float because of missing values
movie_titles_df["year_of_release"] = (
    movie_titles_df["year_of_release"].replace("NULL", None).astype(float)
)

In [None]:
movie_titles_df.info()
display(movie_titles_df)

In [None]:
movie_titles_df.describe(include="number")

Reference: [IDMb Non-Commercial Datasets](https://developer.imdb.com/non-commercial-datasets/)

In [None]:
SUB_URLS = [
    "name.basics.tsv.gz",
    "title.akas.tsv.gz",
    "title.basics.tsv.gz",
    "title.crew.tsv.gz",
    "title.episode.tsv.gz",
    "title.principals.tsv.gz",
    "title.ratings.tsv.gz"
]
# LOCAL_IMBD_FILE_PREFIX = DATA_DIR / "imdb"

In [None]:
waac.config.imdb_metadata

## Merging Data: Approach

1. Find movies in movie_titles.txt that are also in the IMDB datasets
    1. 

In [None]:
print(SUB_URLS)

In [None]:
imdb_metadata = {}
CHUNK_SIZE = 1_000
imdb_data = {}
# for sub_url in SUB_URLS
for sub_url in SUB_URLS[1:3]:
    url = IMDB_BASE_URI_PREFIX + sub_url
    print(url)
    df_temp = pd.read_table(url, compression="gzip", na_values=r"\N")
    imdb_data[sub_url] = df_temp
    # total_rows = 0    
    # with pd.read_table(url, compression="gzip", iterator=True, na_values=r"\N") as reader:
    #     i = 1
    #     while True:
    #         try:
    #             current_rows = reader.get_chunk(CHUNK_SIZE)
    #             total_rows += len(current_rows)
    #             print(f"Iteration #{i} - Total Rows: {total_rows}")
    #             i += 1
    #         except StopIteration:
    #             break
    #         break  # anyway
    # imdb_data[sub_url] = current_rows

In [None]:
imdb_data["title.akas.tsv.gz"]

In [None]:
imdb_data["title.basics.tsv.gz"]

In [None]:
movie_titles_df.columns

In [None]:
(i,i for i in range(4))

In [None]:
import multiprocessing as mp
mp.cpu_count()

In [None]:
x = movie_titles_df.sort_values("year_of_release", ascending=False).groupby("year_of_release", sort=False)

{k: type(v) for k, v in x}


In [None]:
results = []
# row = movie_titles_df.iloc[0]
# print(row)
n = len(movie_titles_df)
title_basic = imdb_data["title.basics.tsv.gz"]
# Indexing for quick filtering
title_basic_reindexed = title_basic.set_index("startYear")
title_aka = imdb_data["title.akas.tsv.gz"]
print(f"title.basics shape: {title_basic.shape}")

matches_so_far = 0
for i, row in enumerate(movie_titles_df.itertuples()):
    tconst, match_source = None, None
    
    if i % 500 == 0:
        print(f"# matches so far: {matches_so_far}")
        print(f"{i+1}/{n}: {row}")
    title_basic_filtered_year = title_basic_reindexed.loc[row.year_of_release]
    # print(f"title.basics (filtered on year) shape: {title_basic_filtered_year.shape}")
    title_basic_filtered_title = title_basic_filtered_year.loc[
        (title_basic_filtered_year.primaryTitle.str.lower() == row.title.lower()) |
        (title_basic_filtered_year.originalTitle.str.lower() == row.title.lower())
    ]
    # print(f"title.basics (filtered on year and title) shape: {title_basic_filtered_title.shape}")
    if len(title_basic_filtered_title) == 1:
        # print("Found match based on `basic` data")
        tconst = title_basic_filtered_title.iloc[0].tconst
        match_source = 1  # "basic"
    elif len(title_basic_filtered_title) > 1:
        pass
        # # check the aka df
        # title_aka = imdb_data["title.akas.tsv.gz"]
        # title_basic_filtered = title_aka.loc[title_aka.titleId.isin(title_basic_filtered.tconst)]
        # print(f"title.basics (filtered) shape: {title_basic_filtered.shape}")
    else:
        # Filter the aka df for movies from that year by using the titleID
        aka_filtered_year = title_aka.loc[title_aka.titleId.isin(title_basic_filtered_year.tconst)]
        aka_filtered_title = aka_filtered_year.loc[
            (aka_filtered_year.title.str.lower() == row.title.lower())
        ]
        if len(aka_filtered_title) == 1:
            tconst = aka_filtered_title.iloc[0].titleId
            match_source = 2  # "aka"
            print(3)
        elif aka_filtered_title.empty:
            pass
        else:
            # We are only interested in the titleID and many languages might have the same title
            aka_filtered_lower = aka_filtered_title.loc[:, ["titleId","title"]]
            for col in aka_filtered_lower:
                aka_filtered_lower[col] = aka_filtered_lower[col].astype(str).str.lower()
            aka_filtered_lower = aka_filtered_lower.drop_duplicates(subset=["titleId", "title"])
            if len(aka_filtered_lower) == 1:
                tconst = aka_filtered_lower.iloc[0].titleId
                match_source = 3  # "aka after duplicates"
            else:
    results.append({"movie_ID": row.movie_ID, "tconst": tconst, "match_source": match_source})
    matches_so_far += (tconst is not None)
    
    

In [None]:
i

In [None]:
list(title_basic_filtered_title.itertuples())[0]

In [None]:
row.title.lower()

In [None]:
for i in range(len(text_files)):
    print(text_files[i])
    with text_files[i].open() as f:
        for _ in range(5):
            print(f.readline())

In [None]:
config.raw_data_column_names

Extract tar files

In [None]:
fp_to_extract = DOWNLOAD_DIR / "training_set.tar"

# It won't exist anymore if it has already been extracted
if fp_to_extract.exists():
    with tarfile.open(fp_to_extract, "r") as t:
        tar_file_names = t.getnames()


In [None]:
print(len(tar_file_names))
print(tar_file_names[:2])
print(tar_file_names[-2:])

In [None]:
N_MOVIES = 100

movies_to_extract = rng.choice([x for x in tar_file_names if x.endswith(".txt")], size=N_MOVIES)

In [None]:
display(len(movies_to_extract))
movies_to_extract[:5]

In [None]:
# with tarfile.open(DATA_DIR / "download" / "training_set.tar", "r") as t:
#     t.extractall(path=DATA_DIR / "download", members=movies_to_extract)

Load data to dataframe

In [None]:
with open("/Users/jillianaugustine/Documents/GitHub/women_and_code/data/download/movie_titles.txt", "r",
encoding="latin-1") as fp:
    print(fp)
    lines = fp.readlines()

In [None]:
print(len(lines))

In [None]:
def load_df_from_txt(fp: Union[str, Path], schema: Union[Dict, List]):
    """Load data from a text file into a dataframe."""
    if isinstance(fp, str):
        fp = Path(fp)
    if not isinstance(fp, Path):
        raise TypeError(f"`fp` must be a str or Path. Got {type(fp)}.")

    def _get_chunk(stream: io.IOBase):
        # set up the first chunk
        i = 1
        current_line = stream.readline()
        if match := re.match("^\d+(?=:\n)", current_line):
            current_line = int(match[0])
        chunk = [current_line]

        # iterate through the file, yielding chunks as necessary
        while True:
            current_line = stream.readline()
            if not current_line:
                # End of stream
                break
            # check if the line matches the correct pattern
            if match := re.match("^\d+(?=:\n)", current_line):
                current_line = int(match[0])
                # yielf the previous chunk and start a new one
                logger.debug(f"Yielding chunk {i}")
                yield chunk
                i += 1
                chunk = [current_line]

        # yield the final chunk
        logger.debug(f"Yielding chunk {i}")
        yield chunk

    if not isinstance(schema, (dict, list)):
        raise TypeError(f"`schema` must be Dict or List. Got {type(schema)}")

    # Initialise
    df_list = []
    if isinstance(schema, dict):
        assert len(schema.keys()) == 1, f"`scehma` must contain only one key. Got {len(schema.keys())})."
        header_row_name = list(schema.keys())[0]
        col_names = list(schema.values())
    else:
        col_names = schema

    # Read data
    if isinstance(schema, dict):
        # df = pd.DataFrame(columns=col_names)
        with fp.open("r", encoding="latin-1") as f:
            for chunk in _get_chunk(f):
                chunk_header = chunk.pop(0)
                records = [line.split(",") for line in chunk]
                df_temp = pd.DataFrame.from_records(records, columns = col_names)
                df_temp.insert(0, header_row_name, chunk_header)
                df_list.append(df_temp)
    else:
        with fp.open("r", encoding="latin-1") as f:
            for chunk in _get_chunk(f):
                # no chunk header expected if schema is a list
                records = [line.split(",") for line in chunk]
                df_temp = pd.DataFrame.from_records(records, columns = col_names)
                df_list.append(df_temp)

    return pd.concat(df_list)

    

In [None]:
list((DATA_DIR / "download").glob("*.txt"))

In [None]:
for fp in (DATA_DIR / "download").glob("*.txt"):
    if fp.name != "qualifying.txt":
        continue
    schema = config.raw_data_column_names[fp.name]
    df = load_df_from_txt(fp, schema=schema)

In [None]:
df

In [None]:
for fp in (DATA_DIR / "download").glob("*.txt"):
    if fp.name != "qualifying.txt":
        continue
    with fp.open(encoding="latin-1") as f:
        lines = f.readlines()
        n = len(lines)
    
            

In [None]:
pattern = re.compile("^\d+(?=:\n)")

In [None]:
starts = [(i, pattern.match(line)) for i, line in enumerate(lines)]

In [None]:
# (line_no, movie_ID)
starts = [(i, int(m[0])) for i, m in starts if m]

In [None]:
slices = [slice(starts[i][0], starts[i+1][0]) for i in range(len(starts)-1)] + [slice(starts[-1][0], None)]

In [None]:
slices[-3:]

In [None]:
len(slices)

In [None]:
df_list = [None for s in slices]
n = len(slices)
for i, s in enumerate(slices):
    print(f"{i+1}/{n}") if i % 1000 == 0 else None
    subset = lines[s]
    # header is always movie_ID
    movie_ID = subset.pop(0).split(":")[0]
    movie_ID = int(movie_ID)
    df_temp = pd.DataFrame.from_records(
        [line.strip().split(",") for line in subset],
        columns = config.raw_data_column_names["qualifying.txt"]["movie_ID"]
    )
    df_temp = df_temp.assign(movie_ID = movie_ID)
    df_list[i] = df_temp

In [None]:
len(df_list)

In [None]:
df = pd.concat(df_list)

In [None]:
df