## 1. Imports

Importing required dependencies.

In [1]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import boto3
from botocore.config import Config
import dask.dataframe as dd
import os


#### Set up functions to import data from the cloud
Introduces helper functions for interacting with an R2 cloud storage bucket: download_from_r2 to download files and list_bucket_contents to list available files.


In [2]:
from dotenv import load_dotenv
import os

dotenv_path = '../.env'

if os.path.exists(dotenv_path):
    load_dotenv(dotenv_path)
    print(f"Loaded .env file from: {os.path.abspath(dotenv_path)}")
else:
    # Try an alternative common path if the notebook CWD is the project root
    dotenv_path_alt = 'BookDB/.env'
    if os.path.exists(dotenv_path_alt):
        load_dotenv(dotenv_path_alt)
        print(f"Loaded .env file from: {os.path.abspath(dotenv_path_alt)}")
    else:
        print(f".env file not found at {dotenv_path} or {dotenv_path_alt}. Please check the path.")

# Test if credentials are loaded
R2_ENDPOINT_URL = os.getenv('R2_ENDPOINT_URL')
if R2_ENDPOINT_URL:
    print("R2_ENDPOINT_URL successfully loaded.")
else:
    print("R2_ENDPOINT_URL not found. Check your .env file and path.")

Loaded .env file from: /Users/anze/Documents/University/Y2S2/AI Machine Learning Foundations/Final Project/BookDB/.env
R2_ENDPOINT_URL successfully loaded.


In [3]:
def download_from_r2(object_name, local_path, bucket_name="bookdbio"):
    # ensure parent dir exists
    parent_dir = os.path.dirname(local_path)
    if parent_dir and not os.path.isdir(parent_dir):
        os.makedirs(parent_dir, exist_ok=True)

    s3 = boto3.client('s3',
        endpoint_url = os.getenv('R2_ENDPOINT_URL'),
        aws_access_key_id = os.getenv('R2_ACCESS_KEY_ID'),
        aws_secret_access_key = os.getenv('R2_SECRET_ACCESS_KEY'),
        config = Config(signature_version='s3v4')
   )

    try:
        s3.download_file(bucket_name, object_name, local_path)
        print(f"Successfully downloaded {object_name} to {local_path}")
    except Exception as e:
        print(f"Download failed for {object_name}: {e}")

In [33]:
def list_bucket_contents(bucket_name="bookdbio"):
    """List all objects in the R2 bucket"""
    s3 = boto3.client('s3',
        endpoint_url = os.getenv('R2_ENDPOINT_URL'),
        aws_access_key_id = os.getenv('R2_ACCESS_KEY_ID'),
        aws_secret_access_key = os.getenv('R2_SECRET_ACCESS_KEY'),
        config = Config(signature_version='s3v4')
   )
    
    try:
        response = s3.list_objects_v2(Bucket=bucket_name)
        if 'Contents' in response:
            print("Available files in bucket:")
            for obj in response['Contents']:
                print(f"- {obj['Key']}")
        else:
            print("Bucket is empty")
    except Exception as e:
        print(f"Error listing bucket contents: {e}")

In [19]:
def upload_to_r2(files, bucket_name):
    """
    Upload multiple files to Cloudflare R2 bucket
    
    Args:
        files (list): List of tuples containing (file_path, object_name)
        bucket_name (str): R2 bucket name
    """
    # Configure R2 client
    s3 = boto3.client('s3',
        endpoint_url = os.getenv('R2_ENDPOINT_URL'),
        aws_access_key_id = os.getenv('R2_ACCESS_KEY_ID'),
        aws_secret_access_key = os.getenv('R2_SECRET_ACCESS_KEY'),
        config = Config(signature_version='s3v4')
   )

    for file_path, object_name in files:
        try:
            s3.upload_file(file_path, bucket_name, object_name)
            print(f"Successfully uploaded {file_path} to {object_name}")
        except Exception as e:
            print(f"Upload failed for {file_path}: {e}")


In [34]:
list_bucket_contents()

Available files in bucket:
- data/author_id_map.csv
- data/authors.parquet
- data/book_id_map.csv
- data/book_texts.parquet
- data/book_texts_reduced.parquet
- data/books.parquet
- data/books_dedup.parquet
- data/books_triplets.parquet
- data/books_triplets_reduced.parquet
- data/books_works.parquet
- data/interactions.parquet
- data/interactions_dedup.parquet
- data/interactions_prepared_ncf.parquet
- data/interactions_prepared_ncf_reduced.parquet
- data/item_id_map_reduced.csv
- data/new_authors.parquet
- data/new_books.parquet
- data/processed_training_pairs_parts_0_to_12.parquet
- data/reduce_books_df
- data/reduced_book_ids.csv
- data/reduced_books.parquet
- data/reduced_interactions.parquet
- data/reduced_reviews.parquet
- data/reduced_user_ids.csv
- data/reviews_dedup.parquet
- data/sampled_users_book.parquet
- data/training_pairs.parquet.zip
- data/user_id_map.csv
- data/user_id_map_reduced.csv
- db/bookdb.sql
- embeddings/SBERT_embeddings.parquet
- embeddings/gmf_book_embeddin

