In [2]:
import os
import polars as pl
import csv

# Load Yelp data from raw_data directory
raw_data_path = './raw_data/'
target_path = './dataset/'
filter_size = 5

In [3]:
# load meta_Books.json with pandas instead of polars due to JSON parsing error
meta_books = pl.read_ndjson(os.path.join(raw_data_path, 'meta_Books.json'))
display(meta_books.head())

category,tech1,description,fit,title,also_buy,tech2,brand,feature,rank,also_view,main_cat,similar_item,date,price,asin,imageURL,imageURLHighRes
list[str],str,list[str],str,str,list[str],str,str,list[null],str,list[str],str,str,str,str,str,list[null],list[null]
[],"""""","[""It is a biology book with God&apos;s perspective.""]","""""","""Biology Gods Living Creation T…","[""0669009075"", ""B000K2P5SA"", … ""0743252012""]","""""","""Keith Graham""",[],"""1,349,781 in Books (""","[""0019777701"", ""B000AUCX7I"", … ""B0095ZCRCK""]","""Books""","""""","""""","""$39.94""","""0000092878""",[],[]
"[""Books"", ""New, Used & Rental Textbooks"", ""Medicine & Health Sciences""]","""""",[],"""""","""Mksap 16 Audio Companion: Medi…",[],"""""","""Acp""",[],"""1,702,625 in Books (""","[""B01MUCYEV7"", ""B01KUGTY6O""]","""Books""","""""","""""","""""","""000047715X""",[],[]
"[""Books"", ""Arts & Photography"", ""Music""]","""""","[""Discography of American Punk, Hardcore, and Power Pop""]","""""","""Flex! Discography of North Ame…",[],"""""","""Burkhard Jarisch""",[],"""6,291,012 in Books (""",[],"""Books""","""""","""""","""$199.99""","""0000004545""",[],[]
"[""Books"", ""Arts & Photography"", ""Music""]","""""","[""This is a collection of classic gospel hymns that many churches still enjoy singing today.""]","""""","""Heavenly Highway Hymns: Shaped…",[],"""""","""Stamps/Baxter""",[],"""2,384,057 in Books (""","[""0006180116"", ""0996092730"", … ""0871482215""]","""Books""","""""","""""","""""","""0000013765""",[],[]
[],"""""",[],"""""","""Georgina Goodman Nelson Womens…",[],"""""","""""",[],"""11,735,726 in Books (""",[],"""Books""","""""","""""","""$164.10""","""0000000116""",[],[]


In [4]:
# Create category2asin.tsv and brand2asin.tsv files
category2asin = {}
brand2asin = {}

# Process meta_books to extract categories and brands
asin2category = {}
asin2brand = {}

for row in meta_books.iter_rows(named=True):
    asin = row['asin']
    categories = row['category']
    brand = row['brand'] if 'brand' in row else None
    
    # Handle categories
    if categories and isinstance(categories, list):
        asin2category[asin] = categories
    
    # Handle brand
    if brand:
        asin2brand[asin] = brand

# Write asin2category.tsv
with open(os.path.join(target_path, 'asin2category.tsv'), 'w', newline='') as f:
    writer = csv.writer(f, delimiter='\t')
    for asin, categories in asin2category.items():
        writer.writerow([asin, categories])

# Write asin2brand.tsv
with open(os.path.join(target_path, 'asin2brand.tsv'), 'w', newline='') as f:
    writer = csv.writer(f, delimiter='\t')
    for asin, brand in asin2brand.items():
        writer.writerow([asin, brand])


In [5]:
# Load Books_5.json using polars
review_df = pl.read_ndjson(os.path.join(raw_data_path, 'Books_5.json'))
display(review_df.head())

overall,vote,verified,reviewTime,reviewerID,asin,style,reviewerName,reviewText,summary,unixReviewTime
f64,str,bool,str,str,str,struct[1],str,str,str,i64
5.0,,False,"""03 30, 2005""","""A1REUF3A1YCPHM""","""0001713353""","{"" Hardcover""}","""TW Ervin II""","""The King, the Mice and the Che…","""A story children will love and…",1112140800
5.0,,True,"""06 20, 2016""","""AVP0HXC9FG790""","""0001713353""",,"""Amazon Customer""","""The kids loved it!""","""Five Stars""",1466380800
5.0,,True,"""01 24, 2016""","""A324TTUBKTN73A""","""0001713353""","{"" Paperback""}","""Tekla Borner""","""My students (3 & 4 year olds) …","""Five Stars""",1453593600
5.0,,False,"""07 9, 2015""","""A2RE7WG349NV5D""","""0001713353""","{"" Paperback""}","""Deborah K Woroniecki""","""LOVE IT""","""Five Stars""",1436400000
5.0,,True,"""01 18, 2015""","""A32B7QIUDQCD0E""","""0001713353""",,"""E""","""Great!""","""Five Stars""",1421539200


In [6]:
# Create reviewerID to incremental id mapping
reviewer_ids = review_df['reviewerID'].unique()
reviewer_id_map = {reviewer_id: idx for idx, reviewer_id in enumerate(reviewer_ids)}

# Create asin to incremental id mapping
asin_ids = review_df['asin'].unique()
asin_id_map = {asin: idx for idx, asin in enumerate(asin_ids)}

# Save mappings to files
with open(os.path.join(target_path, 'reviewer2idx.tsv'), 'w', newline='') as f:
    writer = csv.writer(f, delimiter='\t')
    for reviewer_id, idx in reviewer_id_map.items():
        writer.writerow([reviewer_id, idx])

with open(os.path.join(target_path, 'asin2idx.tsv'), 'w', newline='') as f:
    writer = csv.writer(f, delimiter='\t')
    for asin, idx in asin_id_map.items():
        writer.writerow([asin, idx])

In [7]:
# Filter items by popularity and users by activity in one pass
filtered_reviews = (
    review_df
    .with_row_index()
    .join(
        review_df.group_by('asin').len().filter(pl.col('len') >= filter_size),
        on='asin'
    )
    .join(
        review_df.group_by('reviewerID').len().filter(pl.col('len') >= filter_size),
        on='reviewerID'
    )
    .drop(['index', 'len'])
)

# Get unique user IDs and create splits
user_ids = filtered_reviews['reviewerID'].unique()
num_users = len(user_ids)
split_1 = int(num_users * 0.8)
split_2 = int(num_users * 0.9)

# Create train/valid/test splits using polars expressions
train_users = set(user_ids[:split_1])
valid_users = set(user_ids[split_1:split_2])
test_users = set(user_ids[split_2:])

train_data = filtered_reviews.filter(pl.col('reviewerID').is_in(train_users))
valid_data = filtered_reviews.filter(pl.col('reviewerID').is_in(valid_users))
test_data = filtered_reviews.filter(pl.col('reviewerID').is_in(test_users))

In [8]:
# Create validation dataframe with user history and items to predict
validation_df = (
    valid_data
    .group_by('reviewerID')
    .agg([
        pl.col('asin').map_elements(lambda x: [asin_id_map[item] for item in x], return_dtype=pl.List(pl.Int64)).alias('given_user_history'),
        pl.col('asin').map_elements(lambda x: [asin_id_map[item] for item in x], return_dtype=pl.List(pl.Int64)).alias('predicting_items')
    ])
    .rename({'reviewerID': 'user_id'})
    .with_columns([
        pl.col('user_id').replace_strict(reviewer_id_map)
    ])
)

display(validation_df.head())


user_id,given_user_history,predicting_items
i64,list[i64],list[i64]
1127753,"[99711, 186712, … 65864]","[99711, 186712, … 65864]"
1278283,"[85903, 97934, … 381682]","[85903, 97934, … 381682]"
1410122,"[517959, 266308, … 437201]","[517959, 266308, … 437201]"
1572592,"[494178, 160130, … 82798]","[494178, 160130, … 82798]"
1029558,"[196981, 102686, … 667060]","[196981, 102686, … 667060]"


In [9]:
# Create test dataframe with user history and items to predict
test_df = (
    test_data
    .group_by('reviewerID')
    .agg([
        pl.col('asin').map_elements(lambda x: [asin_id_map[item] for item in x], return_dtype=pl.List(pl.Int64)).alias('given_user_history'),
        pl.col('asin').map_elements(lambda x: [asin_id_map[item] for item in x], return_dtype=pl.List(pl.Int64)).alias('predicting_items')
    ])
    .rename({'reviewerID': 'user_id'})
    .with_columns([
        pl.col('user_id').replace_strict(reviewer_id_map)
    ])
)

display(test_df.head())


user_id,given_user_history,predicting_items
i64,list[i64],list[i64]
352146,"[107523, 697973, … 609187]","[107523, 697973, … 609187]"
771994,"[271449, 287019, … 513613]","[271449, 287019, … 513613]"
1717550,"[479653, 189544, … 91557]","[479653, 189544, … 91557]"
859283,"[399253, 277131, … 543189]","[399253, 277131, … 543189]"
562745,"[286817, 666397, … 81544]","[286817, 666397, … 81544]"


In [12]:
# Create training dataframe with user history and items to predict
train_df = (
    train_data
    .sort('unixReviewTime')  # Sort by review time
    .group_by('reviewerID')
    .agg([
        pl.col('asin').map_elements(lambda x: [asin_id_map[item] for item in x], return_dtype=pl.List(pl.Int64)).alias('given_user_history'),
        pl.col('asin').map_elements(lambda x: [asin_id_map[item] for item in x], return_dtype=pl.List(pl.Int64)).alias('predicting_items')
    ])
    .rename({'reviewerID': 'user_id'})
    .with_columns([
        pl.col('user_id').replace_strict(reviewer_id_map)
    ])
)

# Create sequences for each user
train_sequences = []
for row in train_df.iter_rows(named=True):
    user_id = row['user_id']
    history = row['given_user_history']
    
    # Create sequences of increasing length
    for i in range(1, len(history)):
        train_sequences.append({
            'user_id': user_id,
            'given_user_history': history[:i],
            'predicting_items': history[i]
        })

# Convert to dataframe
train_df = pl.DataFrame(train_sequences)

display(train_df.head())



user_id,given_user_history,predicting_items
i64,list[i64],i64
1813400,[152005],147511
1813400,"[152005, 147511]",123243
1813400,"[152005, 147511, 123243]",33343
1813400,"[152005, 147511, … 33343]",484312
1813400,"[152005, 147511, … 484312]",375991


In [13]:
# Write dataframes to TSV with list columns as comma-separated strings
train_df.with_columns([
    pl.col('given_user_history').map_elements(lambda x: '[' + ','.join(map(str, x)) + ']', return_dtype=pl.String),
    pl.col('predicting_items').map_elements(lambda x: str(x), return_dtype=pl.String)  # Single item now
]).write_csv('dataset/training.tsv', separator='\t')

validation_df.with_columns([
    pl.col('given_user_history').map_elements(lambda x: '[' + ','.join(map(str, x)) + ']', return_dtype=pl.String),
    pl.col('predicting_items').map_elements(lambda x: '[' + ','.join(map(str, x)) + ']', return_dtype=pl.String)
]).write_csv('dataset/validation.tsv', separator='\t')

test_df.with_columns([
    pl.col('given_user_history').map_elements(lambda x: '[' + ','.join(map(str, x)) + ']', return_dtype=pl.String),
    pl.col('predicting_items').map_elements(lambda x: '[' + ','.join(map(str, x)) + ']', return_dtype=pl.String)
]).write_csv('dataset/test.tsv', separator='\t')