## Recommender Systems using Snorkel

In [1]:
pip install snorkel



In [2]:
import logging
import os

logging.basicConfig(level=logging.INFO)


if os.path.basename(os.getcwd()) == "snorkel-tutorials":
    os.chdir("recsys")

In [3]:
import calendar
import gzip
import json
import logging
import os
import pickle
from datetime import datetime
from typing import Any, Dict, List, Optional, Tuple

import gdown
import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split
from tensorflow.keras import backend as K

IS_TEST = os.environ.get("TRAVIS") == "true" or os.environ.get("IS_TEST") == "true"

YA_BOOKS_URL = "https://drive.google.com/uc?id=1gH7dG4yQzZykTpbHYsrw2nFknjUm0Mol"
YA_INTERACTIONS_URL = "https://drive.google.com/uc?id=1NNX7SWcKahezLFNyiW88QFPAqOAYP5qg"
YA_REVIEWS_URL = "https://drive.google.com/uc?id=1M5iqCZ8a7rZRtsmY5KQ5rYnP9S0bQJVo"
SMALL_DATA_URL = "https://drive.google.com/uc?id=1_UY4xTbk3o0xjGbVllQZC2bBt-WAwyF_"

BOOK_DATA = "data/goodreads_books_young_adult.json.gz"
INTERACTIONS_DATA = "data/goodreads_interactions_young_adult.json.gz"
REVIEWS_DATA = "data/goodreads_reviews_young_adult.json.gz"
SAMPLE_DATA = "data/sample_data.pkl"

In [4]:
def download_and_process_data() -> Tuple[Tuple[pd.DataFrame, ...], pd.DataFrame]:
    logging.info("Downloading raw data")
    maybe_download_files()
    if IS_TEST:
        return load_small_sample()
    logging.info("Processing book data")
    df_books, book_id_to_idx = process_books_data()
    logging.info("Processing interaction data")
    df_interactions, user_id_to_idx = process_interactions_data(book_id_to_idx)
    df_interactions_nz = df_interactions[df_interactions.rating != 0]
    ratings_map = {1: 0, 2: 0, 3: 0, 4: 1, 5: 1}
    df_interactions_nz["rating_4_5"] = df_interactions_nz.rating.map(ratings_map)
    logging.info("Processing review data")
    df_reviews = process_reviews_data(book_id_to_idx, user_id_to_idx)
    logging.info("Joining interaction data")
    # Compute book_idxs for each user.
    user_to_books = (
        df_interactions.groupby("user_idx")["book_idx"]
        .apply(tuple)
        .reset_index()
        .rename(columns={"book_idx": "book_idxs"})
    )
    data = user_to_books.merge(df_interactions_nz, on="user_idx", how="inner")[
        ["user_idx", "book_idxs", "book_idx", "rating_4_5"]
    ].merge(
        df_reviews[["user_idx", "book_idx", "review_text"]],
        on=["user_idx", "book_idx"],
        how="left",
    )
    data = data.rename(columns={"rating_4_5": "rating"})
    user_idxs = list(user_id_to_idx.values())
    return split_data(user_idxs, data), df_books

In [5]:
def maybe_download_files(data_dir: str = "data") -> None:
    if not os.path.exists(data_dir):
        os.makedirs(data_dir, exist_ok=True)
        if IS_TEST:
            # Sample data pickle
            gdown.download(SMALL_DATA_URL, output=SAMPLE_DATA, quiet=None)
        else:
            # Books
            gdown.download(YA_BOOKS_URL, output=BOOK_DATA, quiet=None)
            # Interactions
            gdown.download(YA_INTERACTIONS_URL, output=INTERACTIONS_DATA, quiet=None)
            # Reviews
            gdown.download(YA_REVIEWS_URL, output=REVIEWS_DATA, quiet=None)

