forked from datalad/datalad
/
gitrepo.py
4272 lines (3788 loc) · 161 KB
/
gitrepo.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
# emacs: -*- mode: python; py-indent-offset: 4; tab-width: 4; indent-tabs-mode: nil -*-
# ex: set sts=4 ts=4 sw=4 noet:
# ## ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ##
#
# See COPYING file distributed along with the datalad package for the
# copyright and license terms.
#
# ## ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ##
"""Internal low-level interface to Git repositories
"""
import re
import time
import os
import os.path as op
import logging
from collections import (
OrderedDict,
)
from os import linesep
from os.path import (
join as opj,
exists,
isabs,
commonprefix,
relpath,
dirname,
curdir,
pardir,
sep
)
import posixpath
import threading
from functools import wraps
from weakref import (
finalize,
WeakValueDictionary
)
from datalad.log import log_progress
from datalad.support.due import due, Doi
from datalad import ssh_manager
from datalad.cmd import (
GitWitlessRunner,
WitlessProtocol,
BatchedCommand,
NoCapture,
StdOutErrCapture,
)
from datalad.config import (
ConfigManager,
_parse_gitconfig_dump,
write_config_section,
)
from datalad.dochelpers import exc_str
import datalad.utils as ut
from datalad.utils import (
Path,
PurePosixPath,
ensure_list,
optional_args,
on_windows,
getpwd,
posix_relpath,
ensure_dir,
generate_file_chunks,
ensure_unicode,
is_interactive,
)
# imports from same module:
from .external_versions import external_versions
from .exceptions import (
CommandError,
FileNotInRepositoryError,
GitIgnoreError,
InvalidGitReferenceError,
InvalidGitRepositoryError,
NoSuchPathError,
PathKnownToRepositoryError,
)
from .network import (
RI,
PathRI,
is_ssh
)
from .path import get_parent_paths
from .repo import (
PathBasedFlyweight,
RepoInterface,
path_based_str_repr,
)
from datalad.core.local.repo import repo_from_path
# shortcuts
_curdirsep = curdir + sep
_pardirsep = pardir + sep
lgr = logging.getLogger('datalad.gitrepo')
def to_options(split_single_char_options=True, **kwargs):
"""Transform keyword arguments into a list of cmdline options
Imported from GitPython.
Original copyright:
Copyright (C) 2008, 2009 Michael Trier and contributors
Original license:
BSD 3-Clause "New" or "Revised" License
Parameters
----------
split_single_char_options: bool
kwargs:
Returns
-------
list
"""
def dashify(string):
return string.replace('_', '-')
def transform_kwarg(name, value, split_single_char_options):
if len(name) == 1:
if value is True:
return ["-%s" % name]
elif value not in (False, None):
if split_single_char_options:
return ["-%s" % name, "%s" % value]
else:
return ["-%s%s" % (name, value)]
else:
if value is True:
return ["--%s" % dashify(name)]
elif value is not False and value is not None:
return ["--%s=%s" % (dashify(name), value)]
return []
args = []
kwargs = OrderedDict(sorted(kwargs.items(), key=lambda x: x[0]))
for k, v in kwargs.items():
if isinstance(v, (list, tuple)):
for value in v:
args += transform_kwarg(k, value, split_single_char_options)
else:
args += transform_kwarg(k, v, split_single_char_options)
return args
def _normalize_path(base_dir, path):
"""Helper to check paths passed to methods of this class.
Checks whether `path` is beneath `base_dir` and normalizes it.
Additionally paths are converted into relative paths with respect to
`base_dir`, considering PWD in case of relative paths. This
is intended to be used in repository classes, which means that
`base_dir` usually will be the repository's base directory.
Parameters
----------
base_dir: str
directory to serve as base to normalized, relative paths
path: str
path to be normalized
Returns
-------
str:
path, that is a relative path with respect to `base_dir`
"""
if not path:
return path
pathobj = Path(path)
# do absolute() in addition to always get an absolute path
# even with non-existing base_dirs on windows
base_dir = str(Path(base_dir).resolve().absolute()) # realpath OK
# path = normpath(path)
# Note: disabled normpath, because it may break paths containing symlinks;
# But we don't want to realpath relative paths, in case cwd isn't the
# correct base.
if pathobj.is_absolute():
# path might already be a symlink pointing to annex etc,
# so realpath only its directory, to get "inline" with
# realpath(base_dir) above
path = str(pathobj.parent.resolve() / pathobj.name) # realpath OK
# Executive decision was made to not do this kind of magic!
#
# elif commonprefix([realpath(getpwd()), base_dir]) == base_dir:
# # If we are inside repository, rebuilt relative paths.
# path = opj(realpath(getpwd()), path)
#
# BUT with relative curdir/pardir start it would assume relative to curdir
#
elif path.startswith(_curdirsep) or path.startswith(_pardirsep):
path = str(Path(getpwd()).resolve() / pathobj) # realpath OK
else:
# We were called from outside the repo. Therefore relative paths
# are interpreted as being relative to self.path already.
return path
if commonprefix([path, base_dir]) != base_dir:
raise FileNotInRepositoryError(msg="Path outside repository: %s"
% base_dir, filename=path)
return relpath(path, start=base_dir)
@optional_args
def normalize_path(func):
"""Decorator to provide unified path conversion for a single file
Unlike normalize_paths, intended to be used for functions dealing with a
single filename at a time
Note
----
This is intended to be used within the repository classes and therefore
returns a class method!
The decorated function is expected to take a path at
first positional argument (after 'self'). Additionally the class `func`
is a member of, is expected to have an attribute 'path'.
"""
@wraps(func)
def _wrap_normalize_path(self, file_, *args, **kwargs):
file_new = _normalize_path(self.path, file_)
return func(self, file_new, *args, **kwargs)
return _wrap_normalize_path
@optional_args
def normalize_paths(func, match_return_type=True, map_filenames_back=False,
serialize=False):
"""Decorator to provide unified path conversions.
Note
----
This is intended to be used within the repository classes and therefore
returns a class method!
The decorated function is expected to take a path or a list of paths at
first positional argument (after 'self'). Additionally the class `func`
is a member of, is expected to have an attribute 'path'.
Accepts either a list of paths or a single path in a str. Passes a list
to decorated function either way, but would return based on the value of
match_return_type and possibly input argument.
If a call to the wrapped function includes normalize_path and it is False
no normalization happens for that function call (used for calls to wrapped
functions within wrapped functions, while possible CWD is within a
repository)
Parameters
----------
match_return_type : bool, optional
If True, and a single string was passed in, it would return the first
element of the output (after verifying that it is a list of length 1).
It makes easier to work with single files input.
map_filenames_back : bool, optional
If True and returned value is a dictionary, it assumes to carry entries
one per file, and then filenames are mapped back to as provided from the
normalized (from the root of the repo) paths
serialize : bool, optional
Loop through files giving only a single one to the function one at a time.
This allows to simplify implementation and interface to annex commands
which do not take multiple args in the same call (e.g. checkpresentkey)
"""
@wraps(func)
def _wrap_normalize_paths(self, files, *args, **kwargs):
normalize = _normalize_path if kwargs.pop('normalize_paths', True) \
else lambda rpath, filepath: filepath
if files:
if isinstance(files, str) or not files:
files_new = [normalize(self.path, files)]
single_file = True
elif isinstance(files, list):
files_new = [normalize(self.path, path) for path in files]
single_file = False
else:
raise ValueError("_files_decorator: Don't know how to handle "
"instance of %s." % type(files))
else:
single_file = None
files_new = []
if map_filenames_back:
def remap_filenames(out):
"""Helper to map files back to non-normalized paths"""
if isinstance(out, dict):
assert(len(out) == len(files_new))
files_ = [files] if single_file else files
mapped = out.__class__()
for fin, fout in zip(files_, files_new):
mapped[fin] = out[fout]
return mapped
else:
return out
else:
remap_filenames = lambda x: x
if serialize: # and not single_file:
result = [
func(self, f, *args, **kwargs)
for f in files_new
]
else:
result = func(self, files_new, *args, **kwargs)
if single_file is None:
# no files were provided, nothing we can do really
return result
elif (result is None) or not match_return_type or not single_file:
# If function doesn't return anything or no denormalization
# was requested or it was not a single file
return remap_filenames(result)
elif single_file:
if len(result) != 1:
# Magic doesn't apply
return remap_filenames(result)
elif isinstance(result, (list, tuple)):
return result[0]
elif isinstance(result, dict) and tuple(result)[0] == files_new[0]:
# assume that returned dictionary has files as keys.
return tuple(result.values())[0]
else:
# no magic can apply
return remap_filenames(result)
else:
return RuntimeError("should have not got here... check logic")
return _wrap_normalize_paths
if "2.24.0" <= external_versions["cmd:git"] < "2.25.0":
# An unintentional change in Git 2.24.0 led to `ls-files -o` traversing
# into untracked submodules when multiple pathspecs are given, returning
# repositories that are deeper than the first level. This helper filters
# these deeper levels out so that save_() doesn't fail trying to add them.
#
# This regression fixed with upstream's 072a231016 (2019-12-10).
def _prune_deeper_repos(repos):
firstlevel_repos = []
prev = None
for repo in sorted(repos):
if not (prev and str(repo).startswith(prev)):
prev = str(repo)
firstlevel_repos.append(repo)
return firstlevel_repos
else:
def _prune_deeper_repos(repos):
return repos
class GitProgress(WitlessProtocol):
"""Reduced variant of GitPython's RemoteProgress class
Original copyright:
Copyright (C) 2008, 2009 Michael Trier and contributors
Original license:
BSD 3-Clause "New" or "Revised" License
"""
# inform super-class to capture stderr
proc_err = True
_num_op_codes = 10
BEGIN, END, COUNTING, COMPRESSING, WRITING, RECEIVING, RESOLVING, FINDING_SOURCES, CHECKING_OUT, ENUMERATING = \
[1 << x for x in range(_num_op_codes)]
STAGE_MASK = BEGIN | END
OP_MASK = ~STAGE_MASK
DONE_TOKEN = 'done.'
TOKEN_SEPARATOR = ', '
_known_ops = {
COUNTING: ("Counting", "Objects"),
ENUMERATING: ("Enumerating", "Objects"),
COMPRESSING: ("Compressing", "Objects"),
WRITING: ("Writing", "Objects"),
RECEIVING: ("Receiving", "Objects"),
RESOLVING: ("Resolving", "Deltas"),
FINDING_SOURCES: ("Finding", "Sources"),
CHECKING_OUT: ("Check out", "Things"),
}
__slots__ = ('_seen_ops', '_pbars', '_encoding')
re_op_absolute = re.compile(r"(remote: )?([\w\s]+):\s+()(\d+)()(.*)")
re_op_relative = re.compile(r"(remote: )?([\w\s]+):\s+(\d+)% \((\d+)/(\d+)\)(.*)")
def __init__(self, *args):
super().__init__(*args)
self._unprocessed = None
def connection_made(self, transport):
super().connection_made(transport)
self._seen_ops = []
self._pbars = set()
def process_exited(self):
# take down any progress bars that were not closed orderly
for pbar_id in self._pbars:
log_progress(
lgr.info,
pbar_id,
'Finished',
)
super().process_exited()
def pipe_data_received(self, fd, byts):
# progress reports only come from stderr
if fd != 2:
# let the base class decide what to do with it
super().pipe_data_received(fd, byts)
return
for line in byts.splitlines(keepends=True):
# put any unprocessed content back in front
line = self._unprocessed + line if self._unprocessed else line
self._unprocessed = None
if not self._parse_progress_line(line):
# anything that doesn't look like a progress report
# is retained and returned
# in case of partial progress lines, this can lead to
# leakage of progress info into the output, but
# it is better to enable better (maybe more expensive)
# subsequent filtering than hidding lines with
# unknown, potentially important info
lgr.debug('Non-progress stderr: %s', line)
if line.endswith((b'\r', b'\n')):
# complete non-progress line, pass on
super().pipe_data_received(fd, line)
else:
# an incomplete line, maybe the next batch completes
# it to become a recognizable progress report
self._unprocessed = line
def _parse_progress_line(self, line):
"""Process a single line
Parameters
----------
line : bytes
Returns
-------
bool
Flag whether the line was recognized as a Git progress report.
"""
# handle
# Counting objects: 4, done.
# Compressing objects: 50% (1/2)
# Compressing objects: 100% (2/2)
# Compressing objects: 100% (2/2), done.
line = line.decode(self.encoding) if isinstance(line, bytes) else line
if line.startswith(('warning:', 'error:', 'fatal:')):
return False
# find escape characters and cut them away - regex will not work with
# them as they are non-ascii. As git might expect a tty, it will send them
last_valid_index = None
for i, c in enumerate(reversed(line)):
if ord(c) < 32:
# its a slice index
last_valid_index = -i - 1
# END character was non-ascii
# END for each character in line
if last_valid_index is not None:
line = line[:last_valid_index]
# END cut away invalid part
line = line.rstrip()
cur_count, max_count = None, None
match = self.re_op_relative.match(line)
if match is None:
match = self.re_op_absolute.match(line)
if not match:
return False
# END could not get match
op_code = 0
_remote, op_name, _percent, cur_count, max_count, message = match.groups()
# get operation id
if op_name == "Counting objects":
op_code |= self.COUNTING
elif op_name == "Compressing objects":
op_code |= self.COMPRESSING
elif op_name == "Writing objects":
op_code |= self.WRITING
elif op_name == 'Receiving objects':
op_code |= self.RECEIVING
elif op_name == 'Resolving deltas':
op_code |= self.RESOLVING
elif op_name == 'Finding sources':
op_code |= self.FINDING_SOURCES
elif op_name == 'Checking out files':
op_code |= self.CHECKING_OUT
elif op_name == 'Enumerating objects':
op_code |= self.ENUMERATING
else:
# Note: On windows it can happen that partial lines are sent
# Hence we get something like "CompreReceiving objects", which is
# a blend of "Compressing objects" and "Receiving objects".
# This can't really be prevented.
lgr.debug(
'Output line matched a progress report of an unknown type: %s',
line)
# TODO investigate if there is any chance that we might swallow
# important info -- until them do not flag this line
# as progress
return False
# END handle op code
pbar_id = 'gitprogress-{}-{}'.format(id(self), op_code)
op_props = self._known_ops[op_code]
# figure out stage
if op_code not in self._seen_ops:
self._seen_ops.append(op_code)
op_code |= self.BEGIN
log_progress(
lgr.info,
pbar_id,
'Start {} {}'.format(
op_props[0].lower(),
op_props[1].lower(),
),
label=op_props[0],
unit=' {}'.format(op_props[1]),
total=float(max_count) if max_count else None,
)
self._pbars.add(pbar_id)
# END begin opcode
if message is None:
message = ''
# END message handling
done_progress = False
message = message.strip()
if message.endswith(self.DONE_TOKEN):
op_code |= self.END
message = message[:-len(self.DONE_TOKEN)]
done_progress = True
# END end message handling
message = message.strip(self.TOKEN_SEPARATOR)
if cur_count and max_count:
log_progress(
lgr.info,
pbar_id,
line,
update=float(cur_count),
noninteractive_level=logging.DEBUG,
)
if done_progress:
log_progress(
lgr.info,
pbar_id,
'Finished {} {}'.format(
op_props[0].lower(),
op_props[1].lower(),
),
noninteractive_level=logging.DEBUG,
)
self._pbars.discard(pbar_id)
return True
class StdOutCaptureWithGitProgress(GitProgress):
proc_out = True
class FetchInfo(dict):
"""
dict that carries results of a fetch operation of a single head
Reduced variant of GitPython's RemoteProgress class
Original copyright:
Copyright (C) 2008, 2009 Michael Trier and contributors
Original license:
BSD 3-Clause "New" or "Revised" License
"""
NEW_TAG, NEW_HEAD, HEAD_UPTODATE, TAG_UPDATE, REJECTED, FORCED_UPDATE, \
FAST_FORWARD, ERROR = [1 << x for x in range(8)]
_re_fetch_result = re.compile(r'^\s*(.) (\[?[\w\s\.$@]+\]?)\s+(.+) [-> ]+ ([^\s]+)( \(.*\)?$)?')
_flag_map = {
'!': ERROR,
'+': FORCED_UPDATE,
'*': 0,
'=': HEAD_UPTODATE,
' ': FAST_FORWARD,
'-': TAG_UPDATE,
}
_operation_map = {
NEW_TAG: 'new-tag',
NEW_HEAD: 'new-branch',
HEAD_UPTODATE: 'uptodate',
TAG_UPDATE: 'tag-update',
REJECTED: 'rejected',
FORCED_UPDATE: 'forced-update',
FAST_FORWARD: 'fast-forward',
ERROR: 'error',
}
@classmethod
def _from_line(cls, line):
"""Parse information from the given line as returned by git-fetch -v
and return a new FetchInfo object representing this information.
"""
match = cls._re_fetch_result.match(line)
if match is None:
raise ValueError("Failed to parse line: %r" % line)
# parse lines
control_character, operation, local_remote_ref, remote_local_ref, note = \
match.groups()
# parse flags from control_character
flags = 0
try:
flags |= cls._flag_map[control_character]
except KeyError:
raise ValueError(
"Control character %r unknown as parsed from line %r"
% (control_character, line))
# END control char exception handling
# parse operation string for more info - makes no sense for symbolic refs,
# but we parse it anyway
old_commit = None
if 'rejected' in operation:
flags |= cls.REJECTED
if 'new tag' in operation:
flags |= cls.NEW_TAG
if 'tag update' in operation:
flags |= cls.TAG_UPDATE
if 'new branch' in operation:
flags |= cls.NEW_HEAD
if '...' in operation or '..' in operation:
split_token = '...'
if control_character == ' ':
split_token = split_token[:-1]
old_commit = operation.split(split_token)[0]
# END handle refspec
return cls(
ref=remote_local_ref.strip(),
local_ref=local_remote_ref.strip(),
# convert flag int into a list of operation labels
operations=[
cls._operation_map[o]
for o in cls._operation_map.keys()
if flags & o
],
note=note,
old_commit=old_commit,
)
class PushInfo(dict):
"""dict that carries results of a push operation of a single head
Reduced variant of GitPython's RemoteProgress class
Original copyright:
Copyright (C) 2008, 2009 Michael Trier and contributors
Original license:
BSD 3-Clause "New" or "Revised" License
"""
NEW_TAG, NEW_HEAD, NO_MATCH, REJECTED, REMOTE_REJECTED, REMOTE_FAILURE, DELETED, \
FORCED_UPDATE, FAST_FORWARD, UP_TO_DATE, ERROR = [1 << x for x in range(11)]
_flag_map = {'X': NO_MATCH,
'-': DELETED,
'*': 0,
'+': FORCED_UPDATE,
' ': FAST_FORWARD,
'=': UP_TO_DATE,
'!': ERROR}
_operation_map = {
NEW_TAG: 'new-tag',
NEW_HEAD: 'new-branch',
NO_MATCH: 'no-match',
REJECTED: 'rejected',
REMOTE_REJECTED: 'remote-rejected',
REMOTE_FAILURE: 'remote-failure',
DELETED: 'deleted',
FORCED_UPDATE: 'forced-update',
FAST_FORWARD: 'fast-forward',
UP_TO_DATE: 'uptodate',
ERROR: 'error',
}
@classmethod
def _from_line(cls, line):
"""Create a new PushInfo instance as parsed from line which is expected to be like
refs/heads/master:refs/heads/master 05d2687..1d0568e as bytes"""
control_character, from_to, summary = line.split('\t', 3)
flags = 0
# control character handling
try:
flags |= cls._flag_map[control_character]
except KeyError:
raise ValueError("Control character %r unknown as parsed from line %r" % (control_character, line))
# END handle control character
# from_to handling
from_ref_string, to_ref_string = from_to.split(':')
# commit handling, could be message or commit info
old_commit = None
if summary.startswith('['):
if "[rejected]" in summary:
flags |= cls.REJECTED
elif "[remote rejected]" in summary:
flags |= cls.REMOTE_REJECTED
elif "[remote failure]" in summary:
flags |= cls.REMOTE_FAILURE
elif "[no match]" in summary:
flags |= cls.ERROR
elif "[new tag]" in summary:
flags |= cls.NEW_TAG
elif "[new branch]" in summary:
flags |= cls.NEW_HEAD
# uptodate encoded in control character
else:
# fast-forward or forced update - was encoded in control character,
# but we parse the old and new commit
split_token = "..."
if control_character == " ":
split_token = ".."
old_sha, _new_sha = summary.split(' ')[0].split(split_token)
# have to use constructor here as the sha usually is abbreviated
old_commit = old_sha
# END message handling
return cls(
from_ref=from_ref_string.strip(),
to_ref=to_ref_string.strip(),
# convert flag int into a list of operation labels
operations=[
cls._operation_map[o]
for o in cls._operation_map.keys()
if flags & o
],
note=summary.strip(),
old_commit=old_commit,
)
@path_based_str_repr
class GitRepo(RepoInterface, metaclass=PathBasedFlyweight):
"""Representation of a git repository
"""
# We must check git config to have name and email set, but
# should do it once
_config_checked = False
# Begin Flyweight:
_unique_instances = WeakValueDictionary()
GIT_MIN_VERSION = "2.19.1"
git_version = None
def _flyweight_invalid(self):
return not self.is_valid_git()
@classmethod
def _flyweight_reject(cls, id_, *args, **kwargs):
# TODO:
# This is a temporary approach. See PR # ...
# create = kwargs.pop('create', None)
# kwargs.pop('path', None)
# if create and kwargs:
# # we have `create` plus options other than `path`
# return "Call to {0}() with args {1} and kwargs {2} conflicts " \
# "with existing instance {3}." \
# "This is likely to be caused by inconsistent logic in " \
# "your code." \
# "".format(cls, args, kwargs, cls._unique_instances[id_])
pass
# End Flyweight
def __hash__(self):
# the flyweight key is already determining unique instances
# add the class name to distinguish from strings of a path
return hash((self.__class__.__name__, self.__weakref__.key))
@classmethod
def _check_git_version(cls):
external_versions.check("cmd:git", min_version=cls.GIT_MIN_VERSION)
cls.git_version = external_versions['cmd:git']
# This is the least common denominator to claim that a user
# used DataLad.
# For now citing Zenodo's all (i.e., latest) version
@due.dcite(Doi("10.5281/zenodo.808846"),
# override path since there is no need ATM for such details
path="datalad",
description="DataLad - Data management and distribution platform")
def __init__(self, path, runner=None, create=True,
git_opts=None, repo=None, fake_dates=False,
create_sanity_checks=True,
**kwargs):
"""Creates representation of git repository at `path`.
Can also be used to create a git repository at `path`.
Parameters
----------
path: str
path to the git repository; In case it's not an absolute path,
it's relative to PWD
create: bool, optional
if true, creates a git repository at `path` if there is none. Also
creates `path`, if it doesn't exist.
If set to false, an exception is raised in case `path` doesn't exist
or doesn't contain a git repository.
repo: git.Repo, optional
This argument is ignored.
create_sanity_checks: bool, optional
Whether to perform sanity checks during initialization (when
`create=True` and target path is not a valid repo already), such as
that new repository is not created in the directory where git already
tracks some files.
kwargs:
keyword arguments serving as additional options to the git-init
command. Therefore, it makes sense only if called with `create`.
Generally, this way of passing options to the git executable is
(or will be) used a lot in this class. It's a transformation of
python-style keyword arguments (or a `dict`) to command line arguments,
provided by GitPython.
A single character keyword will be prefixed by '-', multiple characters
by '--'. An underscore in the keyword becomes a dash. The value of the
keyword argument is used as the value for the corresponding command
line argument. Assigning a boolean creates a flag.
Examples:
no_commit=True => --no-commit
C='/my/path' => -C /my/path
"""
# A lock to prevent multiple threads performing write operations in parallel
self._write_lock = threading.Lock()
if self.git_version is None:
self._check_git_version()
# BEGIN Repo validity test
# We want to fail early for tests, that would be performed a lot. In
# particular this is about GitRepo.is_valid_repo. We would use the
# latter to decide whether or not to call GitRepo() only for __init__ to
# then test the same things again. If we fail early we can save the
# additional test from outer scope.
self.path = path
# Note, that the following three path objects are used often and
# therefore are stored for performance. Path object creation comes with
# a cost. Most notably, this is used for validity checking of the
# repository.
self.pathobj = ut.Path(self.path)
self.dot_git = self._get_dot_git(self.pathobj, ok_missing=True)
self._valid_git_test_path = self.dot_git / 'HEAD'
_valid_repo = self.is_valid_git()
do_create = False
if create and not _valid_repo:
if repo is not None:
# `repo` passed with `create`, which doesn't make sense
raise TypeError("argument 'repo' must not be used with 'create'")
do_create = True
else:
# Note: We used to call gitpy.Repo(path) here, which potentially
# raised NoSuchPathError or InvalidGitRepositoryError. This is
# used by callers of GitRepo.__init__() to detect whether we have a
# valid repo at `path`. Now, with switching to lazy loading property
# `repo`, we detect those cases without instantiating a
# gitpy.Repo().
if not exists(path):
raise NoSuchPathError(path)
if not _valid_repo:
raise InvalidGitRepositoryError(path)
# END Repo validity test
# So that we "share" control paths with git/git-annex
if ssh_manager:
ssh_manager.ensure_initialized()
# note: we may also want to distinguish between a path to the worktree
# and the actual repository
# Could be used to e.g. disable automatic garbage and autopacking
# ['-c', 'receive.autogc=0', '-c', 'gc.auto=0']
self._GIT_COMMON_OPTIONS = []
if git_opts is None:
git_opts = {}
if kwargs:
git_opts.update(kwargs)
self._cfg = None
self._git_runner = GitWitlessRunner(cwd=self.path)
if do_create: # we figured it out earlier
# we briefly need a runner to create the repo, and cannot
# use the config manager runner yet, as it would try to
# access the repo config which didn't materialize yet
self._create_empty_repo(path, create_sanity_checks, **git_opts)
# after creation we need to reconsider .git path
self.dot_git = self._get_dot_git(self.pathobj, ok_missing=True)
# with DryRunProtocol path might still not exist
if exists(self.path):
self.inode = os.stat(self.path).st_ino
else:
self.inode = None
if fake_dates:
self.configure_fake_dates()
# Set by fake_dates_enabled to cache config value across this instance.
self._fake_dates_enabled = None
# Finally, register a finalizer (instead of having a __del__ method).
# This will be called by garbage collection as well as "atexit". By
# keeping the reference here, we can also call it explicitly.
# Note, that we can pass required attributes to the finalizer, but not
# `self` itself. This would create an additional reference to the object
# and thereby preventing it from being collected at all.
self._finalizer = finalize(self, GitRepo._cleanup, self.path)
@property
def bare(self):
if self.config.getbool("core", "bare") and \
self.pathobj == self.dot_git:
return True
elif not self.config.getbool("core", "bare") and \
not self.pathobj == self.dot_git:
return False
else:
raise InvalidGitRepositoryError("GitRepo contains inconsistent hints"
" on whether or not it is a bare "
"repository.")
def _create_empty_repo(self, path, sanity_checks=True, **kwargs):
if not op.lexists(path):
os.makedirs(path)
elif sanity_checks:
# Verify that we are not trying to initialize a new git repository
# under a directory some files of which are already tracked by git
# use case: https://github.com/datalad/datalad/issues/3068
try:
stdout, _ = self._call_git(
['-C', path, 'ls-files'],
expect_fail=True,
read_only=True,
)
if stdout:
raise PathKnownToRepositoryError(
"Failing to initialize new repository under %s where "
"following files are known to a repository above: %s"
% (path, stdout)
)
except CommandError:
# assume that all is good -- we are not under any repo
pass
cmd = ['-C', path, 'init']