Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Container details #3951

Merged
merged 11 commits into from
May 29, 2024
78 changes: 78 additions & 0 deletions migrations/versions/cd51b7fe9d03_.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
"""v3.10: Add new tables tokencontainerstates, tokencontainerinfo and new columns (states, info_list, last_seen and
last_updated) to tokencontainer

Revision ID: cd51b7fe9d03
Revises: db6b2ef8100f
Create Date: 2024-05-21 13:46:28.577683

"""
from datetime import datetime

from sqlalchemy.exc import OperationalError, ProgrammingError
from sqlalchemy.schema import Sequence

# revision identifiers, used by Alembic.
revision = 'cd51b7fe9d03'
down_revision = 'db6b2ef8100f'

from alembic import op
import sqlalchemy as sa


def upgrade():
try:
op.create_table('tokencontainerstates',
sa.Column("id", sa.Integer, sa.Identity(), primary_key=True),
sa.Column("container_id", sa.Integer(), sa.ForeignKey("tokencontainer.id")),
sa.Column("state", sa.Unicode(100), default='active', nullable=False),
mysql_row_format='DYNAMIC'
)
except Exception as exx:
print("Could not add table 'tokencontainerstates' - probably already exists!")
print(exx)

try:
op.create_table('tokencontainerinfo',
sa.Column("id", sa.Integer, Sequence("containerinfo_seq"), primary_key=True),
sa.Column("key", sa.Unicode(255), nullable=False),
sa.Column("value", sa.UnicodeText(), default=''),
sa.Column("type", sa.Unicode(100), default=''),
sa.Column("description", sa.Unicode(2000), default=''),
sa.Column("container_id", sa.Integer(),
sa.ForeignKey('tokencontainer.id'), index=True),
mysql_row_format='DYNAMIC'
)
except Exception as exx:
print("Could not add table 'tokencontainerstates' - probably already exists!")
print(exx)

try:
op.add_column('tokencontainer',
sa.Column('last_seen', sa.DateTime, default=datetime.now()))
except (OperationalError, ProgrammingError) as exx:
if "already exists" in str(exx.orig).lower():
print("Ok, column 'last_seen' already exists.")
else:
print(exx)
except Exception as exx:
print("Could not add column 'last_seen' to table 'tokencontainer'")
print(exx)

try:
op.add_column('tokencontainer',
sa.Column('last_updated', sa.DateTime, default=datetime.now()))
except (OperationalError, ProgrammingError) as exx:
if "already exists" in str(exx.orig).lower():
print("Ok, column 'last_updated' already exists.")
else:
print(exx)
except Exception as exx:
print("Could not add column 'last_updated' to table 'tokencontainer'")
print(exx)


def downgrade():
op.drop_table('tokencontainerstates')
op.drop_table('tokencontainerinfo')
op.drop_column('tokencontainer', 'last_seen')
op.drop_column('tokencontainer', 'last_updated')
105 changes: 90 additions & 15 deletions privacyidea/api/container.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
import logging

from flask import Blueprint, jsonify, request
from flask import Blueprint, jsonify, request, g

from privacyidea.api.lib.utils import send_result, getParam, required
from privacyidea.lib.container import get_container_classes, create_container_template, \
find_container_by_serial, init_container, get_container_classes_descriptions, \
get_container_token_types, get_all_containers, add_tokens_to_container
get_container_token_types, get_all_containers, remove_tokens_from_container, add_tokens_to_container
from privacyidea.lib.containerclass import TokenContainerClass
from privacyidea.lib.error import ParameterError
from privacyidea.lib.event import event
from privacyidea.lib.log import log_with
from privacyidea.lib.token import get_one_token, get_tokens, \
convert_token_objects_to_dicts
Expand Down Expand Up @@ -54,11 +56,15 @@ def list_containers():

res: list = []
for container in result["containers"]:
tmp: dict = {"type": container.type, "serial": container.serial, "description": container.description}
tmp: dict = {"type": container.type,
"serial": container.serial,
"description": container.description,
"last_seen": container.last_seen,
"last_updated": container.last_updated}
tmp_users: dict = {}
users: list = []
for user in container.get_users():
tmp_users["user_name"] = get_username(user.login, user.resolver)
tmp_users["user_name"] = get_username(user.uid, user.resolver)
tmp_users["user_realm"] = user.realm
tmp_users["user_resolver"] = user.resolver
tmp_users["user_id"] = user.login
Expand All @@ -73,6 +79,15 @@ def list_containers():
tokens_dict_list = convert_token_objects_to_dicts(tokens)
tmp["tokens"] = tokens_dict_list

tmp["states"] = [token_container_states.state for token_container_states in container.get_states()]

infos: dict = {}
for info in container.get_containerinfo():
if info.type:
infos[info.key + ".type"] = info.type
infos[info.key] = info.value
tmp["info"] = infos

res.append(tmp)
result["containers"] = res
return send_result(result)
Expand Down Expand Up @@ -108,7 +123,7 @@ def unassign():
user = get_user_from_param(request.all_data, required)
serial = getParam(request.all_data, "serial", required, allow_empty=False)
container = find_container_by_serial(serial)
res = container.remove(user)
res = container.remove_user(user)
return send_result(res)


Expand Down Expand Up @@ -142,6 +157,7 @@ def delete(container_serial):

@container_blueprint.route('<string:container_serial>/add', methods=['POST'])
@log_with(log)
# @event("container_add", request, g)
def add_token(container_serial):
"""
Add a token to a container
Expand Down Expand Up @@ -169,19 +185,17 @@ def remove_token(container_serial):
:jsonparam: serial: Serial of the token to remove
:jsonparam: serial_list: List of serials of the tokens to remove. Comma separated.
"""
container = find_container_by_serial(container_serial)
serial = getParam(request.all_data, "serial", required, allow_empty=True)
serials = request.args.getlist("serial_list")
serial = getParam(request.all_data, "serial", optional=True, allow_empty=True)
serials = getParam(request.all_data, "serial_list", optional=True, allow_empty=True)
if not serial and not serials:
raise ParameterError("Either serial or serial_list is required")
token_serials = []
if serials:
token_serials = serials
if serial:
token_serials.append(serial)

serials.append(serial)

token = get_one_token(serial=serial)
res = False
if token:
container.remove_token(token.get_serial())
res = True
res = remove_tokens_from_container(container_serial, token_serials)
return send_result(res)


Expand All @@ -201,6 +215,67 @@ def get_types():
return send_result(res)


@container_blueprint.route('/description/<serial>', methods=['POST'])
@log_with(log)
def set_description(serial):
"""
Set the description of a container
:jsonparam: serial: Serial of the container
:jsonparam: description: New description to be set
"""
container = find_container_by_serial(serial)
new_description = getParam(request.all_data, "description", optional=required, allow_empty=False)
res = False
if new_description:
container.description = new_description
res = True
return send_result(res)


@container_blueprint.route('/states', methods=['POST'])
@log_with(log)
def set_states():
"""
Set the states of a container
:jsonparam: serial: Serial of the container
:jsonparam: states: string list
"""
serial = getParam(request.all_data, "serial", required, allow_empty=False)
states = getParam(request.all_data, "states", required, allow_empty=False)
container = find_container_by_serial(serial)

res = False
if states:
container.set_states(states)
res = True

return send_result(res)


@container_blueprint.route('statetypes', methods=['GET'])
nilsbehlen marked this conversation as resolved.
Show resolved Hide resolved
@log_with(log)
def get_state_types():
"""
Get the supported state types as dictionary
The types are the keys and the value is a list containing all states that are excluded when the key state is
selected
"""
state_types_exclusions = TokenContainerClass.get_state_types()
return send_result(state_types_exclusions)