In [6]:
def process_books_data(
    book_path: str = BOOK_DATA, min_ratings: int = 100, max_ratings: int = 15000
) -> Tuple[pd.DataFrame, Dict[int, int]]:
    books = load_data(book_path, None)
    df_books = pd.DataFrame(books)
    df_books = df_books[
        [
            "authors",
            "average_rating",
            "book_id",
            "country_code",
            "description",
            "is_ebook",
            "language_code",
            "ratings_count",
            "similar_books",
            "text_reviews_count",
            "title",
        ]
    ]
    df_books = df_books.astype(
        dict(
            average_rating=float,
            book_id=int,
            is_ebook=bool,
            ratings_count=int,
            text_reviews_count=int,
        )
    )
    # Turns author role dict into list of <= 5 authors for simplicity.
    df_books.authors = df_books.authors.map(
        lambda l: [pair["author_id"] for pair in l[:5]]
    )
    df_books["first_author"] = df_books.authors.map(lambda l: int(l[0]))

    df_books = df_books[
        (df_books.ratings_count >= min_ratings)
        & (df_books.ratings_count <= max_ratings)
    ]

    book_id_to_idx = {v: i for i, v in enumerate(df_books.book_id)}
    df_books["book_idx"] = df_books.book_id.map(book_id_to_idx)
    return df_books, book_id_to_idx


In [7]:
def process_interactions_data(
    book_id_to_idx: Dict[int, int],
    interactions_path: str = INTERACTIONS_DATA,
    min_user_count: int = 25,
    max_user_count: int = 200,
    max_to_load: int = 5_000_000,
) -> Tuple[pd.DataFrame, Dict[int, int]]:
    interactions = load_data(
        interactions_path,
        max_to_load,
        dict(book_id=set(map(str, book_id_to_idx.keys()))),
    )
    df_interactions = pd.DataFrame(interactions)
    df_interactions = df_interactions[
        ["book_id", "is_read", "rating", "review_id", "user_id"]
    ]
    df_interactions = df_interactions.astype(
        dict(book_id=int, is_read=bool, rating=int)
    )
    df_interactions["book_idx"] = df_interactions.book_id.map(book_id_to_idx)
    user_counts = df_interactions.groupby(["user_id"]).size()
    user_mask = (user_counts >= min_user_count) & (user_counts <= max_user_count)
    users_filt = user_counts[user_mask].index
    user_id_to_idx = {v: i for i, v in enumerate(users_filt)}
    df_interactions = df_interactions[
        df_interactions.user_id.isin(set(user_id_to_idx.keys()))
    ]
    df_interactions["user_idx"] = df_interactions.user_id.map(user_id_to_idx)
    return df_interactions, user_id_to_idx


def process_reviews_data(
    book_id_to_idx: Dict[int, int],
    user_id_to_idx: Dict[int, int],
    reviews_path: str = REVIEWS_DATA,
) -> pd.DataFrame:
    reviews = load_data(
        reviews_path,
        None,
        dict(
            book_id=set(map(str, book_id_to_idx.keys())),
            user_id=set(user_id_to_idx.keys()),
        ),
    )
    df_reviews = pd.DataFrame(reviews)
    df_reviews["book_idx"] = df_reviews.book_id.astype("int").map(book_id_to_idx)
    df_reviews["user_idx"] = df_reviews.user_id.map(user_id_to_idx)
    return df_reviews


def split_data(user_idxs, data: pd.DataFrame) -> Tuple[pd.DataFrame, ...]:
    user_idxs_train, user_idxs_test = train_test_split(user_idxs, test_size=0.05)
    user_idxs_train, user_idxs_dev = train_test_split(user_idxs_train, test_size=0.01)
    user_idxs_train, user_idxs_val = train_test_split(user_idxs_train, test_size=0.01)

    data_train = data[data.user_idx.isin(set(user_idxs_train))].drop("rating", axis=1)
    data_test = data[data.user_idx.isin(set(user_idxs_test))]
    data_dev = data[data.user_idx.isin(set(user_idxs_dev))]
    data_val = data[data.user_idx.isin(set(user_idxs_val))]
    return data_train, data_test, data_dev, data_val

In [8]:
def recall_batch(y_true: np.ndarray, y_pred: np.ndarray) -> float:
    true_positives = K.sum(K.round(y_true * y_pred))
    all_positives = K.sum(y_true)
    return true_positives / (all_positives + K.epsilon())


