New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Wide Deep refactor and deep movies #4506

Merged
merged 39 commits into from Jun 20, 2018

Conversation

Projects
None yet
6 participants
@robieta
Copy link
Collaborator

robieta commented Jun 11, 2018

In order to prep wide-deep to run on more substantial hardware, this PR adds the Kaggle movie dataset as an option, and refactors the wide-deep code to allow for multiple datasets.

@robieta robieta requested review from karmel and tensorflow/tf-garden-team as code owners Jun 11, 2018

@googlebot googlebot added the cla: yes label Jun 11, 2018

@k-w-w
Copy link
Member

k-w-w left a comment

A few things. Looks great overall

build_estimator_fn, flags_obj, tensors_to_log, early_stop=False):
"""Define training loop."""
# Clean up the model directory if present
shutil.rmtree(flags_obj.model_dir, ignore_errors=True)

This comment has been minimized.

@k-w-w

k-w-w Jun 11, 2018

Member

Given that none of our other models remove the model directory, can we remove this line?

This comment has been minimized.

@robieta

robieta Jun 12, 2018

Collaborator

I added a --clean flag to base flags. That way it's possible to get this behavior, but by default we don't blow away user's progress.

Args:
model: Estimator object
model_type: string indicating model type. "wide", "deep" or "wide_deep"
export_dir: directory to export the model.

This comment has been minimized.

@k-w-w

k-w-w Jun 11, 2018

Member

Don't forget to add model_column_fn to args

import numpy as np
import pandas as pd
import six
from six.moves import urllib # pylint: disable=redefined-builtin

This comment has been minimized.

@k-w-w

k-w-w Jun 11, 2018

Member

Not used?

_RATINGS_SMALL = "ratings_small.csv"
_METADATA = "movies_metadata.csv"
_FILES = ["credits.csv", "keywords.csv", "links.csv", "links_small.csv",
_METADATA, "ratings.csv", "ratings_small.csv"]

This comment has been minimized.

@k-w-w

k-w-w Jun 11, 2018

Member

change "ratings.csv" and "ratings_small.csv" to _RATINGS and _RATINGS_SMALL

"rating": tf.FixedLenFeature([1], dtype=tf.float32)
}

_BUFFER_SIZE = {

This comment has been minimized.

@k-w-w

k-w-w Jun 11, 2018

Member

What is this buffer size/subdir used for?

This comment has been minimized.

@robieta

robieta Jun 12, 2018

Collaborator

It is used to validate the binary representation of the data. So if the file construction is interrupted midway through, it won't have the same number of bytes and the program will know to reconstruct it.

@karmel

This comment has been minimized.

Copy link
Member

karmel commented Jun 11, 2018

Haven't read through all of this yet, but it appears to be the MovieLens dataset, not a kaggle dataset. Is that correct? If so, @yhliang2018 used the same for the recommendation model, IIRC. If the same, let's have one download script. If different... can it be the same?

'set the --data_dir argument to the correct path.' % data_file)

def parse_csv(value):
print('Parsing', data_file)

This comment has been minimized.

@qlzh727

qlzh727 Jun 12, 2018

Member

please see tf.logging.info

for line in temp_eval_file:
line = line.strip()
line = line.replace(', ', ',')
if not line or ',' not in line:

This comment has been minimized.

@qlzh727

qlzh727 Jun 12, 2018

Member

Should u log a warning here since this line is bit abnormal?

This comment has been minimized.

@robieta

robieta Jun 12, 2018

Collaborator

This code predates me (I'm just moving it. And indeed, git isn't respecting git mv...), but I think it's just expected coercing the file into the exact format expected. It looks like there are two empty lines and one with some sort of label that aren't actual data points.

line = line.replace(', ', ',')
if not line or ',' not in line:
continue
if line[-1] == '.':

This comment has been minimized.

@qlzh727

qlzh727 Jun 12, 2018

Member

What's the case that it has a full stop as line ending? Does it count as a normal data?

This comment has been minimized.

@robieta

robieta Jun 12, 2018

Collaborator

Yes, the last column is >50k. or <=50k., so this is just getting rid of those periods.

def download(data_dir):
"""Download census data if it is not already present."""
if not tf.gfile.Exists(data_dir):
tf.gfile.MkDir(data_dir)

This comment has been minimized.

@qlzh727

qlzh727 Jun 12, 2018

Member

use MakeDirs to create parent directory if it does not exist.


def build_model_columns():
"""Builds a set of wide and deep feature columns."""
# Continuous columns

This comment has been minimized.

@qlzh727

qlzh727 Jun 12, 2018

Member

What is "Continuous column"? Do u mean numeric column with continuous number?

This comment has been minimized.

@robieta

robieta Jun 12, 2018

Collaborator

Changed to # Continuous variable columns for clarity.

columns = tf.decode_csv(value, record_defaults=_CSV_COLUMN_DEFAULTS)
features = dict(zip(_CSV_COLUMNS, columns))
labels = features.pop('income_bracket')
return features, tf.equal(labels, '>50K')

This comment has been minimized.

@qlzh727

qlzh727 Jun 12, 2018

Member

What's the meaning of the second return value? probably worth a comment.

if flags_obj.download_if_missing:
census_dataset.download(flags_obj.data_dir)

train_file = os.path.join(flags_obj.data_dir, 'adult.data')

This comment has been minimized.

@qlzh727

qlzh727 Jun 12, 2018

Member

It will be nice to import the const value from dataset.


temp_dir = tempfile.mkdtemp()
try:
subprocess.call(args=["kaggle", "datasets", "download", "-d",

This comment has been minimized.

@qlzh727

qlzh727 Jun 12, 2018

Member

Err, it will be nice to have a python API rather than use the cli from python.

assert set(files) == set(_FILES)
for i in files:
tf.gfile.Copy(os.path.join(temp_dir, i), os.path.join(data_dir, i))
print(i.ljust(20), "copied")

This comment has been minimized.

@qlzh727

qlzh727 Jun 12, 2018

Member

tf.logging.info?

metadata["budget"] = metadata["budget"].astype("float")

metadata["genres"] = metadata["genres"].fillna("[]").apply(
ast.literal_eval).apply(

This comment has been minimized.

@qlzh727

qlzh727 Jun 12, 2018

Member

I am bit confused about this. What's the type of each element in x?

@yhliang2018
Copy link
Contributor

yhliang2018 left a comment

Good example to handle large dataset. Thanks! I will try it on recommendation model.

raise

if tf.gfile.Exists(data_dir):
tf.gfile.DeleteRecursively(data_dir)

This comment has been minimized.

@yhliang2018

yhliang2018 Jun 12, 2018

Contributor

Could we reuse the downloaded data, rather than deleting it?

from official.utils.flags import core as flags_core


_RATINGS = "ratings.csv"

This comment has been minimized.

@yhliang2018

yhliang2018 Jun 12, 2018

Contributor

Looks like all other py files (like cecus_dataset.py) use single quote for string. Should we keep it consistent?

dataset = dataset.batch(batch_size)
return dataset.prefetch(1)

return input_fn

This comment has been minimized.

@yhliang2018

yhliang2018 Jun 12, 2018

Contributor

Is this _write_to_buffer the magic here? For the recommendation model with numpy array data, should I replace the dataframe with numpy array and perform desalinization on the serialized binary file?

@yhliang2018

This comment has been minimized.

Copy link
Contributor

yhliang2018 commented Jun 12, 2018

@karmel Yes, recommendation model uses MovieLens dataset. But I see the two dataests have different fields and preprocessing pipelines. Eventually, we only use three columns in Movielens dataset, which is simpler compared to the Kaggle dataset.

@robieta
Copy link
Collaborator

robieta left a comment

Addressed some PR comments. Based on discussions I will refactor to use straight MovieLens dataset and then ping everyone. Thanks for the comments.

for line in temp_eval_file:
line = line.strip()
line = line.replace(', ', ',')
if not line or ',' not in line:

This comment has been minimized.

@robieta

robieta Jun 12, 2018

Collaborator

This code predates me (I'm just moving it. And indeed, git isn't respecting git mv...), but I think it's just expected coercing the file into the exact format expected. It looks like there are two empty lines and one with some sort of label that aren't actual data points.

line = line.replace(', ', ',')
if not line or ',' not in line:
continue
if line[-1] == '.':

This comment has been minimized.

@robieta

robieta Jun 12, 2018

Collaborator

Yes, the last column is >50k. or <=50k., so this is just getting rid of those periods.


def build_model_columns():
"""Builds a set of wide and deep feature columns."""
# Continuous columns

This comment has been minimized.

@robieta

robieta Jun 12, 2018

Collaborator

Changed to # Continuous variable columns for clarity.

"rating": tf.FixedLenFeature([1], dtype=tf.float32)
}

_BUFFER_SIZE = {

This comment has been minimized.

@robieta

robieta Jun 12, 2018

Collaborator

It is used to validate the binary representation of the data. So if the file construction is interrupted midway through, it won't have the same number of bytes and the program will know to reconstruct it.

build_estimator_fn, flags_obj, tensors_to_log, early_stop=False):
"""Define training loop."""
# Clean up the model directory if present
shutil.rmtree(flags_obj.model_dir, ignore_errors=True)

This comment has been minimized.

@robieta

robieta Jun 12, 2018

Collaborator

I added a --clean flag to base flags. That way it's possible to get this behavior, but by default we don't blow away user's progress.

robieta added some commits Jun 7, 2018

@robieta robieta force-pushed the feat/wide_deep_movielens branch from 4bc60ff to 6471f80 Jun 15, 2018

robieta added some commits Jun 15, 2018

@robieta

This comment has been minimized.

Copy link
Collaborator

robieta commented Jun 18, 2018

Well, this PR has rather sprawled.

Wide Deep and Recommendation now both use the same movielens base dataset, with application specific additional code in their respective folders. They also both use utils.data.file_io to run their datasets from binary files rather than from_tensor_slices().

@karmel @yhliang2018 @qlzh727 PTAL

robieta added some commits Jun 18, 2018

# pylint: enable=g-bad-import-order

from official.utils.flags import core as flags_core

This comment has been minimized.

@qlzh727

qlzh727 Jun 18, 2018

Member

Should have double lines between import and consts.

NUM_ITEM_IDS = 3952

MAX_RATING = 5

This comment has been minimized.

@qlzh727

qlzh727 Jun 18, 2018

Member

No need of double lines between consts.

whole number ratings while the 20m dataset allows half integer ratings.
"""
if dataset not in DATASETS:
raise ValueError("dataset must be one of: {}".format(",".join(DATASETS)))

This comment has been minimized.

@qlzh727

qlzh727 Jun 18, 2018

Member

Can you also log the input param value "dataset" as well? This might be helpful for troubleshooting the typo.

tf.gfile.DeleteRecursively(temp_dir)


def _write_csv(input_path, output_path, names, skip_first, separator=","):

This comment has been minimized.

@qlzh727

qlzh727 Jun 18, 2018

Member

From the comment, it seems that the function mainly does transformation, eg encoding. Should we just call it _transform_csv? or _cleanup_csv?

continue # ignore existing labels in the csv

line = line.decode("utf-8", errors="ignore")
fields = line.split(separator)

This comment has been minimized.

@qlzh727

qlzh727 Jun 18, 2018

Member

Just want to make sure that we process the data with ',' as separator correctly. Usually data with ',' in content will quote the comma, and in this case, we didn't handle it correctly.

This comment has been minimized.

@robieta

robieta Jun 18, 2018

Collaborator

Yes. One of the two datasets uses commas as a separator, and has already quoted the fields with commas in the literal text. This is there to avoid double quoting those entries.

"rebuilding buffer.".format(buffer_path, actual_size, expected_size))
tf.gfile.Remove(buffer_path)

if dataframe is None:

This comment has been minimized.

@qlzh727

qlzh727 Jun 18, 2018

Member

Probably this check should happen first, to avoid the unnecessary remove of existing file.

This comment has been minimized.

@robieta

robieta Jun 18, 2018

Collaborator

I don't think this can be reordered. If a valid buffer does exist we want to exit early. Note the removal doesn't depend on the content of the input dataframe. And if it fails the check the removal is absolutely necessary, because otherwise we will concatenate onto an incomplete buffer fragment.

tf.gfile.MakeDirs(buffer_folder)
buffer_path = os.path.join(buffer_folder, str(uuid.uuid4()))
_GARBAGE_COLLECTOR.register(buffer_path)
write_to_buffer(dataframe, buffer_path, columns)

This comment has been minimized.

@qlzh727

qlzh727 Jun 18, 2018

Member

There is a return value from this method, shouldn't we just return the value here?

return buffer_path


def iter_shard_dataframe(df, rows_per_core=1000):

This comment has been minimized.

@qlzh727

qlzh727 Jun 18, 2018

Member

I think this class and methods need some unit test

This comment has been minimized.

@robieta

robieta Jun 18, 2018

Collaborator

I think you're right.

def _shard_dict_to_examples(shard_dict):
"""Converts a dict of arrays into a list of example bytes."""
n = [i for i in shard_dict.values()][0].shape[0]
feature_list = [{} for _ in range(n)]

This comment has been minimized.

@qlzh727

qlzh727 Jun 18, 2018

Member

probably can be changed to [{}] * n

This comment has been minimized.

@robieta

robieta Jun 18, 2018

Collaborator
a = [{}] * 5
a[0]["foo"] = 1
print(a)

b = [{} for _ in range(5)]
b[0]["foo"] = 1
print(b)
[{'foo': 1}, {'foo': 1}, {'foo': 1}, {'foo': 1}, {'foo': 1}]
[{'foo': 1}, {}, {}, {}, {}]

This comment has been minimized.

@qlzh727

qlzh727 Jun 18, 2018

Member

Ah, thanks for the explanation.


def _shard_dict_to_examples(shard_dict):
"""Converts a dict of arrays into a list of example bytes."""
n = [i for i in shard_dict.values()][0].shape[0]

This comment has been minimized.

@qlzh727

qlzh727 Jun 18, 2018

Member

I am bit confused about this, are u assuming all the item in shard_dict has same size and shape?

This comment has been minimized.

@robieta

robieta Jun 18, 2018

Collaborator

The pandas dataframes cannot be passed to map because they cannot be pickled. So instead I convert them into a dict with the column names and the underlying arrays. Once inside the map function I infer the shape from the first one. In order to do this safely I have a assert len(set([v.shape[0] for v in inp.values()])) == 1 check before entering the map. The reason that I've done this is that I try whenever possible to avoid throwing exceptions inside of a multiprocessing pool.

robieta added some commits Jun 18, 2018

]),

dict(row_count=10, cpu_count=4, expected=[
[[0, 1], [2, 3, 4], [5, 6], [7, 8, 9]]

This comment has been minimized.

@qlzh727

qlzh727 Jun 19, 2018

Member

Is there any reason that group 2 and 4 have 1 more item? It is random when assigning item to bucket? Should it have a deterministic order?

This comment has been minimized.

@qlzh727

qlzh727 Jun 19, 2018

Member

Can you address this comment?

This comment has been minimized.

@robieta

robieta Jun 19, 2018

Collaborator

Sorry I missed this. Sharding is done using np.linspace. I believe you see this pattern because it does the spacing in floats and then casts to ints. It should be deterministic.

}


class FixedCoreCount(object):

This comment has been minimized.

@qlzh727

qlzh727 Jun 19, 2018

Member

To make this shorter, you can use the contextmanager like below:

@contextmanager
def fixed_core_count(cpu_count):
   old_count_fn = multiprocessing.cpu_count
   multiprocessing.cpu_count = lambda: cpu_count
   yield
   multiprocessing.cpu_count = old_count_fn

This comment has been minimized.

@robieta

robieta Jun 19, 2018

Collaborator

This is much better. Changed.

def _serialize_deserialize(self, num_cores=1):
n = 20

np.random.seed(34523)

This comment has been minimized.

@qlzh727

qlzh727 Jun 19, 2018

Member

What is this magic seed? Is it chosen randomly? or can we use a more normal one, eg 0 or 1?

self._test_sharding(**_TEST_CASES[7])

def _serialize_deserialize(self, num_cores=1):
n = 20

This comment has been minimized.

@qlzh727

qlzh727 Jun 19, 2018

Member

What is n? number or rows?

@qlzh727
Copy link
Member

qlzh727 left a comment

Please wait for the final approval from @karmel

@karmel

karmel approved these changes Jun 20, 2018

@robieta robieta merged commit 20070ca into master Jun 20, 2018

4 checks passed

Lint Python Files Internal CI build successful
Details
Ubuntu Python2 Internal CI build successful
Details
Ubuntu Python3 Internal CI build successful
Details
cla/google All necessary CLAs are signed

@robieta robieta deleted the feat/wide_deep_movielens branch Jun 20, 2018

djoshea added a commit to djoshea/models that referenced this pull request Jul 19, 2018

Wide Deep refactor and deep movies (tensorflow#4506)
* begin branch

* finish download script

* rename download to dataset

* intermediate commit

* intermediate commit

* misc tweaks

* intermediate commit

* intermediate commit

* intermediate commit

* delint and update census test.

* add movie tests

* delint

* fix py2 issue

* address PR comments

* intermediate commit

* intermediate commit

* intermediate commit

* finish wide deep transition to vanilla movielens

* delint

* intermediate commit

* intermediate commit

* intermediate commit

* intermediate commit

* fix import

* add default ncf csv construction

* change default on download_if_missing

* shard and vectorize example serialization

* fix import

* update ncf data unittests

* delint

* delint

* more delinting

* fix wide-deep movielens serialization

* address PR comments

* add file_io tests

* investigate wide-deep test failure

* remove hard coded path and properly use flags.

* address file_io test PR comments

* missed a hash_bucked_size

Jay-Roberts pushed a commit to FLIR/tensorflow-models that referenced this pull request Oct 18, 2018

Wide Deep refactor and deep movies (tensorflow#4506)
* begin branch

* finish download script

* rename download to dataset

* intermediate commit

* intermediate commit

* misc tweaks

* intermediate commit

* intermediate commit

* intermediate commit

* delint and update census test.

* add movie tests

* delint

* fix py2 issue

* address PR comments

* intermediate commit

* intermediate commit

* intermediate commit

* finish wide deep transition to vanilla movielens

* delint

* intermediate commit

* intermediate commit

* intermediate commit

* intermediate commit

* fix import

* add default ncf csv construction

* change default on download_if_missing

* shard and vectorize example serialization

* fix import

* update ncf data unittests

* delint

* delint

* more delinting

* fix wide-deep movielens serialization

* address PR comments

* add file_io tests

* investigate wide-deep test failure

* remove hard coded path and properly use flags.

* address file_io test PR comments

* missed a hash_bucked_size
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment