-
Notifications
You must be signed in to change notification settings - Fork 42
/
git_files.go
146 lines (126 loc) · 3.81 KB
/
git_files.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
package syncer
import (
"context"
"database/sql"
"errors"
"fmt"
"io/ioutil"
"os"
"strings"
"unicode/utf8"
"github.com/jackc/pgx/v4"
libgit2 "github.com/libgit2/git2go/v33"
"github.com/mergestat/fuse/internal/db"
uuid "github.com/satori/go.uuid"
)
func (w *worker) sendBatchFiles(ctx context.Context, tx pgx.Tx, j *db.DequeueSyncJobRow, batch []*file) error {
inputs := make([][]interface{}, 0, len(batch))
for _, c := range batch {
var repoID uuid.UUID
var err error
if repoID, err = uuid.FromString(j.RepoID.String()); err != nil {
return fmt.Errorf("uuid: %w", err)
}
var contents interface{}
if utf8.ValidString(c.Contents.String) {
contents = strings.ReplaceAll(c.Contents.String, "\u0000", "")
} else {
contents = nil
}
input := []interface{}{repoID, c.Path.String, c.Executable.Bool, contents}
inputs = append(inputs, input)
}
if _, err := tx.CopyFrom(ctx, pgx.Identifier{"git_files"}, []string{"repo_id", "path", "executable", "contents"}, pgx.CopyFromRows(inputs)); err != nil {
return fmt.Errorf("tx copy from: %w", err)
}
return nil
}
type file struct {
Path sql.NullString `json:"path"`
Executable sql.NullBool `json:"executable"`
Contents sql.NullString `json:"contents"`
}
const selectFiles = `
SELECT
path, executable, contents
FROM files(?);
`
func (w *worker) handleGitFiles(ctx context.Context, j *db.DequeueSyncJobRow) error {
l := w.loggerForJob(j)
tmpPath, err := ioutil.TempDir(os.Getenv("GIT_CLONE_PATH"), "mergestat-repo-")
if err != nil {
return fmt.Errorf("temp dir: %w", err)
}
defer func() {
if err := os.RemoveAll(tmpPath); err != nil {
w.logger.Err(err).Msgf("error cleaning up repo at: %s, %v", tmpPath, err)
}
}()
var ghToken string
if ghToken, err = w.fetchGitHubTokenFromDB(ctx); err != nil {
return err
}
var creds *libgit2.Credential
if creds, err = libgit2.NewCredentialUserpassPlaintext(ghToken, ""); err != nil {
return err
}
defer creds.Free()
var repo *libgit2.Repository
if repo, err = libgit2.Clone(j.Repo, tmpPath, &libgit2.CloneOptions{
Bare: true,
FetchOptions: libgit2.FetchOptions{
RemoteCallbacks: libgit2.RemoteCallbacks{
CredentialsCallback: func(url string, username_from_url string, allowed_types libgit2.CredentialType) (*libgit2.Credential, error) {
return creds, nil
},
},
},
}); err != nil {
return fmt.Errorf("git clone: %w", err)
}
defer repo.Free()
if err := w.sendBatchLogMessages(ctx, []*syncLog{
{
Type: SyncLogTypeInfo,
RepoSyncQueueID: j.ID,
Message: "starting to execute files query",
},
}); err != nil {
return fmt.Errorf("log messages: %w", err)
}
files := make([]*file, 0)
if err = w.mergestat.SelectContext(ctx, &files, selectFiles, tmpPath, tmpPath); err != nil {
return fmt.Errorf("mergestat query files: %w", err)
}
var tx pgx.Tx
if tx, err = w.pool.BeginTx(ctx, pgx.TxOptions{}); err != nil {
return fmt.Errorf("begin tx: %w", err)
}
defer func() {
if err := tx.Rollback(ctx); err != nil {
if !errors.Is(err, pgx.ErrTxClosed) {
w.logger.Err(err).Msgf("could not rollback transaction")
}
}
}()
if _, err := tx.Exec(ctx, "DELETE FROM git_files WHERE repo_id = $1;", j.RepoID.String()); err != nil {
return fmt.Errorf("exec delete: %w", err)
}
if err := w.sendBatchFiles(ctx, tx, j, files); err != nil {
return fmt.Errorf("send batch files: %w", err)
}
l.Info().Msgf("sent batch of %d files", len(files))
if err := w.db.WithTx(tx).SetSyncJobStatus(ctx, db.SetSyncJobStatusParams{Status: "DONE", ID: j.ID}); err != nil {
return fmt.Errorf("update status done: %w", err)
}
if err := w.sendBatchLogMessages(ctx, []*syncLog{
{
Type: SyncLogTypeInfo,
RepoSyncQueueID: j.ID,
Message: "finished",
},
}); err != nil {
return fmt.Errorf("log messages: %w", err)
}
return tx.Commit(ctx)
}