def precision_batch(y_true: np.ndarray, y_pred: np.ndarray) -> float:
    true_positives = K.sum(K.round(y_true * y_pred))
    predicted_positives = K.sum(K.round(y_pred))
    return true_positives / (predicted_positives + K.epsilon())


def f1_batch(y_true: np.ndarray, y_pred: np.ndarray) -> float:
    prec = precision_batch(y_true, y_pred)
    rec = recall_batch(y_true, y_pred)
    return 2 * ((prec * rec) / (prec + rec + K.epsilon()))


def get_n_epochs() -> int:
    return 2 if IS_TEST else 30

In [9]:
# +
def save_small_sample():
    """Load full data, sample, and dump to file.."""
    (df_train, df_test, df_dev, df_valid), df_books = download_and_process_data()
    df_train = df_train.dropna().sample(frac=0.01)
    df_test = df_test.dropna().sample(frac=0.01)
    df_dev = df_dev.dropna().sample(frac=0.01)
    df_valid = df_valid.dropna().sample(frac=0.01)
    df_all = pd.concat([df_train, df_test, df_dev, df_valid], axis=0)
    df_books = df_books.merge(
        df_all[["book_idx"]].drop_duplicates(), on="book_idx", how="inner"
    )
    with open(SAMPLE_DATA, "wb") as f:
        pickle.dump(df_train, f)
        pickle.dump(df_test, f)
        pickle.dump(df_dev, f)
        pickle.dump(df_valid, f)
        pickle.dump(df_books, f)


def load_small_sample():
    """Load sample data."""
    with open(SAMPLE_DATA, "rb") as f:
        df_train = pickle.load(f)
        df_test = pickle.load(f)
        df_dev = pickle.load(f)
        df_valid = pickle.load(f)
        df_books = pickle.load(f)
        return (df_train, df_test, df_dev, df_valid), df_books


# -


def maybe_download_files(data_dir: str = "data") -> None:
    if not os.path.exists(data_dir):
        os.makedirs(data_dir, exist_ok=True)
        if IS_TEST:
            # Sample data pickle
            gdown.download(SMALL_DATA_URL, output=SAMPLE_DATA, quiet=None)
        else:
            # Books
            gdown.download(YA_BOOKS_URL, output=BOOK_DATA, quiet=None)
            # Interactions
            gdown.download(YA_INTERACTIONS_URL, output=INTERACTIONS_DATA, quiet=None)
            # Reviews
            gdown.download(YA_REVIEWS_URL, output=REVIEWS_DATA, quiet=None)


def get_timestamp(date_str: str) -> datetime.timestamp:
    month_to_int = dict((v, k) for k, v in enumerate(calendar.month_abbr))
    _, month, day, _, _, year = date_str.split()
    dt = datetime(year=int(year), month=month_to_int[month], day=int(day))
    return datetime.timestamp(dt)


def load_data(
    file_name: str, max_to_load: int = 100, filter_dict: Optional[dict] = None
) -> List[Dict[str, Any]]:
    count = 0
    data = []
    filter_dict = filter_dict or {}
    with gzip.open(file_name) as fin:
        for l in fin:
            d = json.loads(l)
            for k, v in filter_dict.items():
                if d[k] not in v:
                    break
            else:
                count += 1
                data.append(d)
                if (max_to_load is not None) and (count >= max_to_load):
                    break
    return data


In [10]:
#from utils import download_and_process_data

(df_train, df_test, df_dev, df_valid), df_books = download_and_process_data()

df_books.head()

INFO:root:Downloading raw data
INFO:root:Processing book data
INFO:numexpr.utils:NumExpr defaulting to 2 threads.
INFO:root:Processing interaction data
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  if sys.path[0] == '':
INFO:root:Processing review data
INFO:root:Joining interaction data


Unnamed: 0,authors,average_rating,book_id,country_code,description,is_ebook,language_code,ratings_count,similar_books,text_reviews_count,title,first_author,book_idx
3,[293603],4.35,10099492,US,It all comes down to this.\nVlad's running out...,True,eng,152,"[25861113, 7430195, 18765937, 6120544, 3247550...",9,Twelfth Grade Kills (The Chronicles of Vladimi...,293603,0
4,[4018722],3.71,22642971,US,The future world is at peace.\nElla Shepherd h...,True,eng,1525,"[20499652, 17934493, 13518102, 16210411, 17149...",428,The Body Electric,4018722,1
5,[6537142],3.89,31556136,US,A gorgeously written and deeply felt literary ...,True,,109,[],45,Like Water,6537142,2
12,"[6455200, 5227552]",3.9,18522274,US,Zoe Vanderveen is on the run with her captor t...,True,en-US,191,"[25063023, 18553080, 17567752, 18126509, 17997...",6,"Volition (The Perception Trilogy, #2)",6455200,3
13,[187837],3.19,17262776,US,"The war is over, but for thirteen-year-old Rac...",True,eng,248,"[16153997, 10836616, 17262238, 16074827, 13628...",68,Little Red Lies,187837,4


In [11]:
df_dev.sample(frac=1, random_state=12).head()

Unnamed: 0,user_idx,book_idxs,book_idx,rating,review_text
567397,21906,"(25315, 10119, 8253, 6133, 13572, 13579, 8966,...",24514,1,'La lluvia en tu habitacion' no es una novela ...
449015,17311,"(21132, 19986, 2462, 6268, 14212, 5607, 7524, ...",7524,1,I loved this. Sweet little short story about a...
427620,16470,"(18839, 10934, 23247, 6363, 31045, 25682, 2457...",9849,0,I think this book can be summed up by the foll...
322779,12455,"(6401, 3276, 10432, 24284, 4285, 24352, 16476,...",4285,0,
411652,15883,"(30888, 20262, 17830, 30416, 12880, 1828, 7907...",8755,1,


In [12]:
POSITIVE = 1
NEGATIVE = 0
ABSTAIN = -1

In [13]:
from snorkel.labeling.lf import labeling_function

In [14]:
from snorkel.labeling.lf import labeling_function

book_to_first_author = dict(zip(df_books.book_idx, df_books.first_author))
first_author_to_books_df = df_books.groupby("first_author")[["book_idx"]].agg(set)
first_author_to_books = dict(
    zip(first_author_to_books_df.index, first_author_to_books_df.book_idx)
)


@labeling_function(
    resources=dict(
        book_to_first_author=book_to_first_author,
        first_author_to_books=first_author_to_books,
    )
)
def shared_first_author(x, book_to_first_author, first_author_to_books):
    author = book_to_first_author[x.book_idx]
    same_author_books = first_author_to_books[author]
    num_read = len(set(x.book_idxs).intersection(same_author_books))
    return POSITIVE if num_read > 15 else ABSTAIN

In [15]:
low_rating_strs = [
    "one star",
    "1 star",
    "two star",
    "2 star",
    "3 star",
    "three star",
    "3.5 star",
    "2.5 star",
    "1 out of 5",
    "2 out of 5",
    "3 out of 5",
]
high_rating_strs = ["5 stars", "five stars", "four stars", "4 stars", "4.5 stars"]


@labeling_function(
    resources=dict(low_rating_strs=low_rating_strs, high_rating_strs=high_rating_strs)
)
def stars_in_review(x, low_rating_strs, high_rating_strs):
    if not isinstance(x.review_text, str):
        return ABSTAIN
    for low_rating_str in low_rating_strs:
        if low_rating_str in x.review_text.lower():
            return NEGATIVE
    for high_rating_str in high_rating_strs:
        if high_rating_str in x.review_text.lower():
            return POSITIVE
    return ABSTAIN

In [16]:
from snorkel.preprocess import preprocessor
from textblob import TextBlob


@preprocessor(memoize=True)
def textblob_polarity(x):
    if isinstance(x.review_text, str):
        x.blob = TextBlob(x.review_text)
    else:
        x.blob = None
    return x


# Label high polarity reviews as positive.
@labeling_function(pre=[textblob_polarity])
def polarity_positive(x):
    if x.blob:
        if x.blob.polarity > 0.3:
            return POSITIVE
    return ABSTAIN


# Label high subjectivity reviews as positive.
@labeling_function(pre=[textblob_polarity])
def subjectivity_positive(x):
    if x.blob:
        if x.blob.subjectivity > 0.75:
            return POSITIVE
    return ABSTAIN


