Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Compatibility with parfive 1.1 #3822

Merged
merged 4 commits into from Apr 15, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog/3822.feature.rst
@@ -0,0 +1 @@
Add support for parfive 1.1. This sets a limit on the number of open connections to JSOC when downloading files to 10.
nabobalis marked this conversation as resolved.
Show resolved Hide resolved
2 changes: 1 addition & 1 deletion setup.cfg
Expand Up @@ -39,7 +39,7 @@ install_requires =
matplotlib>=2.1.2
pandas>=0.23.0
astropy>=3.2
parfive[ftp]>=1.0
parfive[ftp]>=1.0.2
importlib_resources;python_version<"3.7"

[options.extras_require]
Expand Down
2 changes: 1 addition & 1 deletion sunpy/database/tests/test_tables.py
Expand Up @@ -161,7 +161,7 @@ def test_entries_from_fido_search_result(fido_search_result):
# 2 entries from norh
assert entries[60] == DatabaseEntry(
source='NAOJ', provider='NRO', physobs="",
fileid=("ftp://anonymous:data@sunpy.org@solar-pub.nao.ac.jp/"
fileid=("ftp://solar-pub.nao.ac.jp/"
"pub/nsro/norh/data/tcx/2012/01/tca120101"),
observation_time_start=datetime(2012, 1, 1, 0, 0),
observation_time_end=datetime(2012, 1, 2, 0, 0),
Expand Down
4 changes: 2 additions & 2 deletions sunpy/net/dataretriever/sources/norh.py
Expand Up @@ -89,7 +89,7 @@ def _get_url_for_timerange(self, timerange, **kwargs):
timerange = TimeRange(timerange.start.strftime('%Y-%m-%d'),
timerange.end)

norh = Scraper(BASEURL, freq=freq)
norh = Scraper(BASEURL, freq=freq, append_login=False)
# TODO: warn user that some files may have not been listed, like for example:
# tca160504_224657 on ftp://solar-pub.nao.ac.jp/pub/nsro/norh/data/tcx/2016/05/
# as it doesn't follow pattern.
Expand All @@ -98,7 +98,7 @@ def _get_url_for_timerange(self, timerange, **kwargs):

def _get_time_for_url(self, urls):
freq = urls[0].split('/')[-1][0:3] # extract the frequency label
crawler = Scraper(BASEURL, freq=freq)
crawler = Scraper(BASEURL, freq=freq, append_login=False)
times = list()
for url in urls:
t0 = crawler._extractDateURL(url)
Expand Down
12 changes: 6 additions & 6 deletions sunpy/net/dataretriever/tests/test_norh.py
Expand Up @@ -19,16 +19,16 @@
@pytest.mark.remote_data
@pytest.mark.parametrize("timerange,url_start,url_end", [
(TimeRange('2012/4/21', '2012/4/21'),
'ftp://anonymous:data@sunpy.org@solar-pub.nao.ac.jp/pub/nsro/norh/data/tcx/2012/04/tca120421',
'ftp://anonymous:data@sunpy.org@solar-pub.nao.ac.jp/pub/nsro/norh/data/tcx/2012/04/tca120421'
'ftp://solar-pub.nao.ac.jp/pub/nsro/norh/data/tcx/2012/04/tca120421',
'ftp://solar-pub.nao.ac.jp/pub/nsro/norh/data/tcx/2012/04/tca120421'
),
(TimeRange('2012/12/1', '2012/12/2'),
'ftp://anonymous:data@sunpy.org@solar-pub.nao.ac.jp/pub/nsro/norh/data/tcx/2012/12/tca121201',
'ftp://anonymous:data@sunpy.org@solar-pub.nao.ac.jp/pub/nsro/norh/data/tcx/2012/12/tca121202'
'ftp://solar-pub.nao.ac.jp/pub/nsro/norh/data/tcx/2012/12/tca121201',
'ftp://solar-pub.nao.ac.jp/pub/nsro/norh/data/tcx/2012/12/tca121202'
),
(TimeRange('2012/3/7', '2012/3/14'),
'ftp://anonymous:data@sunpy.org@solar-pub.nao.ac.jp/pub/nsro/norh/data/tcx/2012/03/tca120307',
'ftp://anonymous:data@sunpy.org@solar-pub.nao.ac.jp/pub/nsro/norh/data/tcx/2012/03/tca120314'
'ftp://solar-pub.nao.ac.jp/pub/nsro/norh/data/tcx/2012/03/tca120307',
'ftp://solar-pub.nao.ac.jp/pub/nsro/norh/data/tcx/2012/03/tca120314'
)
])
def test_get_url_for_time_range(timerange, url_start, url_end):
Expand Down
25 changes: 19 additions & 6 deletions sunpy/net/jsoc/jsoc.py
Expand Up @@ -453,7 +453,7 @@ def request_data(self, jsoc_response, **kwargs):
return requests

def fetch(self, jsoc_response, path=None, progress=True, overwrite=False,
downloader=None, wait=True, sleep=10):
downloader=None, wait=True, sleep=10, max_conn=4, **kwargs):
"""
Make the request for the data in a JSOC response and wait for it to be
staged and then download the data.
Expand All @@ -477,7 +477,7 @@ def fetch(self, jsoc_response, path=None, progress=True, overwrite=False,
and the existing file will be overwritten, if `'unique'` the filename
will be modified to be unique.

max_conns : `int`
max_conn : `int`
Maximum number of download connections.

downloader : `parfive.Downloader`, optional
Expand All @@ -500,6 +500,9 @@ def fetch(self, jsoc_response, path=None, progress=True, overwrite=False,
# Make staging request to JSOC
responses = self.request_data(jsoc_response)

defaults = {'max_splits': 2}
defaults.update(kwargs)

# Make response iterable
if not isiterable(responses):
responses = [responses]
Expand All @@ -513,10 +516,10 @@ def fetch(self, jsoc_response, path=None, progress=True, overwrite=False,

return self.get_request(responses, path=path, overwrite=overwrite,
progress=progress, downloader=downloader,
wait=wait)
wait=wait, max_conn=max_conn, **defaults)

def get_request(self, requests, path=None, overwrite=False, progress=True,
downloader=None, wait=True):
downloader=None, wait=True, max_conn=4, **kwargs):
"""
Query JSOC to see if the request(s) is ready for download.

Expand Down Expand Up @@ -557,6 +560,8 @@ def get_request(self, requests, path=None, overwrite=False, progress=True,
"""
c = drms.Client()

kwargs['max_splits'] = kwargs.get('max_splits', 2)

# Convert Responses to a list if not already
if isinstance(requests, str) or not isiterable(requests):
requests = [requests]
Expand Down Expand Up @@ -595,10 +600,18 @@ def get_request(self, requests, path=None, overwrite=False, progress=True,
fname = os.path.expanduser(fname)
paths.append(fname)

if max_conn * kwargs['max_splits'] > 10:
warnings.warn(("JSOC does not support more than 10 parallel connections. " +
"Changing the number of parallel connections to 8."), SunpyUserWarning)
kwargs['max_splits'] = 2
max_conn = 4

dl_set = True
if not downloader:
Cadair marked this conversation as resolved.
Show resolved Hide resolved
nabobalis marked this conversation as resolved.
Show resolved Hide resolved
dl_set = False
downloader = Downloader(progress=progress, overwrite=overwrite)
downloader = Downloader(progress=progress, overwrite=overwrite, max_conn=max_conn)
else:
downloader.max_conn = max_conn

urls = []
for request in requests:
Expand All @@ -615,7 +628,7 @@ def get_request(self, requests, path=None, overwrite=False, progress=True,
print_message = "{0} URLs found for download. Full request totalling {1}MB"
print(print_message.format(len(urls), request._d['size']))
for aurl, fname in zip(urls, paths):
downloader.enqueue_file(aurl, filename=fname)
downloader.enqueue_file(aurl, filename=fname, **kwargs)

if dl_set and not wait:
return Results()
Expand Down
24 changes: 24 additions & 0 deletions sunpy/net/jsoc/tests/test_jsoc.py
Expand Up @@ -7,9 +7,11 @@
import astropy.units as u
import pytest
from parfive import Results
from unittest import mock

from sunpy.net.jsoc import JSOCClient, JSOCResponse
import sunpy.net.attrs as a
from sunpy.util.exceptions import SunpyUserWarning

client = JSOCClient()

Expand Down Expand Up @@ -346,3 +348,25 @@ def test_can_handle_query_no_series():
assert not JSOCClient._can_handle_query(a.Time("2020/01/02", "2020/01/03"))
assert not JSOCClient._can_handle_query(a.Wavelength(17.1*u.nm))
assert JSOCClient._can_handle_query(a.jsoc.Series("hmi.M_45s"))


@pytest.mark.remote_data
def test_max_parallel_connections():
responses = client.search(
a.Time('2014/1/1T1:00:36', '2014/1/1T01:01:38'),
a.jsoc.Series('hmi.M_45s'), a.jsoc.Notify('jsoc@cadair.com'),
a.jsoc.Protocol("as-is"))

path = tempfile.mkdtemp()

with mock.patch(
"parfive.Downloader.download",
new_callable=mock.MagicMock
) as download:

download.side_effect = ["Mocked Downloader"]

with pytest.warns(SunpyUserWarning):
client.fetch(responses, path=path, max_conn=5, max_splits=5)
Cadair marked this conversation as resolved.
Show resolved Hide resolved

assert download.called
11 changes: 10 additions & 1 deletion sunpy/util/scraper.py
Expand Up @@ -75,6 +75,10 @@ def __init__(self, pattern, **kwargs):
end=self.pattern[milliseconds.end():]
))

self.append_login = True
if 'append_login' in kwargs:
Cadair marked this conversation as resolved.
Show resolved Hide resolved
self.append_login = kwargs['append_login']

def matches(self, filepath, date):
return date.strftime(self.pattern) == filepath

Expand Down Expand Up @@ -263,8 +267,13 @@ def _ftpfileslist(self, timerange):
if (datehref >= timerange.start and
datehref <= timerange.end):
filesurls.append(fullpath)
filesurls = ['ftp://anonymous:data@sunpy.org@' + "{0.netloc}{0.path}".format(urlsplit(url))

login = ''
if self.append_login:
login = "anonymous:data@sunpy.org@"
filesurls = [f'ftp://{login}' + "{0.netloc}{0.path}".format(urlsplit(url))
for url in filesurls]

return filesurls

def _localfilelist(self, timerange):
Expand Down