Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Store event counters separately for each privacyIDEA node #1833

Merged
merged 5 commits into from Sep 6, 2019
Merged
Changes from all commits
Commits
File filter...
Filter file types
Jump to…
Jump to file or symbol
Failed to load files and symbols.

Always

Just for now

@@ -0,0 +1,98 @@
"""Store privacyIDEA node in eventcounter table
Revision ID: d756b34061ff
Revises: 3d7f8b29cbb1
Create Date: 2019-09-02 13:59:24.244529
"""
# revision identifiers, used by Alembic.
from sqlalchemy import orm
from sqlalchemy.sql.ddl import CreateSequence

from privacyidea.lib.config import get_privacyidea_node

revision = 'd756b34061ff'
down_revision = '3d7f8b29cbb1'

from alembic import op, context
import sqlalchemy as sa
from sqlalchemy.ext.declarative import declarative_base


Base = declarative_base()


class OldEventCounter(Base):
__tablename__ = 'eventcounter'
counter_name = sa.Column(sa.Unicode(80), nullable=False, primary_key=True)
counter_value = sa.Column(sa.Integer, default=0)
__table_args__ = {'mysql_row_format': 'DYNAMIC'}


class NewEventCounter(Base):
__tablename__ = 'eventcounter_new'
id = sa.Column(sa.Integer, sa.Sequence("eventcounter_seq"), primary_key=True)
counter_name = sa.Column(sa.Unicode(80), nullable=False)
counter_value = sa.Column(sa.Integer, default=0)
node = sa.Column(sa.Unicode(255), nullable=False)
__table_args__ = (sa.UniqueConstraint('counter_name',
'node',
name='evctr_1'),
{'mysql_row_format': 'DYNAMIC'})


def dialect_supports_sequences():
migration_context = context.get_context()
return migration_context.dialect.supports_sequences


def create_seq(seq):
if dialect_supports_sequences():
op.execute(CreateSequence(seq))


def upgrade():

try:
# Step 1: Create sequence on Postgres
seq = sa.Sequence('eventcounter_seq')
try:
create_seq(seq)
except Exception as _e:
pass
# Step 2: Create new eventcounter_new table
op.create_table('eventcounter_new',
sa.Column("id", sa.Integer, sa.Sequence("eventcounter_seq"), primary_key=True),
sa.Column("counter_name", sa.Unicode(80), nullable=False),
sa.Column("counter_value", sa.Integer, default=0),
sa.Column("node", sa.Unicode(255), nullable=False),
sa.UniqueConstraint('counter_name', 'node', name='evctr_1'),
mysql_row_format='DYNAMIC'
)
# Step 3: Migrate data from eventcounter to eventcounter_new
node = get_privacyidea_node()
bind = op.get_bind()
session = orm.Session(bind=bind)
for old_ctr in session.query(OldEventCounter).all():
new_ctr = NewEventCounter(counter_name=old_ctr.counter_name,
counter_value=old_ctr.counter_value,
node=node)
session.add(new_ctr)
print("Migrating counter {!r}={} on node={!r} ...".format(new_ctr.counter_name,
new_ctr.counter_value,
node))
session.commit()
# Step 4: Remove eventcounter
op.drop_table("eventcounter")
op.rename_table("eventcounter_new", "eventcounter")
except Exception as exx:
session.rollback()
print("Could not migrate table 'eventcounter'")
print (exx)

def downgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.drop_constraint('evctr_1', 'eventcounter', type_='unique')
op.drop_column('eventcounter', 'node')
op.drop_column('eventcounter', 'id')
# ### end Alembic commands ###
@@ -23,7 +23,10 @@
"""
This module is used to modify counters in the database
"""
from privacyidea.models import EventCounter
from sqlalchemy import func

from privacyidea.lib.config import get_privacyidea_node
from privacyidea.models import EventCounter, db


