-
Notifications
You must be signed in to change notification settings - Fork 43
/
git_files.go
133 lines (111 loc) · 3.6 KB
/
git_files.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
package syncer
import (
"context"
"database/sql"
"errors"
"fmt"
"strings"
"unicode/utf8"
"github.com/jackc/pgx/v4"
libgit2 "github.com/libgit2/git2go/v33"
"github.com/mergestat/fuse/internal/db"
uuid "github.com/satori/go.uuid"
)
func (w *worker) sendBatchFiles(ctx context.Context, tx pgx.Tx, j *db.DequeueSyncJobRow, batch []*file) error {
inputs := make([][]interface{}, 0, len(batch))
for _, c := range batch {
var repoID uuid.UUID
var err error
if repoID, err = uuid.FromString(j.RepoID.String()); err != nil {
return fmt.Errorf("uuid: %w", err)
}
var contents interface{}
if utf8.ValidString(c.Contents.String) {
contents = strings.ReplaceAll(c.Contents.String, "\u0000", "")
} else {
contents = nil
}
input := []interface{}{repoID, c.Path.String, c.Executable.Bool, contents}
inputs = append(inputs, input)
}
if _, err := tx.CopyFrom(ctx, pgx.Identifier{"git_files"}, []string{"repo_id", "path", "executable", "contents"}, pgx.CopyFromRows(inputs)); err != nil {
return fmt.Errorf("tx copy from: %w", err)
}
return nil
}
type file struct {
Path sql.NullString `json:"path"`
Executable sql.NullBool `json:"executable"`
Contents sql.NullString `json:"contents"`
}
const selectFiles = `
SELECT
path, executable, contents
FROM files(?);
`
func (w *worker) handleGitFiles(ctx context.Context, j *db.DequeueSyncJobRow) error {
l := w.loggerForJob(j)
// indicate that we're starting query execution
if err := w.formatBatchLogMessages(ctx, SyncLogTypeInfo, j, jobStatusTypeInit); err != nil {
return fmt.Errorf("log messages: %w", err)
}
tmpPath, cleanup, err := w.createTempDirForGitClone(j)
if err != nil {
return fmt.Errorf("temp dir: %w", err)
}
defer cleanup()
var ghToken string
if ghToken, err = w.fetchGitHubTokenFromDB(ctx); err != nil {
return err
}
var repo *libgit2.Repository
if repo, err = w.cloneRepo(ctx, ghToken, j.Repo, tmpPath, true, j); err != nil {
return fmt.Errorf("git clone: %w", err)
}
defer repo.Free()
files := make([]*file, 0)
if err = w.mergestat.SelectContext(ctx, &files, selectFiles, tmpPath, tmpPath); err != nil {
return fmt.Errorf("mergestat query files: %w", err)
}
var tx pgx.Tx
if tx, err = w.pool.BeginTx(ctx, pgx.TxOptions{}); err != nil {
return fmt.Errorf("begin tx: %w", err)
}
defer func() {
if err := tx.Rollback(ctx); err != nil {
if !errors.Is(err, pgx.ErrTxClosed) {
w.logger.Err(err).Msgf("could not rollback transaction")
}
}
}()
r, err := tx.Exec(ctx, "DELETE FROM git_files WHERE repo_id = $1;", j.RepoID.String())
if err != nil {
return fmt.Errorf("exec delete: %w", err)
}
if err := w.sendBatchLogMessages(ctx, []*syncLog{{
Type: SyncLogTypeInfo,
RepoSyncQueueID: j.ID,
Message: fmt.Sprintf("removed %d row(s) from git_files", r.RowsAffected()),
}}); err != nil {
return err
}
if err := w.sendBatchFiles(ctx, tx, j, files); err != nil {
return fmt.Errorf("send batch files: %w", err)
}
l.Info().Msgf("sent batch of %d files", len(files))
if err := w.sendBatchLogMessages(ctx, []*syncLog{{
Type: SyncLogTypeInfo,
RepoSyncQueueID: j.ID,
Message: fmt.Sprintf("inserted %d row(s) into git_files", len(files)),
}}); err != nil {
return err
}
if err := w.db.WithTx(tx).SetSyncJobStatus(ctx, db.SetSyncJobStatusParams{Status: "DONE", ID: j.ID}); err != nil {
return fmt.Errorf("update status done: %w", err)
}
// indicate that we're finishing query execution
if err := w.formatBatchLogMessages(ctx, SyncLogTypeInfo, j, jobStatusTypeFinish); err != nil {
return fmt.Errorf("log messages: %w", err)
}
return tx.Commit(ctx)
}