@@ -63,22 +63,20 @@ def __repr__(self) -> str:
63
63
def _open_files (
64
64
url_mapping : Mapping [str , Union [DataGranule , None ]],
65
65
fs : fsspec .AbstractFileSystem ,
66
- threads : int = 8 ,
66
+ * ,
67
67
pqdm_kwargs : Optional [Mapping [str , Any ]] = None ,
68
68
) -> List [fsspec .spec .AbstractBufferedFile ]:
69
69
def multi_thread_open (data : tuple [str , Optional [DataGranule ]]) -> EarthAccessFile :
70
70
url , granule = data
71
71
return EarthAccessFile (fs .open (url ), granule ) # type: ignore
72
72
73
73
pqdm_kwargs = {
74
- "exception_behavior" : "immediate" ,
74
+ "exception_behaviour" : "immediate" ,
75
+ "n_jobs" : 8 ,
75
76
** (pqdm_kwargs or {}),
76
77
}
77
78
78
- fileset = pqdm (
79
- url_mapping .items (), multi_thread_open , n_jobs = threads , ** pqdm_kwargs
80
- )
81
- return fileset
79
+ return pqdm (url_mapping .items (), multi_thread_open , ** pqdm_kwargs )
82
80
83
81
84
82
def make_instance (
@@ -344,6 +342,7 @@ def open(
344
342
self ,
345
343
granules : Union [List [str ], List [DataGranule ]],
346
344
provider : Optional [str ] = None ,
345
+ * ,
347
346
pqdm_kwargs : Optional [Mapping [str , Any ]] = None ,
348
347
) -> List [fsspec .spec .AbstractBufferedFile ]:
349
348
"""Returns a list of file-like objects that can be used to access files
@@ -361,14 +360,15 @@ def open(
361
360
A list of "file pointers" to remote (i.e. s3 or https) files.
362
361
"""
363
362
if len (granules ):
364
- return self ._open (granules , provider , pqdm_kwargs )
363
+ return self ._open (granules , provider , pqdm_kwargs = pqdm_kwargs )
365
364
return []
366
365
367
366
@singledispatchmethod
368
367
def _open (
369
368
self ,
370
369
granules : Union [List [str ], List [DataGranule ]],
371
370
provider : Optional [str ] = None ,
371
+ * ,
372
372
pqdm_kwargs : Optional [Mapping [str , Any ]] = None ,
373
373
) -> List [Any ]:
374
374
raise NotImplementedError ("granules should be a list of DataGranule or URLs" )
@@ -378,7 +378,8 @@ def _open_granules(
378
378
self ,
379
379
granules : List [DataGranule ],
380
380
provider : Optional [str ] = None ,
381
- threads : int = 8 ,
381
+ * ,
382
+ pqdm_kwargs : Optional [Mapping [str , Any ]] = None ,
382
383
) -> List [Any ]:
383
384
fileset : List = []
384
385
total_size = round (sum ([granule .size () for granule in granules ]) / 1024 , 2 )
@@ -411,7 +412,7 @@ def _open_granules(
411
412
fileset = _open_files (
412
413
url_mapping ,
413
414
fs = s3_fs ,
414
- threads = threads ,
415
+ pqdm_kwargs = pqdm_kwargs ,
415
416
)
416
417
except Exception as e :
417
418
raise RuntimeError (
@@ -420,19 +421,19 @@ def _open_granules(
420
421
f"Exception: { traceback .format_exc ()} "
421
422
) from e
422
423
else :
423
- fileset = self ._open_urls_https (url_mapping , threads = threads )
424
- return fileset
424
+ fileset = self ._open_urls_https (url_mapping , pqdm_kwargs = pqdm_kwargs )
425
425
else :
426
426
url_mapping = _get_url_granule_mapping (granules , access = "on_prem" )
427
- fileset = self ._open_urls_https (url_mapping , threads = threads )
428
- return fileset
427
+ fileset = self ._open_urls_https (url_mapping , pqdm_kwargs = pqdm_kwargs )
428
+
429
+ return fileset
429
430
430
431
@_open .register
431
432
def _open_urls (
432
433
self ,
433
434
granules : List [str ],
434
435
provider : Optional [str ] = None ,
435
- threads : int = 8 ,
436
+ * ,
436
437
pqdm_kwargs : Optional [Mapping [str , Any ]] = None ,
437
438
) -> List [Any ]:
438
439
fileset : List = []
@@ -460,7 +461,6 @@ def _open_urls(
460
461
fileset = _open_files (
461
462
url_mapping ,
462
463
fs = s3_fs ,
463
- threads = threads ,
464
464
pqdm_kwargs = pqdm_kwargs ,
465
465
)
466
466
except Exception as e :
@@ -481,15 +481,16 @@ def _open_urls(
481
481
raise ValueError (
482
482
"We cannot open S3 links when we are not in-region, try using HTTPS links"
483
483
)
484
- fileset = self ._open_urls_https (url_mapping , threads , pqdm_kwargs )
484
+ fileset = self ._open_urls_https (url_mapping , pqdm_kwargs = pqdm_kwargs )
485
485
return fileset
486
486
487
487
def get (
488
488
self ,
489
489
granules : Union [List [DataGranule ], List [str ]],
490
- local_path : Union [Path , str , None ] = None ,
490
+ local_path : Optional [ Union [Path , str ] ] = None ,
491
491
provider : Optional [str ] = None ,
492
492
threads : int = 8 ,
493
+ * ,
493
494
pqdm_kwargs : Optional [Mapping [str , Any ]] = None ,
494
495
) -> List [str ]:
495
496
"""Retrieves data granules from a remote storage system.
@@ -503,7 +504,11 @@ def get(
503
504
504
505
Parameters:
505
506
granules: A list of granules(DataGranule) instances or a list of granule links (HTTP).
506
- local_path: Local directory to store the remote data granules.
507
+ local_path: Local directory to store the remote data granules. If not
508
+ supplied, defaults to a subdirectory of the current working directory
509
+ of the form `data/YYYY-MM-DD-UUID`, where `YYYY-MM-DD` is the year,
510
+ month, and day of the current date, and `UUID` is the last 6 digits
511
+ of a UUID4 value.
507
512
provider: a valid cloud provider, each DAAC has a provider code for their cloud distributions
508
513
threads: Parallel number of threads to use to download the files;
509
514
adjust as necessary, default = 8.
@@ -514,26 +519,28 @@ def get(
514
519
Returns:
515
520
List of downloaded files
516
521
"""
522
+ if not granules :
523
+ raise ValueError ("List of URLs or DataGranule instances expected" )
524
+
517
525
if local_path is None :
518
- today = datetime .datetime .today ().strftime ("%Y-%m-%d" )
526
+ today = datetime .datetime .now ().strftime ("%Y-%m-%d" )
519
527
uuid = uuid4 ().hex [:6 ]
520
528
local_path = Path .cwd () / "data" / f"{ today } -{ uuid } "
521
- elif isinstance (local_path , str ):
522
- local_path = Path (local_path )
523
529
524
- if len (granules ):
525
- files = self ._get (granules , local_path , provider , threads , pqdm_kwargs )
526
- return files
527
- else :
528
- raise ValueError ("List of URLs or DataGranule instances expected" )
530
+ pqdm_kwargs = {
531
+ "n_jobs" : threads ,
532
+ ** (pqdm_kwargs or {}),
533
+ }
534
+
535
+ return self ._get (granules , Path (local_path ), provider , pqdm_kwargs = pqdm_kwargs )
529
536
530
537
@singledispatchmethod
531
538
def _get (
532
539
self ,
533
540
granules : Union [List [DataGranule ], List [str ]],
534
541
local_path : Path ,
535
542
provider : Optional [str ] = None ,
536
- threads : int = 8 ,
543
+ * ,
537
544
pqdm_kwargs : Optional [Mapping [str , Any ]] = None ,
538
545
) -> List [str ]:
539
546
"""Retrieves data granules from a remote storage system.
@@ -566,7 +573,7 @@ def _get_urls(
566
573
granules : List [str ],
567
574
local_path : Path ,
568
575
provider : Optional [str ] = None ,
569
- threads : int = 8 ,
576
+ * ,
570
577
pqdm_kwargs : Optional [Mapping [str , Any ]] = None ,
571
578
) -> List [str ]:
572
579
data_links = granules
@@ -590,7 +597,7 @@ def _get_urls(
590
597
else :
591
598
# if we are not in AWS
592
599
return self ._download_onprem_granules (
593
- data_links , local_path , threads , pqdm_kwargs
600
+ data_links , local_path , pqdm_kwargs = pqdm_kwargs
594
601
)
595
602
596
603
@_get .register
@@ -599,7 +606,7 @@ def _get_granules(
599
606
granules : List [DataGranule ],
600
607
local_path : Path ,
601
608
provider : Optional [str ] = None ,
602
- threads : int = 8 ,
609
+ * ,
603
610
pqdm_kwargs : Optional [Mapping [str , Any ]] = None ,
604
611
) -> List [str ]:
605
612
data_links : List = []
@@ -615,7 +622,7 @@ def _get_granules(
615
622
for granule in granules
616
623
)
617
624
)
618
- total_size = round (sum ([ granule .size () for granule in granules ] ) / 1024 , 2 )
625
+ total_size = round (sum (granule .size () for granule in granules ) / 1024 , 2 )
619
626
logger .info (
620
627
f" Getting { len (granules )} granules, approx download size: { total_size } GB"
621
628
)
@@ -642,7 +649,7 @@ def _get_granules(
642
649
# if the data are cloud-based, but we are not in AWS,
643
650
# it will be downloaded as if it was on prem
644
651
return self ._download_onprem_granules (
645
- data_links , local_path , threads , pqdm_kwargs
652
+ data_links , local_path , pqdm_kwargs = pqdm_kwargs
646
653
)
647
654
648
655
def _download_file (self , url : str , directory : Path ) -> str :
@@ -684,7 +691,7 @@ def _download_onprem_granules(
684
691
self ,
685
692
urls : List [str ],
686
693
directory : Path ,
687
- threads : int = 8 ,
694
+ * ,
688
695
pqdm_kwargs : Optional [Mapping [str , Any ]] = None ,
689
696
) -> List [Any ]:
690
697
"""Downloads a list of URLS into the data directory.
@@ -711,25 +718,26 @@ def _download_onprem_granules(
711
718
712
719
arguments = [(url , directory ) for url in urls ]
713
720
714
- results = pqdm (
715
- arguments ,
716
- self ._download_file ,
717
- n_jobs = threads ,
718
- argument_type = "args" ,
719
- ** pqdm_kwargs ,
720
- )
721
- return results
721
+ pqdm_kwargs = {
722
+ "exception_behaviour" : "immediate" ,
723
+ ** (pqdm_kwargs or {}),
724
+ # We don't want a user to be able to override the following kwargs,
725
+ # which is why they appear *after* spreading pqdm_kwargs above.
726
+ "argument_type" : "args" ,
727
+ }
728
+
729
+ return pqdm (arguments , self ._download_file , ** pqdm_kwargs )
722
730
723
731
def _open_urls_https (
724
732
self ,
725
733
url_mapping : Mapping [str , Union [DataGranule , None ]],
726
- threads : int = 8 ,
734
+ * ,
727
735
pqdm_kwargs : Optional [Mapping [str , Any ]] = None ,
728
736
) -> List [fsspec .AbstractFileSystem ]:
729
737
https_fs = self .get_fsspec_session ()
730
738
731
739
try :
732
- return _open_files (url_mapping , https_fs , threads , pqdm_kwargs )
740
+ return _open_files (url_mapping , https_fs , pqdm_kwargs = pqdm_kwargs )
733
741
except Exception :
734
742
logger .exception (
735
743
"An exception occurred while trying to access remote files via HTTPS"
0 commit comments