def increase(counter_name):
@@ -32,14 +35,25 @@ def increase(counter_name):
If the counter does not exist yet, create the counter.
:param counter_name: The name/identifier of the counter
:return: the new integer value of the counter
:return: None
"""
counter = EventCounter.query.filter_by(counter_name=counter_name).first()
# If there is no table row for the current node, create one.
node = get_privacyidea_node()
counter = EventCounter.query.filter_by(counter_name=counter_name, node=node).first()
if not counter:
counter = EventCounter(counter_name, 0)
counter = EventCounter(counter_name, 0, node=node)
counter.save()
counter.increase()
return counter.counter_value


def _reset_counter_on_all_nodes(counter_name):
"""
Reset all EventCounter rows that set a value for ``counter_name`` to zero,
regardless of the node column.
:param counter_name: The name/identifier of the counter
"""
EventCounter.query.filter_by(counter_name=counter_name).update({'counter_value': 0})
db.session.commit()


def decrease(counter_name, allow_negative=False):
@@ -49,15 +63,22 @@ def decrease(counter_name, allow_negative=False):
Also checks whether the counter is allowed to become negative.
:param counter_name: The name/identifier of the counter
:param allow_negative: Whether the counter can become negative
:return: the new integer value of the counter
:param allow_negative: Whether the counter can become negative. Note that even if this flag is not set,
the counter may become negative due to concurrent queries.
:return: None
"""
counter = EventCounter.query.filter_by(counter_name=counter_name).first()
node = get_privacyidea_node()
counter = EventCounter.query.filter_by(counter_name=counter_name, node=node).first()
if not counter:
counter = EventCounter(counter_name, 0)
counter = EventCounter(counter_name, 0, node=node)
counter.save()
counter.decrease(allow_negative)
return counter.counter_value
# We are allowed to decrease the current counter object only if the overall
# counter value is positive (because individual rows may be negative then),
# or if we allow negative values. Otherwise, we need to reset all rows of all nodes.
if read(counter_name) > 0 or allow_negative:
counter.decrease()
else:
_reset_counter_on_all_nodes(counter_name)


def reset(counter_name):
@@ -67,11 +88,13 @@ def reset(counter_name):
:param counter_name: The name/identifier of the counter
:return:
"""
counter = EventCounter.query.filter_by(counter_name=counter_name).first()
if not counter:
counter = EventCounter(counter_name, 0)
node = get_privacyidea_node()
counters = EventCounter.query.filter_by(counter_name=counter_name).count()
if not counters:
counter = EventCounter(counter_name, 0, node=node)
counter.save()
counter.reset()
else:
_reset_counter_on_all_nodes(counter_name)


def read(counter_name):
@@ -82,8 +105,5 @@ def read(counter_name):
:param counter_name: The name of the counter
:return: The value of the counter
"""
counter = EventCounter.query.filter_by(counter_name=counter_name).first()
if not counter:
return None
else:
return counter.counter_value
return db.session.query(func.sum(EventCounter.counter_value))\
.filter(EventCounter.counter_name == counter_name).one()[0]

This comment has been minimized.

Copy link
@cornelinux

cornelinux Sep 6, 2019

Member

Thats a lot of logic in one line! ;-)

@@ -99,14 +99,12 @@ def do(self, action, options=None):
counter_name = handler_options.get("counter_name")

if action == "increase_counter":
r = increase(counter_name)
log.debug(u"Increased the counter {0!s} to {1!s}.".format(counter_name,
r))
increase(counter_name)
log.debug(u"Increased the counter {0!s}.".format(counter_name))

This comment has been minimized.

Copy link
@cornelinux

cornelinux Sep 6, 2019

Member

We could have also returned the overall new counter value (over all nodes). This would have kept the log entry the same. But it is not that important to me.

This comment has been minimized.

Copy link
@fredreichbier

fredreichbier Sep 6, 2019

Author Contributor

Yes, I agree that would be nice. But this results in another SELECT query -- which, I think, would be totally fine if the result is used for something important. But as it was only written to the debug log, I thought we could save one query and speed up things a bit :)

elif action == "decrease_counter":
allow_negative = handler_options.get("allow_negative_values")
r = decrease(counter_name, allow_negative)
log.debug(u"Decreased the counter {0!s} to {1!s}.".format(counter_name,
r))
decrease(counter_name, allow_negative)
log.debug(u"Decreased the counter {0!s}.".format(counter_name))
elif action == "reset_counter":
reset(counter_name)
log.debug(u"Reset the counter {0!s} to 0.".format(counter_name))
@@ -2499,21 +2499,34 @@ def get(self):
class EventCounter(db.Model):
"""
This table stores counters of the event handler "Counter".
Note that an event counter name does *not* correspond to just one,
but rather *several* table rows, because we store event counters
for each privacyIDEA node separately.
This is intended to improve the performance of replicated setups,
because each privacyIDEA node then only writes to its own "private"
table row. This way, we avoid locking issues that would occur
if all nodes write to the same table row.
"""
__tablename__ = 'eventcounter'
__table_args__ = {'mysql_row_format': 'DYNAMIC'}
counter_name = db.Column(db.Unicode(80), nullable=False, primary_key=True)
id = db.Column(db.Integer, Sequence("eventcounter_seq"), primary_key=True)
counter_name = db.Column(db.Unicode(80), nullable=False)
counter_value = db.Column(db.Integer, default=0)
node = db.Column(db.Unicode(255), nullable=False)
__table_args__ = (db.UniqueConstraint('counter_name',
'node',
name='evctr_1'),
{'mysql_row_format': 'DYNAMIC'})