# Label low polarity reviews as negative.
@labeling_function(pre=[textblob_polarity])
def polarity_negative(x):
    if x.blob:
        if x.blob.polarity < 0.0:
            return NEGATIVE
    return ABSTAIN

In [17]:
from snorkel.labeling import PandasLFApplier, LFAnalysis

lfs = [
    stars_in_review,
    shared_first_author,
    polarity_positive,
    subjectivity_positive,
    polarity_negative,
]

applier = PandasLFApplier(lfs)
L_dev = applier.apply(df_dev)

100%|██████████| 8141/8141 [00:08<00:00, 925.71it/s]


In [18]:
LFAnalysis(L_dev, lfs).lf_summary(df_dev.rating.values)



Unnamed: 0,j,Polarity,Coverage,Overlaps,Conflicts,Correct,Incorrect,Emp. Acc.
stars_in_review,0,"[0, 1]",0.019285,0.005528,0.002457,132,25,0.840764
shared_first_author,1,[1],0.068296,0.003071,0.001351,356,200,0.640288
polarity_positive,2,[1],0.047169,0.0156,0.001106,301,83,0.783854
subjectivity_positive,3,[1],0.016828,0.012161,0.00172,103,34,0.751825
polarity_negative,4,[0],0.015477,0.002702,0.002457,65,61,0.515873


In [19]:
from snorkel.labeling.model import LabelModel

L_train = applier.apply(df_train)
label_model = LabelModel(cardinality=2, verbose=True)
label_model.fit(L_train, n_epochs=5000, seed=123, log_freq=20, lr=0.01)
preds_train = label_model.predict(L_train)

100%|██████████| 797253/797253 [14:44<00:00, 901.07it/s]
INFO:root:Computing O...
INFO:root:Estimating \mu...
  0%|          | 0/5000 [00:00<?, ?epoch/s]INFO:root:[0 epochs]: TRAIN:[loss=0.002]
  0%|          | 1/5000 [00:00<11:49,  7.05epoch/s]INFO:root:[20 epochs]: TRAIN:[loss=0.000]
INFO:root:[40 epochs]: TRAIN:[loss=0.000]
INFO:root:[60 epochs]: TRAIN:[loss=0.000]
INFO:root:[80 epochs]: TRAIN:[loss=0.000]
INFO:root:[100 epochs]: TRAIN:[loss=0.000]
  2%|▏         | 105/5000 [00:00<00:09, 523.86epoch/s]INFO:root:[120 epochs]: TRAIN:[loss=0.000]
INFO:root:[140 epochs]: TRAIN:[loss=0.000]
INFO:root:[160 epochs]: TRAIN:[loss=0.000]
INFO:root:[180 epochs]: TRAIN:[loss=0.000]
INFO:root:[200 epochs]: TRAIN:[loss=0.000]
  4%|▍         | 217/5000 [00:00<00:06, 771.35epoch/s]INFO:root:[220 epochs]: TRAIN:[loss=0.000]
INFO:root:[240 epochs]: TRAIN:[loss=0.000]
INFO:root:[260 epochs]: TRAIN:[loss=0.000]
INFO:root:[280 epochs]: TRAIN:[loss=0.000]
INFO:root:[300 epochs]: TRAIN:[loss=0.000]
  6%|▌

In [20]:
from snorkel.labeling import filter_unlabeled_dataframe

df_train_filtered, preds_train_filtered = filter_unlabeled_dataframe(
    df_train, preds_train, L_train
)
df_train_filtered["rating"] = preds_train_filtered

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  


In [22]:
import numpy as np
import tensorflow as tf
#from utils import precision_batch, recall_batch, f1_batch

n_books = max([max(df.book_idx) for df in [df_train, df_test, df_dev, df_valid]])