## 2. Data Loading and Initial Exploration

This section focuses on loading the primary datasets required for the reranker data engineering pipeline. We will download data from Cloudflare R2 storage, load them into Dask DataFrames for efficient handling of potentially large datasets, and perform some initial exploration.

The core datasets are:
* `reduced_books.parquet`: Contains metadata for books (e.g., title, authors, genres).
* `reduced_interactions.parquet`: Contains user-book interaction data (e.g., ratings, reads).
* `new_authors.parquet`: Contains author information.
* `book_texts_reduced.parquet`: Contains the textual content or descriptions of books.

We will use helper functions defined earlier (`download_from_r2`, `list_bucket_contents`, `upload_to_r2`) to manage data transfer with the R2 bucket.

### 1. 'reduced_books'

Reduced set of books: contains book meta data.

In [12]:
download_from_r2("data/reduced_books.parquet", "data/reduced_books.parquet")

Successfully downloaded data/reduced_books.parquet to data/reduced_books.parquet


In [4]:
books_df = dd.read_parquet("data/reduced_books.parquet")

In [None]:
books_df.head() 

### 2. 'reduced_interactions'

Reduced set of interactions: contains data about user-book interaction data (e.g., ratings, reads)

In [9]:
download_from_r2("data/reduced_interactions.parquet", "data/reduced_interactions.parquet")

Successfully downloaded data/reduced_interactions.parquet to data/reduced_interactions.parquet


In [5]:
interactions_df = dd.read_parquet("data/reduced_interactions.parquet")

In [None]:
interactions_df.head()

In [6]:
# Count the number of unique user IDs in the interactions dataframe
unique_users_count = interactions_df['user_id'].nunique().compute()
print(f"Number of unique user IDs: {unique_users_count}")


Number of unique user IDs: 205242


### 3. 'book_texts_reduced.parquet'

Reduced set of book descriptions: contains text describing each book, used in a later script to generate training pairs.

In [30]:
download_from_r2("data/book_texts_reduced.parquet", "data/book_texts_reduced.parquet")

Successfully downloaded data/book_texts_reduced.parquet to data/book_texts_reduced.parquet


## 3. Data Preprocessing and Feature Engineering

With the data loaded, this section focuses on transforming and engineering features to prepare for generating training pairs for the cross-encoder reranker model.

The key steps include:
* **Book Metadata Preparation (`reduced_books_df`):**
  * Dropping irrelevant columns.
  * Extracting and cleaning genre information from `popular_shelves`.
  * Integrating author names using the `authors_df`.
  * The goal is to create a concise `reduced_books_df` containing `book_id`, `title`, `description`, `genre`, and `authors`, which will be crucial for constructing user contexts and book descriptions.
* **User Interaction Data Preparation (`interactions_df`):**
  * Identifying unique users.
  * Creating a `user_books_df` that lists books read by each user, sorted by rating. This helps in identifying positive examples for training.
  * Sampling a subset of users (`sampled_users_book`) for more manageable processing, if necessary.

These processed DataFrames will then be saved to Parquet format for use in the subsequent training data generation script.

### 1. 'reduced_books'

Preparing book metadat dataset so it can be used to generate user contexts for the cross-encoder. This is done by:
* Dropping unnecessary columns
* Extracting book genres
* Adding book authors

In [7]:
def analyze_dataframe(df):
    # Get column info
    cols = df.columns
    dtypes = df.dtypes
    
    # Calculate total rows
    total_rows = len(df.compute())
    
    # Initialize lists to store results
    results = []
    
    # Analyze each column
    for col in cols:
        # Count non-null values
        non_null_count = df[col].count().compute()
        null_count = total_rows - non_null_count
        null_percentage = (null_count / total_rows) * 100
        
        results.append({
            'Column': col,
            'Data Type': str(dtypes[col]),
            'Non-Null Count': non_null_count,
            'Null Count': null_count,
            'Null Percentage': f'{null_percentage:.2f}%'
        })
    
    # Convert results to pandas DataFrame for better display
    results_df = pd.DataFrame(results)
    return results_df.sort_values('Null Percentage', ascending=False)

# Display the analysis
print("DataFrame Analysis:")
display(analyze_dataframe(books_df))

DataFrame Analysis:


Unnamed: 0,Column,Data Type,Non-Null Count,Null Count,Null Percentage
0,isbn,string,17663,0,0.00%
15,publisher,string,17663,0,0.00%
27,title,string,17663,0,0.00%
26,work_id,string,17663,0,0.00%
25,ratings_count,string,17663,0,0.00%
24,book_id,int64,17663,0,0.00%
23,image_url,string,17663,0,0.00%
22,url,string,17663,0,0.00%
21,publication_year,string,17663,0,0.00%
20,edition_information,string,17663,0,0.00%


In [8]:
books_df.set_index('book_id', inplace=True)


