Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added TFRecord support as a preprocessing cache format #1194

Merged
merged 21 commits into from
Jun 8, 2021

Conversation

zhisbug
Copy link
Contributor

@zhisbug zhisbug commented Jun 3, 2021

Code Pull Requests

This PR adds support for TFRecordDataset when the backend is Ray.

The functionality is complete but several problems need to be addressed before merging:

  • I have to comment out this line in order to make Ray backend + Dask working on images. This issue is irrelevant with the feature intorduced by this PR. I did some diagnosis and found that a previous commit 30d164e7cc3fa7d1c45286727c0183f8eefa8e39 caused the issue.

  • There is still an unknown issue when writing images into tfrecords. This line will die when dumping multiple dask DF parittions into disk. This only happens when we use images. Still under my investigation. [Fixed]

  • some minor bugs on type conversion which I will fix after running some tests.

@zhisbug
Copy link
Contributor Author

zhisbug commented Jun 4, 2021

@tgaddair
All issues addressed -- should observe a constant speedup over parquet dataset.

On my cluster with 2x 2080Ti and a 32 cores CPU, I observe a constant 5.5 - 6 iter/s throughout the training, with batch_size=256 on the small.yml pipeline.

As a baseline, training with ParquetDataset was ~2.5iter/s at early epoch and descreasing to 5 ~7 s/iter, finally at 9s/iter.

Copy link
Collaborator

@tgaddair tgaddair left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice! This is very awesome. Just a few comments.

ludwig/data/dataframe/dask_df_utils.py Outdated Show resolved Hide resolved
ludwig/data/dataframe/pandas.py Outdated Show resolved Hide resolved
ludwig/data/dataset/tfrecord.py Outdated Show resolved Hide resolved
ludwig/data/dataset/tfrecord.py Show resolved Hide resolved
ludwig/data/dataset/tfrecord.py Outdated Show resolved Hide resolved
if shard_count > 1:
dataset = dataset.shard(shard_count, cur_shard)
total_samples = self.size
local_samples = int(total_samples / shard_count) if shard_count else total_samples
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is fine heuristic. Basically, we want the biggest buffer that will fit in memory, but not larger.

compression_level=compression_level)


def get_schema(df, columns=None):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be fine for now. One thing we can improve in the future is to use Ludwig's training_set_metadata to more precisely obtain the data type of each column. But if this works, this is fine for now.

lambda x: tf.data.TFRecordDataset(x, compression_type="GZIP"),
num_parallel_calls=tf.data.AUTOTUNE)
# Fetch one element so to get the parser.
features, feature_lists = self._detect_schema(dataset)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of detecting the schema here, what if we just stored it in the metadata at write time?

@tgaddair
Copy link
Collaborator

tgaddair commented Jun 4, 2021

A couple of observations in testing:

  1. Serialization is very slow. For 1GB dataset, it took about an hour to write to disk (vs 30s for Parquet).
  2. TFRecordWriter and Reader may not work with remote filesystems automatically. The solution used by pandas-tfrecord was to write locally then put the chunk in s3 as shown here: https://github.com/schipiga/pandas-tfrecords/blob/master/pandas_tfrecords/to_tfrecords.py#L58. We could do something similar. They also do the same to download chunks at read time (though I believe TF has utilities to read from remote storage): https://github.com/schipiga/pandas-tfrecords/blob/master/pandas_tfrecords/from_tfrecords.py#L163

@tgaddair
Copy link
Collaborator

tgaddair commented Jun 4, 2021

Got an error when writing out the test set:

Traceback (most recent call last):
  File "higgs_preprocess.py", line 39, in <module>
    dataset=dataset_path,
  File "/Users/tgaddair/repos/ludwig/ludwig/api.py", line 1281, in preprocess
    random_seed=random_seed
  File "/Users/tgaddair/repos/ludwig/ludwig/data/preprocessing.py", line 1454, in preprocess_for_training
    processed = cache.put(*processed)
  File "/Users/tgaddair/repos/ludwig/ludwig/data/cache/manager.py", line 71, in put
    TEST,
  File "/Users/tgaddair/repos/ludwig/ludwig/data/dataset/tfrecord.py", line 211, in save
    self.backend.df_engine.to_tfrecord(dataset, dataset_tfrecord_fp)
  File "/Users/tgaddair/repos/ludwig/ludwig/data/dataframe/dask.py", line 96, in to_tfrecord
    compression_level=9)
  File "/Users/tgaddair/repos/ludwig/ludwig/data/dataframe/dask_df_utils.py", line 54, in dask_to_tfrecords
    out = out.compute()
  File "/Users/tgaddair/.venv/ludwig/stable/lib/python3.7/site-packages/dask/base.py", line 281, in compute
    (result,) = compute(self, traverse=False, **kwargs)
  File "/Users/tgaddair/.venv/ludwig/stable/lib/python3.7/site-packages/dask/base.py", line 563, in compute
    results = schedule(dsk, keys, **kwargs)
  File "/Users/tgaddair/.venv/ludwig/stable/lib/python3.7/site-packages/dask/threaded.py", line 84, in get
    **kwargs
  File "/Users/tgaddair/.venv/ludwig/stable/lib/python3.7/site-packages/dask/local.py", line 487, in get_async
    raise_exception(exc, tb)
  File "/Users/tgaddair/.venv/ludwig/stable/lib/python3.7/site-packages/dask/local.py", line 317, in reraise
    raise exc
  File "/Users/tgaddair/.venv/ludwig/stable/lib/python3.7/site-packages/dask/local.py", line 222, in execute_task
    result = _execute_task(task, data)
  File "/Users/tgaddair/.venv/ludwig/stable/lib/python3.7/site-packages/dask/core.py", line 121, in _execute_task
    return func(*(_execute_task(a, cache) for a in args))
  File "/Users/tgaddair/.venv/ludwig/stable/lib/python3.7/site-packages/dask/utils.py", line 37, in apply
    return func(*args)
  File "/Users/tgaddair/repos/ludwig/ludwig/data/dataframe/pandas.py", line 79, in pandas_df_to_tfrecords
    schema = get_schema(df, columns)
  File "/Users/tgaddair/repos/ludwig/ludwig/data/dataframe/pandas.py", line 89, in get_schema
    for col, val in df.iloc[0].to_dict().items():
  File "/Users/tgaddair/.venv/ludwig/stable/lib/python3.7/site-packages/pandas/core/indexing.py", line 879, in __getitem__
    return self._getitem_axis(maybe_callable, axis=axis)
  File "/Users/tgaddair/.venv/ludwig/stable/lib/python3.7/site-packages/pandas/core/indexing.py", line 1496, in _getitem_axis
    self._validate_integer(key, axis)
  File "/Users/tgaddair/.venv/ludwig/stable/lib/python3.7/site-packages/pandas/core/indexing.py", line 1437, in _validate_integer
    raise IndexError("single positional indexer is out-of-bounds")
IndexError: single positional indexer is out-of-bounds

@zhisbug
Copy link
Contributor Author

zhisbug commented Jun 4, 2021

A couple of observations in testing:

  1. Serialization is very slow. For 1GB dataset, it took about an hour to write to disk (vs 30s for Parquet).

Does Parquet actually read&serialize the images? If it only saves a link (a.k.a. file path) to the image, then this is expected? As TFRecords offset the overhead from reading at training to dataset construction time?

  1. TFRecordWriter and Reader may not work with remote filesystems automatically. The solution used by pandas-tfrecord was to write locally then put the chunk in s3 as shown here: https://github.com/schipiga/pandas-tfrecords/blob/master/pandas_tfrecords/to_tfrecords.py#L58. We could do something similar. They also do the same to download chunks at read time (though I believe TF has utilities to read from remote storage): https://github.com/schipiga/pandas-tfrecords/blob/master/pandas_tfrecords/from_tfrecords.py#L163

Yeah it would be easy to add an S3 support. Does Parquet dataset support S3?

@tgaddair
Copy link
Collaborator

tgaddair commented Jun 4, 2021

For my current test, I'm using a tabular dataset consisting of many small rows. I think the bottlenecks is the per-row serialization.

The current Parquet writer/reader supports S3 and other distributed filesystems through fsspec. We can also use this library for TF record.

@zhisbug
Copy link
Contributor Author

zhisbug commented Jun 5, 2021

For my current test, I'm using a tabular dataset consisting of many small rows. I think the bottlenecks is the per-row serialization.

Just a random note: a critical performance factor for serialization is to set the df_engine parallelism, such as here: https://github.com/ludwig-ai/ludwig/blob/master/ludwig/backend/ray.py#L44

In my case it is set as the number of CPUs ray auto-detected (which is 20), and the serialization takes <3mins on the pet-adoption-finder dataset...

@zhisbug
Copy link
Contributor Author

zhisbug commented Jun 5, 2021

Got an error when writing out the test set:

Traceback (most recent call last):
  File "higgs_preprocess.py", line 39, in <module>
    dataset=dataset_path,
  File "/Users/tgaddair/repos/ludwig/ludwig/api.py", line 1281, in preprocess
    random_seed=random_seed
  File "/Users/tgaddair/repos/ludwig/ludwig/data/preprocessing.py", line 1454, in preprocess_for_training
    processed = cache.put(*processed)
  File "/Users/tgaddair/repos/ludwig/ludwig/data/cache/manager.py", line 71, in put
    TEST,
  File "/Users/tgaddair/repos/ludwig/ludwig/data/dataset/tfrecord.py", line 211, in save
    self.backend.df_engine.to_tfrecord(dataset, dataset_tfrecord_fp)
  File "/Users/tgaddair/repos/ludwig/ludwig/data/dataframe/dask.py", line 96, in to_tfrecord
    compression_level=9)
  File "/Users/tgaddair/repos/ludwig/ludwig/data/dataframe/dask_df_utils.py", line 54, in dask_to_tfrecords
    out = out.compute()
  File "/Users/tgaddair/.venv/ludwig/stable/lib/python3.7/site-packages/dask/base.py", line 281, in compute
    (result,) = compute(self, traverse=False, **kwargs)
  File "/Users/tgaddair/.venv/ludwig/stable/lib/python3.7/site-packages/dask/base.py", line 563, in compute
    results = schedule(dsk, keys, **kwargs)
  File "/Users/tgaddair/.venv/ludwig/stable/lib/python3.7/site-packages/dask/threaded.py", line 84, in get
    **kwargs
  File "/Users/tgaddair/.venv/ludwig/stable/lib/python3.7/site-packages/dask/local.py", line 487, in get_async
    raise_exception(exc, tb)
  File "/Users/tgaddair/.venv/ludwig/stable/lib/python3.7/site-packages/dask/local.py", line 317, in reraise
    raise exc
  File "/Users/tgaddair/.venv/ludwig/stable/lib/python3.7/site-packages/dask/local.py", line 222, in execute_task
    result = _execute_task(task, data)
  File "/Users/tgaddair/.venv/ludwig/stable/lib/python3.7/site-packages/dask/core.py", line 121, in _execute_task
    return func(*(_execute_task(a, cache) for a in args))
  File "/Users/tgaddair/.venv/ludwig/stable/lib/python3.7/site-packages/dask/utils.py", line 37, in apply
    return func(*args)
  File "/Users/tgaddair/repos/ludwig/ludwig/data/dataframe/pandas.py", line 79, in pandas_df_to_tfrecords
    schema = get_schema(df, columns)
  File "/Users/tgaddair/repos/ludwig/ludwig/data/dataframe/pandas.py", line 89, in get_schema
    for col, val in df.iloc[0].to_dict().items():
  File "/Users/tgaddair/.venv/ludwig/stable/lib/python3.7/site-packages/pandas/core/indexing.py", line 879, in __getitem__
    return self._getitem_axis(maybe_callable, axis=axis)
  File "/Users/tgaddair/.venv/ludwig/stable/lib/python3.7/site-packages/pandas/core/indexing.py", line 1496, in _getitem_axis
    self._validate_integer(key, axis)
  File "/Users/tgaddair/.venv/ludwig/stable/lib/python3.7/site-packages/pandas/core/indexing.py", line 1437, in _validate_integer
    raise IndexError("single positional indexer is out-of-bounds")
IndexError: single positional indexer is out-of-bounds

All comments addressed except this test set issue.
It looks like something errors out when we try to check the schema of the pandas DF partition of the original dask DF.

Could you let me know the procedure to reproduce so I can take a look?

@tgaddair
Copy link
Collaborator

tgaddair commented Jun 7, 2021

Nice work @zhisbug. I'll try and get a small repro together for the issue above. In the meantime, looks like there are a few tests failing we should fix:

https://github.com/ludwig-ai/ludwig/pull/1194/checks?check_run_id=2751504369#step:9:319


# interleave the tfrecord files for parallel reading
dataset = files.interleave(
lambda x: tf.data.TFRecordDataset(x, compression_type=self.compression_type),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like this should work with S3 and other remote paths natively: https://blog.min.io/hyper-scale-machine-learning-with-minio-and-tensorflow/

@tgaddair
Copy link
Collaborator

tgaddair commented Jun 8, 2021

@zhisbug I fixed the issue above, it was caused by an empty test set. The check lazy load issue has also been resolved. We should be good to land once tests pass.

@tgaddair tgaddair changed the title New feature: TFRecordDataset Added TFRecord support as a preprocessing cache format Jun 8, 2021
@tgaddair tgaddair merged commit eb4fa5f into ludwig-ai:master Jun 8, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants