Skip to content
This repository has been archived by the owner on Nov 3, 2023. It is now read-only.

measured rep transaction type #200

Merged
merged 3 commits into from
Jan 24, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,5 @@
## jackal - main / unreleased

* [ENHANCEMENT] Added memory ballast. #198
* [BUGFIX] Fix S2S db key check when nop KV is used. #199
* [CHANGE] Introduced measured repository transaction type. #200
* [BUGFIX] Fix S2S db key check when nop KV is used. #199
11 changes: 6 additions & 5 deletions pkg/storage/measured/blocklist.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,33 +23,34 @@ import (
)

type measuredBlockListRep struct {
rep repository.BlockList
rep repository.BlockList
inTx bool
}

func (m *measuredBlockListRep) UpsertBlockListItem(ctx context.Context, item *blocklistmodel.Item) (err error) {
t0 := time.Now()
err = m.rep.UpsertBlockListItem(ctx, item)
reportOpMetric(upsertOp, time.Since(t0).Seconds(), err == nil)
reportOpMetric(upsertOp, time.Since(t0).Seconds(), err == nil, m.inTx)
return
}

func (m *measuredBlockListRep) DeleteBlockListItem(ctx context.Context, item *blocklistmodel.Item) (err error) {
t0 := time.Now()
err = m.rep.DeleteBlockListItem(ctx, item)
reportOpMetric(deleteOp, time.Since(t0).Seconds(), err == nil)
reportOpMetric(deleteOp, time.Since(t0).Seconds(), err == nil, m.inTx)
return
}

func (m *measuredBlockListRep) FetchBlockListItems(ctx context.Context, username string) (blockList []*blocklistmodel.Item, err error) {
t0 := time.Now()
blockList, err = m.rep.FetchBlockListItems(ctx, username)
reportOpMetric(fetchOp, time.Since(t0).Seconds(), err == nil)
reportOpMetric(fetchOp, time.Since(t0).Seconds(), err == nil, m.inTx)
return
}

func (m *measuredBlockListRep) DeleteBlockListItems(ctx context.Context, username string) (err error) {
t0 := time.Now()
err = m.rep.DeleteBlockListItems(ctx, username)
reportOpMetric(deleteOp, time.Since(t0).Seconds(), err == nil)
reportOpMetric(deleteOp, time.Since(t0).Seconds(), err == nil, m.inTx)
return
}
9 changes: 5 additions & 4 deletions pkg/storage/measured/capabilities.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,26 +23,27 @@ import (
)

type measuredCapabilitiesRep struct {
rep repository.Capabilities
rep repository.Capabilities
inTx bool
}

func (m *measuredCapabilitiesRep) UpsertCapabilities(ctx context.Context, caps *capsmodel.Capabilities) (err error) {
t0 := time.Now()
err = m.rep.UpsertCapabilities(ctx, caps)
reportOpMetric(upsertOp, time.Since(t0).Seconds(), err == nil)
reportOpMetric(upsertOp, time.Since(t0).Seconds(), err == nil, m.inTx)
return
}

func (m *measuredCapabilitiesRep) CapabilitiesExist(ctx context.Context, node, ver string) (ok bool, err error) {
t0 := time.Now()
ok, err = m.rep.CapabilitiesExist(ctx, node, ver)
reportOpMetric(fetchOp, time.Since(t0).Seconds(), err == nil)
reportOpMetric(fetchOp, time.Since(t0).Seconds(), err == nil, m.inTx)
return
}

func (m *measuredCapabilitiesRep) FetchCapabilities(ctx context.Context, node, ver string) (caps *capsmodel.Capabilities, err error) {
t0 := time.Now()
caps, err = m.rep.FetchCapabilities(ctx, node, ver)
reportOpMetric(fetchOp, time.Since(t0).Seconds(), err == nil)
reportOpMetric(fetchOp, time.Since(t0).Seconds(), err == nil, m.inTx)
return
}
9 changes: 5 additions & 4 deletions pkg/storage/measured/last.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,26 +23,27 @@ import (
)

type measuredLastRep struct {
rep repository.Last
rep repository.Last
inTx bool
}

func (m *measuredLastRep) UpsertLast(ctx context.Context, last *lastmodel.Last) error {
t0 := time.Now()
err := m.rep.UpsertLast(ctx, last)
reportOpMetric(upsertOp, time.Since(t0).Seconds(), err == nil)
reportOpMetric(upsertOp, time.Since(t0).Seconds(), err == nil, m.inTx)
return err
}