@container_blueprint.route('/lastseen/<serial>', methods=['POST'])
@log_with(log)
def update_last_seen(serial):
"""
Updates the date and time for the last_seen property
:jsonparam: serial: Serial of the container
"""
container = find_container_by_serial(serial)
container.update_last_seen()
return send_result(True)


# @container_blueprint.route('tokentypes', methods=['GET'])
# @log_with(log)
# def get_token_types():
Expand Down
41 changes: 35 additions & 6 deletions privacyidea/api/token.py
Original file line number Diff line number Diff line change
Expand Up @@ -580,6 +580,7 @@ def enable_api(serial=None):

:jsonparam basestring serial: the serial number of the single token to
enable
:jsonparam basestring serial_list: a list of serial numbers of the tokens to enable
:jsonparam basestring user: The login name of the user
:jsonparam basestring realm: the realm name of the user
:return: In case of success it returns the number of enabled
Expand All @@ -589,10 +590,23 @@ def enable_api(serial=None):
user = request.User
if not serial:
serial = getParam(request.all_data, "serial", optional)
g.audit_object.log({"serial": serial})
serial_list = getParam(request.all_data, "serial_list", optional)

res = enable_token(serial, enable=True, user=user)
g.audit_object.log({"success": True})
if not serial and not serial_list:
raise ParameterError("Either serial or serial_list is required")

token_serials = []
if serial_list:
token_serials = serial_list
if serial:
token_serials.append(serial)

res = 0
for serial in token_serials:
g.audit_object.log({"serial": serial})

res += enable_token(serial, enable=True, user=user)
g.audit_object.log({"success": True})
return send_result(res)


Expand All @@ -610,6 +624,7 @@ def disable_api(serial=None):

:jsonparam basestring serial: the serial number of the single token to
disable
:jsonparam basestring serial_list: a list of serial numbers of the tokens to enable
:jsonparam basestring user: The login name of the user
:jsonparam basestring realm: the realm name of the user
:return: In case of success it returns the number of disabled
Expand All @@ -619,13 +634,27 @@ def disable_api(serial=None):
user = request.User
if not serial:
serial = getParam(request.all_data, "serial", optional)
g.audit_object.log({"serial": serial})
serial_list = getParam(request.all_data, "serial_list", optional)

res = enable_token(serial, enable=False, user=user)
g.audit_object.log({"success": True})
if not serial and not serial_list:
raise ParameterError("Either serial or serial_list is required")

token_serials = []
if serial_list:
token_serials = serial_list
if serial:
token_serials.append(serial)

res = 0
for serial in token_serials:
g.audit_object.log({"serial": serial})

res += enable_token(serial, enable=False, user=user)
g.audit_object.log({"success": True})
return send_result(res)



@token_blueprint.route('/<serial>', methods=['DELETE'])
@prepolicy(check_base_action, request, action=ACTION.DELETE)
@event("token_delete", request, g)
Expand Down
16 changes: 15 additions & 1 deletion privacyidea/lib/container.py
Original file line number Diff line number Diff line change
Expand Up @@ -282,14 +282,16 @@ def init_container(params):
db_container = TokenContainer(serial=serial, container_type=ctype.lower(), description=desc)
db_container.save()

container = create_container_from_db_object(db_container)
user = params.get("user")
realm = params.get("realm")
if user and not realm or realm and not user:
log.error(f"Assigning a container to user on creation requires both user and realm parameters!")
elif user and realm:
container = create_container_from_db_object(db_container)
container.add_user(User(login=user, realm=realm))

container.set_states(['active'])

return serial


Expand Down Expand Up @@ -327,3 +329,15 @@ def get_container_token_types():
for container_type, container_class in classes.items():
ret[container_type] = container_class.get_supported_token_types()
return ret


def remove_tokens_from_container(container_serial, token_serials):
"""
Remove the given tokens from the container with the given serial
"""
container = find_container_by_serial(container_serial)
res = False
for token_serial in token_serials:
container.remove_token(token_serial)
res = True
return res
Loading
Loading