From 8f72aba887192dc8dc3fe7cdb53ebf684dfd2f64 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miguel=20=C3=81ngel=20Ortu=C3=B1o?= Date: Mon, 24 Jan 2022 11:46:38 +0100 Subject: [PATCH 1/3] measured rep transaction type --- pkg/storage/measured/blocklist.go | 11 ++++---- pkg/storage/measured/capabilities.go | 9 +++--- pkg/storage/measured/last.go | 9 +++--- pkg/storage/measured/measured.go | 7 +++-- pkg/storage/measured/metrics.go | 2 +- pkg/storage/measured/offline.go | 11 ++++---- pkg/storage/measured/private.go | 9 +++--- pkg/storage/measured/roster.go | 31 +++++++++++---------- pkg/storage/measured/tx.go | 41 ++++++++++++++++++++++++++++ pkg/storage/measured/user.go | 11 ++++---- pkg/storage/measured/vcard.go | 9 +++--- 11 files changed, 101 insertions(+), 49 deletions(-) create mode 100644 pkg/storage/measured/tx.go diff --git a/pkg/storage/measured/blocklist.go b/pkg/storage/measured/blocklist.go index f691ec294..fa01a0fa8 100644 --- a/pkg/storage/measured/blocklist.go +++ b/pkg/storage/measured/blocklist.go @@ -23,33 +23,34 @@ import ( ) type measuredBlockListRep struct { - rep repository.BlockList + rep repository.BlockList + inTx bool } func (m *measuredBlockListRep) UpsertBlockListItem(ctx context.Context, item *blocklistmodel.Item) (err error) { t0 := time.Now() err = m.rep.UpsertBlockListItem(ctx, item) - reportOpMetric(upsertOp, time.Since(t0).Seconds(), err == nil) + reportOpMetric(upsertOp, time.Since(t0).Seconds(), err == nil, m.inTx) return } func (m *measuredBlockListRep) DeleteBlockListItem(ctx context.Context, item *blocklistmodel.Item) (err error) { t0 := time.Now() err = m.rep.DeleteBlockListItem(ctx, item) - reportOpMetric(deleteOp, time.Since(t0).Seconds(), err == nil) + reportOpMetric(deleteOp, time.Since(t0).Seconds(), err == nil, m.inTx) return } func (m *measuredBlockListRep) FetchBlockListItems(ctx context.Context, username string) (blockList []*blocklistmodel.Item, err error) { t0 := time.Now() blockList, err = m.rep.FetchBlockListItems(ctx, username) - reportOpMetric(fetchOp, time.Since(t0).Seconds(), err == nil) + reportOpMetric(fetchOp, time.Since(t0).Seconds(), err == nil, m.inTx) return } func (m *measuredBlockListRep) DeleteBlockListItems(ctx context.Context, username string) (err error) { t0 := time.Now() err = m.rep.DeleteBlockListItems(ctx, username) - reportOpMetric(deleteOp, time.Since(t0).Seconds(), err == nil) + reportOpMetric(deleteOp, time.Since(t0).Seconds(), err == nil, m.inTx) return } diff --git a/pkg/storage/measured/capabilities.go b/pkg/storage/measured/capabilities.go index 7889bbab9..9b564bc85 100644 --- a/pkg/storage/measured/capabilities.go +++ b/pkg/storage/measured/capabilities.go @@ -23,26 +23,27 @@ import ( ) type measuredCapabilitiesRep struct { - rep repository.Capabilities + rep repository.Capabilities + inTx bool } func (m *measuredCapabilitiesRep) UpsertCapabilities(ctx context.Context, caps *capsmodel.Capabilities) (err error) { t0 := time.Now() err = m.rep.UpsertCapabilities(ctx, caps) - reportOpMetric(upsertOp, time.Since(t0).Seconds(), err == nil) + reportOpMetric(upsertOp, time.Since(t0).Seconds(), err == nil, m.inTx) return } func (m *measuredCapabilitiesRep) CapabilitiesExist(ctx context.Context, node, ver string) (ok bool, err error) { t0 := time.Now() ok, err = m.rep.CapabilitiesExist(ctx, node, ver) - reportOpMetric(fetchOp, time.Since(t0).Seconds(), err == nil) + reportOpMetric(fetchOp, time.Since(t0).Seconds(), err == nil, m.inTx) return } func (m *measuredCapabilitiesRep) FetchCapabilities(ctx context.Context, node, ver string) (caps *capsmodel.Capabilities, err error) { t0 := time.Now() caps, err = m.rep.FetchCapabilities(ctx, node, ver) - reportOpMetric(fetchOp, time.Since(t0).Seconds(), err == nil) + reportOpMetric(fetchOp, time.Since(t0).Seconds(), err == nil, m.inTx) return } diff --git a/pkg/storage/measured/last.go b/pkg/storage/measured/last.go index 1629c8426..ef66d339b 100644 --- a/pkg/storage/measured/last.go +++ b/pkg/storage/measured/last.go @@ -23,26 +23,27 @@ import ( ) type measuredLastRep struct { - rep repository.Last + rep repository.Last + inTx bool } func (m *measuredLastRep) UpsertLast(ctx context.Context, last *lastmodel.Last) error { t0 := time.Now() err := m.rep.UpsertLast(ctx, last) - reportOpMetric(upsertOp, time.Since(t0).Seconds(), err == nil) + reportOpMetric(upsertOp, time.Since(t0).Seconds(), err == nil, m.inTx) return err } func (m *measuredLastRep) FetchLast(ctx context.Context, username string) (last *lastmodel.Last, err error) { t0 := time.Now() last, err = m.rep.FetchLast(ctx, username) - reportOpMetric(fetchOp, time.Since(t0).Seconds(), err == nil) + reportOpMetric(fetchOp, time.Since(t0).Seconds(), err == nil, m.inTx) return } func (m *measuredLastRep) DeleteLast(ctx context.Context, username string) error { t0 := time.Now() err := m.rep.DeleteLast(ctx, username) - reportOpMetric(deleteOp, time.Since(t0).Seconds(), err == nil) + reportOpMetric(deleteOp, time.Since(t0).Seconds(), err == nil, m.inTx) return err } diff --git a/pkg/storage/measured/measured.go b/pkg/storage/measured/measured.go index 9504f0e27..5f5404085 100644 --- a/pkg/storage/measured/measured.go +++ b/pkg/storage/measured/measured.go @@ -60,7 +60,9 @@ func New(rep repository.Repository) repository.Repository { // InTransaction generates a repository transaction and completes it after it's being used by f function. // In case f returns no error tx transaction will be committed. func (m *Measured) InTransaction(ctx context.Context, f func(ctx context.Context, tx repository.Transaction) error) error { - return m.rep.InTransaction(ctx, f) + return m.rep.InTransaction(ctx, func(ctx context.Context, tx repository.Transaction) error { + return f(ctx, newMeasuredTx(tx)) + }) } // Start initializes repository. @@ -73,11 +75,12 @@ func (m *Measured) Stop(ctx context.Context) error { return m.rep.Stop(ctx) } -func reportOpMetric(opType string, durationInSecs float64, success bool) { +func reportOpMetric(opType string, durationInSecs float64, success bool, inTx bool) { metricLabel := prometheus.Labels{ "instance": instance.ID(), "type": opType, "success": strconv.FormatBool(success), + "tx": strconv.FormatBool(inTx), } repOperations.With(metricLabel).Inc() repOperationDurationBucket.With(metricLabel).Observe(durationInSecs) diff --git a/pkg/storage/measured/metrics.go b/pkg/storage/measured/metrics.go index 761947310..176fb3abf 100644 --- a/pkg/storage/measured/metrics.go +++ b/pkg/storage/measured/metrics.go @@ -34,7 +34,7 @@ var ( Help: "Bucketed histogram of repository operation duration.", Buckets: prometheus.ExponentialBuckets(0.01, 2, 24), }, - []string{"instance", "type", "success"}, + []string{"instance", "type", "success", "tx"}, ) ) diff --git a/pkg/storage/measured/offline.go b/pkg/storage/measured/offline.go index fff690f00..785a14ee4 100644 --- a/pkg/storage/measured/offline.go +++ b/pkg/storage/measured/offline.go @@ -23,33 +23,34 @@ import ( ) type measuredOfflineRep struct { - rep repository.Offline + rep repository.Offline + inTx bool } func (m *measuredOfflineRep) InsertOfflineMessage(ctx context.Context, message *stravaganza.Message, username string) error { t0 := time.Now() err := m.rep.InsertOfflineMessage(ctx, message, username) - reportOpMetric(upsertOp, time.Since(t0).Seconds(), err == nil) + reportOpMetric(upsertOp, time.Since(t0).Seconds(), err == nil, m.inTx) return err } func (m *measuredOfflineRep) CountOfflineMessages(ctx context.Context, username string) (int, error) { t0 := time.Now() count, err := m.rep.CountOfflineMessages(ctx, username) - reportOpMetric(fetchOp, time.Since(t0).Seconds(), err == nil) + reportOpMetric(fetchOp, time.Since(t0).Seconds(), err == nil, m.inTx) return count, err } func (m *measuredOfflineRep) FetchOfflineMessages(ctx context.Context, username string) ([]*stravaganza.Message, error) { t0 := time.Now() ms, err := m.rep.FetchOfflineMessages(ctx, username) - reportOpMetric(fetchOp, time.Since(t0).Seconds(), err == nil) + reportOpMetric(fetchOp, time.Since(t0).Seconds(), err == nil, m.inTx) return ms, err } func (m *measuredOfflineRep) DeleteOfflineMessages(ctx context.Context, username string) error { t0 := time.Now() err := m.rep.DeleteOfflineMessages(ctx, username) - reportOpMetric(deleteOp, time.Since(t0).Seconds(), err == nil) + reportOpMetric(deleteOp, time.Since(t0).Seconds(), err == nil, m.inTx) return err } diff --git a/pkg/storage/measured/private.go b/pkg/storage/measured/private.go index 15a56d441..1bf8a1d84 100644 --- a/pkg/storage/measured/private.go +++ b/pkg/storage/measured/private.go @@ -23,26 +23,27 @@ import ( ) type measuredPrivateRep struct { - rep repository.Private + rep repository.Private + inTx bool } func (m *measuredPrivateRep) FetchPrivate(ctx context.Context, namespace, username string) (private stravaganza.Element, err error) { t0 := time.Now() private, err = m.rep.FetchPrivate(ctx, namespace, username) - reportOpMetric(fetchOp, time.Since(t0).Seconds(), err == nil) + reportOpMetric(fetchOp, time.Since(t0).Seconds(), err == nil, m.inTx) return } func (m *measuredPrivateRep) UpsertPrivate(ctx context.Context, private stravaganza.Element, namespace, username string) (err error) { t0 := time.Now() err = m.rep.UpsertPrivate(ctx, private, namespace, username) - reportOpMetric(upsertOp, time.Since(t0).Seconds(), err == nil) + reportOpMetric(upsertOp, time.Since(t0).Seconds(), err == nil, m.inTx) return } func (m *measuredPrivateRep) DeletePrivates(ctx context.Context, username string) (err error) { t0 := time.Now() err = m.rep.DeletePrivates(ctx, username) - reportOpMetric(deleteOp, time.Since(t0).Seconds(), err == nil) + reportOpMetric(deleteOp, time.Since(t0).Seconds(), err == nil, m.inTx) return } diff --git a/pkg/storage/measured/roster.go b/pkg/storage/measured/roster.go index b5fc71c67..6b7159599 100644 --- a/pkg/storage/measured/roster.go +++ b/pkg/storage/measured/roster.go @@ -23,103 +23,104 @@ import ( ) type measuredRosterRep struct { - rep repository.Roster + rep repository.Roster + inTx bool } func (m *measuredRosterRep) TouchRosterVersion(ctx context.Context, username string) (int, error) { t0 := time.Now() ver, err := m.rep.TouchRosterVersion(ctx, username) - reportOpMetric(upsertOp, time.Since(t0).Seconds(), err == nil) + reportOpMetric(upsertOp, time.Since(t0).Seconds(), err == nil, m.inTx) return ver, err } func (m *measuredRosterRep) FetchRosterVersion(ctx context.Context, username string) (int, error) { t0 := time.Now() ver, err := m.rep.FetchRosterVersion(ctx, username) - reportOpMetric(fetchOp, time.Since(t0).Seconds(), err == nil) + reportOpMetric(fetchOp, time.Since(t0).Seconds(), err == nil, m.inTx) return ver, err } func (m *measuredRosterRep) UpsertRosterItem(ctx context.Context, ri *rostermodel.Item) error { t0 := time.Now() err := m.rep.UpsertRosterItem(ctx, ri) - reportOpMetric(upsertOp, time.Since(t0).Seconds(), err == nil) + reportOpMetric(upsertOp, time.Since(t0).Seconds(), err == nil, m.inTx) return err } func (m *measuredRosterRep) DeleteRosterItem(ctx context.Context, username, jid string) error { t0 := time.Now() err := m.rep.DeleteRosterItem(ctx, username, jid) - reportOpMetric(deleteOp, time.Since(t0).Seconds(), err == nil) + reportOpMetric(deleteOp, time.Since(t0).Seconds(), err == nil, m.inTx) return err } func (m *measuredRosterRep) DeleteRosterItems(ctx context.Context, username string) error { t0 := time.Now() err := m.rep.DeleteRosterItems(ctx, username) - reportOpMetric(deleteOp, time.Since(t0).Seconds(), err == nil) + reportOpMetric(deleteOp, time.Since(t0).Seconds(), err == nil, m.inTx) return err } func (m *measuredRosterRep) FetchRosterItems(ctx context.Context, username string) ([]*rostermodel.Item, error) { t0 := time.Now() items, err := m.rep.FetchRosterItems(ctx, username) - reportOpMetric(fetchOp, time.Since(t0).Seconds(), err == nil) + reportOpMetric(fetchOp, time.Since(t0).Seconds(), err == nil, m.inTx) return items, err } func (m *measuredRosterRep) FetchRosterItemsInGroups(ctx context.Context, username string, groups []string) ([]*rostermodel.Item, error) { t0 := time.Now() items, err := m.rep.FetchRosterItemsInGroups(ctx, username, groups) - reportOpMetric(fetchOp, time.Since(t0).Seconds(), err == nil) + reportOpMetric(fetchOp, time.Since(t0).Seconds(), err == nil, m.inTx) return items, err } func (m *measuredRosterRep) FetchRosterItem(ctx context.Context, username, jid string) (*rostermodel.Item, error) { t0 := time.Now() itm, err := m.rep.FetchRosterItem(ctx, username, jid) - reportOpMetric(fetchOp, time.Since(t0).Seconds(), err == nil) + reportOpMetric(fetchOp, time.Since(t0).Seconds(), err == nil, m.inTx) return itm, err } func (m *measuredRosterRep) UpsertRosterNotification(ctx context.Context, rn *rostermodel.Notification) error { t0 := time.Now() err := m.rep.UpsertRosterNotification(ctx, rn) - reportOpMetric(upsertOp, time.Since(t0).Seconds(), err == nil) + reportOpMetric(upsertOp, time.Since(t0).Seconds(), err == nil, m.inTx) return err } func (m *measuredRosterRep) DeleteRosterNotification(ctx context.Context, contact, jid string) error { t0 := time.Now() err := m.rep.DeleteRosterNotification(ctx, contact, jid) - reportOpMetric(deleteOp, time.Since(t0).Seconds(), err == nil) + reportOpMetric(deleteOp, time.Since(t0).Seconds(), err == nil, m.inTx) return err } func (m *measuredRosterRep) DeleteRosterNotifications(ctx context.Context, contact string) error { t0 := time.Now() err := m.rep.DeleteRosterNotifications(ctx, contact) - reportOpMetric(deleteOp, time.Since(t0).Seconds(), err == nil) + reportOpMetric(deleteOp, time.Since(t0).Seconds(), err == nil, m.inTx) return err } func (m *measuredRosterRep) FetchRosterNotification(ctx context.Context, contact string, jid string) (*rostermodel.Notification, error) { t0 := time.Now() rn, err := m.rep.FetchRosterNotification(ctx, contact, jid) - reportOpMetric(fetchOp, time.Since(t0).Seconds(), err == nil) + reportOpMetric(fetchOp, time.Since(t0).Seconds(), err == nil, m.inTx) return rn, err } func (m *measuredRosterRep) FetchRosterNotifications(ctx context.Context, contact string) ([]*rostermodel.Notification, error) { t0 := time.Now() rns, err := m.rep.FetchRosterNotifications(ctx, contact) - reportOpMetric(fetchOp, time.Since(t0).Seconds(), err == nil) + reportOpMetric(fetchOp, time.Since(t0).Seconds(), err == nil, m.inTx) return rns, err } func (m *measuredRosterRep) FetchRosterGroups(ctx context.Context, username string) ([]string, error) { t0 := time.Now() groups, err := m.rep.FetchRosterGroups(ctx, username) - reportOpMetric(fetchOp, time.Since(t0).Seconds(), err == nil) + reportOpMetric(fetchOp, time.Since(t0).Seconds(), err == nil, m.inTx) return groups, err } diff --git a/pkg/storage/measured/tx.go b/pkg/storage/measured/tx.go new file mode 100644 index 000000000..e53571029 --- /dev/null +++ b/pkg/storage/measured/tx.go @@ -0,0 +1,41 @@ +// Copyright 2021 The jackal Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package measuredrepository + +import "github.com/ortuman/jackal/pkg/storage/repository" + +type measuredTx struct { + repository.User + repository.Last + repository.Capabilities + repository.Offline + repository.BlockList + repository.Private + repository.Roster + repository.VCard +} + +func newMeasuredTx(tx repository.Transaction) *measuredTx { + return &measuredTx{ + User: &measuredUserRep{rep: tx, inTx: true}, + Last: &measuredLastRep{rep: tx, inTx: true}, + Capabilities: &measuredCapabilitiesRep{rep: tx, inTx: true}, + Offline: &measuredOfflineRep{rep: tx, inTx: true}, + BlockList: &measuredBlockListRep{rep: tx, inTx: true}, + Private: &measuredPrivateRep{rep: tx, inTx: true}, + Roster: &measuredRosterRep{rep: tx, inTx: true}, + VCard: &measuredVCardRep{rep: tx, inTx: true}, + } +} diff --git a/pkg/storage/measured/user.go b/pkg/storage/measured/user.go index 9be0959a2..32f201ec0 100644 --- a/pkg/storage/measured/user.go +++ b/pkg/storage/measured/user.go @@ -24,33 +24,34 @@ import ( ) type measuredUserRep struct { - rep repository.User + rep repository.User + inTx bool } func (m *measuredUserRep) UpsertUser(ctx context.Context, user *usermodel.User) (err error) { t0 := time.Now() err = m.rep.UpsertUser(ctx, user) - reportOpMetric(upsertOp, time.Since(t0).Seconds(), err == nil) + reportOpMetric(upsertOp, time.Since(t0).Seconds(), err == nil, m.inTx) return } func (m *measuredUserRep) DeleteUser(ctx context.Context, username string) (err error) { t0 := time.Now() err = m.rep.DeleteUser(ctx, username) - reportOpMetric(deleteOp, time.Since(t0).Seconds(), err == nil) + reportOpMetric(deleteOp, time.Since(t0).Seconds(), err == nil, m.inTx) return } func (m *measuredUserRep) FetchUser(ctx context.Context, username string) (usr *usermodel.User, err error) { t0 := time.Now() usr, err = m.rep.FetchUser(ctx, username) - reportOpMetric(fetchOp, time.Since(t0).Seconds(), err == nil) + reportOpMetric(fetchOp, time.Since(t0).Seconds(), err == nil, m.inTx) return } func (m *measuredUserRep) UserExists(ctx context.Context, username string) (ok bool, err error) { t0 := time.Now() ok, err = m.rep.UserExists(ctx, username) - reportOpMetric(fetchOp, time.Since(t0).Seconds(), err == nil) + reportOpMetric(fetchOp, time.Since(t0).Seconds(), err == nil, m.inTx) return } diff --git a/pkg/storage/measured/vcard.go b/pkg/storage/measured/vcard.go index e0824d153..a3ad63998 100644 --- a/pkg/storage/measured/vcard.go +++ b/pkg/storage/measured/vcard.go @@ -23,26 +23,27 @@ import ( ) type measuredVCardRep struct { - rep repository.VCard + rep repository.VCard + inTx bool } func (m *measuredVCardRep) UpsertVCard(ctx context.Context, vCard stravaganza.Element, username string) error { t0 := time.Now() err := m.rep.UpsertVCard(ctx, vCard, username) - reportOpMetric(upsertOp, time.Since(t0).Seconds(), err == nil) + reportOpMetric(upsertOp, time.Since(t0).Seconds(), err == nil, m.inTx) return err } func (m *measuredVCardRep) FetchVCard(ctx context.Context, username string) (stravaganza.Element, error) { t0 := time.Now() vc, err := m.rep.FetchVCard(ctx, username) - reportOpMetric(fetchOp, time.Since(t0).Seconds(), err == nil) + reportOpMetric(fetchOp, time.Since(t0).Seconds(), err == nil, m.inTx) return vc, err } func (m *measuredVCardRep) DeleteVCard(ctx context.Context, username string) (err error) { t0 := time.Now() err = m.rep.DeleteVCard(ctx, username) - reportOpMetric(deleteOp, time.Since(t0).Seconds(), err == nil) + reportOpMetric(deleteOp, time.Since(t0).Seconds(), err == nil, m.inTx) return } From 7d5a6a592b15fd953dc5c9e0f51333784f8efb41 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miguel=20=C3=81ngel=20Ortu=C3=B1o?= Date: Mon, 24 Jan 2022 11:51:02 +0100 Subject: [PATCH 2/3] updated CHANGELOG.md --- CHANGELOG.md | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 7fb4f9dec..951ece074 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,4 +3,5 @@ ## jackal - main / unreleased * [ENHANCEMENT] Added memory ballast. #198 -* [BUGFIX] Fix S2S db key check when nop KV is used. #199 \ No newline at end of file +* [CHANGE] Introduced measured repository transaction type. #200 +* [BUGFIX] Fix S2S db key check when nop KV is used. #199 From e845a2f7bfa81b81cf8a2b88c8e6a75bebc60e97 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miguel=20=C3=81ngel=20Ortu=C3=B1o?= Date: Mon, 24 Jan 2022 11:52:48 +0100 Subject: [PATCH 3/3] fix prom cardinality issue --- pkg/storage/measured/metrics.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/storage/measured/metrics.go b/pkg/storage/measured/metrics.go index 176fb3abf..8035bf52e 100644 --- a/pkg/storage/measured/metrics.go +++ b/pkg/storage/measured/metrics.go @@ -24,7 +24,7 @@ var ( Name: "operations_total", Help: "The total number of repository operations.", }, - []string{"instance", "type", "success"}, + []string{"instance", "type", "success", "tx"}, ) repOperationDurationBucket = prometheus.NewHistogramVec( prometheus.HistogramOpts{