Skip to content

Commit

Permalink
Avoid kvset/kvget race on subscriptions update (#348)
Browse files Browse the repository at this point in the history
When adding or deleting a subscription, we save the subscriptions via `KVSet`, and previously turned around to fetch those with `KVGet` before broadcasting to the client. On installations with a database replica, the second request might not hit the `master` database, and the update just effected might not be "seen".

Skip the `KVGet` and instead just use the information about the updated susbcriptions that we already have in hand when updating the client.

Fixes: #342
  • Loading branch information
lieut-data committed Jan 10, 2023
1 parent 57e74ce commit 8106e4a
Show file tree
Hide file tree
Showing 4 changed files with 234 additions and 102 deletions.
9 changes: 5 additions & 4 deletions server/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -482,7 +482,7 @@ func parseTriggers(triggersCsv string) *gitlab.AddWebhookOptions {

func (p *Plugin) subscriptionDelete(info *gitlab.UserInfo, config *configuration, fullPath, channelID string) (string, error) {
normalizedPath := normalizePath(fullPath, config.GitlabURL)
deleted, err := p.Unsubscribe(channelID, normalizedPath)
deleted, updatedSubscriptions, err := p.Unsubscribe(channelID, normalizedPath)
if err != nil {
p.API.LogWarn("can't unsubscribe channel in command", "err", err.Error())
return "Encountered an error trying to unsubscribe. Please try again.", nil
Expand All @@ -492,7 +492,7 @@ func (p *Plugin) subscriptionDelete(info *gitlab.UserInfo, config *configuration
return "Subscription not found, please check repository name.", nil
}

p.sendChannelSubscriptionsUpdated(channelID)
p.sendChannelSubscriptionsUpdated(updatedSubscriptions, channelID)

return fmt.Sprintf("Successfully deleted subscription for %s.", normalizedPath), nil
}
Expand Down Expand Up @@ -535,7 +535,8 @@ func (p *Plugin) subscriptionsAddCommand(ctx context.Context, info *gitlab.UserI
return err.Error()
}

if subscribeErr := p.Subscribe(info, namespace, project, channelID, features); subscribeErr != nil {
updatedSubscriptions, subscribeErr := p.Subscribe(info, namespace, project, channelID, features)
if subscribeErr != nil {
p.API.LogWarn(
"failed to subscribe",
"namespace", namespace,
Expand Down Expand Up @@ -571,7 +572,7 @@ func (p *Plugin) subscriptionsAddCommand(ctx context.Context, info *gitlab.UserI
)
}

p.sendChannelSubscriptionsUpdated(channelID)
p.sendChannelSubscriptionsUpdated(updatedSubscriptions, channelID)

return fmt.Sprintf("Successfully subscribed to %s.%s", fullPath, hookStatusMessage)
}
Expand Down
11 changes: 2 additions & 9 deletions server/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -554,17 +554,10 @@ func (p *Plugin) sendRefreshEvent(userID string) {
)
}

func (p *Plugin) sendChannelSubscriptionsUpdated(channelID string) {
func (p *Plugin) sendChannelSubscriptionsUpdated(subs *Subscriptions, channelID string) {
config := p.getConfiguration()

subscriptions, err := p.GetSubscriptionsByChannel(channelID)
if err != nil {
p.API.LogWarn(
"unable to fetch subscriptions by channel",
"err", err.Error(),
)
return
}
subscriptions := filterSubscriptionsByChannel(subs, channelID)

var payload struct {
ChannelID string `json:"channel_id"`
Expand Down
38 changes: 22 additions & 16 deletions server/subscriptions.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,31 +19,37 @@ type Subscriptions struct {
Repositories map[string][]*subscription.Subscription
}

func (p *Plugin) Subscribe(info *gitlab.UserInfo, namespace, project, channelID, features string) error {
func (p *Plugin) Subscribe(info *gitlab.UserInfo, namespace, project, channelID, features string) (*Subscriptions, error) {
if err := p.isNamespaceAllowed(namespace); err != nil {
return err
return nil, err
}

fullPath := fullPathFromNamespaceAndProject(namespace, project)
sub, err := subscription.New(channelID, info.UserID, features, fullPath)
if err != nil {
return err
return nil, err
}

if err := p.AddSubscription(fullPath, sub); err != nil {
return err
subs, err := p.AddSubscription(fullPath, sub)
if err != nil {
return nil, err
}

return nil
return subs, nil
}

func (p *Plugin) GetSubscriptionsByChannel(channelID string) ([]*subscription.Subscription, error) {
var filteredSubs []*subscription.Subscription
subs, err := p.GetSubscriptions()
if err != nil {
return nil, err
}

return filterSubscriptionsByChannel(subs, channelID), nil
}

func filterSubscriptionsByChannel(subs *Subscriptions, channelID string) []*subscription.Subscription {
var filteredSubs []*subscription.Subscription

for _, v := range subs.Repositories {
for _, s := range v {
if s.ChannelID == channelID {
Expand All @@ -52,13 +58,13 @@ func (p *Plugin) GetSubscriptionsByChannel(channelID string) ([]*subscription.Su
}
}

return filteredSubs, nil
return filteredSubs
}

func (p *Plugin) AddSubscription(fullPath string, sub *subscription.Subscription) error {
func (p *Plugin) AddSubscription(fullPath string, sub *subscription.Subscription) (*Subscriptions, error) {
subs, err := p.GetSubscriptions()
if err != nil {
return err
return nil, err
}

repoSubs := subs.Repositories[fullPath]
Expand All @@ -80,7 +86,7 @@ func (p *Plugin) AddSubscription(fullPath string, sub *subscription.Subscription
}

subs.Repositories[fullPath] = repoSubs
return p.StoreSubscriptions(subs)
return subs, p.StoreSubscriptions(subs)
}

func (p *Plugin) GetSubscriptions() (*Subscriptions, error) {
Expand Down Expand Up @@ -155,14 +161,14 @@ func (p *Plugin) GetSubscribedChannelsForProject(

// Unsubscribe deletes the link between namespace/project and channelID.
// Returns true if subscription was found, false otherwise.
func (p *Plugin) Unsubscribe(channelID string, fullPath string) (bool, error) {
func (p *Plugin) Unsubscribe(channelID string, fullPath string) (bool, *Subscriptions, error) {
if fullPath == "" {
return false, errors.New("invalid repository")
return false, nil, errors.New("invalid repository")
}

subs, err := p.GetSubscriptions()
if err != nil {
return false, err
return false, nil, err
}

var removed bool
Expand Down Expand Up @@ -194,7 +200,7 @@ func (p *Plugin) Unsubscribe(channelID string, fullPath string) (bool, error) {
}

if !removed {
return false, nil
return false, subs, nil
}
return true, p.StoreSubscriptions(subs)
return true, subs, p.StoreSubscriptions(subs)
}

0 comments on commit 8106e4a

Please sign in to comment.