Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/main'
Browse files Browse the repository at this point in the history
* upstream/main:
  Correctly handle select on multiple channels in Queues (go-gitea#22146)
  Support template for merge message description (go-gitea#22248)
  • Loading branch information
zjjhot committed Dec 30, 2022
2 parents 1217216 + a609cae commit 8b33f80
Show file tree
Hide file tree
Showing 10 changed files with 146 additions and 82 deletions.
26 changes: 0 additions & 26 deletions modules/queue/queue_channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,32 +109,6 @@ func (q *ChannelQueue) Flush(timeout time.Duration) error {
return q.FlushWithContext(ctx)
}

// FlushWithContext is very similar to CleanUp but it will return as soon as the dataChan is empty
func (q *ChannelQueue) FlushWithContext(ctx context.Context) error {
log.Trace("ChannelQueue: %d Flush", q.qid)
paused, _ := q.IsPausedIsResumed()
for {
select {
case <-paused:
return nil
case data, ok := <-q.dataChan:
if !ok {
return nil
}
if unhandled := q.handle(data); unhandled != nil {
log.Error("Unhandled Data whilst flushing queue %d", q.qid)
}
atomic.AddInt64(&q.numInQueue, -1)
case <-q.baseCtx.Done():
return q.baseCtx.Err()
case <-ctx.Done():
return ctx.Err()
default:
return nil
}
}
}

// Shutdown processing from this queue
func (q *ChannelQueue) Shutdown() {
q.lock.Lock()
Expand Down
30 changes: 0 additions & 30 deletions modules/queue/unique_queue_channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"fmt"
"runtime/pprof"
"sync"
"sync/atomic"
"time"

"code.gitea.io/gitea/modules/container"
Expand Down Expand Up @@ -167,35 +166,6 @@ func (q *ChannelUniqueQueue) Flush(timeout time.Duration) error {
return q.FlushWithContext(ctx)
}

// FlushWithContext is very similar to CleanUp but it will return as soon as the dataChan is empty
func (q *ChannelUniqueQueue) FlushWithContext(ctx context.Context) error {
log.Trace("ChannelUniqueQueue: %d Flush", q.qid)
paused, _ := q.IsPausedIsResumed()
for {
select {
case <-paused:
return nil
default:
}
select {
case data, ok := <-q.dataChan:
if !ok {
return nil
}
if unhandled := q.handle(data); unhandled != nil {
log.Error("Unhandled Data whilst flushing queue %d", q.qid)
}
atomic.AddInt64(&q.numInQueue, -1)
case <-q.baseCtx.Done():
return q.baseCtx.Err()
case <-ctx.Done():
return ctx.Err()
default:
return nil
}
}
}

// Shutdown processing from this queue
func (q *ChannelUniqueQueue) Shutdown() {
log.Trace("ChannelUniqueQueue: %s Shutting down", q.name)
Expand Down
44 changes: 43 additions & 1 deletion modules/queue/workerpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -463,13 +463,43 @@ func (p *WorkerPool) IsEmpty() bool {
return atomic.LoadInt64(&p.numInQueue) == 0
}

// contextError returns either ctx.Done(), the base context's error or nil
func (p *WorkerPool) contextError(ctx context.Context) error {
select {
case <-p.baseCtx.Done():
return p.baseCtx.Err()
case <-ctx.Done():
return ctx.Err()
default:
return nil
}
}

// FlushWithContext is very similar to CleanUp but it will return as soon as the dataChan is empty
// NB: The worker will not be registered with the manager.
func (p *WorkerPool) FlushWithContext(ctx context.Context) error {
log.Trace("WorkerPool: %d Flush", p.qid)
paused, _ := p.IsPausedIsResumed()
for {
// Because select will return any case that is satisified at random we precheck here before looking at dataChan.
select {
case <-paused:
// Ensure that even if paused that the cancelled error is still sent
return p.contextError(ctx)
case <-p.baseCtx.Done():
return p.baseCtx.Err()
case <-ctx.Done():
return ctx.Err()
default:
}

select {
case data := <-p.dataChan:
case <-paused:
return p.contextError(ctx)
case data, ok := <-p.dataChan:
if !ok {
return nil
}
if unhandled := p.handle(data); unhandled != nil {
log.Error("Unhandled Data whilst flushing queue %d", p.qid)
}
Expand All @@ -495,6 +525,7 @@ func (p *WorkerPool) doWork(ctx context.Context) {
paused, _ := p.IsPausedIsResumed()
data := make([]Data, 0, p.batchLength)
for {
// Because select will return any case that is satisified at random we precheck here before looking at dataChan.
select {
case <-paused:
log.Trace("Worker for Queue %d Pausing", p.qid)
Expand All @@ -515,8 +546,19 @@ func (p *WorkerPool) doWork(ctx context.Context) {
log.Trace("Worker shutting down")
return
}
case <-ctx.Done():
if len(data) > 0 {
log.Trace("Handling: %d data, %v", len(data), data)
if unhandled := p.handle(data...); unhandled != nil {
log.Error("Unhandled Data in queue %d", p.qid)
}
atomic.AddInt64(&p.numInQueue, -1*int64(len(data)))
}
log.Trace("Worker shutting down")
return
default:
}

select {
case <-paused:
// go back around
Expand Down
2 changes: 1 addition & 1 deletion routers/api/v1/repo/pull.go
Original file line number Diff line number Diff line change
Expand Up @@ -815,7 +815,7 @@ func MergePullRequest(ctx *context.APIContext) {

message := strings.TrimSpace(form.MergeTitleField)
if len(message) == 0 {
message, err = pull_service.GetDefaultMergeMessage(ctx, ctx.Repo.GitRepo, pr, repo_model.MergeStyle(form.Do))
message, _, err = pull_service.GetDefaultMergeMessage(ctx, ctx.Repo.GitRepo, pr, repo_model.MergeStyle(form.Do))
if err != nil {
ctx.Error(http.StatusInternalServerError, "GetDefaultMergeMessage", err)
return
Expand Down
6 changes: 4 additions & 2 deletions routers/web/repo/issue.go
Original file line number Diff line number Diff line change
Expand Up @@ -1664,19 +1664,21 @@ func ViewIssue(ctx *context.Context) {

ctx.Data["MergeStyle"] = mergeStyle

defaultMergeMessage, err := pull_service.GetDefaultMergeMessage(ctx, ctx.Repo.GitRepo, pull, mergeStyle)
defaultMergeMessage, defaultMergeBody, err := pull_service.GetDefaultMergeMessage(ctx, ctx.Repo.GitRepo, pull, mergeStyle)
if err != nil {
ctx.ServerError("GetDefaultMergeMessage", err)
return
}
ctx.Data["DefaultMergeMessage"] = defaultMergeMessage
ctx.Data["DefaultMergeBody"] = defaultMergeBody

defaultSquashMergeMessage, err := pull_service.GetDefaultMergeMessage(ctx, ctx.Repo.GitRepo, pull, repo_model.MergeStyleSquash)
defaultSquashMergeMessage, defaultSquashMergeBody, err := pull_service.GetDefaultMergeMessage(ctx, ctx.Repo.GitRepo, pull, repo_model.MergeStyleSquash)
if err != nil {
ctx.ServerError("GetDefaultSquashMergeMessage", err)
return
}
ctx.Data["DefaultSquashMergeMessage"] = defaultSquashMergeMessage
ctx.Data["DefaultSquashMergeBody"] = defaultSquashMergeBody

if err = pull.LoadProtectedBranch(ctx); err != nil {
ctx.ServerError("LoadProtectedBranch", err)
Expand Down
2 changes: 1 addition & 1 deletion routers/web/repo/pull.go
Original file line number Diff line number Diff line change
Expand Up @@ -986,7 +986,7 @@ func MergePullRequest(ctx *context.Context) {
message := strings.TrimSpace(form.MergeTitleField)
if len(message) == 0 {
var err error
message, err = pull_service.GetDefaultMergeMessage(ctx, ctx.Repo.GitRepo, pr, repo_model.MergeStyle(form.Do))
message, _, err = pull_service.GetDefaultMergeMessage(ctx, ctx.Repo.GitRepo, pr, repo_model.MergeStyle(form.Do))
if err != nil {
ctx.ServerError("GetDefaultMergeMessage", err)
return
Expand Down
38 changes: 23 additions & 15 deletions services/pull/merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,19 +39,19 @@ import (
)

// GetDefaultMergeMessage returns default message used when merging pull request
func GetDefaultMergeMessage(ctx context.Context, baseGitRepo *git.Repository, pr *issues_model.PullRequest, mergeStyle repo_model.MergeStyle) (string, error) {
func GetDefaultMergeMessage(ctx context.Context, baseGitRepo *git.Repository, pr *issues_model.PullRequest, mergeStyle repo_model.MergeStyle) (message, body string, err error) {
if err := pr.LoadHeadRepo(ctx); err != nil {
return "", err
return "", "", err
}
if err := pr.LoadBaseRepo(ctx); err != nil {
return "", err
return "", "", err
}
if pr.BaseRepo == nil {
return "", repo_model.ErrRepoNotExist{ID: pr.BaseRepoID}
return "", "", repo_model.ErrRepoNotExist{ID: pr.BaseRepoID}
}

if err := pr.LoadIssue(ctx); err != nil {
return "", err
return "", "", err
}

isExternalTracker := pr.BaseRepo.UnitEnabled(ctx, unit.TypeExternalTracker)
Expand All @@ -64,12 +64,12 @@ func GetDefaultMergeMessage(ctx context.Context, baseGitRepo *git.Repository, pr
templateFilepath := fmt.Sprintf(".gitea/default_merge_message/%s_TEMPLATE.md", strings.ToUpper(string(mergeStyle)))
commit, err := baseGitRepo.GetBranchCommit(pr.BaseRepo.DefaultBranch)
if err != nil {
return "", err
return "", "", err
}
templateContent, err := commit.GetFileContent(templateFilepath, setting.Repository.PullRequest.DefaultMergeMessageSize)
if err != nil {
if !git.IsErrNotExist(err) {
return "", err
return "", "", err
}
} else {
vars := map[string]string{
Expand Down Expand Up @@ -107,27 +107,35 @@ func GetDefaultMergeMessage(ctx context.Context, baseGitRepo *git.Repository, pr
vars["ClosingIssues"] = ""
}
}

return os.Expand(templateContent, func(s string) string {
return vars[s]
}), nil
message, body = expandDefaultMergeMessage(templateContent, vars)
return message, body, nil
}
}

// Squash merge has a different from other styles.
if mergeStyle == repo_model.MergeStyleSquash {
return fmt.Sprintf("%s (%s%d)", pr.Issue.Title, issueReference, pr.Issue.Index), nil
return fmt.Sprintf("%s (%s%d)", pr.Issue.Title, issueReference, pr.Issue.Index), "", nil
}

if pr.BaseRepoID == pr.HeadRepoID {
return fmt.Sprintf("Merge pull request '%s' (%s%d) from %s into %s", pr.Issue.Title, issueReference, pr.Issue.Index, pr.HeadBranch, pr.BaseBranch), nil
return fmt.Sprintf("Merge pull request '%s' (%s%d) from %s into %s", pr.Issue.Title, issueReference, pr.Issue.Index, pr.HeadBranch, pr.BaseBranch), "", nil
}

if pr.HeadRepo == nil {
return fmt.Sprintf("Merge pull request '%s' (%s%d) from <deleted>:%s into %s", pr.Issue.Title, issueReference, pr.Issue.Index, pr.HeadBranch, pr.BaseBranch), nil
return fmt.Sprintf("Merge pull request '%s' (%s%d) from <deleted>:%s into %s", pr.Issue.Title, issueReference, pr.Issue.Index, pr.HeadBranch, pr.BaseBranch), "", nil
}

return fmt.Sprintf("Merge pull request '%s' (%s%d) from %s:%s into %s", pr.Issue.Title, issueReference, pr.Issue.Index, pr.HeadRepo.FullName(), pr.HeadBranch, pr.BaseBranch), nil
return fmt.Sprintf("Merge pull request '%s' (%s%d) from %s:%s into %s", pr.Issue.Title, issueReference, pr.Issue.Index, pr.HeadRepo.FullName(), pr.HeadBranch, pr.BaseBranch), "", nil
}

func expandDefaultMergeMessage(template string, vars map[string]string) (message, body string) {
message = strings.TrimSpace(template)
if splits := strings.SplitN(message, "\n", 2); len(splits) == 2 {
message = splits[0]
body = strings.TrimSpace(splits[1])
}
mapping := func(s string) string { return vars[s] }
return os.Expand(message, mapping), os.Expand(body, mapping)
}

// Merge merges pull request to base repository.
Expand Down
67 changes: 67 additions & 0 deletions services/pull/merge_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
// Copyright 2022 The Gitea Authors. All rights reserved.
// SPDX-License-Identifier: MIT

package pull

import (
"testing"

"github.com/stretchr/testify/assert"
)

func Test_expandDefaultMergeMessage(t *testing.T) {
type args struct {
template string
vars map[string]string
}
tests := []struct {
name string
args args
want string
wantBody string
}{
{
name: "single line",
args: args{
template: "Merge ${PullRequestTitle}",
vars: map[string]string{
"PullRequestTitle": "PullRequestTitle",
"PullRequestDescription": "Pull\nRequest\nDescription\n",
},
},
want: "Merge PullRequestTitle",
wantBody: "",
},
{
name: "multiple lines",
args: args{
template: "Merge ${PullRequestTitle}\nDescription:\n\n${PullRequestDescription}\n",
vars: map[string]string{
"PullRequestTitle": "PullRequestTitle",
"PullRequestDescription": "Pull\nRequest\nDescription\n",
},
},
want: "Merge PullRequestTitle",
wantBody: "Description:\n\nPull\nRequest\nDescription\n",
},
{
name: "leading newlines",
args: args{
template: "\n\n\nMerge ${PullRequestTitle}\n\n\nDescription:\n\n${PullRequestDescription}\n",
vars: map[string]string{
"PullRequestTitle": "PullRequestTitle",
"PullRequestDescription": "Pull\nRequest\nDescription\n",
},
},
want: "Merge PullRequestTitle",
wantBody: "Description:\n\nPull\nRequest\nDescription\n",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, got1 := expandDefaultMergeMessage(tt.args.template, tt.args.vars)
assert.Equalf(t, tt.want, got, "expandDefaultMergeMessage(%v, %v)", tt.args.template, tt.args.vars)
assert.Equalf(t, tt.wantBody, got1, "expandDefaultMergeMessage(%v, %v)", tt.args.template, tt.args.vars)
})
}
}
8 changes: 4 additions & 4 deletions services/pull/pull_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,13 +45,13 @@ func TestPullRequest_GetDefaultMergeMessage_InternalTracker(t *testing.T) {
assert.NoError(t, err)
defer gitRepo.Close()

mergeMessage, err := GetDefaultMergeMessage(db.DefaultContext, gitRepo, pr, "")
mergeMessage, _, err := GetDefaultMergeMessage(db.DefaultContext, gitRepo, pr, "")
assert.NoError(t, err)
assert.Equal(t, "Merge pull request 'issue3' (#3) from branch2 into master", mergeMessage)

pr.BaseRepoID = 1
pr.HeadRepoID = 2
mergeMessage, err = GetDefaultMergeMessage(db.DefaultContext, gitRepo, pr, "")
mergeMessage, _, err = GetDefaultMergeMessage(db.DefaultContext, gitRepo, pr, "")
assert.NoError(t, err)
assert.Equal(t, "Merge pull request 'issue3' (#3) from user2/repo1:branch2 into master", mergeMessage)
}
Expand All @@ -75,7 +75,7 @@ func TestPullRequest_GetDefaultMergeMessage_ExternalTracker(t *testing.T) {
assert.NoError(t, err)
defer gitRepo.Close()

mergeMessage, err := GetDefaultMergeMessage(db.DefaultContext, gitRepo, pr, "")
mergeMessage, _, err := GetDefaultMergeMessage(db.DefaultContext, gitRepo, pr, "")
assert.NoError(t, err)

assert.Equal(t, "Merge pull request 'issue3' (!3) from branch2 into master", mergeMessage)
Expand All @@ -84,7 +84,7 @@ func TestPullRequest_GetDefaultMergeMessage_ExternalTracker(t *testing.T) {
pr.HeadRepoID = 2
pr.BaseRepo = nil
pr.HeadRepo = nil
mergeMessage, err = GetDefaultMergeMessage(db.DefaultContext, gitRepo, pr, "")
mergeMessage, _, err = GetDefaultMergeMessage(db.DefaultContext, gitRepo, pr, "")
assert.NoError(t, err)

assert.Equal(t, "Merge pull request 'issue3' (#3) from user2/repo2:branch2 into master", mergeMessage)
Expand Down
5 changes: 3 additions & 2 deletions templates/repo/issue/view_content/pull.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,8 @@
(() => {
const defaultMergeTitle = {{.DefaultMergeMessage}};
const defaultSquashMergeTitle = {{.DefaultSquashMergeMessage}};
const defaultMergeMessage = 'Reviewed-on: ' + {{$.Issue.HTMLURL}} + '\n' + {{$approvers}};
const defaultMergeMessage = {{if .DefaultMergeBody}}{{.DefaultMergeBody}}{{else}}'Reviewed-on: ' + {{$.Issue.HTMLURL}} + '\n' + {{$approvers}}{{end}};
const defaultSquashMergeMessage = {{if .DefaultSquashMergeBody}}{{.DefaultSquashMergeBody}}{{else}}'Reviewed-on: ' + {{$.Issue.HTMLURL}} + '\n' + {{$approvers}}{{end}};
const mergeForm = {
'baseLink': {{.Link}},
'textCancel': {{$.locale.Tr "cancel"}},
Expand Down Expand Up @@ -398,7 +399,7 @@
'allowed': {{$prUnit.PullRequestsConfig.AllowSquash}},
'textDoMerge': {{$.locale.Tr "repo.pulls.squash_merge_pull_request"}},
'mergeTitleFieldText': defaultSquashMergeTitle,
'mergeMessageFieldText': {{.GetCommitMessages}} + defaultMergeMessage,
'mergeMessageFieldText': {{.GetCommitMessages}} + defaultSquashMergeMessage,
'hideAutoMerge': generalHideAutoMerge,
},
{
Expand Down

0 comments on commit 8b33f80

Please sign in to comment.