Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions official/recommendation/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,6 @@
TEST_RATINGS_FILENAME = 'test-ratings.csv'
TEST_NEG_FILENAME = 'test-negative.csv'

TRAIN_DATA = 'train_data.csv'
TEST_DATA = 'test_data.csv'

USER = "user_id"
ITEM = "item_id"
RATING = "rating"
41 changes: 29 additions & 12 deletions official/recommendation/data_download.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,19 +21,23 @@
from __future__ import division
from __future__ import print_function

import argparse
import collections
import os
import sys
import time
import zipfile

# pylint: disable=g-bad-import-order
import numpy as np
import pandas as pd
from six.moves import urllib # pylint: disable=redefined-builtin
from absl import app as absl_app
from absl import flags
import tensorflow as tf
# pylint: enable=g-bad-import-order

from official.recommendation import constants # pylint: disable=g-bad-import-order
from official.recommendation import constants
from official.utils.flags import core as flags_core

# URL to download dataset
_DATA_URL = "http://files.grouplens.org/datasets/movielens/"
Expand Down Expand Up @@ -306,6 +310,10 @@ def main(_):

make_dir(FLAGS.data_dir)

assert FLAGS.dataset, (
"Please specify which dataset to download. "
"Two datasets are available: ml-1m and ml-20m.")

# Download the zip dataset
dataset_zip = FLAGS.dataset + ".zip"
file_path = os.path.join(FLAGS.data_dir, dataset_zip)
Expand Down Expand Up @@ -335,14 +343,23 @@ def _progress(count, block_size, total_size):
parse_file_to_csv(FLAGS.data_dir, FLAGS.dataset)


def define_data_download_flags():
"""Add flags specifying data download arguments."""
flags.DEFINE_string(
name="data_dir", default="/tmp/movielens-data/",
help=flags_core.help_wrap(
"Directory to download and extract data."))

flags.DEFINE_enum(
name="dataset", default=None,
enum_values=["ml-1m", "ml-20m"], case_sensitive=False,
help=flags_core.help_wrap(
"Dataset to be trained and evaluated. Two datasets are available "
": ml-1m and ml-20m."))


if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument(
"--data_dir", type=str, default="/tmp/movielens-data/",
help="Directory to download data and extract the zip.")
parser.add_argument(
"--dataset", type=str, default="ml-1m", choices=["ml-1m", "ml-20m"],
help="Dataset to be trained and evaluated.")

FLAGS, unparsed = parser.parse_known_args()
tf.app.run(argv=[sys.argv[0]] + unparsed)
tf.logging.set_verbosity(tf.logging.INFO)
define_data_download_flags()
FLAGS = flags.FLAGS
absl_app.run(main)
92 changes: 35 additions & 57 deletions official/recommendation/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,12 @@
Load the training dataset and evaluation dataset from csv file into memory.
Prepare input for model training and evaluation.
"""
import time

import numpy as np
from six.moves import xrange # pylint: disable=redefined-builtin
import tensorflow as tf

from official.recommendation import constants # pylint: disable=g-bad-import-order

# The column names and types of csv file
_CSV_COLUMN_NAMES = [constants.USER, constants.ITEM, constants.RATING]
_CSV_TYPES = [[0], [0], [0]]

# The buffer size for shuffling train dataset.
_SHUFFLE_BUFFER_SIZE = 1024

Expand All @@ -37,7 +31,7 @@ class NCFDataSet(object):
"""A class containing data information for model training and evaluation."""

def __init__(self, train_data, num_users, num_items, num_negatives,
true_items, all_items):
true_items, all_items, all_eval_data):
"""Initialize NCFDataset class.

Args:
Expand All @@ -50,17 +44,19 @@ def __init__(self, train_data, num_users, num_items, num_negatives,
evaluation. Each entry is a latest positive instance for one user.
all_items: A nested list, all items for evaluation, and each entry is the
evaluation items for one user.
all_eval_data: A numpy array of eval/test dataset.
"""
self.train_data = train_data
self.num_users = num_users
self.num_items = num_items
self.num_negatives = num_negatives
self.eval_true_items = true_items
self.eval_all_items = all_items
self.all_eval_data = all_eval_data


def load_data(file_name):
"""Load data from a csv file which splits on \t."""
"""Load data from a csv file which splits on tab key."""
lines = tf.gfile.Open(file_name, "r").readlines()

# Process the file line by line
Expand Down Expand Up @@ -122,13 +118,11 @@ def data_preprocessing(train_fname, test_fname, test_neg_fname, num_negatives):
all_items.append(items) # All items (including positive and negative items)
all_test_data.extend(users_items) # Generate test dataset

# Save test dataset into csv file
np.savetxt(constants.TEST_DATA, np.asarray(all_test_data).astype(int),
fmt="%i", delimiter=",")

# Create NCFDataset object
ncf_dataset = NCFDataSet(
train_data, num_users, num_items, num_negatives, true_items, all_items)
train_data, num_users, num_items, num_negatives, true_items, all_items,
np.asarray(all_test_data)
)

return ncf_dataset

Expand All @@ -144,6 +138,9 @@ def generate_train_dataset(train_data, num_items, num_negatives):
num_items: An integer, the number of items in positive training instances.
num_negatives: An integer, the number of negative training instances
following positive training instances. It is 4 by default.

Returns:
A numpy array of training dataset.
"""
all_train_data = []
# A set with user-item tuples
Expand All @@ -158,13 +155,10 @@ def generate_train_dataset(train_data, num_items, num_negatives):
j = np.random.randint(num_items)
all_train_data.append([u, j, 0])

# Save the train dataset into a csv file
np.savetxt(constants.TRAIN_DATA, np.asarray(all_train_data).astype(int),
fmt="%i", delimiter=",")
return np.asarray(all_train_data)


def input_fn(training, batch_size, repeat=1, ncf_dataset=None,
num_parallel_calls=1):
def input_fn(training, batch_size, ncf_dataset, repeat=1):
"""Input function for model training and evaluation.

The train input consists of 1 positive instance (user and item have
Expand All @@ -176,55 +170,39 @@ def input_fn(training, batch_size, repeat=1, ncf_dataset=None,
Args:
training: A boolean flag for training mode.
batch_size: An integer, batch size for training and evaluation.
ncf_dataset: An NCFDataSet object, which contains the information about
training and test data.
repeat: An integer, how many times to repeat the dataset.
ncf_dataset: An NCFDataSet object, which contains the information to
generate negative training instances.
num_parallel_calls: An integer, number of cpu cores for parallel input
processing.

Returns:
dataset: A tf.data.Dataset object containing examples loaded from the files.
"""
# Default test file name
file_name = constants.TEST_DATA

# Generate random negative instances for training in each epoch
if training:
t1 = time.time()
generate_train_dataset(
train_data = generate_train_dataset(
ncf_dataset.train_data, ncf_dataset.num_items,
ncf_dataset.num_negatives)
file_name = constants.TRAIN_DATA
tf.logging.info(
"Generating training instances: {:.1f}s".format(time.time() - t1))

# Create a dataset containing the text lines.
dataset = tf.data.TextLineDataset(file_name)

# Test dataset only has two fields while train dataset has three
num_cols = len(_CSV_COLUMN_NAMES) - 1
# Shuffle the dataset for training
if training:
# Get train features and labels
train_features = [
(constants.USER, np.expand_dims(train_data[:, 0], axis=1)),
(constants.ITEM, np.expand_dims(train_data[:, 1], axis=1))
]
train_labels = [
(constants.RATING, np.expand_dims(train_data[:, 2], axis=1))]

dataset = tf.data.Dataset.from_tensor_slices(
(dict(train_features), dict(train_labels))
)
dataset = dataset.shuffle(buffer_size=_SHUFFLE_BUFFER_SIZE)
num_cols += 1

def _parse_csv(line):
"""Parse each line of the csv file."""
# Decode the line into its fields
fields = tf.decode_csv(line, record_defaults=_CSV_TYPES[0:num_cols])
fields = [tf.expand_dims(field, axis=0) for field in fields]

# Pack the result into a dictionary
features = dict(zip(_CSV_COLUMN_NAMES[0:num_cols], fields))
# Separate the labels from the features for training
if training:
labels = features.pop(constants.RATING)
return features, labels
# Return features only for test/prediction
return features

# Parse each line into a dictionary
dataset = dataset.map(_parse_csv, num_parallel_calls=num_parallel_calls)
else:
# Create eval/test dataset
test_user = ncf_dataset.all_eval_data[:, 0]
test_item = ncf_dataset.all_eval_data[:, 1]
test_features = [
(constants.USER, np.expand_dims(test_user, axis=1)),
(constants.ITEM, np.expand_dims(test_item, axis=1))]

dataset = tf.data.Dataset.from_tensor_slices(dict(test_features))

# Repeat and batch the dataset
dataset = dataset.repeat(repeat)
Expand Down
103 changes: 103 additions & 0 deletions official/recommendation/dataset_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
# Copyright 2018 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Unit tests for dataset.py."""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

import os

import numpy as np
import tensorflow as tf # pylint: disable=g-bad-import-order

from official.recommendation import dataset

_TRAIN_FNAME = os.path.join(
os.path.dirname(__file__), "unittest_data/test_train_ratings.csv")
_TEST_FNAME = os.path.join(
os.path.dirname(__file__), "unittest_data/test_eval_ratings.csv")
_TEST_NEG_FNAME = os.path.join(
os.path.dirname(__file__), "unittest_data/test_eval_negative.csv")
_NUM_NEG = 4


class DatasetTest(tf.test.TestCase):

def test_load_data(self):
data = dataset.load_data(_TEST_FNAME)
self.assertEqual(len(data), 2)

self.assertEqual(data[0][0], 0)
self.assertEqual(data[0][2], 1)

self.assertEqual(data[-1][0], 1)
self.assertEqual(data[-1][2], 1)

def test_data_preprocessing(self):
ncf_dataset = dataset.data_preprocessing(
_TRAIN_FNAME, _TEST_FNAME, _TEST_NEG_FNAME, _NUM_NEG)

# Check train data preprocessing
self.assertAllEqual(np.array(ncf_dataset.train_data)[:, 2],
np.full(len(ncf_dataset.train_data), 1))
self.assertEqual(ncf_dataset.num_users, 2)
self.assertEqual(ncf_dataset.num_items, 175)

# Check test dataset
test_dataset = ncf_dataset.all_eval_data
first_true_item = test_dataset[100]
self.assertEqual(first_true_item[1], ncf_dataset.eval_true_items[0])
self.assertEqual(first_true_item[1], ncf_dataset.eval_all_items[0][-1])

last_gt_item = test_dataset[-1]
self.assertEqual(last_gt_item[1], ncf_dataset.eval_true_items[-1])
self.assertEqual(last_gt_item[1], ncf_dataset.eval_all_items[-1][-1])

test_list = test_dataset.tolist()

first_test_items = [x[1] for x in test_list if x[0] == 0]
self.assertAllEqual(first_test_items, ncf_dataset.eval_all_items[0])

last_test_items = [x[1] for x in test_list if x[0] == 1]
self.assertAllEqual(last_test_items, ncf_dataset.eval_all_items[-1])

def test_generate_train_dataset(self):
# Check train dataset
ncf_dataset = dataset.data_preprocessing(
_TRAIN_FNAME, _TEST_FNAME, _TEST_NEG_FNAME, _NUM_NEG)

train_dataset = dataset.generate_train_dataset(
ncf_dataset.train_data, ncf_dataset.num_items, _NUM_NEG)

# Each user has 1 positive instance followed by _NUM_NEG negative instances
train_data_0 = train_dataset[0]
self.assertEqual(train_data_0[2], 1)
for i in range(1, _NUM_NEG + 1):
train_data = train_dataset[i]
self.assertEqual(train_data_0[0], train_data[0])
self.assertNotEqual(train_data_0[1], train_data[1])
self.assertEqual(0, train_data[2])

train_data_last = train_dataset[-1 - _NUM_NEG]
self.assertEqual(train_data_last[2], 1)
for i in range(-1, -_NUM_NEG):
train_data = train_dataset[i]
self.assertEqual(train_data_last[0], train_data[0])
self.assertNotEqual(train_data_last[1], train_data[1])
self.assertEqual(0, train_data[2])


if __name__ == "__main__":
tf.test.main()
Loading