# Notebook to filter the LFM2b dataset

Prerequisites:
Download the files 
- [listening-events.tsv.bz2](http://www.cp.jku.at/datasets/lfm-2b/chiir/listening-events.tsv.bz2)
- [users.tsv.bz2](http://www.cp.jku.at/datasets/lfm-2b/chiir/users.tsv.bz2)  

from http://www.cp.jku.at/datasets/LFM-2b/ to your preferred directory and extract them.

In case you do not want to reduce the time span of the listening events, unlike us,
download [listening-counts.tsv.bz2](http://www.cp.jku.at/datasets/lfm-2b/chiir/listening-counts.tsv.bz2) instead.

As the dataset is quite big (~90GB extracted), we filter all listening events that are not relevant for our use-case and will proceed with the preprocessing in another notebook.

Our main objective is to reduce the memory consumption when loading the dataset, such that anyone can create the dataset for themselves. We are aware that this leads to longer processing times, however, as this has to be done only once, this should be an acceptible tradeoff. 

To do so, please execute the following command to split the dataset into multiple smaller files:  
```/usr/bin/split listening-events.tsv -l 20000000```  
and move all files into a separate folder.

The current configuration requieres about 3 GB of free memory and takes ~2h to run.

In [1]:
data_dir = r"F:\Temp\data\lfm1y"
splits_dir = r"F:\Temp\data\lfm1y\splits"
filtered_splits_dir = r"F:\Temp\data\lfm1y\splits_filtered"

In [2]:
import os
import csv
import time
import glob
import numpy as np
import pandas as pd
import pickle as pkl
from tqdm import tqdm
from datetime import datetime,  timedelta
from scipy import sparse as sp
from collections import defaultdict

### Settings

In [3]:
# define time range of which to take interactions
# min_time = datetime.fromisoformat('2016-01-01')
# max_time = datetime.today()

def date_parser(time_str):
    return datetime.strptime(time_str, "%Y-%m-%d %H:%M:%S")

# def time_ok(time_str):
#     return min_time <= datetime.strptime(time_str, "%Y-%m-%d %H:%M:%S") <= max_time

# filter users & tracks with too less interaction
min_interactions_user = 5
min_interactions_item = 5
min_interaction_count = 2

### Preprocess users

In [4]:
df_users = pd.read_csv(os.path.join(data_dir, "users.tsv"), 
                        sep="\t", engine="python", encoding='latin-1',
                        header=0)
df_users.columns = ["UserID", "Gender", "Country", "Age", "RegistrationDate"]
# df_users.columns = ["UserID", "Country", "Age", "Gender", "RegistrationDate"]

# keep users for which at least one attribute is available
user_mask = ~df_users["Country"].isna()
user_mask |= df_users["Gender"].isin(["f", "m"])
user_mask |= df_users["Age"] > 0

df_users = df_users[user_mask]
user_ids = df_users["UserID"]

# store old user indices and adjust user ids
df_users.reset_index(drop=True, inplace=True)
user_mapping = {a: b for a, b in zip(user_ids, df_users.index)}
df_users = df_users.assign(UserID = df_users.index)

df_users.to_csv(os.path.join(data_dir, "users-demo.tsv"), sep="\t", index=False)
df_users.head()

Unnamed: 0,UserID,Gender,Country,Age,RegistrationDate
0,0,f,US,27,2005-05-24 18:42:51
1,1,m,,32,2008-07-11 16:30:15
2,2,m,NL,37,2004-04-03 01:43:27
3,3,m,RU,24,2011-07-10 23:29:23
4,4,m,RO,23,2008-12-26 08:42:52


### Filter interactions
We first select only the users for which some demographic information is available. Moreover, we drop all interactions that did happen in our previously defined time frame. The results will again be stored in separate files.

In [5]:
os.makedirs(filtered_splits_dir, exist_ok=True)

first = True
fcount = 0
header = ["user_id", "item_id", "timestamp"]

item_indices = set()

files = list(sorted(glob.glob(os.path.join(splits_dir, "*"))))
for f in tqdm(files, desc="Parsing files"):
    if first:
        df = pd.read_csv(f, sep="\t", engine="c")
        df.columns = header
        first = False
    else:
        df = pd.read_csv(f, sep="\t", names=header, engine="c")
    
    # Filter by user id and timestamp
    uid_mask = df["user_id"].isin(user_ids)
#     df["time"] = pd.to_datetime(df['time'], format="%Y-%m-%d %H:%M:%S", errors="coerce")
#     time_mask = (min_time <= df["time"]) & (df["time"] <= max_time)
        
    mask = uid_mask #& time_mask
    if mask.sum() > 0:
        df = df.loc[mask, ["user_id", "item_id", "timestamp"]]
        df["user_id"] = df["user_id"].map(user_mapping)
        
        item_indices = item_indices.union(df["item_id"].unique())
        
        path = os.path.join(filtered_splits_dir, f"filtered_part_{fcount}.tsv")
        df.to_csv(path, sep="\t", index=False)
        fcount += 1

Parsing files: 100%|█████████████████████████████████████████████████████████████████| 136/136 [07:15<00:00,  3.20s/it]


In [6]:
# Store indices for later, to prevent unwanted execution time when running later cells...
with open(os.path.join(filtered_splits_dir, "item_indices.pkl"), "wb") as fh:
    pkl.dump(item_indices, fh)

### Generate interaction matrix

In [5]:
with open(os.path.join(filtered_splits_dir, "item_indices.pkl"), "rb") as fh:
    item_indices = pkl.load(fh)

In [6]:
n_users = len(df_users)
n_items = len(item_indices)
print(f"n_users={n_users}, n_items={n_items}")

n_users=4998, n_items=5846958


In [7]:
item_mapping = {idx: i for i, idx in enumerate(item_indices)}
user_mapping_filtered = {i: i for i in range(len(user_mapping))}

In [8]:
def update_mapping(initial_mapping, valid_item_indices):
    """ Function for adjusting the mappings we keep to refer back to the old data. """
    keep_mapping = {iid: i for i, iid in enumerate(valid_item_indices)}
    reverse_mapping = dict(map(reversed, initial_mapping.items()))
    return {reverse_mapping[k]: v for k, v in keep_mapping.items()}, keep_mapping

In [11]:
# generate interaction matrix for the individual files
files = sorted(glob.glob(os.path.join(filtered_splits_dir, "*.tsv")))
for i, f in enumerate(tqdm(files, desc="Parsing files")):
    df = pd.read_csv(f, sep="\t")
    
    d = defaultdict(lambda: 0)
    for uid, iid in zip(df["user_id"], df["item_id"]):
        d[(uid, iid)] += 1
        
    uids, iids = zip(*d.keys())
    iids = [item_mapping[i] for i in iids]
    data = list(d.values())
    
    interaction_matrix = sp.csr_matrix((data, (uids, iids)), 
                                       (n_users, n_items))

    sp.save_npz(os.path.join(filtered_splits_dir, f"interaction_matrix_part_{i}.npz"), interaction_matrix)

Parsing files: 100%|█████████████████████████████████████████████████████████████████| 136/136 [07:34<00:00,  3.34s/it]


In [9]:
# process the items in chunks when loading the previously stored matrix parts
items_per_chunk = 500_000

files = sorted(glob.glob(os.path.join(filtered_splits_dir, "*.npz")))
all_valid_items = np.array([], dtype=int)

im_full = []
chunk_steps = list(range(0, n_items, items_per_chunk))
for i in tqdm(chunk_steps, desc="Processing item chunks"):
    interaction_matrix = None
    for f in files:
        im_part = sp.load_npz(f)[:, i:i+items_per_chunk]
        if interaction_matrix is None:
            interaction_matrix = sp.csr_matrix(im_part.shape, dtype=int)
        interaction_matrix += im_part

    # Ignore "misclick" interactions
    interaction_matrix.data -= min_interaction_count - 1
    interaction_matrix.eliminate_zeros()

    # and binarize result
    interaction_matrix.data[:] = 1
        
    # determine items with enough interactions. this should already reduce the number of items considerably,
    # allowing us to store them in memory
    valid_items = np.argwhere(np.array(interaction_matrix.sum(axis=0)).flatten() >= min_interactions_item).flatten()
    im_full.append(interaction_matrix[:, valid_items])
    all_valid_items = np.concatenate([all_valid_items, valid_items + i])
    
im_full = sp.hstack(im_full).tocsr()

# moreover, determine which users are lacking interactions.
valid_users = np.argwhere(np.array(im_full.sum(axis=1)).flatten() >= min_interactions_user).flatten()
im_full = im_full[valid_users, :]
df_users = df_users.loc[valid_users, :]
df_users.reset_index(drop=True, inplace=True)

item_mapping, _ = update_mapping(item_mapping, all_valid_items)
user_mapping_filtered, user_keep_mapping = update_mapping(user_mapping_filtered, valid_users)
df_users["UserID"] = df_users["UserID"].map(user_keep_mapping)

Processing item chunks: 100%|██████████████████████████████████████████████████████████| 12/12 [00:58<00:00,  4.85s/it]


In [10]:
nu, ni = im_full.shape
print(f"Before filtering: n_users={nu}, n_items={ni}")

# filtering users may again lead to items with too few interactions, and vice-versa. 
while True:
    nu, ni = im_full.shape
    valid_items = np.argwhere(np.array(im_full.sum(axis=0)).flatten() >= min_interactions_item).flatten()
    im_full = im_full[:, valid_items]
    
    valid_users = np.argwhere(np.array(im_full.sum(axis=1)).flatten() >= min_interactions_user).flatten()
    im_full = im_full[valid_users, :]
    df_users = df_users.loc[valid_users, :]
    df_users.reset_index(drop=True, inplace=True)
    
    item_mapping, _ = update_mapping(item_mapping, valid_items)
    user_mapping_filtered, user_keep_mapping = update_mapping(user_mapping_filtered, valid_users)
    df_users["UserID"] = df_users["UserID"].map(user_keep_mapping)
    print(f"Performing min interaction filtering - remaining n_users={nu}, n_items={ni}")

    new_nu, new_ni = im_full.shape
    if new_nu == nu and new_ni == ni:
        break

print(f"After filtering: n_users={new_nu}, n_items={new_ni}")

Before filtering: n_users=4962, n_items=359258
Performing min interaction filtering - remaining n_users=4962, n_items=359258
Performing min interaction filtering - remaining n_users=4962, n_items=359254
After filtering: n_users=4962, n_items=359254


In [11]:
sp.save_npz(os.path.join(data_dir, "interaction_matrix_filtered.npz"), im_full)

df_users = df_users.loc[valid_users, :]
df_users.reset_index(drop=True, inplace=True)
df_users = df_users.assign(UserID=df_users.index)
df_users.to_csv(os.path.join(data_dir, "users_demo_filtered.tsv"), sep="\t", index=False)

print(f"Final matrix, density={im_full.nnz / (new_nu*new_ni):.3f}")
display(im_full)

Final matrix, density=0.003


<4962x359254 sparse matrix of type '<class 'numpy.intc'>'
	with 5934042 stored elements in Compressed Sparse Row format>

### Store mappings for later reference

In [12]:
reverse_mapping = dict(map(reversed, user_mapping.items()))
user_mapping = {reverse_mapping[k]: v for k, v in user_mapping_filtered.items()}

df_mapping = pd.DataFrame.from_dict(user_mapping, orient="index")
df_mapping.reset_index(inplace=True)
df_mapping.columns = ["old_user_id", "new_user_id"]
df_mapping.to_csv(os.path.join(data_dir, "user_mapping.csv"), index=False)

In [13]:
df_mapping = pd.DataFrame.from_dict(user_mapping_filtered, orient="index")
df_mapping.reset_index(inplace=True)
df_mapping.columns = ["old_user_id", "new_user_id"]
df_mapping.to_csv(os.path.join(data_dir, "user_mapping_filtered.csv"), index=False)

In [14]:
df_mapping = pd.DataFrame.from_dict(item_mapping, orient="index")
df_mapping.reset_index(inplace=True)
df_mapping.columns = ["old_item_id", "new_item_id"]
df_mapping.to_csv(os.path.join(data_dir, "item_mapping.csv"), index=False)

### Generating a new interaction_history.tsv file which includes a timestamp

In [15]:
im_full_coo = im_full.tocoo()

uids, iids = im_full_coo.row, im_full_coo.col

# set for fast (O(1)) lookup
interactions = set([tuple(el) for el in  np.stack([uids, iids], axis=1).tolist()])

In [17]:
full_df = None

# generate interaction matrix for the individual files
files = sorted(glob.glob(os.path.join(filtered_splits_dir, "*.tsv")))
for i, f in enumerate(tqdm(files, desc="Parsing files")):
    
    df = pd.read_csv(f, sep="\t")
    n_inter = len(df)
    
    # adjust user and item id's
    # Note: for elements that are not in the dictionaries, the result will be NaN
    df.user_id = df.user_id.map(user_mapping_filtered)
    df.item_id = df.item_id.map(item_mapping)

    # by dropping all entries, where either user or item was filtered due to not enough interactions,
    # we only keep interacitons that are in the interactions matrix
    df.dropna(inplace=True)
    
    # not sure why, but ids are floats here (dict mapping is int -> int)...
    df.user_id = df.user_id.astype(int)
    df.item_id = df.item_id.astype(int)
    
    # find first interaction by first ensuring that dataframe is sorted
    df.sort_values("timestamp", inplace=True)
    
    # we switch to numpy methods to get first timestamp of interaction to speed up compuations
    # check out https://stackoverflow.com/a/38145104 for some nice extra information
    arr_slice = df[["user_id", "item_id"]].values
    lidx = np.ravel_multi_index(arr_slice.T, arr_slice.max(0)+1)
    
    # this retrieves the first indices for every (user,item) interaction
    _, indices = np.unique(lidx, return_index=True)
    df = df.iloc[indices]
    
    # collect all partial DataFrames
    full_df = df if full_df is None else pd.concat([full_df, df])
        
# filtering a final time, now that all history parts are joined together
df = full_df
df.sort_values("timestamp", inplace=True)

arr_slice = df[["user_id", "item_id"]].values
lidx = np.ravel_multi_index(arr_slice.T, arr_slice.max(0)+1)
_, indices = np.unique(lidx, return_index=True)
df = df.iloc[indices]

# dropping bad interactions
df = df.assign(ui=list(tuple(el) for el in np.stack([df.user_id, df.item_id], axis=1)))
df = df.loc[df["ui"].isin(interactions)].drop(columns=["ui"])

Parsing files: 100%|█████████████████████████████████████████████████████████████████| 136/136 [06:00<00:00,  2.65s/it]


In [18]:
df

Unnamed: 0,user_id,item_id,timestamp
25695,0,106,2019-04-22 04:25:07
290113,0,281,2019-08-22 17:38:29
181817,0,327,2019-04-22 23:45:57
478737,0,352,2019-07-07 18:16:07
498331,0,412,2019-07-04 18:05:00
...,...,...,...
409031,4961,349834,2019-03-30 00:28:24
531500,4961,349835,2019-03-30 18:18:37
410349,4961,349836,2019-03-30 00:38:56
407967,4961,349838,2019-03-30 00:19:43


In [19]:
df.to_csv(os.path.join(data_dir, "interaction_history_filtered.tsv"), sep="\t", index=False)

In [20]:
im_full

<4962x359254 sparse matrix of type '<class 'numpy.intc'>'
	with 5934042 stored elements in Compressed Sparse Row format>

In [21]:
print("All interactions with their first timestep are maintained in the new dataframe:")
print(len(df) == im_full.nnz)

All interactions with their first timestep are maintained in the new dataframe:
True
