Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ENH buffer openml stream rather than reading all at once #16084

Merged
merged 15 commits into from Apr 26, 2020

Conversation

jnothman
Copy link
Member

@jnothman jnothman commented Jan 9, 2020

I've not benchmarked yet. This should reduce memory requirements when fetching from OpenML.

This no longer explicitly closes the open URL handler, since it is required until the ARFF has been completely read. We could return the stream object from _download_data_arff if we want to close it.

@thomasjpfan
Copy link
Member

Quick memory profiling on _download_data_arff (with mnist)

master

   459   84.168 MiB    0.004 MiB       @_retry_with_clean_cache(url, data_home)
   460                                 def _arff_load():
   461   84.176 MiB    0.008 MiB           with closing(_open_openml_url(url, data_home)) as response:
   462   84.176 MiB    0.000 MiB               if sparse is True:
   463                                             return_type = _arff.COO
   464                                         else:
   465   84.176 MiB    0.000 MiB                   return_type = _arff.DENSE_GEN
   466                             
   467  395.703 MiB  311.527 MiB               arff_file = _arff.loads(response.read().decode('utf-8'),
   468  395.703 MiB    0.000 MiB                                       encode_nominal=encode_nominal,
   469  477.031 MiB   81.328 MiB                                       return_type=return_type)
   470  477.031 MiB    0.000 MiB           return arff_file

This PR

   459   81.723 MiB    0.000 MiB       @_retry_with_clean_cache(url, data_home)
   460                                 def _arff_load():
   461   81.723 MiB    0.000 MiB           response = _open_openml_url(url, data_home)
   462   81.723 MiB    0.000 MiB           if sparse is True:
   463                                         return_type = _arff.COO
   464                                     else:
   465   81.723 MiB    0.000 MiB               return_type = _arff.DENSE_GEN
   466                             
   467   81.734 MiB    0.004 MiB           arff_file = _arff.load((line.decode('utf-8')
   468   81.734 MiB    0.004 MiB                                   for line in response),
   469   81.723 MiB    0.000 MiB                                  encode_nominal=encode_nominal,
   470   81.734 MiB    0.000 MiB                                  return_type=return_type)
   471   81.734 MiB    0.000 MiB           return arff_file

@jnothman
Copy link
Member Author

Thanks @thomasjpfan, though that's not the relevant portion, since now the content isn't even buffered until _convert_arff_data or _convert_arff_data_dataframe. So I'd be more interested in peak memory usage across the call.

@jnothman
Copy link
Member Author

master

In [2]: %memit fetch_openml('mnist_784', cache=True)
peak memory: 1337.64 MiB, increment: 1221.80 MiB

this branch

In [2]: %memit fetch_openml('mnist_784', cache=True)
peak memory: 953.20 MiB, increment: 841.33 MiB

@thomasjpfan
Copy link
Member

On master and PR _convert_arff_data increments the memory by 839 MB as expected. The biggest difference is how _download_data_arff does not increment on this PR

master

Line #    Mem usage    Increment   Line Contents
================================================
   736   85.578 MiB    0.000 MiB       arff = _download_data_arff(data_description['file_id'], return_sparse,
   737  484.016 MiB  398.438 MiB                                  data_home, encode_nominal=not as_frame)
...
   762  484.016 MiB    0.000 MiB           X, y = _convert_arff_data(arff['data'], col_slice_x,
   763 1323.121 MiB  839.105 MiB                                     col_slice_y, shape)

this pr

Line #    Mem usage    Increment   Line Contents
================================================
   737   81.617 MiB    0.000 MiB       arff = _download_data_arff(data_description['file_id'], return_sparse,
   738   81.758 MiB    0.141 MiB                                  data_home, encode_nominal=not as_frame)
...
   763   81.758 MiB    0.000 MiB           X, y = _convert_arff_data(arff['data'], col_slice_x,
   764  921.691 MiB  839.934 MiB                                     col_slice_y, shape)

This PR works as expected.

Copy link
Member

@thomasjpfan thomasjpfan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am happy with this. All it needs is a whats new entry in 0.23 tagged Efficiency.

@jnothman jnothman changed the title [WIP] ENH buffer openml stream rather than reading all at once [MRG] ENH buffer openml stream rather than reading all at once Jan 10, 2020
return_type = _arff.DENSE_GEN

