Skip to content

Commit

Permalink
Merge pull request #3822 from Raahul-Singh/test_parfive_1.1_rc2
Browse files Browse the repository at this point in the history
Compatibility with parfive 1.1
  • Loading branch information
nabobalis committed Apr 15, 2020
2 parents fceea3c + 17f2e5c commit 7334d78
Show file tree
Hide file tree
Showing 8 changed files with 64 additions and 17 deletions.
1 change: 1 addition & 0 deletions changelog/3822.feature.rst
@@ -0,0 +1 @@
Add support for parfive 1.1. This sets a limit on the number of open connections to JSOC when downloading files to 10.
2 changes: 1 addition & 1 deletion setup.cfg
Expand Up @@ -39,7 +39,7 @@ install_requires =
matplotlib>=2.1.2
pandas>=0.23.0
astropy>=3.2
parfive[ftp]>=1.0
parfive[ftp]>=1.0.2
importlib_resources;python_version<"3.7"

[options.extras_require]
Expand Down
2 changes: 1 addition & 1 deletion sunpy/database/tests/test_tables.py
Expand Up @@ -161,7 +161,7 @@ def test_entries_from_fido_search_result(fido_search_result):
# 2 entries from norh
assert entries[60] == DatabaseEntry(
source='NAOJ', provider='NRO', physobs="",
fileid=("ftp://anonymous:data@sunpy.org@solar-pub.nao.ac.jp/"
fileid=("ftp://solar-pub.nao.ac.jp/"
"pub/nsro/norh/data/tcx/2012/01/tca120101"),
observation_time_start=datetime(2012, 1, 1, 0, 0),
observation_time_end=datetime(2012, 1, 2, 0, 0),
Expand Down
4 changes: 2 additions & 2 deletions sunpy/net/dataretriever/sources/norh.py
Expand Up @@ -89,7 +89,7 @@ def _get_url_for_timerange(self, timerange, **kwargs):
timerange = TimeRange(timerange.start.strftime('%Y-%m-%d'),
timerange.end)

norh = Scraper(BASEURL, freq=freq)
norh = Scraper(BASEURL, freq=freq, append_login=False)
# TODO: warn user that some files may have not been listed, like for example:
# tca160504_224657 on ftp://solar-pub.nao.ac.jp/pub/nsro/norh/data/tcx/2016/05/
# as it doesn't follow pattern.
Expand All @@ -98,7 +98,7 @@ def _get_url_for_timerange(self, timerange, **kwargs):

def _get_time_for_url(self, urls):
freq = urls[0].split('/')[-1][0:3] # extract the frequency label
crawler = Scraper(BASEURL, freq=freq)
crawler = Scraper(BASEURL, freq=freq, append_login=False)
times = list()
for url in urls:
t0 = crawler._extractDateURL(url)
Expand Down
12 changes: 6 additions & 6 deletions sunpy/net/dataretriever/tests/test_norh.py
Expand Up @@ -19,16 +19,16 @@
@pytest.mark.remote_data
@pytest.mark.parametrize("timerange,url_start,url_end", [
(TimeRange('2012/4/21', '2012/4/21'),
'ftp://anonymous:data@sunpy.org@solar-pub.nao.ac.jp/pub/nsro/norh/data/tcx/2012/04/tca120421',
'ftp://anonymous:data@sunpy.org@solar-pub.nao.ac.jp/pub/nsro/norh/data/tcx/2012/04/tca120421'
'ftp://solar-pub.nao.ac.jp/pub/nsro/norh/data/tcx/2012/04/tca120421',
'ftp://solar-pub.nao.ac.jp/pub/nsro/norh/data/tcx/2012/04/tca120421'
),
(TimeRange('2012/12/1', '2012/12/2'),
'ftp://anonymous:data@sunpy.org@solar-pub.nao.ac.jp/pub/nsro/norh/data/tcx/2012/12/tca121201',
'ftp://anonymous:data@sunpy.org@solar-pub.nao.ac.jp/pub/nsro/norh/data/tcx/2012/12/tca121202'
'ftp://solar-pub.nao.ac.jp/pub/nsro/norh/data/tcx/2012/12/tca121201',
'ftp://solar-pub.nao.ac.jp/pub/nsro/norh/data/tcx/2012/12/tca121202'
),
(TimeRange('2012/3/7', '2012/3/14'),
'ftp://anonymous:data@sunpy.org@solar-pub.nao.ac.jp/pub/nsro/norh/data/tcx/2012/03/tca120307',
'ftp://anonymous:data@sunpy.org@solar-pub.nao.ac.jp/pub/nsro/norh/data/tcx/2012/03/tca120314'
'ftp://solar-pub.nao.ac.jp/pub/nsro/norh/data/tcx/2012/03/tca120307',
'ftp://solar-pub.nao.ac.jp/pub/nsro/norh/data/tcx/2012/03/tca120314'
)
])
def test_get_url_for_time_range(timerange, url_start, url_end):
Expand Down
25 changes: 19 additions & 6 deletions sunpy/net/jsoc/jsoc.py
Expand Up @@ -453,7 +453,7 @@ def request_data(self, jsoc_response, **kwargs):
return requests

def fetch(self, jsoc_response, path=None, progress=True, overwrite=False,
downloader=None, wait=True, sleep=10):
downloader=None, wait=True, sleep=10, max_conn=4, **kwargs):
"""
Make the request for the data in a JSOC response and wait for it to be
staged and then download the data.
Expand All @@ -477,7 +477,7 @@ def fetch(self, jsoc_response, path=None, progress=True, overwrite=False,
and the existing file will be overwritten, if `'unique'` the filename
will be modified to be unique.
max_conns : `int`
max_conn : `int`
Maximum number of download connections.
downloader : `parfive.Downloader`, optional
Expand All @@ -500,6 +500,9 @@ def fetch(self, jsoc_response, path=None, progress=True, overwrite=False,
# Make staging request to JSOC
responses = self.request_data(jsoc_response)

defaults = {'max_splits': 2}
defaults.update(kwargs)

# Make response iterable
if not isiterable(responses):
responses = [responses]
Expand All @@ -513,10 +516,10 @@ def fetch(self, jsoc_response, path=None, progress=True, overwrite=False,

return self.get_request(responses, path=path, overwrite=overwrite,
progress=progress, downloader=downloader,
wait=wait)
wait=wait, max_conn=max_conn, **defaults)

def get_request(self, requests, path=None, overwrite=False, progress=True,
downloader=None, wait=True):
downloader=None, wait=True, max_conn=4, **kwargs):
"""
Query JSOC to see if the request(s) is ready for download.
Expand Down Expand Up @@ -557,6 +560,8 @@ def get_request(self, requests, path=None, overwrite=False, progress=True,
"""
c = drms.Client()

kwargs['max_splits'] = kwargs.get('max_splits', 2)

# Convert Responses to a list if not already
if isinstance(requests, str) or not isiterable(requests):
requests = [requests]
Expand Down Expand Up @@ -595,10 +600,18 @@ def get_request(self, requests, path=None, overwrite=False, progress=True,
fname = os.path.expanduser(fname)
paths.append(fname)

if max_conn * kwargs['max_splits'] > 10:
warnings.warn(("JSOC does not support more than 10 parallel connections. " +
"Changing the number of parallel connections to 8."), SunpyUserWarning)
kwargs['max_splits'] = 2
max_conn = 4

dl_set = True
if not downloader:
dl_set = False
downloader = Downloader(progress=progress, overwrite=overwrite)
downloader = Downloader(progress=progress, overwrite=overwrite, max_conn=max_conn)
else:
downloader.max_conn = max_conn

urls = []
for request in requests:
Expand All @@ -615,7 +628,7 @@ def get_request(self, requests, path=None, overwrite=False, progress=True,
print_message = "{0} URLs found for download. Full request totalling {1}MB"
print(print_message.format(len(urls), request._d['size']))
for aurl, fname in zip(urls, paths):
downloader.enqueue_file(aurl, filename=fname)
downloader.enqueue_file(aurl, filename=fname, **kwargs)

if dl_set and not wait:
return Results()
Expand Down
24 changes: 24 additions & 0 deletions sunpy/net/jsoc/tests/test_jsoc.py
Expand Up @@ -7,9 +7,11 @@
import astropy.units as u
import pytest
from parfive import Results
from unittest import mock

from sunpy.net.jsoc import JSOCClient, JSOCResponse
import sunpy.net.attrs as a
from sunpy.util.exceptions import SunpyUserWarning

client = JSOCClient()

Expand Down Expand Up @@ -346,3 +348,25 @@ def test_can_handle_query_no_series():
assert not JSOCClient._can_handle_query(a.Time("2020/01/02", "2020/01/03"))
assert not JSOCClient._can_handle_query(a.Wavelength(17.1*u.nm))
assert JSOCClient._can_handle_query(a.jsoc.Series("hmi.M_45s"))


@pytest.mark.remote_data
def test_max_parallel_connections():
responses = client.search(
a.Time('2014/1/1T1:00:36', '2014/1/1T01:01:38'),
a.jsoc.Series('hmi.M_45s'), a.jsoc.Notify('jsoc@cadair.com'),
a.jsoc.Protocol("as-is"))

path = tempfile.mkdtemp()

with mock.patch(
"parfive.Downloader.download",
new_callable=mock.MagicMock
) as download:

download.side_effect = ["Mocked Downloader"]

with pytest.warns(SunpyUserWarning):
client.fetch(responses, path=path, max_conn=5, max_splits=5)

assert download.called
11 changes: 10 additions & 1 deletion sunpy/util/scraper.py
Expand Up @@ -75,6 +75,10 @@ def __init__(self, pattern, **kwargs):
end=self.pattern[milliseconds.end():]
))

self.append_login = True
if 'append_login' in kwargs:
self.append_login = kwargs['append_login']

def matches(self, filepath, date):
return date.strftime(self.pattern) == filepath

Expand Down Expand Up @@ -263,8 +267,13 @@ def _ftpfileslist(self, timerange):
if (datehref >= timerange.start and
datehref <= timerange.end):
filesurls.append(fullpath)
filesurls = ['ftp://anonymous:data@sunpy.org@' + "{0.netloc}{0.path}".format(urlsplit(url))

login = ''
if self.append_login:
login = "anonymous:data@sunpy.org@"
filesurls = [f'ftp://{login}' + "{0.netloc}{0.path}".format(urlsplit(url))
for url in filesurls]

return filesurls

def _localfilelist(self, timerange):
Expand Down

0 comments on commit 7334d78

Please sign in to comment.