Skip to content

Commit

Permalink
Reuse connection where possible
Browse files Browse the repository at this point in the history
  • Loading branch information
Pliner committed Feb 3, 2019
1 parent 20c1b5a commit d9c0ef2
Show file tree
Hide file tree
Showing 4 changed files with 105 additions and 55 deletions.
23 changes: 20 additions & 3 deletions database/redis/metric.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,17 @@ func (connector *DbConnector) GetPatternMetrics(pattern string) ([]string, error
return metrics, nil
}

func (connector *DbConnector) getPatternMetrics(c redis.Conn, pattern string) ([]string, error) {
metrics, err := redis.Strings(c.Do("SMEMBERS", patternMetricsKey(pattern)))
if err != nil {
if err == redis.ErrNil {
return make([]string, 0), nil
}
return nil, fmt.Errorf("failed to get pattern metrics for pattern %s, error: %v", pattern, err)
}
return metrics, nil
}

// RemovePattern removes pattern from patterns list
func (connector *DbConnector) RemovePattern(pattern string) error {
c := connector.pool.Get()
Expand All @@ -196,12 +207,18 @@ func (connector *DbConnector) RemovePatternsMetrics(patterns []string) error {

// RemovePatternWithMetrics removes pattern metrics with data and given pattern
func (connector *DbConnector) RemovePatternWithMetrics(pattern string) error {
metrics, err := connector.GetPatternMetrics(pattern)
c := connector.pool.Get()
defer c.Close()

return connector.removePatternWithMetrics(c, pattern)
}

func (connector *DbConnector) removePatternWithMetrics(c redis.Conn, pattern string) error {
metrics, err := connector.getPatternMetrics(c, pattern)
if err != nil {
return err
}
c := connector.pool.Get()
defer c.Close()

c.Send("MULTI")
c.Send("SREM", patternsListKey, pattern)
for _, metric := range metrics {
Expand Down
64 changes: 40 additions & 24 deletions database/redis/subscription.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@ func (connector *DbConnector) GetSubscription(id string) (moira.SubscriptionData
c := connector.pool.Get()
defer c.Close()

return connector.getSubscription(c, id)
}

func (connector *DbConnector) getSubscription(c redis.Conn, id string) (moira.SubscriptionData, error) {
subscription, err := reply.Subscription(c.Do("GET", subscriptionKey(id)))
if err != nil {
return subscription, err
Expand All @@ -30,6 +34,10 @@ func (connector *DbConnector) GetSubscriptions(subscriptionIDs []string) ([]*moi
c := connector.pool.Get()
defer c.Close()

return connector.getSubscriptions(c, subscriptionIDs)
}

func (connector *DbConnector) getSubscriptions(c redis.Conn, subscriptionIDs []string) ([]*moira.SubscriptionData, error) {
c.Send("MULTI")
for _, id := range subscriptionIDs {
c.Send("GET", subscriptionKey(id))
Expand All @@ -48,16 +56,17 @@ func (connector *DbConnector) GetSubscriptions(subscriptionIDs []string) ([]*moi

// SaveSubscription writes subscription data, updates tags subscriptions and user subscriptions
func (connector *DbConnector) SaveSubscription(subscription *moira.SubscriptionData) error {
oldSubscription, getSubError := connector.GetSubscription(subscription.ID)
c := connector.pool.Get()
defer c.Close()

oldSubscription, getSubError := connector.getSubscription(c, subscription.ID)
if getSubError != nil && getSubError != database.ErrNil {
return getSubError
}
oldTriggers, err := connector.getSubscriptionTriggers(&oldSubscription)
oldTriggers, err := connector.getSubscriptionTriggers(c, &oldSubscription)
if err != nil {
return fmt.Errorf("failed to get triggers by subscription: %s", err.Error())
}
c := connector.pool.Get()
defer c.Close()
c.Send("MULTI")
if getSubError != database.ErrNil {
addSendSubscriptionRequest(c, subscription, &oldSubscription)
Expand All @@ -68,11 +77,11 @@ func (connector *DbConnector) SaveSubscription(subscription *moira.SubscriptionD
if err != nil {
return fmt.Errorf("failed to EXEC: %s", err.Error())
}
newTriggers, err := connector.getSubscriptionTriggers(subscription)
newTriggers, err := connector.getSubscriptionTriggers(c, subscription)
if err != nil {
return fmt.Errorf("failed to get triggers by subscription: %s", err.Error())
}
return connector.refreshUnusedTriggers(newTriggers, oldTriggers)
return connector.refreshUnusedTriggers(c, newTriggers, oldTriggers)
}

// SaveSubscriptions writes subscriptions, updates tags subscriptions and user subscriptions
Expand All @@ -81,16 +90,18 @@ func (connector *DbConnector) SaveSubscriptions(subscriptions []*moira.Subscript
for i, subscription := range subscriptions {
ids[i] = subscription.ID
}
oldSubscriptions, err := connector.GetSubscriptions(ids)

c := connector.pool.Get()
defer c.Close()

oldSubscriptions, err := connector.getSubscriptions(c, ids)
if err != nil {
return err
}
oldTriggers, err := connector.getSubscriptionsTriggers(oldSubscriptions)
oldTriggers, err := connector.getSubscriptionsTriggers(c, oldSubscriptions)
if err != nil {
return err
}
c := connector.pool.Get()
defer c.Close()
c.Send("MULTI")
for i, subscription := range subscriptions {
addSendSubscriptionRequest(c, subscription, oldSubscriptions[i])
Expand All @@ -99,11 +110,11 @@ func (connector *DbConnector) SaveSubscriptions(subscriptions []*moira.Subscript
if err != nil {
return fmt.Errorf("failed to EXEC: %s", err.Error())
}
newTriggers, err := connector.getSubscriptionsTriggers(subscriptions)
newTriggers, err := connector.getSubscriptionsTriggers(c, subscriptions)
if err != nil {
return err
}
if err := connector.refreshUnusedTriggers(newTriggers, oldTriggers); err != nil {
if err := connector.refreshUnusedTriggers(c, newTriggers, oldTriggers); err != nil {
return err
}

Expand All @@ -112,19 +123,21 @@ func (connector *DbConnector) SaveSubscriptions(subscriptions []*moira.Subscript

// RemoveSubscription deletes subscription data and removes subscriptionID from users and tags subscriptions
func (connector *DbConnector) RemoveSubscription(subscriptionID string) error {
subscription, err := connector.GetSubscription(subscriptionID)
c := connector.pool.Get()
defer c.Close()

subscription, err := connector.getSubscription(c, subscriptionID)
if err != nil {
if err == database.ErrNil {
return nil
}
return err
}
oldTriggers, err := connector.getSubscriptionTriggers(&subscription)
oldTriggers, err := connector.getSubscriptionTriggers(c, &subscription)
if err != nil {
return fmt.Errorf("failed to get triggers by subscription: %s", err.Error())
}
c := connector.pool.Get()
defer c.Close()

c.Send("MULTI")
c.Send("SREM", userSubscriptionsKey(subscription.User), subscriptionID)
for _, tag := range subscription.Tags {
Expand All @@ -135,7 +148,7 @@ func (connector *DbConnector) RemoveSubscription(subscriptionID string) error {
if err != nil {
return fmt.Errorf("failed to EXEC: %s", err.Error())
}
err = connector.refreshUnusedTriggers([]*moira.Trigger{}, oldTriggers)
err = connector.refreshUnusedTriggers(c, []*moira.Trigger{}, oldTriggers)
if err != nil {
return fmt.Errorf("failed to update triggers by subscription: %s", err.Error())
}
Expand All @@ -160,6 +173,12 @@ func (connector *DbConnector) GetTagsSubscriptions(tags []string) ([]*moira.Subs
c := connector.pool.Get()
defer c.Close()

return connector.getTagsSubscriptions(c, tags)
}

// GetTagsSubscriptions gets all subscriptionsIDs by given tag list and read subscriptions.
// Len of subscriptionIDs is equal to len of returned values array. If there is no object by current ID, then nil is returned
func (connector *DbConnector) getTagsSubscriptions(c redis.Conn, tags []string) ([]*moira.SubscriptionData, error) {
tagKeys := make([]interface{}, 0, len(tags))
for _, tag := range tags {
tagKeys = append(tagKeys, fmt.Sprintf("moira-tag-subscriptions:%s", tag))
Expand All @@ -176,7 +195,7 @@ func (connector *DbConnector) GetTagsSubscriptions(tags []string) ([]*moira.Subs
return make([]*moira.SubscriptionData, 0), nil
}

subscriptionsData, err := connector.GetSubscriptions(subscriptionsIDs)
subscriptionsData, err := connector.getSubscriptions(c, subscriptionsIDs)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -204,14 +223,11 @@ func addSendSubscriptionRequest(c redis.Conn, subscription *moira.SubscriptionDa
return nil
}

func (connector *DbConnector) getSubscriptionTriggers(subscription *moira.SubscriptionData) ([]*moira.Trigger, error) {
func (connector *DbConnector) getSubscriptionTriggers(c redis.Conn, subscription *moira.SubscriptionData) ([]*moira.Trigger, error) {
if subscription == nil || len(subscription.Tags) == 0 {
return make([]*moira.Trigger, 0), nil
}

c := connector.pool.Get()
defer c.Close()

tagKeys := make([]interface{}, 0, len(subscription.Tags))
for _, tag := range subscription.Tags {
tagKeys = append(tagKeys, tagTriggersKey(tag))
Expand All @@ -232,12 +248,12 @@ func (connector *DbConnector) getSubscriptionTriggers(subscription *moira.Subscr
return connector.GetTriggers(triggerIDs)
}

func (connector *DbConnector) getSubscriptionsTriggers(subscriptions []*moira.SubscriptionData) ([]*moira.Trigger, error) {
func (connector *DbConnector) getSubscriptionsTriggers(c redis.Conn, subscriptions []*moira.SubscriptionData) ([]*moira.Trigger, error) {
triggersMap := make(map[string]*moira.Trigger)
triggers := make([]*moira.Trigger, 0)

for _, subscription := range subscriptions {
triggersBySubscription, err := connector.getSubscriptionTriggers(subscription)
triggersBySubscription, err := connector.getSubscriptionTriggers(c, subscription)
if err != nil {
return triggers, err
}
Expand Down
45 changes: 27 additions & 18 deletions database/redis/trigger.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,14 +49,17 @@ func (connector *DbConnector) GetTrigger(triggerID string) (moira.Trigger, error
c := connector.pool.Get()
defer c.Close()

return connector.getTrigger(c, triggerID)
}

func (connector *DbConnector) getTrigger(c redis.Conn, triggerID string) (moira.Trigger, error) {
c.Send("MULTI")
c.Send("GET", triggerKey(triggerID))
c.Send("SMEMBERS", triggerTagsKey(triggerID))
rawResponse, err := redis.Values(c.Do("EXEC"))
if err != nil {
return moira.Trigger{}, fmt.Errorf("failed to EXEC: %s", err.Error())
}

return connector.getTriggerWithTags(rawResponse[0], rawResponse[1], triggerID)
}

Expand Down Expand Up @@ -96,6 +99,10 @@ func (connector *DbConnector) GetPatternTriggerIDs(pattern string) ([]string, er
c := connector.pool.Get()
defer c.Close()

return connector.getPatternTriggerIDs(c, pattern)
}

func (connector *DbConnector) getPatternTriggerIDs(c redis.Conn, pattern string) ([]string, error) {
triggerIds, err := redis.Strings(c.Do("SMEMBERS", patternTriggersKey(pattern)))
if err != nil {
return nil, fmt.Errorf("failed to retrieve pattern triggers for pattern: %s, error: %s", pattern, err.Error())
Expand All @@ -120,7 +127,10 @@ func (connector *DbConnector) RemovePatternTriggerIDs(pattern string) error {
// If given trigger contains new tags then create it.
// If given trigger has no subscription on it, add it to triggers-without-subscriptions
func (connector *DbConnector) SaveTrigger(triggerID string, trigger *moira.Trigger) error {
existing, errGetTrigger := connector.GetTrigger(triggerID)
c := connector.pool.Get()
defer c.Close()

existing, errGetTrigger := connector.getTrigger(c, triggerID)
if errGetTrigger != nil && errGetTrigger != database.ErrNil {
return errGetTrigger
}
Expand All @@ -131,8 +141,7 @@ func (connector *DbConnector) SaveTrigger(triggerID string, trigger *moira.Trigg
if err != nil {
return err
}
c := connector.pool.Get()
defer c.Close()

c.Send("MULTI")
cleanupPatterns := make([]string, 0)
if errGetTrigger != database.ErrNil {
Expand Down Expand Up @@ -172,38 +181,38 @@ func (connector *DbConnector) SaveTrigger(triggerID string, trigger *moira.Trigg
return fmt.Errorf("failed to EXEC: %s", err.Error())
}

hasSubscriptions, err := connector.triggerHasSubscriptions(trigger)
hasSubscriptions, err := connector.triggerHasSubscriptions(c, trigger)
if err != nil {
return fmt.Errorf("failed to check trigger subscriptions: %s", err.Error())
}

if !hasSubscriptions {
err = connector.MarkTriggersAsUnused(triggerID)
err = connector.markTriggersAsUnused(c, triggerID)
} else {
err = connector.MarkTriggersAsUsed(triggerID)
err = connector.markTriggersAsUsed(c, triggerID)
}
if err != nil {
return fmt.Errorf("failed to mark trigger as (un)used: %s", err.Error())
}

return connector.cleanupPatternsOutOfUse(cleanupPatterns)
return connector.cleanupPatternsOutOfUse(c, cleanupPatterns)
}

// RemoveTrigger deletes trigger data by given triggerID, delete trigger tag list,
// Deletes triggerID from containing tags triggers list and from containing patterns triggers list
// If containing patterns doesn't used in another triggers, then delete this patterns with metrics data
func (connector *DbConnector) RemoveTrigger(triggerID string) error {
trigger, err := connector.GetTrigger(triggerID)
c := connector.pool.Get()
defer c.Close()

trigger, err := connector.getTrigger(c, triggerID)
if err != nil {
if err == database.ErrNil {
return nil
}
return err
}

c := connector.pool.Get()
defer c.Close()

c.Send("MULTI")
c.Send("DEL", triggerKey(triggerID))
c.Send("DEL", triggerTagsKey(triggerID))
Expand All @@ -223,7 +232,7 @@ func (connector *DbConnector) RemoveTrigger(triggerID string) error {
return fmt.Errorf("failed to EXEC: %s", err.Error())
}

return connector.cleanupPatternsOutOfUse(trigger.Patterns)
return connector.cleanupPatternsOutOfUse(c, trigger.Patterns)
}

// GetTriggerChecks gets triggers data with tags, lastCheck data and throttling by given triggersIDs
Expand Down Expand Up @@ -294,26 +303,26 @@ func (connector *DbConnector) getTriggerWithTags(triggerRaw interface{}, tagsRaw
return trigger, nil
}

func (connector *DbConnector) cleanupPatternsOutOfUse(pattern []string) error {
func (connector *DbConnector) cleanupPatternsOutOfUse(c redis.Conn, pattern []string) error {
for _, pattern := range pattern {
triggerIDs, err := connector.GetPatternTriggerIDs(pattern)
triggerIDs, err := connector.getPatternTriggerIDs(c, pattern)
if err != nil {
return err
}
if len(triggerIDs) == 0 {
if err := connector.RemovePatternWithMetrics(pattern); err != nil {
if err := connector.removePatternWithMetrics(c, pattern); err != nil {
return err
}
}
}
return nil
}

func (connector *DbConnector) triggerHasSubscriptions(trigger *moira.Trigger) (bool, error) {
func (connector *DbConnector) triggerHasSubscriptions(c redis.Conn, trigger *moira.Trigger) (bool, error) {
if trigger == nil || len(trigger.Tags) == 0 {
return false, nil
}
subscriptions, err := connector.GetTagsSubscriptions(trigger.Tags)
subscriptions, err := connector.getTagsSubscriptions(c, trigger.Tags)
if err != nil {
return false, err
}
Expand Down

0 comments on commit d9c0ef2

Please sign in to comment.