forked from pachyderm/pachyderm
-
Notifications
You must be signed in to change notification settings - Fork 1
/
githook.go
167 lines (156 loc) · 5.17 KB
/
githook.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
package githook
import (
"bytes"
"encoding/json"
"fmt"
"path"
"strconv"
"github.com/pachyderm/pachyderm/src/client"
"github.com/pachyderm/pachyderm/src/client/pps"
col "github.com/pachyderm/pachyderm/src/server/pkg/collection"
"github.com/pachyderm/pachyderm/src/server/pkg/ppsdb"
"github.com/pachyderm/pachyderm/src/server/pkg/util"
etcd "github.com/coreos/etcd/clientv3"
logrus "github.com/sirupsen/logrus"
"golang.org/x/net/context"
"gopkg.in/go-playground/webhooks.v3"
"gopkg.in/go-playground/webhooks.v3/github"
)
// GitHookPort specifies the port the server will listen on
const GitHookPort = 999
const apiVersion = "v1"
// gitHookServer serves GetFile requests over HTTP
type gitHookServer struct {
hook *github.Webhook
client *client.APIClient
etcdClient *etcd.Client
pipelines col.Collection
}
// RunGitHookServer starts the webhook server
func RunGitHookServer(address string, etcdAddress string, etcdPrefix string) error {
c, err := client.NewFromAddress(address)
if err != nil {
return err
}
etcdClient, err := etcd.New(etcd.Config{
Endpoints: []string{etcdAddress},
DialOptions: client.EtcdDialOptions(),
})
if err != nil {
return err
}
hook := github.New(&github.Config{})
s := &gitHookServer{
hook,
c,
etcdClient,
ppsdb.Pipelines(etcdClient, etcdPrefix),
}
hook.RegisterEvents(
func(payload interface{}, header webhooks.Header) {
if err := s.HandlePush(payload, header); err != nil {
pl, ok := payload.(github.PushPayload)
if !ok {
logrus.Infof("github webhook failed to cast payload, this is likely a bug")
logrus.Infof("github webhook failed to handle push with error %v", err)
return
}
logrus.Infof("github webhook failed to handle push for repo (%v) on branch (%v) with error %v", pl.Repository.Name, path.Base(pl.Ref), err)
}
},
github.PushEvent,
)
return webhooks.Run(hook, ":"+strconv.Itoa(GitHookPort), fmt.Sprintf("/%v/handle/push", apiVersion))
}
func matchingBranch(inputBranch string, payloadBranch string) bool {
if inputBranch == payloadBranch {
return true
}
if inputBranch == "" && payloadBranch == "master" {
return true
}
return false
}
func (s *gitHookServer) findMatchingPipelineInputs(payload github.PushPayload) (pipelines []*pps.PipelineInfo, inputs []*pps.GitInput, err error) {
payloadBranch := path.Base(payload.Ref)
pipelines, err = s.client.ListPipeline()
if err != nil {
return nil, nil, err
}
for _, pipelineInfo := range pipelines {
pps.VisitInput(pipelineInfo.Input, func(input *pps.Input) {
if input.Git != nil {
if input.Git.URL == payload.Repository.CloneURL && matchingBranch(input.Git.Branch, payloadBranch) {
inputs = append(inputs, input.Git)
}
}
})
}
if len(inputs) == 0 {
return nil, nil, fmt.Errorf("no pipeline inputs corresponding to git URL (%v) on branch (%v) found, perhaps the git input is not set yet on a pipeline", payload.Repository.CloneURL, payloadBranch)
}
return pipelines, inputs, nil
}
func (s *gitHookServer) HandlePush(payload interface{}, _ webhooks.Header) (retErr error) {
pl, ok := payload.(github.PushPayload)
if !ok {
return fmt.Errorf("received invalid github.PushPayload")
}
logrus.Infof("received github push payload for repo (%v) on branch (%v)", pl.Repository.Name, path.Base(pl.Ref))
raw, err := json.Marshal(pl)
if err != nil {
return fmt.Errorf("error marshalling payload (%v): %v", pl, err)
}
pipelines, gitInputs, err := s.findMatchingPipelineInputs(pl)
if err != nil {
return err
}
if pl.Repository.Private {
for _, pipelineInfo := range pipelines {
if err := util.FailPipeline(context.Background(), s.etcdClient, s.pipelines, pipelineInfo.Pipeline.Name, fmt.Sprintf("unable to clone private github repo (%v)", pl.Repository.CloneURL)); err != nil {
// err will be handled but first we want to
// try and fail all relevant pipelines
logrus.Errorf("error marking pipeline %v as failed %v", pipelineInfo.Pipeline.Name, err)
retErr = err
}
}
return retErr
}
triggeredRepos := make(map[string]bool)
for _, input := range gitInputs {
if triggeredRepos[input.Name] {
// This input is used on multiple pipelines, and we've already
// committed to this input repo
continue
}
if err := s.commitPayload(input.Name, input.Branch, raw); err != nil {
logrus.Errorf("github webhook failed to commit payload to repo (%v) push with error: %v\n", input.Name, err)
retErr = err
continue
}
triggeredRepos[input.Name] = true
}
return nil
}
func (s *gitHookServer) commitPayload(repoName string, branchName string, rawPayload []byte) (retErr error) {
commit, err := s.client.StartCommit(repoName, branchName)
if err != nil {
return err
}
defer func() {
if retErr != nil {
if err := s.client.DeleteCommit(repoName, commit.ID); err != nil {
logrus.Errorf("git webhook failed to delete partial commit (%v) on repo (%v) with error %v", commit.ID, repoName, err)
}
return
}
retErr = s.client.FinishCommit(repoName, commit.ID)
}()
if err = s.client.DeleteFile(repoName, commit.ID, "commit.json"); err != nil {
return err
}
if _, err = s.client.PutFile(repoName, commit.ID, "commit.json", bytes.NewReader(rawPayload)); err != nil {
return err
}
return nil
}