arff_file = _arff.load((line.decode('utf-8')
for line in response),
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@shashanksingh28 if this PR is merged before #14800 I'd suggest just making a _check_md5 helper which takes an iterable of bytes as input and returns generates iterable of bytes, checking md5 on the way.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good. I would rather wait for this to go in first and md5 check after...

@@ -784,6 +789,8 @@ def fetch_openml(name=None, version='active', data_id=None, data_home=None,
elif y.shape[1] == 0:
y = None

fp.close() # explicitly close HTTP connection after parsing
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If any of the parsing fails, the connection would remain open.

Maybe, we can put the _convert_arff_data_dataframe and _convert_arff_data logic up:

fp, arff = _download_data_arff(data_description['file_id'], return_sparse, ...)

nominal_attributes = None
frame = None
with closing(fp):
    if as_frame:
        columns = data_columns + target_columns
        frame = _convert_arff_data_dataframe(arff, columns, features_dict)
    else:
        X, y = _convert_arff_data(arff['data'], col_slice_x, ...)

if as_frame:
    ...
else:
    ...

An alternative would be to indent the whole parsing logic.

data_downloaded = np.array(list(data_arff['data']), dtype='O')
fp.close()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So the connection closes even with np.array... fails.

with closing(fp):
    data_downloaded = np.array(list(data_arff['data']), dtype='O')

@jnothman
Copy link
Member Author

hmmm good points. I also need to review how this interacts with the caching

@jnothman
Copy link
Member Author

It looks like it should work okay with caching actually.

@jnothman
Copy link
Member Author

I've realised that the progressive loading of the file also breaks some of the usefulness of _retry_with_clean_cache.

@thomasjpfan
Copy link
Member

We most likely need to refactor a little to get _retry_with_clean_cache to work. In its current decorator form, it needs to wrap a function that calls _convert_arff_data_dataframe or _convert_arff_data. Maybe this function can be called _download_and_parse_data_arff:

def fetch_openml(...):
    ....
	if as_frame:
	    parse_arff = partial(_convert_arff_data_dataframe, columns=columns, 	
							 features_dict=features_dict)
	else:
		parse_arff = partial(_convert_arff_data, col_slice_x=, ...)
 	
	result = _download_and_parse_data_arff(..., parse_arff)
	...

def _download_and_parse_data_arff(file_id, sparse, data_home, as_frame, parse_arff):
	url = _DATA_FILE.format(file_id)

	@_retry_with_clean_cache(url, data_home)
	def _download_parse_inner():
    	arff_file = _download_data_arff(...)

		if as_frame:
			return parse_arff(arff_file)
		else:
			return parse_arff(arff_file['data'])

	return _download_parse_inner()

With this type of refactor _convert_arff_data would need to be updated to accept .

@jnothman
Copy link
Member Author

Here's my little refactor that ensures the failure will be retried with appropriate scope. Not that the retry business is tested (should I bother??)

@jnothman
Copy link
Member Author

Another reviewer would be very welcome here!

@thomasjpfan
Copy link
Member

Here's my little refactor that ensures the failure will be retried with appropriate scope. Not that the retry business is tested (should I bother??)

The retry logic is independently tested, but not together with fetch_openml. If we want to test streaming generator retry logic, we would need mock out a request that will fail midstream and succeed with a retry. I do not think this is needed for this PR.

@NicolasHug Most of the diff of this PR is moving code around to allow for the stream close with a context manager. This is done to accommodate arff.load consuming a generator.

Copy link
Member

@NicolasHug NicolasHug left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good, minor concern about the retry logic now

@@ -58,6 +58,10 @@ Changelog
:func:`datasets.make_moons` now accept two-element tuple.
:pr:`15707` by :user:`Maciej J Mikulski <mjmikulski>`.

- |Efficiency| :func:`datasets.fetch_openml` no longer stores the full dataset
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
- |Efficiency| :func:`datasets.fetch_openml` no longer stores the full dataset
- |Efficiency| :func:`datasets.fetch_openml` has reduced memory usage because it no longer stores the full dataset

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed in latest commit

Comment on lines 489 to 490
# Note that if the data is dense, no reading is done until the data
# generator is iterated.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure how to interpret this comment

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mean e.g. "Since we pass a generator, load() will read lines one by one and avoid excessive memory usage" ?

That'd be a useful comment IMHO

Comment on lines 465 to 476
col_slice_y = [int(features_dict[col_name]['index'])
for col_name in target_columns]

col_slice_x = [int(features_dict[col_name]['index'])
for col_name in data_columns]
for col_idx in col_slice_y:
feat = features_list[col_idx]
nr_missing = int(feat['number_of_missing_values'])
if nr_missing > 0:
raise ValueError('Target column {} has {} missing values. '
'Missing values are not supported for target '
'columns. '.format(feat['name'], nr_missing))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The column validation is moved in this function which is decorated by _retry_with_clean_cache.

So the ValueError raised here will cause the decorator to raise warn("Invalid cache, redownloading file", RuntimeWarning) and it will re-run the function after clearing the cache which is not necessary

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a fair point. I'll try to get some user-input validation out of the retry.

@jnothman
Copy link
Member Author

jnothman commented Feb 2, 2020

Thanks for the review @NicolasHug. I've tried to address your comments, but haven't been able to test locally thanks to an OS update breaking my conda build...

@@ -51,6 +51,8 @@ def wrapper(*args, **kw):
return f(*args, **kw)
except HTTPError:
raise
except ValueError:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

except (HTTPError, ValueError):
	raise

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we also passthrough ArffException as well?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we also passthrough ArffException as well?

No, because they may be due to data corruption.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That was my concern about not letting ValueError through too...

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are some ValueErrors in _arff.py as well. In principle, we only want to retry when there is an parsing error or an error from _open_openml_url, which is scoped to:

def _load_arff(...):
	response = _open_openml_url(url, data_home)

	with closing(response):
		arff = _arff.load((line.decode('utf-8')
 	    	              for line in response),
    	    	          return_type=return_type,
        	    	      encode_nominal=not as_frame)
	return arff

Can we place this in its own function and use the retry wrapper on this new function?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, that code only reads the headers and returns a generator, which is the basis of this change. The parsing errors will actually occur when that generator is iterated during conversion to arrays.

If the ValueErrors in _arff.py are not possible to raise due to data corruption, then the current solution is fine

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The only ValueError I see from corruption may be

def _escape_sub_callback(match):
s = match.group()
if len(s) == 2:
try:
return _ESCAPE_SUB_MAP[s]
except KeyError:
raise ValueError('Unsupported escape sequence: %s' % s)

On a side note, we can define the _load_arff as follows:

def _load_arff(..., as_frame):
	response = _open_openml_url(url, data_home)

	with closing(response):
		arff = _arff.load((line.decode('utf-8')
 	    	              for line in response),
    	    	          return_type=return_type,
        	    	      encode_nominal=not as_frame)
		if as_frame:
			return _convert_arff_data_dataframe(arff, columns, features_dict)
		else:
			return _convert_arff_data(arff['data'], col_slice_x,
                              		  col_slice_y, shape)

@jnothman
Copy link
Member Author

jnothman commented Feb 13, 2020 via email

@thomasjpfan
Copy link
Member

To be concrete, I was thinking of this: jnothman#8

Copy link
Member

@thomasjpfan thomasjpfan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PR updated such that only errors from the downloading and parsing will trigger a redownload.

still LGTM

@@ -58,6 +58,10 @@ Changelog
:func:`datasets.make_moons` now accept two-element tuple.
:pr:`15707` by :user:`Maciej J Mikulski <mjmikulski>`.

- |Efficiency| :func:`datasets.fetch_openml` no longer stores the full dataset
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed in latest commit

@jnothman
Copy link
Member Author

jnothman commented Apr 2, 2020

Perhaps we can get another review here? Not essential, but a nice memory boost for fetch_openml, and unblocking work on the checksum PR.

@thomasjpfan
Copy link
Member

As an aside, this may help resolve #16629 and allow us to turn back on the memory profiler for gallery examples.

@jnothman jnothman requested a review from rth April 22, 2020 01:26
Copy link
Member

@rth rth left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @jnothman ! LGTM, after a somewhat superficial review. I think our openml fetcher code is non trivial and sometimes difficult to follow. I wonder if type annotations could help some for readability and if we can move part of this logic upstream.

@rth rth changed the title [MRG] ENH buffer openml stream rather than reading all at once ENH buffer openml stream rather than reading all at once Apr 26, 2020
@rth rth merged commit a14953a into scikit-learn:master Apr 26, 2020
@rth
Copy link
Member

rth commented Apr 26, 2020

Maybe I should have synced master, but CI was green I merged it. Looking into a fix in #17047

@jnothman
Copy link
Member Author

Great! Thanks!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

5 participants