def __init__(self, name, value=0):
def __init__(self, name, value=0, node=""):
self.counter_value = value
self.counter_name = name
self.node = node
self.save()

def save(self):
db.session.add(self)
db.session.commit()
return self.counter_name

def delete(self):
ret = self.counter_name
@@ -2529,25 +2542,12 @@ def increase(self):
self.counter_value = self.counter_value + 1
self.save()

def decrease(self, allow_negative=False):
"""
Decrease the value of a counter, stop at zero if allow_negative not given
:param allow_negative:
:return:
"""
if self.counter_value <= 0 and not allow_negative:
# set counter to zero
self.counter_value = 0
else:
self.counter_value = self.counter_value - 1
self.save()

def reset(self):
def decrease(self):
"""
Reset the value of a counter
Decrease the value of a counter.
:return:
"""
self.counter_value = 0
self.counter_value = self.counter_value - 1
self.save()


@@ -1,6 +1,7 @@
# coding: utf-8
from mock import mock
import os
from sqlalchemy import func

from privacyidea.models import (Token,
Resolver,
@@ -18,7 +19,7 @@
EventHandlerCondition, PrivacyIDEAServer,
ClientApplication, Subscription, UserCache,
EventCounter, PeriodicTask, PeriodicTaskLastRun,
PeriodicTaskOption, MonitoringStats, PolicyCondition)
PeriodicTaskOption, MonitoringStats, PolicyCondition, db)
from .base import MyTestCase
from dateutil.tz import tzutc
from datetime import datetime
@@ -775,6 +776,7 @@ def test_25_eventcounter(self):
counter.save()
counter2 = EventCounter.query.filter_by(counter_name="test_counter").first()
self.assertEqual(counter2.counter_value, 10)
self.assertEqual(counter2.node, "")

counter2.increase()
counter2.increase()
@@ -787,30 +789,31 @@ def test_25_eventcounter(self):
counter4 = EventCounter.query.filter_by(counter_name="test_counter").first()
self.assertEqual(counter4.counter_value, 11)

counter4.decrease(allow_negative=True)
counter4.decrease()

counter5 = EventCounter.query.filter_by(counter_name="test_counter").first()
self.assertEqual(counter5.counter_value, 10)

counter5.reset()
counter6 = EventCounter("test_counter", 4, "othernode")
self.assertEqual(counter6.counter_value, 4)
self.assertEqual(counter6.node, "othernode")

counter6 = EventCounter.query.filter_by(counter_name="test_counter").first()
self.assertEqual(counter6.counter_value, 0)
counter_value = db.session.query(func.sum(EventCounter.counter_value))\
.filter(EventCounter.counter_name == "test_counter").one()[0]
self.assertEqual(counter_value, 14)

counter6.decrease(allow_negative=True)
counter6.decrease(allow_negative=True)
counters7 = EventCounter.query.filter_by(counter_name="test_counter").all()
self.assertEqual(len(counters7), 2)

counter7 = EventCounter.query.filter_by(counter_name="test_counter").first()
self.assertEqual(counter7.counter_value, -2)

counter7.decrease(allow_negative=False)
counter8 = EventCounter.query.filter_by(counter_name="test_counter", node="othernode")
counter8.delete()

counter8 = EventCounter.query.filter_by(counter_name="test_counter").first()
self.assertEqual(counter8.counter_value, 0)
counters9 = EventCounter.query.filter_by(counter_name="test_counter").all()
self.assertEqual(len(counters9), 1)
counters9[0].delete()

counter8.delete()
counter9 = EventCounter.query.filter_by(counter_name="test_counter").first()
self.assertEqual(counter9, None)
counter10 = EventCounter.query.filter_by(counter_name="test_counter").first()
self.assertEqual(counter10, None)

def test_26_periodictask(self):
current_utc_time = datetime(2018, 3, 4, 5, 6, 8)
ProTip! Use n and p to navigate between commits in a pull request.
You can’t perform that action at this time.