func (m *measuredLastRep) FetchLast(ctx context.Context, username string) (last *lastmodel.Last, err error) {
t0 := time.Now()
last, err = m.rep.FetchLast(ctx, username)
reportOpMetric(fetchOp, time.Since(t0).Seconds(), err == nil)
reportOpMetric(fetchOp, time.Since(t0).Seconds(), err == nil, m.inTx)
return
}

func (m *measuredLastRep) DeleteLast(ctx context.Context, username string) error {
t0 := time.Now()
err := m.rep.DeleteLast(ctx, username)
reportOpMetric(deleteOp, time.Since(t0).Seconds(), err == nil)
reportOpMetric(deleteOp, time.Since(t0).Seconds(), err == nil, m.inTx)
return err
}
7 changes: 5 additions & 2 deletions pkg/storage/measured/measured.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,9 @@ func New(rep repository.Repository) repository.Repository {
// InTransaction generates a repository transaction and completes it after it's being used by f function.
// In case f returns no error tx transaction will be committed.
func (m *Measured) InTransaction(ctx context.Context, f func(ctx context.Context, tx repository.Transaction) error) error {
return m.rep.InTransaction(ctx, f)
return m.rep.InTransaction(ctx, func(ctx context.Context, tx repository.Transaction) error {
return f(ctx, newMeasuredTx(tx))
})
}

// Start initializes repository.
Expand All @@ -73,11 +75,12 @@ func (m *Measured) Stop(ctx context.Context) error {
return m.rep.Stop(ctx)
}

func reportOpMetric(opType string, durationInSecs float64, success bool) {
func reportOpMetric(opType string, durationInSecs float64, success bool, inTx bool) {
metricLabel := prometheus.Labels{
"instance": instance.ID(),
"type": opType,
"success": strconv.FormatBool(success),
"tx": strconv.FormatBool(inTx),
}
repOperations.With(metricLabel).Inc()
repOperationDurationBucket.With(metricLabel).Observe(durationInSecs)
Expand Down
4 changes: 2 additions & 2 deletions pkg/storage/measured/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ var (
Name: "operations_total",
Help: "The total number of repository operations.",
},
[]string{"instance", "type", "success"},
[]string{"instance", "type", "success", "tx"},
)
repOperationDurationBucket = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Expand All @@ -34,7 +34,7 @@ var (
Help: "Bucketed histogram of repository operation duration.",
Buckets: prometheus.ExponentialBuckets(0.01, 2, 24),
},
[]string{"instance", "type", "success"},
[]string{"instance", "type", "success", "tx"},
)
)

Expand Down
11 changes: 6 additions & 5 deletions pkg/storage/measured/offline.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,33 +23,34 @@ import (
)

type measuredOfflineRep struct {
rep repository.Offline
rep repository.Offline
inTx bool
}

func (m *measuredOfflineRep) InsertOfflineMessage(ctx context.Context, message *stravaganza.Message, username string) error {
t0 := time.Now()
err := m.rep.InsertOfflineMessage(ctx, message, username)
reportOpMetric(upsertOp, time.Since(t0).Seconds(), err == nil)
reportOpMetric(upsertOp, time.Since(t0).Seconds(), err == nil, m.inTx)
return err
}

func (m *measuredOfflineRep) CountOfflineMessages(ctx context.Context, username string) (int, error) {
t0 := time.Now()
count, err := m.rep.CountOfflineMessages(ctx, username)
reportOpMetric(fetchOp, time.Since(t0).Seconds(), err == nil)
reportOpMetric(fetchOp, time.Since(t0).Seconds(), err == nil, m.inTx)
return count, err
}

func (m *measuredOfflineRep) FetchOfflineMessages(ctx context.Context, username string) ([]*stravaganza.Message, error) {
t0 := time.Now()
ms, err := m.rep.FetchOfflineMessages(ctx, username)
reportOpMetric(fetchOp, time.Since(t0).Seconds(), err == nil)
reportOpMetric(fetchOp, time.Since(t0).Seconds(), err == nil, m.inTx)
return ms, err
}

func (m *measuredOfflineRep) DeleteOfflineMessages(ctx context.Context, username string) error {
t0 := time.Now()
err := m.rep.DeleteOfflineMessages(ctx, username)
reportOpMetric(deleteOp, time.Since(t0).Seconds(), err == nil)
reportOpMetric(deleteOp, time.Since(t0).Seconds(), err == nil, m.inTx)
return err
}
9 changes: 5 additions & 4 deletions pkg/storage/measured/private.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,26 +23,27 @@ import (
)

type measuredPrivateRep struct {
rep repository.Private
rep repository.Private
inTx bool
}

func (m *measuredPrivateRep) FetchPrivate(ctx context.Context, namespace, username string) (private stravaganza.Element, err error) {
t0 := time.Now()
private, err = m.rep.FetchPrivate(ctx, namespace, username)
reportOpMetric(fetchOp, time.Since(t0).Seconds(), err == nil)
reportOpMetric(fetchOp, time.Since(t0).Seconds(), err == nil, m.inTx)
return
}

func (m *measuredPrivateRep) UpsertPrivate(ctx context.Context, private stravaganza.Element, namespace, username string) (err error) {
t0 := time.Now()
err = m.rep.UpsertPrivate(ctx, private, namespace, username)
reportOpMetric(upsertOp, time.Since(t0).Seconds(), err == nil)
reportOpMetric(upsertOp, time.Since(t0).Seconds(), err == nil, m.inTx)
return
}

func (m *measuredPrivateRep) DeletePrivates(ctx context.Context, username string) (err error) {
t0 := time.Now()
err = m.rep.DeletePrivates(ctx, username)
reportOpMetric(deleteOp, time.Since(t0).Seconds(), err == nil)
reportOpMetric(deleteOp, time.Since(t0).Seconds(), err == nil, m.inTx)
return
}
31 changes: 16 additions & 15 deletions pkg/storage/measured/roster.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,103 +23,104 @@ import (
)

type measuredRosterRep struct {
rep repository.Roster
rep repository.Roster
inTx bool
}

func (m *measuredRosterRep) TouchRosterVersion(ctx context.Context, username string) (int, error) {
t0 := time.Now()
ver, err := m.rep.TouchRosterVersion(ctx, username)
reportOpMetric(upsertOp, time.Since(t0).Seconds(), err == nil)
reportOpMetric(upsertOp, time.Since(t0).Seconds(), err == nil, m.inTx)
return ver, err
}

func (m *measuredRosterRep) FetchRosterVersion(ctx context.Context, username string) (int, error) {
t0 := time.Now()
ver, err := m.rep.FetchRosterVersion(ctx, username)
reportOpMetric(fetchOp, time.Since(t0).Seconds(), err == nil)
reportOpMetric(fetchOp, time.Since(t0).Seconds(), err == nil, m.inTx)
return ver, err
}

func (m *measuredRosterRep) UpsertRosterItem(ctx context.Context, ri *rostermodel.Item) error {
t0 := time.Now()
err := m.rep.UpsertRosterItem(ctx, ri)
reportOpMetric(upsertOp, time.Since(t0).Seconds(), err == nil)
reportOpMetric(upsertOp, time.Since(t0).Seconds(), err == nil, m.inTx)
return err
}

func (m *measuredRosterRep) DeleteRosterItem(ctx context.Context, username, jid string) error {
t0 := time.Now()
err := m.rep.DeleteRosterItem(ctx, username, jid)
reportOpMetric(deleteOp, time.Since(t0).Seconds(), err == nil)
reportOpMetric(deleteOp, time.Since(t0).Seconds(), err == nil, m.inTx)
return err
}

func (m *measuredRosterRep) DeleteRosterItems(ctx context.Context, username string) error {
t0 := time.Now()
err := m.rep.DeleteRosterItems(ctx, username)
reportOpMetric(deleteOp, time.Since(t0).Seconds(), err == nil)
reportOpMetric(deleteOp, time.Since(t0).Seconds(), err == nil, m.inTx)
return err
}

func (m *measuredRosterRep) FetchRosterItems(ctx context.Context, username string) ([]*rostermodel.Item, error) {
t0 := time.Now()
items, err := m.rep.FetchRosterItems(ctx, username)
reportOpMetric(fetchOp, time.Since(t0).Seconds(), err == nil)
reportOpMetric(fetchOp, time.Since(t0).Seconds(), err == nil, m.inTx)
return items, err
}

