/
runner.go
2336 lines (2114 loc) · 61.9 KB
/
runner.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
// Copyright 2017 Keybase Inc. All rights reserved.
// Use of this source code is governed by a BSD
// license that can be found in the LICENSE file.
package kbfsgit
import (
"bufio"
"context"
"encoding/json"
"fmt"
"io"
"os"
"path/filepath"
"runtime"
"runtime/pprof"
"strconv"
"strings"
"sync"
"time"
"github.com/keybase/client/go/kbfs/data"
"github.com/keybase/client/go/kbfs/idutil"
"github.com/keybase/client/go/kbfs/kbfsmd"
"github.com/keybase/client/go/kbfs/libfs"
"github.com/keybase/client/go/kbfs/libgit"
"github.com/keybase/client/go/kbfs/libkbfs"
"github.com/keybase/client/go/kbfs/tlf"
"github.com/keybase/client/go/kbfs/tlfhandle"
"github.com/keybase/client/go/libkb"
"github.com/keybase/client/go/logger"
"github.com/keybase/client/go/protocol/keybase1"
"github.com/pkg/errors"
billy "gopkg.in/src-d/go-billy.v4"
"gopkg.in/src-d/go-billy.v4/osfs"
gogit "gopkg.in/src-d/go-git.v4"
gogitcfg "gopkg.in/src-d/go-git.v4/config"
"gopkg.in/src-d/go-git.v4/plumbing"
gogitobj "gopkg.in/src-d/go-git.v4/plumbing/object"
gogitstor "gopkg.in/src-d/go-git.v4/plumbing/storer"
"gopkg.in/src-d/go-git.v4/storage"
"gopkg.in/src-d/go-git.v4/storage/filesystem"
)
const (
gitCmdCapabilities = "capabilities"
gitCmdList = "list"
gitCmdFetch = "fetch"
gitCmdPush = "push"
gitCmdOption = "option"
gitOptionVerbosity = "verbosity"
gitOptionProgress = "progress"
gitOptionCloning = "cloning"
gitOptionPushcert = "pushcert"
gitOptionIfAsked = "if-asked"
gitLFSInitEvent = "init"
gitLFSUploadEvent = "upload"
gitLFSDownloadEvent = "download"
gitLFSCompleteEvent = "complete"
gitLFSTerminateEvent = "terminate"
gitLFSProgressEvent = "progress"
// Debug tag ID for an individual git command passed to the process.
ctxCommandOpID = "GITCMDID"
kbfsgitPrefix = "keybase://"
repoSplitter = "/"
kbfsRepoDir = ".kbfs_git"
publicName = "public"
privateName = "private"
teamName = "team"
// localRepoRemoteName is the name of the remote that gets added
// locally to the config of the KBFS bare repo, pointing to the
// git repo stored at the `gitDir` passed to `newRunner`.
//
// In go-git, there is no way to hook two go-git.Repository
// instances together to do fetches/pulls between them. One of the
// two repos has to be defined as a "remote" to the other one in
// order to use the nice Fetch and Pull commands. (There might be
// other more involved ways to transfer objects manually
// one-by-one, but that seems like it would be pretty sad.)
//
// Since there is no standard remote protocol for keybase yet
// (that's what we're building!), it's not supported by go-git
// itself. That means our only option is to treat the local
// on-disk repo as a "remote" with respect to the bare KBFS repo,
// and do everything in reverse: for example, when a user does a
// push, we actually fetch from the local repo and write the
// objects into the bare repo.
localRepoRemoteName = "local"
packedRefsPath = "packed-refs"
packedRefsTempPath = "._packed-refs"
defaultMaxLooseRefs = 50
defaultPruneMinLooseObjects = -1
defaultMaxObjectPacks = 50
minGCInterval = 7 * 24 * time.Hour
unlockPrintBytesStatusThreshold = time.Second / 2
gcPrintStatusThreshold = time.Second
maxCommitsToVisitPerRef = 20
)
type ctxCommandTagKey int
const (
ctxCommandIDKey ctxCommandTagKey = iota
)
type runnerProcessType int
const (
processGit runnerProcessType = iota
processLFS
processLFSNoProgress
)
type runner struct {
config libkbfs.Config
log logger.Logger
h *tlfhandle.Handle
remote string
repo string
gitDir string
uniqID string
input io.Reader
output io.Writer
errput io.Writer
gcDone bool
processType runnerProcessType
verbosity int64
progress bool
cloning bool
logSync sync.Once
logSyncDone sync.Once
printStageLock sync.Mutex
needPrintDone bool
stageStartTime time.Time
stageMemProfName string
stageCPUProfPath string
}
func newRunnerWithType(ctx context.Context, config libkbfs.Config,
remote, repo, gitDir string, input io.Reader, output, errput io.Writer,
processType runnerProcessType) (
*runner, error) {
tlfAndRepo := strings.TrimPrefix(repo, kbfsgitPrefix)
parts := strings.Split(tlfAndRepo, repoSplitter)
if len(parts) != 3 {
return nil, errors.Errorf("Repo should be in the format "+
"%s<tlfType>%s<tlf>%s<repo>, but got %s",
kbfsgitPrefix, repoSplitter, repoSplitter, tlfAndRepo)
}
var t tlf.Type
switch parts[0] {
case publicName:
t = tlf.Public
case privateName:
t = tlf.Private
case teamName:
t = tlf.SingleTeam
default:
return nil, errors.Errorf("Unrecognized TLF type: %s", parts[0])
}
h, err := libkbfs.GetHandleFromFolderNameAndType(
ctx, config.KBPKI(), config.MDOps(), config, parts[1], t)
if err != nil {
return nil, err
}
// Use the device ID and PID to make a unique ID (for generating
// temp files in KBFS).
session, err := idutil.GetCurrentSessionIfPossible(
ctx, config.KBPKI(), h.Type() == tlf.Public)
if err != nil {
return nil, err
}
uniqID := fmt.Sprintf("%s-%d", session.VerifyingKey.String(), os.Getpid())
return &runner{
config: config,
log: config.MakeLogger(""),
h: h,
remote: remote,
repo: parts[2],
gitDir: gitDir,
uniqID: uniqID,
input: input,
output: output,
errput: errput,
processType: processType,
verbosity: 1,
progress: true,
}, nil
}
// newRunner creates a new runner for git commands. It expects `repo`
// to be in the form "keybase://private/user/reponame". `remote`
// is the local name assigned to that URL, while `gitDir` is the
// filepath leading to the .git directory of the caller's local
// on-disk repo.
func newRunner(ctx context.Context, config libkbfs.Config,
remote, repo, gitDir string, input io.Reader, output, errput io.Writer) (
*runner, error) {
return newRunnerWithType(
ctx, config, remote, repo, gitDir, input, output, errput, processGit)
}
// handleCapabilities: from https://git-scm.com/docs/git-remote-helpers
//
// Lists the capabilities of the helper, one per line, ending with a
// blank line. Each capability may be preceded with *, which marks
// them mandatory for git versions using the remote helper to
// understand. Any unknown mandatory capability is a fatal error.
func (r *runner) handleCapabilities() error {
caps := []string{
gitCmdFetch,
gitCmdPush,
gitCmdOption,
}
for _, c := range caps {
_, err := r.output.Write([]byte(c + "\n"))
if err != nil {
return err
}
}
_, err := r.output.Write([]byte("\n"))
return err
}
// getElapsedStr gets an additional string to append to the errput
// message at the end of a phase. It includes the measured time of
// the phase, and if verbosity is high enough, it includes the
// location of a memory profile taken at the end of the phase.
func (r *runner) getElapsedStr(
ctx context.Context, startTime time.Time, profName string,
cpuProfFullPath string) string {
if r.verbosity < 2 {
return ""
}
elapsed := r.config.Clock().Now().Sub(startTime)
elapsedStr := fmt.Sprintf(" [%s]", elapsed)
if r.verbosity >= 3 {
profName = filepath.Join(os.TempDir(), profName)
f, err := os.Create(profName)
if err != nil {
r.log.CDebugf(ctx, err.Error())
} else {
runtime.GC()
err := pprof.WriteHeapProfile(f)
if err != nil {
r.log.CDebugf(ctx, "Couldn't write heap profile: %+v", err)
}
f.Close()
}
elapsedStr += " [memprof " + profName + "]"
}
if cpuProfFullPath != "" {
pprof.StopCPUProfile()
elapsedStr += " [cpuprof " + cpuProfFullPath + "]"
}
return elapsedStr
}
func (r *runner) printDoneOrErr(
ctx context.Context, err error, startTime time.Time) {
if r.verbosity < 1 {
return
}
profName := "mem.init.prof"
elapsedStr := r.getElapsedStr(ctx, startTime, profName, "")
var writeErr error
if err != nil {
_, writeErr = r.errput.Write([]byte(err.Error() + elapsedStr + "\n"))
} else {
_, writeErr = r.errput.Write([]byte("done." + elapsedStr + "\n"))
}
if writeErr != nil {
r.log.CDebugf(ctx, "Couldn't write error: %+v", err)
}
}
func (r *runner) isManagedByApp() bool {
switch r.h.Type() {
case tlf.Public:
// Public TLFs are never managed by the app.
return false
case tlf.SingleTeam:
// Single-team TLFs are always managed by the app.
return true
case tlf.Private:
// Only single-user private TLFs are managed by the app. So
// if the canonical name contains any commas, readers, or
// spaces, it's not managed by the app.
name := string(r.h.GetCanonicalName())
return !strings.ContainsAny(name, " ,"+tlf.ReaderSep)
default:
panic(fmt.Sprintf("Unexpected type: %s", r.h.Type()))
}
}
func (r *runner) makeFS(ctx context.Context) (fs *libfs.FS, err error) {
// Only allow lazy creates for TLFs that aren't managed by the
// Keybase app.
if r.isManagedByApp() {
fs, _, err = libgit.GetRepoAndID(
ctx, r.config, r.h, r.repo, r.uniqID)
} else {
fs, _, err = libgit.GetOrCreateRepoAndID(
ctx, r.config, r.h, r.repo, r.uniqID)
}
if err != nil {
return nil, err
}
return fs, nil
}
func (r *runner) initRepoIfNeeded(ctx context.Context, forCmd string) (
repo *gogit.Repository, fs *libfs.FS, err error) {
// This function might be called multiple times per function, but
// the subsequent calls will use the local cache. So only print
// these messages once.
if r.verbosity >= 1 {
var startTime time.Time
r.logSync.Do(func() {
startTime = r.config.Clock().Now()
_, err := r.errput.Write([]byte("Syncing with Keybase... "))
if err != nil {
r.log.CDebugf(ctx, "Couldn't write: %+v", err)
}
})
defer func() {
r.logSyncDone.Do(func() { r.printDoneOrErr(ctx, err, startTime) })
}()
}
fs, err = r.makeFS(ctx)
if err != nil {
return nil, nil, err
}
// We don't persist remotes to the config on disk for two
// reasons. 1) gogit/gcfg has a bug where it can't handle
// backslashes in remote URLs, and 2) we don't want to persist the
// remotes anyway since they'll contain local paths and wouldn't
// make sense to other devices, plus that could leak local info.
var storage storage.Storer
storage, err = libgit.NewGitConfigWithoutRemotesStorer(fs)
if err != nil {
return nil, nil, err
}
if forCmd == gitCmdFetch {
r.log.CDebugf(ctx, "Using on-demand storer")
// Wrap it in an on-demand storer, so we don't try to read all the
// objects of big repos into memory at once.
storage, err = libgit.NewOnDemandStorer(storage)
if err != nil {
return nil, nil, err
}
}
config, err := storage.Config()
if err != nil {
return nil, nil, err
}
if config.Pack.Window > 0 {
// Turn delta compression off, both to avoid messing up the
// on-demand storer, and to avoid the unnecessary computation
// since we're not transferring the objects over a network.
// TODO: this results in uncompressed local git repo after
// fetches, so we should either run:
//
// `git repack -a -d -f --depth=250 --window=250` as needed.
// (via https://stackoverflow.com/questions/7102053/git-pull-without-remotely-compressing-objects)
//
// or we should document that the user should do so.
r.log.CDebugf(ctx, "Disabling pack compression by using a 0 window")
config.Pack.Window = 0
err = storage.SetConfig(config)
if err != nil {
return nil, nil, err
}
}
// TODO: This needs to take a server lock when initializing a
// repo.
r.log.CDebugf(ctx, "Attempting to init or open repo %s", r.repo)
repo, err = gogit.Init(storage, nil)
if err == gogit.ErrRepositoryAlreadyExists {
repo, err = gogit.Open(storage, nil)
}
if err != nil {
return nil, nil, err
}
return repo, fs, nil
}
func percent(n int64, d int64) float64 {
return float64(100) * (float64(n) / float64(d))
}
func humanizeBytes(n int64, d int64) string {
const kb = 1024
const kbf = float64(kb)
const mb = kb * 1024
const mbf = float64(mb)
const gb = mb * 1024
const gbf = float64(gb)
// Special case the counting of bytes, when there's no denominator.
if d == 1 {
switch {
case n < kb:
return fmt.Sprintf("%d bytes", n)
case n < mb:
return fmt.Sprintf("%.2f KB", float64(n)/kbf)
case n < gb:
return fmt.Sprintf("%.2f MB", float64(n)/mbf)
}
return fmt.Sprintf("%.2f GB", float64(n)/gbf)
}
switch {
case d < kb:
return fmt.Sprintf("%d/%d bytes", n, d)
case d < mb:
return fmt.Sprintf("%.2f/%.2f KB", float64(n)/kbf, float64(d)/kbf)
case d < gb:
return fmt.Sprintf("%.2f/%.2f MB", float64(n)/mbf, float64(d)/mbf)
}
return fmt.Sprintf("%.2f/%.2f GB", float64(n)/gbf, float64(d)/gbf)
}
// printStageEndIfNeeded should only be used to end stages started with
// printStageStart.
func (r *runner) printStageEndIfNeeded(ctx context.Context) {
r.printStageLock.Lock()
defer r.printStageLock.Unlock()
// go-git grabs the lock right after plumbing.StatusIndexOffset, but before
// sending the Done status update. As a result, it would look like we are
// flushing the journal before plumbing.StatusIndexOffset is done. So
// instead, print "done." only if it's not printed yet.
if r.needPrintDone {
elapsedStr := r.getElapsedStr(ctx,
r.stageStartTime, r.stageMemProfName, r.stageCPUProfPath)
_, err := r.errput.Write([]byte("done." + elapsedStr + "\n"))
if err != nil {
r.log.CDebugf(ctx, "Couldn't write: %+v", err)
}
r.needPrintDone = false
}
}
func (r *runner) printStageStart(ctx context.Context,
toPrint []byte, memProfName, cpuProfName string) {
if len(toPrint) == 0 {
return
}
r.printStageEndIfNeeded(ctx)
r.printStageLock.Lock()
defer r.printStageLock.Unlock()
r.stageStartTime = r.config.Clock().Now()
r.stageMemProfName = memProfName
if len(cpuProfName) > 0 && r.verbosity >= 4 {
cpuProfPath := filepath.Join(
os.TempDir(), cpuProfName)
f, err := os.Create(cpuProfPath)
if err != nil {
r.log.CDebugf(
ctx, "Couldn't create CPU profile: %s", cpuProfName)
cpuProfPath = ""
} else {
err := pprof.StartCPUProfile(f)
if err != nil {
r.log.CDebugf(ctx, "Couldn't start CPU profile: %+v", err)
}
}
r.stageCPUProfPath = cpuProfPath
}
_, err := r.errput.Write(toPrint)
if err != nil {
r.log.CDebugf(ctx, "Couldn't write: %+v", err)
}
r.needPrintDone = true
}
func (r *runner) printGitJournalStart(ctx context.Context) {
adj := "encrypted"
if r.h.Type() == tlf.Public {
adj = "signed"
}
if r.verbosity >= 1 {
r.printStageStart(ctx,
[]byte(fmt.Sprintf("Syncing %s data to Keybase: ", adj)),
"mem.flush.prof", "")
}
}
func (r *runner) printGitJournalMessage(
ctx context.Context, lastByteCount int, totalSize, sizeLeft int64) int {
const bytesFmt string = "(%.2f%%) %s... "
eraseStr := strings.Repeat("\b", lastByteCount)
flushed := totalSize - sizeLeft
if flushed < 0 {
flushed = 0
}
str := fmt.Sprintf(
bytesFmt, percent(flushed, totalSize),
humanizeBytes(flushed, totalSize))
if r.verbosity >= 1 && r.progress {
_, err := r.errput.Write([]byte(eraseStr + str))
if err != nil {
r.log.CDebugf(ctx, "Couldn't write: %+v", err)
}
}
return len(str)
}
// caller should make sure doneCh is closed when journal is all flushed.
func (r *runner) printJournalStatus(
ctx context.Context, jManager *libkbfs.JournalManager, tlfID tlf.ID,
doneCh <-chan struct{}, printStart func(context.Context),
printProgress func(context.Context, int, int64, int64) int,
printEnd func(context.Context)) {
printEnd(ctx)
// Note: the "first" status here gets us the number of unflushed
// bytes left at the time we started printing. However, we don't
// have the total number of bytes being flushed to the server
// throughout the whole operation, which would be more
// informative. It would be better to have that as the
// denominator, but there's no easy way to get it right now.
firstStatus, err := jManager.JournalStatus(tlfID)
if err != nil {
r.log.CDebugf(ctx, "Error getting status: %+v", err)
return
}
if firstStatus.UnflushedBytes == 0 {
return
}
printStart(ctx)
lastByteCount := printProgress(
ctx, 0, firstStatus.UnflushedBytes, firstStatus.UnflushedBytes)
r.log.CDebugf(ctx, "Waiting for %d journal bytes to flush",
firstStatus.UnflushedBytes)
ticker := time.NewTicker(1 * time.Second)
defer ticker.Stop()
for {
select {
case <-ticker.C:
status, err := jManager.JournalStatus(tlfID)
if err != nil {
r.log.CDebugf(ctx, "Error getting status: %+v", err)
return
}
lastByteCount = printProgress(
ctx, lastByteCount, firstStatus.UnflushedBytes,
status.UnflushedBytes)
case <-doneCh:
// doneCh is closed. So assume journal flushing is done and
// take the shortcut.
_ = printProgress(
ctx, lastByteCount, firstStatus.UnflushedBytes, 0)
printEnd(ctx)
return
}
}
}
func (r *runner) waitForJournalWithPrinters(
ctx context.Context, printStart func(context.Context),
printProgress func(context.Context, int, int64, int64) int,
printEnd func(context.Context)) error {
// See if there are any deleted repos to clean up before we flush
// the journal.
err := libgit.CleanOldDeletedReposTimeLimited(ctx, r.config, r.h)
if err != nil {
return err
}
rootNode, _, err := r.config.KBFSOps().GetOrCreateRootNode(
ctx, r.h, data.MasterBranch)
if err != nil {
return err
}
err = r.config.KBFSOps().SyncAll(ctx, rootNode.GetFolderBranch())
if err != nil {
return err
}
jManager, err := libkbfs.GetJournalManager(r.config)
if err != nil {
r.log.CDebugf(ctx, "No journal server: %+v", err)
return nil
}
_, err = jManager.JournalStatus(rootNode.GetFolderBranch().Tlf)
if err != nil {
r.log.CDebugf(ctx, "No journal: %+v", err)
return nil
}
printDoneCh := make(chan struct{})
waitDoneCh := make(chan struct{})
go func() {
r.printJournalStatus(
ctx, jManager, rootNode.GetFolderBranch().Tlf, waitDoneCh,
printStart, printProgress, printEnd)
close(printDoneCh)
}()
// This squashes everything written to the journal into a single
// revision, to make sure that no partial states of the bare repo
// are seen by other readers of the TLF. It also waits for any
// necessary conflict resolution to complete.
err = jManager.FinishSingleOp(ctx, rootNode.GetFolderBranch().Tlf,
nil, keybase1.MDPriorityGit)
if err != nil {
return err
}
close(waitDoneCh)
<-printDoneCh
// Make sure that everything is truly flushed.
status, err := jManager.JournalStatus(rootNode.GetFolderBranch().Tlf)
if err != nil {
return err
}
if status.RevisionStart != kbfsmd.RevisionUninitialized {
r.log.CDebugf(ctx, "Journal status: %+v", status)
return errors.New("Journal is non-empty after a wait")
}
return nil
}
func (r *runner) waitForJournal(ctx context.Context) error {
return r.waitForJournalWithPrinters(
ctx, r.printGitJournalStart, r.printGitJournalMessage,
r.printStageEndIfNeeded)
}
// handleList: From https://git-scm.com/docs/git-remote-helpers
//
// Lists the refs, one per line, in the format "<value> <name> [<attr>
// …]". The value may be a hex sha1 hash, "@<dest>" for a symref, or
// "?" to indicate that the helper could not get the value of the
// ref. A space-separated list of attributes follows the name;
// unrecognized attributes are ignored. The list ends with a blank
// line.
func (r *runner) handleList(ctx context.Context, args []string) (err error) {
forPush := false
if len(args) == 1 && args[0] == "for-push" {
r.log.CDebugf(ctx, "Excluding symbolic refs during a for-push list")
forPush = true
} else if len(args) > 0 {
return errors.Errorf("Bad list request: %v", args)
}
repo, _, err := r.initRepoIfNeeded(ctx, gitCmdList)
if err != nil {
return err
}
refs, err := repo.References()
if err != nil {
return err
}
var symRefs []string
hashesSeen := false
for {
ref, err := refs.Next()
if errors.Cause(err) == io.EOF {
break
}
if err != nil {
return err
}
value := ""
switch ref.Type() {
case plumbing.HashReference:
value = ref.Hash().String()
hashesSeen = true
case plumbing.SymbolicReference:
value = "@" + ref.Target().String()
default:
value = "?"
}
refStr := value + " " + ref.Name().String() + "\n"
if ref.Type() == plumbing.SymbolicReference {
// Don't list any symbolic references until we're sure
// there's at least one object available. Otherwise
// cloning an empty repo will result in an error because
// the HEAD symbolic ref points to a ref that doesn't
// exist.
symRefs = append(symRefs, refStr)
continue
}
r.log.CDebugf(ctx, "Listing ref %s", refStr)
_, err = r.output.Write([]byte(refStr))
if err != nil {
return err
}
}
if hashesSeen && !forPush {
for _, refStr := range symRefs {
r.log.CDebugf(ctx, "Listing symbolic ref %s", refStr)
_, err = r.output.Write([]byte(refStr))
if err != nil {
return err
}
}
}
err = r.waitForJournal(ctx)
if err != nil {
return err
}
r.log.CDebugf(ctx, "Done waiting for journal")
_, err = r.output.Write([]byte("\n"))
return err
}
var gogitStagesToStatus = map[plumbing.StatusStage]string{
plumbing.StatusCount: "Counting and decrypting: ",
plumbing.StatusRead: "Reading and decrypting metadata: ",
plumbing.StatusFixChains: "Fixing: ",
plumbing.StatusSort: "Sorting... ",
plumbing.StatusDelta: "Calculating deltas: ",
// For us, a "send" actually means fetch.
plumbing.StatusSend: "Fetching and decrypting objects: ",
// For us, a "fetch" actually means writing objects to
// the local journal.
plumbing.StatusFetch: "Preparing and encrypting: ",
plumbing.StatusIndexHash: "Indexing hashes: ",
plumbing.StatusIndexCRC: "Indexing CRCs: ",
plumbing.StatusIndexOffset: "Indexing offsets: ",
}
func humanizeObjects(n int, d int) string {
const k = 1000
const m = k * 1000
// Special case the counting of objects, when there's no denominator.
if d == 1 {
if n < k {
return fmt.Sprintf("%d", n)
} else if n < m {
return fmt.Sprintf("%.2fK", float64(n)/k)
}
return fmt.Sprintf("%.2fM", float64(n)/m)
}
if d < k {
return fmt.Sprintf("%d/%d", n, d)
} else if d < m {
return fmt.Sprintf("%.2f/%.2fK", float64(n)/k, float64(d)/k)
}
return fmt.Sprintf("%.2f/%.2fM", float64(n)/m, float64(d)/m)
}
func (r *runner) printJournalStatusUntilFlushed(
ctx context.Context, doneCh <-chan struct{}) {
rootNode, _, err := r.config.KBFSOps().GetOrCreateRootNode(
ctx, r.h, data.MasterBranch)
if err != nil {
r.log.CDebugf(ctx, "GetOrCreateRootNode error: %+v", err)
return
}
err = r.config.KBFSOps().SyncAll(ctx, rootNode.GetFolderBranch())
if err != nil {
r.log.CDebugf(ctx, "SyncAll error: %+v", err)
return
}
jManager, err := libkbfs.GetJournalManager(r.config)
if err != nil {
r.log.CDebugf(ctx, "No journal server: %+v", err)
}
r.printJournalStatus(
ctx, jManager, rootNode.GetFolderBranch().Tlf, doneCh,
r.printGitJournalStart, r.printGitJournalMessage,
r.printStageEndIfNeeded)
}
func (r *runner) processGogitStatus(ctx context.Context,
statusChan <-chan plumbing.StatusUpdate, fsEvents <-chan libfs.FSEvent) {
if r.h.Type() == tlf.Public {
gogitStagesToStatus[plumbing.StatusFetch] = "Preparing and signing: "
}
currStage := plumbing.StatusUnknown
lastByteCount := 0
for {
if statusChan == nil && fsEvents == nil {
// statusChan is never passed in as nil. So if it's nil, it's been
// closed in the select/case below because receive failed. So
// instead of letting select block forever, we break out of the
// loop here.
break
}
select {
case update, ok := <-statusChan:
if !ok {
statusChan = nil
continue
}
if update.Stage != currStage {
if currStage != plumbing.StatusUnknown {
r.printStageEndIfNeeded(ctx)
}
r.printStageStart(ctx,
[]byte(gogitStagesToStatus[update.Stage]),
fmt.Sprintf("mem.%d.prof", update.Stage),
fmt.Sprintf("cpu.%d.prof", update.Stage),
)
lastByteCount = 0
if stage, ok := gogitStagesToStatus[update.Stage]; ok {
r.log.CDebugf(ctx, "Entering stage: %s - %d total objects",
stage, update.ObjectsTotal)
}
}
eraseStr := strings.Repeat("\b", lastByteCount)
newStr := ""
switch update.Stage {
case plumbing.StatusDone:
r.log.CDebugf(ctx, "Status processing done")
return
case plumbing.StatusCount:
newStr = fmt.Sprintf(
"%s objects... ", humanizeObjects(update.ObjectsTotal, 1))
case plumbing.StatusSort:
default:
newStr = fmt.Sprintf(
"(%.2f%%) %s objects... ",
percent(int64(update.ObjectsDone), int64(update.ObjectsTotal)),
humanizeObjects(update.ObjectsDone, update.ObjectsTotal))
}
lastByteCount = len(newStr)
if r.progress {
_, err := r.errput.Write([]byte(eraseStr + newStr))
if err != nil {
r.log.CDebugf(ctx, "Couldn't write: %+v", err)
}
}
currStage = update.Stage
case fsEvent, ok := <-fsEvents:
if !ok {
fsEvents = nil
continue
}
switch fsEvent.EventType {
case libfs.FSEventLock, libfs.FSEventUnlock:
r.printStageEndIfNeeded(ctx)
// Since we flush all blocks in Lock, subsequent calls to
// Lock/Unlock normally don't take much time. So we only print
// journal status if it's been longer than
// unlockPrintBytesStatusThreshold and fsEvent.Done hasn't been
// closed.
timer := time.NewTimer(unlockPrintBytesStatusThreshold)
select {
case <-timer.C:
r.printJournalStatusUntilFlushed(ctx, fsEvent.Done)
case <-fsEvent.Done:
timer.Stop()
case <-ctx.Done():
timer.Stop()
}
}
}
}
r.log.CDebugf(ctx, "Status channel closed")
r.printStageEndIfNeeded(ctx)
}
// recursiveByteCount returns a sum of the size of all files under the
// directory represented by `fs`. It also returns the length of the
// last string it printed to `r.errput` as `toErase`, to aid in
// overwriting the text on the next update.
func (r *runner) recursiveByteCount(
ctx context.Context, fs billy.Filesystem, totalSoFar int64, toErase int) (
bytes int64, toEraseRet int, err error) {
fileInfos, err := fs.ReadDir("/")
if err != nil {
return 0, 0, err
}
for _, fi := range fileInfos {
if fi.IsDir() {
if fi.Name() == "." {
continue
}
chrootFS, err := fs.Chroot(fi.Name())
if err != nil {
return 0, 0, err
}
var chrootBytes int64
chrootBytes, toErase, err = r.recursiveByteCount(
ctx, chrootFS, totalSoFar+bytes, toErase)
if err != nil {
return 0, 0, err
}
bytes += chrootBytes
} else {
bytes += fi.Size()
if r.progress {
// This function only runs if r.verbosity >= 1.
eraseStr := strings.Repeat("\b", toErase)
newStr := fmt.Sprintf(
"%s... ", humanizeBytes(totalSoFar+bytes, 1))
toErase = len(newStr)
_, err := r.errput.Write([]byte(eraseStr + newStr))
if err != nil {
return 0, 0, err
}
}
}
}
return bytes, toErase, nil
}
// statusWriter is a simple io.Writer shim that logs to `r.errput` the
// number of bytes written to `output`.
type statusWriter struct {
r *runner
output io.Writer
soFar int64
totalBytes int64
nextToErase int
}
var _ io.Writer = (*statusWriter)(nil)
func (sw *statusWriter) Write(p []byte) (n int, err error) {
n, err = sw.output.Write(p)
if err != nil {
return n, err
}
sw.soFar += int64(len(p))
eraseStr := strings.Repeat("\b", sw.nextToErase)
newStr := fmt.Sprintf("(%.2f%%) %s... ",
percent(sw.soFar, sw.totalBytes),
humanizeBytes(sw.soFar, sw.totalBytes))
_, err = sw.r.errput.Write([]byte(eraseStr + newStr))
if err != nil {
return n, err
}
sw.nextToErase = len(newStr)
return n, nil
}
func (r *runner) copyFile(
ctx context.Context, from billy.Filesystem, to billy.Filesystem,
name string, sw *statusWriter) (err error) {
f, err := from.Open(name)
if err != nil {
return err
}
defer f.Close()
toF, err := to.Create(name)
if err != nil {
return err
}
defer toF.Close()
var w io.Writer = toF
// Wrap the destination file in a status shim if we are supposed
// to report progress.