# Keras model to predict rating given book_idxs and book_idx.
def get_model(embed_dim=64, hidden_layer_sizes=[32]):
    # Compute embedding for book_idxs.
    len_book_idxs = tf.keras.layers.Input([])
    book_idxs = tf.keras.layers.Input([None])
    # book_idxs % n_books is to prevent crashing if a book_idx in book_idxs is > n_books.
    book_idxs_emb = tf.keras.layers.Embedding(n_books, embed_dim)(book_idxs % n_books)
    book_idxs_emb = tf.math.divide(
        tf.keras.backend.sum(book_idxs_emb, axis=1), tf.expand_dims(len_book_idxs, 1)
    )
    # Compute embedding for book_idx.
    book_idx = tf.keras.layers.Input([])
    book_idx_emb = tf.keras.layers.Embedding(n_books, embed_dim)(book_idx)
    input_layer = tf.keras.layers.concatenate([book_idxs_emb, book_idx_emb], 1)
    # Build Multi Layer Perceptron on input layer.
    cur_layer = input_layer
    for size in hidden_layer_sizes:
        tf.keras.layers.Dense(size, activation=tf.nn.relu)(cur_layer)
    output_layer = tf.keras.layers.Dense(1, activation=tf.nn.sigmoid)(cur_layer)
    # Create and compile keras model.
    model = tf.keras.Model(
        inputs=[len_book_idxs, book_idxs, book_idx], outputs=[output_layer]
    )
    model.compile(
        "Adagrad",
        "binary_crossentropy",
        metrics=["accuracy", f1_batch, precision_batch, recall_batch],
    )
    return model

In [23]:
# Generator to turn dataframe into data points.
def get_data_points_generator(df):
    def generator():
        for book_idxs, book_idx, rating in zip(df.book_idxs, df.book_idx, df.rating):
            # Remove book_idx from book_idxs so the model can't just look it up.
            book_idxs = tuple(filter(lambda x: x != book_idx, book_idxs))
            yield {
                "len_book_idxs": len(book_idxs),
                "book_idxs": book_idxs,
                "book_idx": book_idx,
                "label": rating,
            }
            if rating == 1:
                # Generate a random negative book_id not in book_idxs.
                random_negative = np.random.randint(0, n_books)
                while random_negative in book_idxs:
                    random_negative = np.random.randint(0, n_books)
                yield {
                    "len_book_idxs": len(book_idxs),
                    "book_idxs": book_idxs,
                    "book_idx": random_negative,
                    "label": 0,
                }

    return generator


def get_data_tensors(df):
    # Use generator to get data points each epoch, along with shuffling and batching.
    padded_shapes = {
        "len_book_idxs": [],
        "book_idxs": [None],
        "book_idx": [],
        "label": [],
    }
    dataset = (
        tf.data.Dataset.from_generator(
            get_data_points_generator(df), {k: tf.int64 for k in padded_shapes}
        )
        .shuffle(123)
        .repeat(None)
        .padded_batch(batch_size=256, padded_shapes=padded_shapes)
    )
    tensor_dict = tf.compat.v1.data.make_one_shot_iterator(dataset).get_next()
    return (
        (
            tensor_dict["len_book_idxs"],
            tensor_dict["book_idxs"],
            tensor_dict["book_idx"],
        ),
        tensor_dict["label"],
    )

In [24]:
#from utils import get_n_epochs

model = get_model()

X_train, Y_train = get_data_tensors(df_train_filtered)
X_valid, Y_valid = get_data_tensors(df_valid)
model.fit(
    X_train,
    Y_train,
    steps_per_epoch=300,
    validation_data=(X_valid, Y_valid),
    validation_steps=40,
    epochs=get_n_epochs(),
    verbose=1,
)

Epoch 1/30



Epoch 2/30
Epoch 3/30
Epoch 4/30
Epoch 5/30
Epoch 6/30
Epoch 7/30
Epoch 8/30
Epoch 9/30
Epoch 10/30
Epoch 11/30
Epoch 12/30
Epoch 13/30
Epoch 14/30
Epoch 15/30
Epoch 16/30
Epoch 17/30
Epoch 18/30
Epoch 19/30
Epoch 20/30
Epoch 21/30
Epoch 22/30
Epoch 23/30
Epoch 24/30
Epoch 25/30
Epoch 26/30





<keras.callbacks.History at 0x7f496c840c50>

In [25]:
X_test, Y_test = get_data_tensors(df_test)
_ = model.evaluate(X_test, Y_test, steps=30)