func (m *measuredRosterRep) FetchRosterItemsInGroups(ctx context.Context, username string, groups []string) ([]*rostermodel.Item, error) {
t0 := time.Now()
items, err := m.rep.FetchRosterItemsInGroups(ctx, username, groups)
reportOpMetric(fetchOp, time.Since(t0).Seconds(), err == nil)
reportOpMetric(fetchOp, time.Since(t0).Seconds(), err == nil, m.inTx)
return items, err
}

func (m *measuredRosterRep) FetchRosterItem(ctx context.Context, username, jid string) (*rostermodel.Item, error) {
t0 := time.Now()
itm, err := m.rep.FetchRosterItem(ctx, username, jid)
reportOpMetric(fetchOp, time.Since(t0).Seconds(), err == nil)
reportOpMetric(fetchOp, time.Since(t0).Seconds(), err == nil, m.inTx)
return itm, err
}

func (m *measuredRosterRep) UpsertRosterNotification(ctx context.Context, rn *rostermodel.Notification) error {
t0 := time.Now()
err := m.rep.UpsertRosterNotification(ctx, rn)
reportOpMetric(upsertOp, time.Since(t0).Seconds(), err == nil)
reportOpMetric(upsertOp, time.Since(t0).Seconds(), err == nil, m.inTx)
return err
}

func (m *measuredRosterRep) DeleteRosterNotification(ctx context.Context, contact, jid string) error {
t0 := time.Now()
err := m.rep.DeleteRosterNotification(ctx, contact, jid)
reportOpMetric(deleteOp, time.Since(t0).Seconds(), err == nil)
reportOpMetric(deleteOp, time.Since(t0).Seconds(), err == nil, m.inTx)
return err
}

func (m *measuredRosterRep) DeleteRosterNotifications(ctx context.Context, contact string) error {
t0 := time.Now()
err := m.rep.DeleteRosterNotifications(ctx, contact)
reportOpMetric(deleteOp, time.Since(t0).Seconds(), err == nil)
reportOpMetric(deleteOp, time.Since(t0).Seconds(), err == nil, m.inTx)
return err
}

func (m *measuredRosterRep) FetchRosterNotification(ctx context.Context, contact string, jid string) (*rostermodel.Notification, error) {
t0 := time.Now()
rn, err := m.rep.FetchRosterNotification(ctx, contact, jid)
reportOpMetric(fetchOp, time.Since(t0).Seconds(), err == nil)
reportOpMetric(fetchOp, time.Since(t0).Seconds(), err == nil, m.inTx)
return rn, err
}

func (m *measuredRosterRep) FetchRosterNotifications(ctx context.Context, contact string) ([]*rostermodel.Notification, error) {
t0 := time.Now()
rns, err := m.rep.FetchRosterNotifications(ctx, contact)
reportOpMetric(fetchOp, time.Since(t0).Seconds(), err == nil)
reportOpMetric(fetchOp, time.Since(t0).Seconds(), err == nil, m.inTx)
return rns, err
}

func (m *measuredRosterRep) FetchRosterGroups(ctx context.Context, username string) ([]string, error) {
t0 := time.Now()
groups, err := m.rep.FetchRosterGroups(ctx, username)
reportOpMetric(fetchOp, time.Since(t0).Seconds(), err == nil)
reportOpMetric(fetchOp, time.Since(t0).Seconds(), err == nil, m.inTx)
return groups, err
}
41 changes: 41 additions & 0 deletions pkg/storage/measured/tx.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
// Copyright 2021 The jackal Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package measuredrepository

import "github.com/ortuman/jackal/pkg/storage/repository"

type measuredTx struct {
repository.User
repository.Last
repository.Capabilities
repository.Offline
repository.BlockList
repository.Private
repository.Roster
repository.VCard
}

func newMeasuredTx(tx repository.Transaction) *measuredTx {
return &measuredTx{
User: &measuredUserRep{rep: tx, inTx: true},
Last: &measuredLastRep{rep: tx, inTx: true},
Capabilities: &measuredCapabilitiesRep{rep: tx, inTx: true},
Offline: &measuredOfflineRep{rep: tx, inTx: true},
BlockList: &measuredBlockListRep{rep: tx, inTx: true},
Private: &measuredPrivateRep{rep: tx, inTx: true},
Roster: &measuredRosterRep{rep: tx, inTx: true},
VCard: &measuredVCardRep{rep: tx, inTx: true},
}
}
Loading