/
api.py
1497 lines (1179 loc) · 49.1 KB
/
api.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""
A Disdat API for creating, publishing, and finding bundles.
These calls are not thread safe. If they operate on a context, they require the user to specify the context.
This is unlike the CLI that maintains state on disk that keeps track of your current context between calls.
The API won't change the CLI's context and vice versa.
"""
import collections
import errno
import getpass
import hashlib
import os
import shutil
import urllib
import disdat.common as common
import disdat.fs
from disdat import logger as _logger
from disdat.data_context import DataContext
from disdat.hyperframe import HyperFrameRecord, LineageRecord, parse_return_val
from disdat.utility.aws_s3 import s3_path_exists
PROC_ID_TRUNCATE_HASH = 10 # 10 ls hex digits
def _get_fs():
"""Initializing FS, which needs a config.
These are both singletons.
TODO: Do we need the config instance here? Most calls in fs / dockerize
TODO: explicitly ask for an instance anyhow.
Returns:
`fs.DisdatFS`
"""
disdat.fs.DisdatConfig.instance()
return disdat.fs.DisdatFS()
def set_aws_profile(aws_profile):
os.environ["AWS_PROFILE"] = aws_profile
class Bundle(HyperFrameRecord):
def __init__(
self,
local_context,
name=None,
data=None,
processing_name=None,
owner=None,
tags=None,
params=None,
dependencies=None,
code_method=None,
vc_info=None,
start_time=0,
stop_time=0,
):
"""Create a bundle in a local context.
There are three ways to create bundles:
1.) Create a bundle with a single call. Must include a data field!
b = api.Bundle('examples', name='propensity_model',owner='fred',data='/Users/fred/model.tgz')
2.) Create a bundle using a context manager. The initial call requires only a context.
with api.Bundle('examples') as b:
b.add_data(file_list)
b.add_code_ref('mymodule.mymethod')
b.add_params({'path': path})
b.add_tags(tags)
Users can query the bundle object to create output files directly in the
referred-to context. They may also add tags, parameters, code and git info, and start/stop times.
Once the bundles is "closed" via the context manager, it will be written to disk and immutable.
Note that one may change anything about an "open" bundle except the context information.
3.) Open and close manually.
b = api.Bundle('examples').open()
b.add_data(file_list)
b.add_code_ref('mymodule.mymethod')
b.add_params({'path': path})
b.add_tags(tags)
b.close()
Default name: If you don't provide a name, Disdat tries to use the basename in `code_ref`.
Default processing_name: If you don't provide a processing name, Disdat will use a default that
takes into consideration your bundles upstream inputs, parameters, and code reference.
Args:
local_context (Union[str, `disdat.data_context.DataContext`): The local context name or context object
name (str): Human name for this bundle.
data (union(pandas.DataFrame, tuple, None, list, dict)): The data this bundle contains.
processing_name (str): A name that indicates a bundle was made in an identical fashion.
owner (str): The owner of the bundle. Default getpass.getuser()
tags (dict): (str,str) dictionary of arbitrary user tags.
params (dict(str:str)): Dictionary of parameters that <code_method> used to produce this output.
dependencies (dict(str:bundle)): Dictionary of argname: bundle, Bundles used to produce this output.
code_method (str): A reference to code that created this bundle. Default None
vc_info (tuple): Version control information triple: e.g. tuple(git_repo , git_commit, branch)
start_time (float): Start time of the process that produced the bundle. Default time.now()
stop_time (float): Stop time of the process that produced the bundle. Default time.now()
"""
self._fs = _get_fs()
try:
if isinstance(local_context, DataContext):
self.data_context = local_context
elif isinstance(local_context, str):
self.data_context = self._fs.get_context(local_context)
if self.data_context is None:
raise Exception(
"Unable to create Bundle: no context found with name[{}]".format(
local_context
)
)
else:
raise Exception(
"Unable to create Bundle: local_context is not str or DataContext"
)
except Exception as e:
_logger.error(
"Unable to allocate bundle in context: {} ".format(local_context, e)
)
return
self._local_dir = None
self._remote_dir = None
self._closed = False # Bundle is closed and immutable
self._data = None # The df, array, dictionary the user wants to store
super(Bundle, self).__init__(
human_name=name, #'' if name is None else name,
owner=getpass.getuser() if owner is None else owner,
processing_name=processing_name, #'' if processing_name is None else processing_name
)
# Add the fields they have passed in.
if tags is not None:
self.add_tags(tags)
if params is not None:
self.add_params(params)
if dependencies is not None:
self.add_dependencies(dependencies.values(), dependencies.keys())
if code_method is not None:
self.add_code_ref(code_method)
if vc_info is not None:
self.add_git_info(vc_info)
self.add_timing(start_time, stop_time)
# Only close and make immutable if the user also adds the data field
if data is not None:
self.open()
self.add_data(data)
self.close()
def __getstate__(self):
"""Manual serialization for pickling bundles
We need to remove references to the data context, the DisdatFS, and
underlying protobuf. Data contexts have DB connections, DisdatFS points
to the current data context, and protobufs should be serialized to string using
the protobuf serializer.
"""
state = self.__dict__.copy()
# convert the data context object to a name
state["data_context"] = self.data_context.get_local_name()
# no need to carry around the fs
del state["_fs"]
# convert underlying pb to a string
state["pb"] = self.pb.SerializeToString()
return state
def __setstate__(self, state):
"""Deserialize the context, fs, and pb fields"""
self.__dict__.update(state)
# Restore a pointer to DisdatFS
self._fs = _get_fs()
# Restore a pointer to the data context
assert isinstance(self.data_context, str)
self.data_context = self._fs.get_context(self.data_context)
# Restore pb object
pb = self._pb_type()
pb.ParseFromString(self.pb)
self.pb = pb
def abandon(self):
"""Remove on-disk state of the bundle if it is abandoned before it is closed.
that were left !closed have their directories harvested.
NOTE: the user has the responsibility to make sure the bundle is not shared across
threads or processes and that they don't remove a directory out from under another
thread of control. E.g., you cannot place this code in __del__ and then _check_closed() b/c
a forked child process might have closed their copy while the parent deletes theirs.
"""
self._check_open()
_logger.debug(
f"Disdat api abandon bundle obj [{id(self)}] process[{os.getpid()}] uuid[{self.uuid}]"
)
try:
shutil.rmtree(self._local_dir, ignore_errors=True)
os.rmdir(self._local_dir)
# TODO: if people create s3 files, s3 file targets, inside of an s3 context,
# TODO: then we will have to clean those up as well.
except IOError as why:
_logger.error(
"Removal of bundle directory {} failed with error {}. Continuing removal...".format(
self._local_dir, why
)
)
def _check_open(self):
assert not self._closed, "Bundle must be open (not closed) for editing."
def _check_closed(self):
assert self._closed, "Bundle must be closed."
""" Getters """
@property
def closed(self):
return self._closed
@property
def data(self):
"""Return the presented data if closed (refreshing in case of localization / delocalization).
Or return the data currently attached to an open bundle (if not closed).
"""
if self._closed:
assert self.is_presentable
self._data = self.data_context.present_hfr(self) # actualize link urls
return self._data
@property
def name(self):
return self.pb.human_name
@property
def processing_name(self):
return self.pb.processing_name
@property
def owner(self):
return self.pb.owner
@property
def uuid(self):
return self.pb.uuid
@property
def creation_date(self):
return self.pb.lineage.creation_date
@property
def local_dir(self):
return self._local_dir
@property
def remote_dir(self):
return self._remote_dir
@property
def tags(self):
"""Return the tags that the user set
bundle.tags holds all of the tags, including the "hidden" parameter tags.
This accesses everything but the parameter tags.
bundle.params accesses everything but the user tags
"""
return {
k: v
for k, v in self.tag_dict.items()
if not k.startswith(common.BUNDLE_TAG_PARAMS_PREFIX)
}
@property
def params(self):
"""Return the tags that were parameters
This returns the string version of the parameters (how they were serialized into the bundle)
Note that we currently use Luigi Parameter parse and serialize to go from string and to string.
Luigi does so to interpret command-line arguments.
"""
return {
k[len(common.BUNDLE_TAG_PARAMS_PREFIX) :]: v
for k, v in self.tag_dict.items()
if k.startswith(common.BUNDLE_TAG_PARAMS_PREFIX)
}
@property
def dependencies(self):
"""Return the argnames and bundles used to produce this bundle.
Note: We do not return a bundle reference because it may not
be found in this context, but it remains a valid reference.
NOTE: At the moment, returns key=processing_name: value=uuid
Returns:
dict: (arg_name:(proc_name, uuid))
"""
found = {}
arg_names = [
"_arg_{}".format(i) for i in range(0, len(self.pb.lineage.depends_on))
]
for an, dep in zip(arg_names, self.pb.lineage.depends_on):
try:
if dep.arg_name:
found[dep.arg_name] = (dep.hframe_proc_name, dep.hframe_uuid)
else:
found[an] = (dep.hframe_proc_name, dep.hframe_uuid)
except ValueError as ve:
found[an] = (dep.hframe_proc_name, dep.hframe_uuid)
return found
@property
def code_ref(self):
"""
Returns:
str: The string representing the name of the code that produced this bundle
"""
return self.pb.lineage.code_method
@property
def timing(self):
"""Return the recorded start and stop times
Returns:
(float, float): (start_time, stop_time)
"""
return self.pb.lineage.start_time, self.pb.lineage.stop_time
@property
def git_info(self):
"""Return the recorded code versioning information. Assumes
a repo URL, commit hash, and branch name.
Returns:
(str, str, str): (repo, hash, branch)
"""
return (
self.pb.lineage.code_repo,
self.pb.lineage.code_hash,
self.pb.lineage.code_branch,
)
@property
def is_presentable(self):
"""Bundles present as a set of possible type or just a HyperFrame.
If there is a Python presentation, return True.
Returns:
(bool): Where this bundle has a Python presentation
"""
return super(Bundle, self).is_presentable()
""" Setters """
@name.setter
def name(self, name):
"""Add the name to the bundle
Args:
name (str): The "human readable" name of this data
Returns:
self
"""
self.pb.human_name = name
@processing_name.setter
def processing_name(self, processing_name):
"""Add the processing name to the bundle
Args:
processing_name (str): Another way to denote versioning "sameness"
Returns:
self
"""
self.pb.processing_name = processing_name
self.pb.lineage.hframe_proc_name = processing_name
def add_tags(self, tags):
"""Add tags to the bundle. Updates if existing.
Args:
k,v (dict): string:string dictionary
Returns:
self
"""
self._check_open()
super(Bundle, self).add_tags(tags)
return self
def add_params(self, params):
"""Add (str,str) params to bundle"""
self._check_open()
assert isinstance(params, dict)
params = {f"{common.BUNDLE_TAG_PARAMS_PREFIX}{k}": v for k, v in params.items()}
super(Bundle, self).add_tags(params)
return self
def add_dependencies(self, bundles, arg_names=None):
"""Add one or more upstream bundles as dependencies
Note: Metadata for correct re-use of re-execution semantics.
Args:
bundles (Union[list `api.Bundle`, `api.Bundle`]): Another bundle that may have been used to produce this one
arg_names (Union[list str, str]): Optional argument names of the dependencies. Default 'arg_<i>' used.
Returns:
self
"""
self._check_open()
curr_count = LineageRecord.dependency_count(self.pb.lineage)
if isinstance(bundles, collections.Iterable):
if arg_names is None:
arg_names = [
"_arg_{}".format(i)
for i in range(0 + curr_count, len(bundles) + curr_count)
]
LineageRecord.add_deps_to_lr(
self.pb.lineage,
[(b.processing_name, b.uuid, an) for an, b in zip(arg_names, bundles)],
)
else:
if arg_names is None:
arg_names = "_arg_{}".format(curr_count)
LineageRecord.add_deps_to_lr(
self.pb.lineage, [(bundles.processing_name, bundles.uuid, arg_names)]
)
return self
def add_code_ref(self, code_ref):
"""Add a string referring to the code
that generated this data. For example, if Python, one could use
"package.module.class.method"
Note: Optional metadata for lineage.
Args:
code_ref (str): String that refers to the code generating the data
Returns:
self
"""
self._check_open()
self.pb.lineage.code_method = code_ref
return self
def add_git_info(self, repo, commit, branch):
"""Add a string referring to the code
that generated this data. For example, if Python, one could use
"package.module.class.method"
Note: Optional metadata for lineage.
Args:
repo (str): Repository URL: e.g., "https://github.com/bamboozle/some-project"
commit (str): Commit hash: e.g., "010b867012ee3f45c63d0435f4af1dc87612a0fd"
branch (str): Repo branch: e.g., "HEAD"
Returns:
self
"""
self._check_open()
self.pb.lineage.code_repo = repo
self.pb.lineage.code_hash = commit
self.pb.lineage.code_branch = branch
return self
def add_timing(self, start_time, stop_time):
"""The start and end of the processing.
Note: Optional metadata.
Args:
start_time (float): start timestamp
stop_time (float): end timestamp
"""
self._check_open()
self.pb.lineage.start_time = start_time
self.pb.lineage.stop_time = stop_time
return self
def add_data(self, data):
"""Attach data to a bundle. The bundle must be open and not closed.
One attaches one data item to a bundle (dictionary, list, tuple, scalar, or dataframe).
Calling this replaces the latest item -- only the latest will be included in the bundle on close.
Note: One uses `add_data_row` or `add_data` but not both. Adding a row after `add_data`
removes the data. Using `add_data` after `add_data_row` removes all previously added rows.
Args:
data (list|tuple|dict|scalar|`pandas.DataFrame`):
Returns:
self
"""
self._check_open()
if self._data is not None:
_logger.warning("Disdat API add_data replacing existing data on bundle")
self._data = data
return self
""" Alternative construction post allocation """
def fill_from_hfr(self, hfr):
"""Given an internal hyperframe, copy out the information to this user-side Bundle object.
Assume the user has set the data_context appropriately when creating the bundle object
The Bundle object inherits from HyperFrameRecord. So we needs to:
a.) set the pb to point to this pb, they may both point to the same pb, but this bundle will be *closed*
b.) init internal state
c.) Set bundle object specific fields. Note we do not set or clear the self.data_context
Args:
hfr:
Returns:
self: this bundle object with self.data containing the kind of object the user saved.
"""
self._check_open()
self._closed = True
self.pb = hfr.pb
self.init_internal_state()
self._data = self.data_context.present_hfr(hfr)
self._local_dir, self._remote_dir = self.data_context.util_get_managed_paths(
self.uuid
)
return self
""" Python Context Manager Interface """
def __enter__(self):
"""'open'"""
return self.open()
def __exit__(self, exc_type, exc_val, exc_tb):
"""'close'
If there has been an exception, let the user deal with the written created bundle.
"""
self.close()
def open(self, force_uuid=None):
"""Management operations to open bundle for writing.
At this time all of the open operations, namely creating the managed path
occur in the default constructor or in the class fill_from_hfr constructor.
Args:
force_uuid (str): DEPRECATING - do not use. Force to open a bundle with a specific bundle.
Returns:
Bundle
"""
if self._closed:
_logger.error("Bundle is closed -- unable to re-open.")
assert False
self._local_dir, self.pb.uuid, self._remote_dir = (
self.data_context.make_managed_path(uuid=force_uuid)
)
return self
def close(self):
"""Write out this bundle as a hyperframe.
Parse the data, set presentation, create lineage, and
write to disk.
This closes the bundle so it may not be re-used.
Returns:
None
"""
def extract_human_name(code_ref):
return code_ref.split(".")[-1]
try:
presentation, frames = parse_return_val(
self.uuid, self._data, self.data_context
)
self.add_frames(frames)
self.pb.presentation = presentation
assert (
self.uuid != ""
), "Disdat API Error: Cannot close a bundle without a UUID."
self.pb.lineage.hframe_uuid = self.uuid
if self.name == "":
if self.code_ref != "":
self.name = extract_human_name(self.code_ref)
if self.processing_name == "":
self.processing_name = self.default_processing_name()
self._mod_finish(
new_time=True
) # Set the hash based on all the context now in the pb, record create time
self.data_context.write_hframe_local(self)
except Exception as error:
"""If we fail for any reason, remove bundle dir and raise"""
self.abandon()
raise
self._closed = True
return self
""" Convenience Routines """
def cat(self):
"""Return the data in the bundle
The data is already present in .data
"""
self._check_closed()
return self._data
def rm(self):
"""Remove bundle from the current context associated with this bundle object
Only remove this bundle with this uuid.
This only makes sense if the bundle is closed.
"""
self._check_closed()
self._fs.rm(uuid=self.uuid, data_context=self.data_context)
return self
def rmr(self):
"""Remove bundle from the current remote context associated with this bundle object
Only remove this bundle with this uuid.
WARNING: If all of your links are delocalized, this will permanently remove *all* of your data!
"""
self._check_closed()
self._fs.rmr(uuid=self.uuid, data_context=self.data_context)
return self
def commit(self):
"""Shortcut version of api.commit(uuid=bundle.uuid)
Returns:
self
"""
self._fs.commit(None, None, uuid=self.uuid, data_context=self.data_context)
return self
def push(self, delocalize=False):
"""Shortcut version of api.push(uuid=bundle.uuid)
Args:
delocalize (bool): Remove all local files, default False
Returns:
(`disdat.api.Bundle`): this bundle
"""
self._check_closed()
if self.data_context.get_remote_object_dir() is None:
raise RuntimeError(
"Not pushing: Current branch '{}/{}' has no remote".format(
self.data_context.get_remote_name(),
self.data_context.get_local_name(),
)
)
self._fs.push(
uuid=self.uuid, data_context=self.data_context, delocalize=delocalize
)
# if we removed files, update the the presentation
if delocalize:
assert self.is_presentable
self._data = self.data_context.present_hfr(self) # actualize link urls
return self
def pull(self, localize=False):
"""Shortcut version of api.pull()
Note if localizing, then we update this bundle to reflect the possibility of new, local links
"""
self._check_closed()
self._fs.pull(uuid=self.uuid, localize=localize, data_context=self.data_context)
# if we pulled files, update the the presentation
if localize:
assert self.is_presentable
self._data = self.data_context.present_hfr(self) # actualize link urls
return self
def localize(self, path):
"""Given a remote managed path in a bundle, retrieve the file.
A local path should already be localized, but in case the user keeps a
local path around, double check that it is local.
Assumes the bundle has been pushed via b.push()
Args:
path (str): The managed s3 path
Returns:
str: The localized path
"""
self._check_closed()
scheme = urllib.parse.urlparse(path).scheme
assert (
scheme == "s3"
), "Bundle.localize: Bad scheme {}, requires an s3 path".format(scheme)
assert (
self._remote_dir is not None
), "Bundle.localize failed, bundle created in context without remote."
assert path.startswith(
self._remote_dir
), "Bundle.localize failed, {} is not remote managed path in {}".format(
path, self._remote_dir
)
# Pull the one file.
new_path = self.data_context.copy_in_files(
path, urllib.parse.urljoin("file:", self._local_dir)
)
# Update the presentation
assert self.is_presentable
self._data = self.data_context.present_hfr(self) # actualize link urls
return new_path
def delocalize(self, path):
"""Given a local managed path in a bundle, delete the local copy.
Tests that the metadata and this item have been pushed.
Args:
path: A managed local or s3 path.
force(bool): Delete the file without checking if the bundle is at the current remote.
Returns:
str: The path to the non-localized file or None if force is set and no remote available.
"""
self._check_closed()
scheme = urllib.parse.urlparse(path).scheme
assert (
scheme == "" or scheme == "file"
), "Bundle.delocalize: Bad scheme {}, requires a local path".format(scheme)
assert (
self._remote_dir is not None
), "Bundle.delocalize failed, bundle created in context without remote."
assert path.startswith(
self._local_dir
), "Bundle.delocalize failed, {} is not a managed path in {}".format(
path, self._remote_dir
)
hframe_file = os.path.join(self._remote_dir, "{}_hframe.pb".format(self.uuid))
assert s3_path_exists(
hframe_file
), "Bundle.delocalize failed. Bundle must be pushed to remote before use."
# Push the one file. This is a safety call.
# If we can't guarantee that we've made a copy, we don't remove the local copy.
new_path = self.data_context.copy_in_files(
urllib.parse.urljoin("file:", path), self._remote_dir
)
# Delete local copy
try:
os.remove(path)
except IOError as e:
print("delocalize: unable to remove {} due to {}".format(path, e))
# Update the presentation
assert self.is_presentable
self._data = self.data_context.present_hfr(self) # actualize link urls
return new_path
def get_directory(self, dir_name):
"""Returns path `<disdat-managed-directory>/<dir_name>`. This gives the user a local
output directory into which to write files. This is useful when a user needs to give an external tool, such
as Spark or Tensorflow, a directory to place output files. After this call, the directory
will exist in the local context.
It is the user's responsibility to add individual file links to the bundle.
It is an error to add a directory as a file link.
Arguments:
dir_name (str): A basedir of a directory to appear in the local bundle.
Returns:
str: A directory path managed by disdat
"""
self._check_open()
if dir_name[-1] == "/":
dir_name = dir_name[:-1]
# if the user erroneously passes in the directory of the bundle, return same
if dir_name == self._local_dir:
return self._local_dir
fqp = os.path.join(self._local_dir, dir_name.lstrip("/"))
try:
os.makedirs(fqp)
except OSError as why:
if not why.errno == errno.EEXIST:
_logger.error(
"Creating directory in bundle directory failed errno {}".format(
why.strerror
)
)
raise
except IOError as why:
_logger.error(
"Creating directory in bundle directory failed {}".format(why)
)
raise
return fqp
def get_file(self, filename):
"""Create a "managed path" to store file `filename` directly in the local data context.
This allows you to create versioned data sets without file copies and without worrying about where
your data files are to be stored.
To use, you must:
a.) Write data into this location
b.) Add this target to the bundle by including this file path
TODO: for binary files add a binary=True, and return luigi.LocalTarget('test.npz', format=luigi.format.Nop)
Arguments:
filename (str,list,dict): filename to create in the bundle
Returns:
str
"""
self._check_open()
return os.path.join(self._local_dir, filename)
def get_remote_directory(self, dir_name):
"""Returns path `<disdat-managed-remote-directory>/<dir_name>`. This gives the user a remote (e.g., s3)
output directory into which to write files. This is useful when a user needs to give an external tool, such
as Spark or Tensorflow, a directory to place output files.
It is the user's responsibility to add individual file links to the bundle.
It is an error to add a directory as a file link.
Arguments:
dir_name (str): A basedir of a directory to appear in the remote bundle.
Returns:
str: A directory path managed by disdat
"""
self._check_open()
fqp = os.path.join(self._remote_dir, dir_name.lstrip("/"))
return fqp
def get_remote_file(self, filename):
"""Create a "managed path" to store file `filename` directly in the remote data context.
This allows you to create versioned data sets without file copies.
To use, you must a.) write data into this file-like object (a 'target'), and b.) add this
target to the bundle by either including its file path or the luigi target object itself.
Note: This requires that the local context has been bound to a remote data context.
Arguments:
filename (str,list,dict): filename to create in the bundle
Returns:
`luigi.s3.S3Target`
"""
self._check_open()
if not self.data_context.remote_ctxt_url:
raise Exception("Managed S3 path creation requires a remote context")
return os.path.join(self._remote_dir, filename)
@staticmethod
def calc_default_processing_name(code_ref, params, dep_proc_ids):
"""
Args:
code_ref (str): The code ref string
params (dict): argname:str
dep_proc_ids (dict): dict of argname:processing_id of upstream dependencies
Returns:
(str): The processing name
"""
def sorted_md5(input_dict):
ids = [input_dict[k] for k in sorted(input_dict)]
as_one_str = ".".join(ids)
return hashlib.md5(as_one_str.encode("utf-8")).hexdigest()
dep_hash = sorted_md5(dep_proc_ids)[:PROC_ID_TRUNCATE_HASH]
param_hash = sorted_md5(params)[:PROC_ID_TRUNCATE_HASH]
processing_id = code_ref + "_" + param_hash + "_" + dep_hash
return processing_id
def default_processing_name(self):
"""The default processing name defines the set of bundles that were ostensibly created
from the same code and parameters. These bundles share the
same code_ref, parameters, and dependency processing_names. Thus different versions
might very well be different because the code changed (but not the code_ref). Or the code
might read an external DB whose tables change.
The name is <code_ref>_<param_hash>_<hash(dependencies.processing_name)
Note: Cache the processing name so other bundles
Returns:
processing_name(str): The processing name.
"""
if self._closed:
return self.pb.processing_name
# Iterate through dependencies dict argname: (processing_name, uuid)
dep_proc_ids = {k: v[0] for k, v in self.dependencies.items()}
return Bundle.calc_default_processing_name(
self.code_ref, self.params, dep_proc_ids
)
def _get_context(context_name):
"""Retrieve data context given name. Raise exception if not found.
Args:
context_name(str): <remote context>/<local context> or <local context>
Returns:
(`disdat.data_context.DataContext`)
"""
fs = _get_fs()
data_context = fs.get_context(context_name)
if data_context is None:
# Try once to see if needs to be loaded
data_context = fs.reload_context(context_name)
if data_context is None:
error_msg = "Unable to perform operation: could not find context {}".format(
context_name
)
_logger.error(error_msg)
raise RuntimeError(error_msg)
return data_context
def init():
"""Initialize disdat with a local context directory and default configs
Returns:
None
"""
disdat.fs.DisdatConfig.init()
def current_context():
"""Return the current context name (not object)"""
fs = _get_fs()
try:
return fs.curr_context_name
except Exception as se:
print(("Current context failed due to error: {}".format(se)))
return None
def ls_contexts():
"""Return list of contexts and their remotes