Unnamed: 0_level_0,isbn,text_reviews_count,series,country_code,language_code,popular_shelves,asin,is_ebook,average_rating,kindle_asin,similar_books,description,format,link,authors,publisher,num_pages,publication_day,isbn13,publication_month,edition_information,publication_year,url,image_url,ratings_count,work_id,title,title_without_series
npartitions=1,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1,Unnamed: 19_level_1,Unnamed: 20_level_1,Unnamed: 21_level_1,Unnamed: 22_level_1,Unnamed: 23_level_1,Unnamed: 24_level_1,Unnamed: 25_level_1,Unnamed: 26_level_1,Unnamed: 27_level_1,Unnamed: 28_level_1
,string,string,object,string,string,object,string,string,string,string,object,string,string,string,object,string,string,string,string,string,string,string,string,string,string,string,string,string
,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...


In [None]:
books_df.head()

Defines extract_genres. This function processes the popular_shelves column (which typically contains a list of dictionaries with shelf names and counts) to identify relevant genre keywords.

* It uses a predefined list of genre_keywords and ignore_keywords.
* It iterates through shelves, normalizes names, and checks for keyword presence.

In [9]:

def extract_genres(popular_shelves):
    """
    Extracts potential genres from a list of popular shelves dictionaries,
    adding only the base genre keyword found.

    Args:
        popular_shelves: A list of dictionaries, where each dictionary has
                         'count' and 'name' keys.

    Returns:
        A list of unique base genre names found, or an empty list on error.
    """
    try:
        if not isinstance(popular_shelves, np.ndarray) or len(popular_shelves) == 0:
            return []
        
        # Use a set to store unique base genres found
        found_genres = set() 
        
        genre_keywords = [
            'action', 'adventure', 'comedy', 'crime', 'mystery', 'textbook', 'children', 'mathematics', 'fantasy',
            'historical', 'horror', 'romance', 'satire', 'science fiction',
            'scifi', 'speculative fiction', 'thriller', 'western', 'paranormal',
            'dystopian', 'urban fantasy', 'contemporary', 'young adult', 'ya',
            'middle grade', 'children\'s', 'literary fiction', 'magic realism',
            'historical fiction', 'gothic', 'suspense', 'biography', 'memoir',
            'nonfiction', 'poetry', 'drama', 'historical romance',
            'fantasy romance', 'romantic suspense', 'science fiction romance',
            'contemporary romance', 'paranormal romance', 'epic fantasy',
            'dark fantasy', 'sword and sorcery', 'steampunk', 'cyberpunk',
            'apocalyptic', 'post-apocalyptic', 'alternate history',
            'superhero', 'mythology', 'fairy tales', 'folklore', 'war',
            'military fiction', 'spy fiction', 'political fiction', 'social science fiction',
            'techno-thriller', 'medical thriller', 'legal thriller',
            'psychological thriller', 'cozy mystery', 'hardboiled', 'noir',
            'coming-of-age', 'lgbtq+', 'christian fiction', 'religious fiction',
            'humor', 'travel', 'food', 'cooking', 'health', 'self-help',
            'business', 'finance', 'history', 'science', 'technology', 'nature',
            'art', 'music', 'philosophy', 'education', 'true crime', 'spiritual',
            'anthology', 'short stories', 'plays', 'screenplays', 'graphic novel',
            'comics', 'manga', 'erotica', 'new adult', 'chick lit', 'womens fiction',
            'sports fiction', 'family saga', ' Regency romance', 'literature'
        ]
        # Sort keywords by length descending to match longer phrases first (e.g., "science fiction" before "science")
        genre_keywords.sort(key=len, reverse=True)

        ignore_keywords = ['to-read', 'owned', 'hardcover', 'shelfari-favorites', 'series', 'might-read',
                           'dnf-d', 'hambly-barbara', 'strong-females', 'first-in-series',
                           'no-thanks-series-collections-boxes', 'entertaining-but-limited',
                           'kate-own', 'e-book', 'compliation', 'my-books',
                           'books-i-own-but-have-not-read', 'everything-owned', 'books-to-find',
                           'i-own-it', 'favorite', 'not-read', 'read-some-day', 'library',
                           'audiobooks', 'status-borrowed', 'owned-books',
                           'spec-fic-awd-locus-nom', '01', 'hardbacks', 'paper', 'german',
                           'hardback', 'physical-scifi-fantasy', 'childhood-favorites',
                           'bundle-same-author', 'aa-sifi-fantasy', 'ready-to-read',
                           'bought-on-flee-markets', 'fantasy-general', 'hardcopy', 'box-2',
                           'unfinished', 'magic', 'duplicates', 'favorites', 'books-i-own',
                           'fantasy-classic', 'own-hard-copy', 'fantasy-read',
                           'book-club-edition', 'sci-fi-or-fantasy', 'fiction-fantasy',
                           'fiction-literature-poetry', 'paused-hiatus', 'status—borrowed',
                           'recs-fantasy', 'fantasy-scifi', 'omnibus', 'speculative',
                           'sf--fantasy', 'in-my-home-library', 'fant-myth-para-vamps',
                           'read-in-my-20s']

        for shelf in popular_shelves:
            if not isinstance(shelf, dict) or 'name' not in shelf:
                continue
            
            shelf_name = shelf['name'].lower().strip() # Normalize shelf name

            # Skip if shelf name contains any ignore keywords
            if any(ignore in shelf_name for ignore in ignore_keywords):
                continue

            # Check if any genre keyword is present in the shelf name
            for keyword in genre_keywords:
                # Use word boundaries or careful checks to avoid partial matches (e.g., 'art' in 'heart')
                # Simple substring check for now, might need refinement depending on data
                if keyword in shelf_name: 
                    found_genres.add(keyword) # Add the base keyword
                    # Optional: break here if you only want the first/longest match per shelf
                    # break 

        return sorted(list(found_genres))
    except Exception as e:
        print(f"Error in extract_genres function: {e}")
        # Log the error message
        logging.error("Error in extract_genres function", exc_info=True)
        return []

In [None]:
books_df.head()

In [20]:
download_from_r2("data/new_authors.parquet", "data/new_authors.parquet")

Successfully downloaded data/new_authors.parquet to data/new_authors.parquet


In [10]:
authors_df = dd.read_parquet("data/new_authors.parquet")

In [11]:
# Create reduced DataFrame
reduced_books_df = books_df[['book_id', 'title', 'description']].copy()

# Modify extract_genres to return a string instead of a list
def extract_genres_string(shelves):
    genres = extract_genres(shelves)
    return ','.join(genres) if genres else ''

# Apply the modified function to get string representation of genres
reduced_books_df['genre'] = books_df['popular_shelves'].apply(extract_genres_string)

# Convert authors to string representation as well
def get_author_names(author_ids):
    author_names = []
    for author_id in author_ids:
        try:
            name = authors_df.loc[authors_df['author_id'] == author_id]['name'].compute().values[0]
            author_names.append(name)
        except:
            continue
    return ','.join(author_names)

reduced_books_df['authors'] = books_df['authors'].apply(get_author_names)

# Display sample of the reduced DataFrame
print("\nSample of reduced books DataFrame:")
print(reduced_books_df.head())

# Display genre distribution (need to split the strings for counting)
print("\nGenre distribution:")
genre_counts = reduced_books_df['genre'].apply(lambda x: x.split(',') if x else []).explode().value_counts()
print(genre_counts)

You did not provide metadata, so Dask is running your function on a small dataset to guess output types. It is possible that Dask will guess incorrectly.
To provide an explicit output types or to silence this message, please provide the `meta=` keyword, as described in the apply function that you are using.
  Before: .apply(func)
  After:  .apply(func, meta=('popular_shelves', 'object'))

You did not provide metadata, so Dask is running your function on a small dataset to guess output types. It is possible that Dask will guess incorrectly.
To provide an explicit output types or to silence this message, please provide the `meta=` keyword, as described in the apply function that you are using.
  Before: .apply(func)
  After:  .apply(func, meta=('authors', 'object'))




Sample of reduced books DataFrame:
      book_id                                              title  \
3     6066819                               Best Friends Forever   
15      89375  90 Minutes in Heaven: A True Story of Death an...   
479  11731782                              Collide (Collide, #1)   
583     54270                                         Mein Kampf   
807     38568                         A Quick Bite (Argeneau #1)   

                                           description  \
3    Addie Downs and Valerie Adler were eight when ...   
15   As he is driving home from a minister's confer...   
479  Sherry has always known there was something ou...   
583  Madman, tyrant, animal - history has given Ado...   
807  That hot guy tied to Lissianna Argeneau's bed?...   

                                                 genre  \
3    coming-of-age,contemporary,drama,humor,mystery...   
15     biography,memoir,nonfiction,self-help,spiritual   
479  contemporary,dystopian,fant

You did not provide metadata, so Dask is running your function on a small dataset to guess output types. It is possible that Dask will guess incorrectly.
To provide an explicit output types or to silence this message, please provide the `meta=` keyword, as described in the apply function that you are using.
  Before: .apply(func)
  After:  .apply(func, meta=('genre', 'object'))



### Saving Processed Book Metadata

The `reduced_books_df` DataFrame, now containing cleaned and structured book metadata (ID, title, description, genres, authors), is saved to `data/reduce_books_df.parquet`. This file will be a key input for the `generate_training_data.py` script.

In [None]:
import dask.dataframe as dd
from dask.diagnostics import ProgressBar
import psutil

# Monitor memory usage
def print_memory_usage():
    process = psutil.Process()
    print(f"Memory usage: {process.memory_info().rss / 1024 / 1024:.2f} MB")

# Before processing
print_memory_usage()

# Set up progress bar
with ProgressBar():
    # Process your DataFrame
    result = reduced_books_df.compute()  # or whatever operation you're doing
    
    # Save to parquet with optimizations
    result.to_parquet(
        'data/reduce_books_df.parquet',
        compression='snappy',
        index=False,
        engine='pyarrow'
    )

# After processing
print_memory_usage()

In [23]:
test_reduced_books_read = dd.read_parquet('data/reduce_books_df.parquet')
print("Sample of re-read reduced_books_df:")
print(test_reduced_books_read.head())

Sample of re-read reduced_books_df:
    book_id                                              title  \
0   6066819                               Best Friends Forever   
1     89375  90 Minutes in Heaven: A True Story of Death an...   
2  11731782                              Collide (Collide, #1)   
3     54270                                         Mein Kampf   
4     38568                         A Quick Bite (Argeneau #1)   

                                         description  \
0  Addie Downs and Valerie Adler were eight when ...   
1  As he is driving home from a minister's confer...   
2  Sherry has always known there was something ou...   
3  Madman, tyrant, animal - history has given Ado...   
4  That hot guy tied to Lissianna Argeneau's bed?...   

                                               genre                  authors  
0  coming-of-age,contemporary,drama,humor,mystery...          Jennifer Weiner  
1    biography,memoir,nonfiction,self-help,spiritual  Don Piper,Cecil 

### 2. reduced_interactions

Now, we process the user-book interaction data. The primary goal is to create a DataFrame (`user_books_df`) where each row represents a user and contains a list of `book_id`s they have interacted with, preferably sorted by their preference (e.g., rating). This list will serve as the basis for identifying "positive" examples (books the user liked/read).

In [15]:
import pandas as pd

def get_sorted_books_for_user_group(df_user_group):
    """
    For a group of interactions belonging to a single user (as a pandas DataFrame),
    sorts them by rating and returns a list of book_ids.
    """
    # df_user_group is a pandas DataFrame
    return df_user_group.sort_values('rating', ascending=False)['book_id'].tolist()


user_books_series_dd = interactions_df.groupby('user_id').apply(
    get_sorted_books_for_user_group,
    meta=pd.Series(dtype='object', name='books_read_list') # Output is a Series of lists
)

user_books_ddf = user_books_series_dd.to_frame(name='books_read').reset_index()

user_books_df = user_books_ddf.compute()

user_books_df['num_books'] = user_books_df['books_read'].apply(len)

# Display sample of the DataFrame (matches original code's intent)
print("\nSample of user_books_df:")
print(user_books_df.head())


Sample of user_books_df:
                            user_id  \
0  000b29d6db10003f526b26f03198ade6   
1  000e8a319dcfa7d80e9a80921f8ca102   
2  001ea9b015d2e811ef63cf70410f30f8   
3  002263ca74da8c31029ae7ec5754083b   
4  003b6ee4e7f465867092d92de889c8a2   

                                          books_read  num_books  
0  [22738563, 4667024, 15881, 2, 20820994, 172282...         89  
1  [2657, 18512, 865, 7332, 31196, 4406, 68591, 1...         99  
2  [80674, 37741, 16299991, 10964, 15881, 6, 4374...        117  
3  [9717, 122403, 13496, 62291, 119073, 6483211, ...        188  
4  [85733, 10407279, 74586, 13497818, 4214, 35729...        143  


In [16]:
user_books_df.shape

(205242, 3)

### Sampling Users and Saving User-Book Lists

To manage computational resources , we sample 50,000 users from the `user_books_df`. The resulting DataFrame, `sampled_users_book`, contains `user_id` and their corresponding `books_read` lists. This is saved to `data/sampled_users_book.parquet` and will also be an input to the `generate_training_data.py` script.

In [17]:
# Sample 50,000 users from the pandas DataFrame
sampled_users_book_pd = user_books_df.sample(n=50000, random_state=42)

# If you need it back as a Dask DataFrame
sampled_users_book = dd.from_pandas(sampled_users_book_pd, npartitions=10)

In [23]:
import dask.dataframe as dd
from dask.diagnostics import ProgressBar
import psutil

# Monitor memory usage
def print_memory_usage():
    process = psutil.Process()
    print(f"Memory usage: {process.memory_info().rss / 1024 / 1024:.2f} MB")

# Before processing
print_memory_usage()

# Set up progress bar
with ProgressBar():
    # Process your DataFrame
    result = sampled_users_book.compute()  # or whatever operation you're doing
    
    # Save to parquet with optimizations
    result.to_parquet(
        'data/sampled_users_book.parquet',
        compression='snappy',
        index=False,
        engine='pyarrow'
    )

# After processing
print_memory_usage()

Memory usage: 42.66 MB
[########################################] | 100% Completed | 109.71 ms
Memory usage: 700.05 MB


In [24]:
test3 = dd.read_parquet("data/sampled_users_book.parquet")

In [25]:
test3.head()

Unnamed: 0,user_id,books_read,num_books
0,001af7947e217e17694c5a9c097afffb,"[57854, 34, 7332, 5470, 9646, 14142, 11138, 17...",38
1,0006260f85929db85eddee3a0bd0e504,"[29056083, 357, 5358, 78129, 375802, 10428708,...",20
2,000bcda59ab565512f51f9e1f531b5e5,"[862041, 2767052, 8933944, 3685, 1772910, 2641...",60
3,0005f52944ea1992e95d61f287acaea9,"[2219694, 169875, 18335634, 23705512, 7171637,...",65
4,000883382802f2d95a3dd545bb953882,"[22402154, 13372690, 13104080, 10429045, 67523...",155


## 4. Uploading Processed Data to Cloud Storage

The processed DataFrames (`reduce_books_df.parquet` and `sampled_users_book.parquet`), along with the `book_texts_reduced.parquet` (which was downloaded earlier), are essential inputs for the next stage. This step uploads these files to the R2 bucket, making them accessible for the training pair generation script or other downstream processes.

In [27]:
# Upload files
files_to_upload = [
    ("data/sampled_users_book.parquet", "data/sampled_users_book.parquet"),
    ("data/reduce_books_df.parquet", "data/reduce_books_df"),
]
upload_to_r2(files_to_upload, "bookdbio")

Successfully uploaded data/sampled_users_book.parquet to data/sampled_users_book.parquet
Successfully uploaded data/reduce_books_df.parquet to data/reduce_books_df


## 5. Data Verification Before Training Pair Generation

Before proceeding to generate training pairs, it's crucial to verify the structure and content of the input files:
* `data/sampled_users_book.parquet`: User IDs and their lists of read books.
* `data/reduce_books_df.parquet`: Book metadata (ID, title, description, genre, authors).
* `data/book_texts_reduced.parquet`: Book full texts or detailed descriptions.

This step ensures the data is in the expected format for the `generate_training_data.py` script.

In [None]:
download_from_r2("data/sampled_users_book.parquet")

In [33]:
import pandas as pd
import dask.dataframe as dd

# Load and check the input files
print("=== User-Book Interactions ===")
user_books = dd.read_parquet('data/sampled_users_book.parquet')
user_books_pd = user_books.compute()
print("Shape:", user_books_pd.shape)
print("\nColumns:", user_books_pd.columns.tolist())
print("\nSample:")
print(user_books_pd.head())
print("\nBooks_read column type:", type(user_books_pd['books_read'].iloc[0]))
print("Sample books_read value:", user_books_pd['books_read'].iloc[0])

print("\n=== Book Metadata ===")
books_df = dd.read_parquet('data/reduce_books_df.parquet')
books_pd = books_df.compute()
print("Shape:", books_pd.shape)
print("\nColumns:", books_pd.columns.tolist())
print("\nSample:")
print(books_pd.head())

print("\n=== Book Texts ===")
book_texts = dd.read_parquet('data/book_texts_reduced.parquet')
book_texts_pd = book_texts.compute()
print("Shape:", book_texts_pd.shape)
print("\nColumns:", book_texts_pd.columns.tolist())
print("\nSample:")
print(book_texts_pd.head())

=== User-Book Interactions ===
Shape: (50000, 3)

Columns: ['user_id', 'books_read', 'num_books']

Sample:
                            user_id  \
0  001af7947e217e17694c5a9c097afffb   
1  0006260f85929db85eddee3a0bd0e504   
2  000bcda59ab565512f51f9e1f531b5e5   
3  0005f52944ea1992e95d61f287acaea9   
4  000883382802f2d95a3dd545bb953882   

                                          books_read  num_books  
0  [57854, 34, 7332, 5470, 9646, 14142, 11138, 17...         38  
1  [29056083, 357, 5358, 78129, 375802, 10428708,...         20  
2  [862041, 2767052, 8933944, 3685, 1772910, 2641...         60  
3  [2219694, 169875, 18335634, 23705512, 7171637,...         65  
4  [22402154, 13372690, 13104080, 10429045, 67523...        155  

Books_read column type: <class 'str'>
Sample books_read value: [57854, 34, 7332, 5470, 9646, 14142, 11138, 17343, 30633, 92003, 23617, 46654, 15241, 18512, 77566, 538845, 1519, 665, 15997, 3836, 33, 597790, 30659, 102920, 23613, 103390, 5129, 11149, 100915, 767

## 6. Generating Training Pairs (External Script)

At this point, the necessary input files (`sampled_users_book.parquet`, `reduce_books_df.parquet`, and `book_texts_reduced.parquet`) are prepared. The next step is to generate the actual training pairs (user context, book text, label).

This task is performed by the `generate_training_data.py` script.

### `generate_training_data.py` Script Overview:

* **Purpose:** To create positive and negative training examples for a cross-encoder reranker.
* **Inputs:**
    * `data/sampled_users_book.parquet`: User IDs and their lists of read books (positive interactions).
    * `data/reduce_books_df.parquet`: Book metadata (title, authors, genres) used to create user contexts.
    * `data/book_texts_reduced.parquet`: Textual content/descriptions for each book.
* **Process (`cross_encoder_data_prep.py` functions):**
    1. For each user and each book they've read (positive example):
        * A "user context" is created. This context is a textual summary of the user's reading history (e.g., "Favorite books: [Book A] by [Author X] and [Book B] by [Author Y]. Favorite genres: [Genre 1] and [Genre 2]."). The target positive book is typically excluded from this context (leave-one-out).
        * A positive pair is formed: `(user_context, positive_book_text, label=1)`.
    2. For each positive example, a specified number of negative examples (`neg_ratio`) are generated:
        * Negative books are randomly sampled from the set of all books *not* read by the user.
        * Negative pairs are formed: `(user_context, negative_book_text, label=0)`. The user context remains the same as for the corresponding positive example.
* **Output:**
    * A Parquet file (e.g., `data/training_pairs.parquet/`) containing the generated training pairs, typically partitioned for easier handling. Each row includes `user_id`, `user_ctx`, `book_id`, `book_text`, and `label`.

    **To run this script (from the `BookDB/reranker/` directory):**
    ```bash
    python generate_training_data.py
    ```
    This script will read the input Parquet files from the `data/` subdirectory and write its output (the `training_pairs.parquet` directory) also to `data/`.

## 7. Post-processing Generated Training Pairs (External Script)

After `generate_training_data.py` has created the initial set of training pairs (e.g., in `data/training_pairs.parquet/`), we must inspect the training paris to make sure they have been generated correctly before being used as training data for the model.

In [28]:
import dask.dataframe as dd

# Define the base path to your Parquet directory
base_path = 'data/training_pairs.parquet/' # Make sure this ends with a slash

# List the specific parts you want to load
parts_to_load = [
    base_path + 'part.0.parquet',
]

# Load the specified parts
df_dd = dd.read_parquet(parts_to_load)


training_pairs_df = df_dd.compute()

print(f"Dask DataFrame loaded with {df_dd.npartitions} partitions.")

Dask DataFrame loaded with 1 partitions.


In [29]:
training_pairs_df.head(20)

Unnamed: 0,user_id,book_id,user_ctx,book_text,label
0,001af7947e217e17694c5a9c097afffb,57854,Favorite books: Tao Te Ching by Lao Tzu and Gi...,"Title: Tao Te Ching | Genres: history, literat...",1
1,001af7947e217e17694c5a9c097afffb,15808287,Favorite books: Tao Te Ching by Lao Tzu and Gi...,Title: Mrs. Lincoln's Dressmaker | Genres: bio...,0
2,001af7947e217e17694c5a9c097afffb,3692,Favorite books: Tao Te Ching by Lao Tzu and Gi...,Title: The Heart of the Matter | Genres: conte...,0
3,001af7947e217e17694c5a9c097afffb,603515,Favorite books: Tao Te Ching by Lao Tzu and Gi...,"Title: The Hound of Rowan (The Tapestry, #1) |...",0
4,001af7947e217e17694c5a9c097afffb,34,Favorite books: Tao Te Ching by Lao Tzu and Gi...,Title: The Fellowship of the Ring (The Lord of...,1
5,001af7947e217e17694c5a9c097afffb,73965,Favorite books: Tao Te Ching by Lao Tzu and Gi...,Title: Drinking: A Love Story | Genres: biogra...,0
6,001af7947e217e17694c5a9c097afffb,1215919,Favorite books: Tao Te Ching by Lao Tzu and Gi...,Title: Highlander Untamed (MacLeods of Skye Tr...,0
7,001af7947e217e17694c5a9c097afffb,218038,Favorite books: Tao Te Ching by Lao Tzu and Gi...,"Title: All About Love (Cynster, #6) | Genres: ...",0
8,001af7947e217e17694c5a9c097afffb,7332,Favorite books: Tao Te Ching by Lao Tzu and Gi...,"Title: The Silmarillion | Genres: adventure, a...",1
9,001af7947e217e17694c5a9c097afffb,455930,Favorite books: Tao Te Ching by Lao Tzu and Gi...,"Title: Echo Burning (Jack Reacher, #5) | Genre...",0


In [30]:
print(f"Shape of loaded data: {training_pairs_df.shape}")

Shape of loaded data: (936681, 5)


In [31]:
import pandas as pd
import dask.dataframe as dd


# Display basic information
print("Shape of the dataset:", training_pairs_df.shape)
print("\nColumns:", training_pairs_df.columns.tolist())
print("\nSample of the data:")
print(training_pairs_df.head())

# Check the distribution of labels
print("\nLabel distribution:")
print(training_pairs_df['label'].value_counts())

# Check some example user contexts and book texts
print("\nExample user context:")
print(training_pairs_df['user_ctx'].iloc[0])
print("\nExample book text:")
print(training_pairs_df['book_text'].iloc[0])

Shape of the dataset: (936681, 5)

Columns: ['user_id', 'book_id', 'user_ctx', 'book_text', 'label']

Sample of the data:
                            user_id   book_id  \
0  001af7947e217e17694c5a9c097afffb     57854   
1  001af7947e217e17694c5a9c097afffb  15808287   
2  001af7947e217e17694c5a9c097afffb      3692   
3  001af7947e217e17694c5a9c097afffb    603515   
4  001af7947e217e17694c5a9c097afffb        34   

                                            user_ctx  \
0  Favorite books: Tao Te Ching by Lao Tzu and Gi...   
1  Favorite books: Tao Te Ching by Lao Tzu and Gi...   
2  Favorite books: Tao Te Ching by Lao Tzu and Gi...   
3  Favorite books: Tao Te Ching by Lao Tzu and Gi...   
4  Favorite books: Tao Te Ching by Lao Tzu and Gi...   

                                           book_text  label  
0  Title: Tao Te Ching | Genres: history, literat...      1  
1  Title: Mrs. Lincoln's Dressmaker | Genres: bio...      0  
2  Title: The Heart of the Matter | Genres: conte...      0 

In [32]:
# Count positive labels per user
positive_counts = training_pairs_df[training_pairs_df['label'] == 1].groupby('user_id').size()
print("\nNumber of positive examples per user:")
print(positive_counts)

# Get some statistics about the positive counts
print("\nStatistics about positive examples per user:")
print(positive_counts.describe())

# Find users with very few positive examples
users_with_few_positives = positive_counts[positive_counts < 5]
print(f"\nNumber of users with less than 5 positive examples: {len(users_with_few_positives)}")
print("\nExample users with few positives:")
print(users_with_few_positives.head())



Number of positive examples per user:
user_id
0005f52944ea1992e95d61f287acaea9     65
0006260f85929db85eddee3a0bd0e504     20
0006db397ebf02b2e891d1048fb70dbc    166
0006de2967df1ec4432c51090803966e     76
000883382802f2d95a3dd545bb953882    154
                                   ... 
0a4decbebdcf1904ebea85c27c4118ef     75
0a664903364d6bb0ae20352854e001b3     73
0a678ecce2fdd2f3168073113dbca527    140
0a74ac6932b75d48032dc982bb0b9841    101
0a97f788f5707a7f116f5cc16875597e     46
Length: 1785, dtype: int64

Statistics about positive examples per user:
count    1785.000000
mean      133.652101
std       100.025313
min         1.000000
25%        77.000000
50%       109.000000
75%       163.000000
max      1478.000000
dtype: float64

Number of users with less than 5 positive examples: 14

Example users with few positives:
user_id
008c374625966c32477ebab37e835a4e    1
00e8157279aa30f4b919aea0a887f49a    2
01e2d286d0361edf8c62bc580d3baa18    1
02ac01d9ebc7165e80d8967f075adbd3    3
0378ae

**Note** What we found out from this step is that the 'generate_training_data.py' script was generating way too much data for us to feasably use in training the reranker (mistake in the way the script was written). Therefore, we reduced the training pairs dataset in a way that we ended up with around 200k-250k total pair, using multiple positive pairs from the same user as well. In order to have good data for the model to learn from, we sampled in such a way as to still have around 20k-30k unique users in the final set, meaning 20-30k unique user contexts, so the model would not overfit. For every positive pair there are 3 negative pairs, and we sampled multiple positives from each user and the corresponding number of negatives. 

### `process_parquet_chunks.py` Script Overview:

*   **Purpose:** To refine the training dataset generated by `generate_training_data.py`. It processes chunks of the training data (individual Parquet files within the `data/training_pairs.parquet/` directory) to apply specific sampling logic.
*   **Inputs:**
        *   Individual Parquet files from a directory (e.g., `data/training_pairs.parquet/part.0.parquet`, `data/training_pairs.parquet/part.1.parquet`, etc.).
*   **Process (`process_individual_chunk` function):**
    1.  For each user within a chunk:
        *   **Positive Samples:**
            *   If a user has more positive examples than `MAX_POSITIVES_TO_SAMPLE`, it samples `MAX_POSITIVES_TO_SAMPLE` of them.
            *   If a user has between `MIN_POSITIVES_TO_KEEP_USER` and `MAX_POSITIVES_TO_SAMPLE` (inclusive) positive examples, it keeps all of them.
                *   If a user has fewer than `MIN_POSITIVES_TO_KEEP_USER` positive examples, their positive examples from that chunk might be dropped (depending on the logic, it seems they are dropped).
            *   **Negative Samples:**
                *   For each selected positive sample, it samples `NEGATIVES_PER_POSITIVE` negative samples for that same user from the available negatives *within that chunk*.
    *   **Output:**
        *   A single combined Parquet file (e.g., `data/processed_training_pairs_parts_0_to_12.parquet`) containing the refined and sampled training data.

    **To run this script (from the `BookDB/reranker/` directory):**
    ```bash
    python process_parquet_chunks.py
    ```
    This script will read the specified `part.*.parquet` files from `data/training_pairs.parquet/`, process them, and save the combined, refined dataset to a new Parquet file in the `data/` directory.

## 8. Final Processed Data

After running `process_parquet_chunks.py`, the final, refined dataset (e.g., `data/processed_training_pairs_parts_0_to_12.parquet`) is ready for training the cross-encoder reranker model.

In [21]:
files_to_upload = [("data/processed_training_pairs_parts_0_to_12.parquet", "data/processed_training_pairs_parts_0_to_12.parquet")]
upload_to_r2(files_to_upload, "bookdbio")

Successfully uploaded data/processed_training_pairs_parts_0_to_12.parquet to data/processed_training_pairs_parts_0_to_12.parquet
