Skip to content

Commit

Permalink
Support resetting user passwords in mysql-users-init
Browse files Browse the repository at this point in the history
This adds support for resetting mysql passwords when a Kubernetes
secret is lost. This should allow clusters to recover persistently
stored data in mysql if a k8s cluster (and all the passwords stored
in Secrets) is lost.
  • Loading branch information
timothyb89 committed Aug 23, 2017
1 parent d61eaca commit 4f612f6
Show file tree
Hide file tree
Showing 2 changed files with 169 additions and 73 deletions.
4 changes: 2 additions & 2 deletions mysql-users-init/build.yml
Expand Up @@ -2,6 +2,6 @@ repository: monasca/mysql-users-init
variants:
- tag: latest
aliases:
- :1.0.0
- :1.0
- :1.1.0
- :1.1
- :1
238 changes: 167 additions & 71 deletions mysql-users-init/mysql_init.py
Expand Up @@ -21,13 +21,14 @@
import string
import time

from typing import (List, Dict, Union, Sequence, Iterable,
from typing import (List, Dict, Union, Sequence, Iterable, Set,
TypeVar, Callable, Optional, Any)

import pymysql
import yaml

from pymysql.connections import Connection
from pymysql.converters import escape_string
from pymysql.err import OperationalError
from requests import HTTPError
from requests import RequestException
Expand Down Expand Up @@ -124,22 +125,19 @@ def get_mysql_client() -> Connection:


@retry()
def missing_hosts_for_user(client: Connection,
name: str,
hosts: List[str]) -> List[str]:
def hosts_for_user(client: Connection, name: str) -> List[str]:
"""
:param client:
:param name: the user name
:param hosts: a list of expected user hosts
:return: true if user with given name exists, false if not
"""
with client.cursor() as c:
c.execute('select `Host` from mysql.user where `User` = %s',
(name,))
found_hosts = map(lambda r: r['Host'], c.fetchall())

return list(set(hosts) - set(found_hosts))
return list(found_hosts)


@retry()
Expand All @@ -148,7 +146,7 @@ def ensure_kubernetes_namespace(client: KubernetesAPIClient, namespace: str):
client.get('/api/v1/namespaces/{}', namespace)
except HTTPError as e:
if e.response.status_code == 404:
logging.info('creating namespace: %s', namespace)
logger.info('creating namespace: %s', namespace)
client.post('/api/v1/namespaces', json={
'apiVersion': 'v1',
'kind': 'Namespace',
Expand Down Expand Up @@ -181,12 +179,14 @@ def get_kubernetes_secret(name: str,

def create_kubernetes_secret(fields: Dict[str, str],
name: str,
namespace: str=None):
namespace: str=None,
replace: bool=False) -> KubernetesAPIResponse:
"""
:param fields:
:param name:
:param namespace: namespace, auto if None
:param replace:
:return:
"""
client = get_kubernetes_client()
Expand Down Expand Up @@ -214,9 +214,40 @@ def prep(value):
'data': encoded
}

logging.info('creating secret "%s" in namespace "%s"', name, namespace)
return client.post('/api/v1/namespaces/{}/secrets',
namespace, json=secret)
if replace:
logger.info('replacing secret "%s" in namespace "%s"',
name, namespace)
return client.request('PUT', '/api/v1/namespaces/{}/secrets/{}',
namespace, name, json=secret)
else:
logger.info('creating secret "%s" in namespace "%s"', name, namespace)
return client.post('/api/v1/namespaces/{}/secrets',
namespace, json=secret)


def diff_kubernetes_secret(secret: KubernetesAPIResponse,
desired_fields: Dict[str, str]) -> Set[str]:
"""
Computes a set of changed fields (either added, removed, or modified)
between the given existing secret and the set of desired fields.
:param secret: an existing secret as a KubernetesAPIResponse containing
encoded secret data
:param desired_fields: a dict of desired fields
:return: a set of fields
"""
current_keys = set(secret.data.keys())
desired_keys = set(desired_fields.keys())

differences = current_keys.symmetric_difference(desired_keys)

for field in current_keys.intersection(desired_keys):
decoded_bytes = base64.b64decode(secret.data[field])
decoded_str = decoded_bytes.decode('utf-8')
if decoded_str != desired_fields[field]:
differences.add(field)

return differences


def parse_secret(secret: Union[str, Dict[str, str]]):
Expand All @@ -230,38 +261,34 @@ def parse_secret(secret: Union[str, Dict[str, str]]):
return secret['namespace'], secret['name']


def get_or_create_password(client: Connection,
user: Dict[str, Any]) -> str:
if 'password' in user:
return user['password']
elif 'secret' in user:
s_namespace, s_name = parse_secret(user['secret'])
existing_secret = get_kubernetes_secret(s_name, s_namespace)

if existing_secret:
# the password already exists, so try to use it (else we would put
# whichever service consumes this secret in an invalid state)

if 'password' not in existing_secret.data:
# probably not recoverable, short of deleting the existing
# secret and re-creating (which would be awful in its own way)
raise MySQLInitException('existing secret %s is '
'invalid' % s_name)

logging.info('will use existing password from secret %s for user '
'%s' % (s_name, user['username']))
pass_bytes = base64.b64decode(existing_secret.data['password'])
return pass_bytes.decode('utf-8')
else:
password = generate_password()
create_kubernetes_secret({
'username': user['username'],
'password': password,
'host': client.host,
'port': str(client.port)
}, s_name, s_namespace)
def get_password(secret: KubernetesAPIResponse) -> str:
if 'password' not in secret.data:
# probably not recoverable, short of deleting the existing
# secret and re-creating (which would be awful in its own way)
raise MySQLInitException('existing secret %s is '
'invalid' % secret.metadata.name)

pass_bytes = base64.b64decode(secret.data['password'])
return pass_bytes.decode('utf-8')


return password
def ensure_kubernetes_secret(existing: KubernetesAPIResponse,
fields: Dict[str, str],
secret_cfg: Dict[str, str]):
s_namespace, s_name = parse_secret(secret_cfg)

if existing:
diff = diff_kubernetes_secret(existing, fields)
if diff:
logger.info('secret fields are outdated in %s/%s: %r',
s_namespace, s_name, diff)
create_kubernetes_secret(fields, s_name, s_namespace,
replace=True)
else:
logger.info('secret is up-to-date: %s/%s', s_namespace, s_name)
else:
logger.info('creating new secret: %s/%s', s_namespace, s_name)
create_kubernetes_secret(fields, s_name, s_namespace)


def create_user(client: Connection, username: str, host: str, password: str):
Expand All @@ -270,9 +297,22 @@ def create_user(client: Connection, username: str, host: str, password: str):
(username, host, password))


def update_password(client: Connection, username: str, host: str,
password: str):
with client.cursor() as c:
# NOTE: this syntax is deprecated but is the only syntax that works
# consistently for both mysql 5.6 and 5.7
c.execute('SET PASSWORD FOR \'%s\'@\'%s\' = '
'PASSWORD(\'%s\');' % (
escape_string(username),
escape_string(host),
escape_string(password)
))


def flush_privileges(client: Connection):
with client.cursor() as c:
logging.debug('flushing privileges...')
logger.debug('flushing privileges...')
c.execute('FLUSH PRIVILEGES;')


Expand All @@ -295,8 +335,8 @@ def create_database(client: Connection, name: str,
collation_part = ''

with client.cursor() as c:
c.execute('CREATE DATABASE %s%s%s' % (
name, charset_part, collation_part
c.execute('CREATE DATABASE `%s`%s%s' % (
escape_string(name), charset_part, collation_part
))


Expand All @@ -306,11 +346,11 @@ def grant_privileges(client: Connection,
user: str, host: str):
with client.cursor() as c:
privs = ', '.join(p.upper() for p in privileges)
c.execute('GRANT %s ON %s.* TO \'%s\'@\'%s\'' % (
c.execute('GRANT %s ON `%s`.* TO \'%s\'@\'%s\'' % (
privs, database, user, host
))

logging.info('granted %s on %s to %s@%s', privs, database, user, host)
logger.info('granted %s on %s to %s@%s', privs, database, user, host)


def load_grant(client: Connection, database: str,
Expand Down Expand Up @@ -344,32 +384,88 @@ def load_grant(client: Connection, database: str,
grant_privileges(client, database, privileges, username, host)


def load_users(client: Connection,
users: List[Dict[str, Union[List, str]]]) -> Dict[str, List]:
known_hosts = {}
def load_user(client: Connection,
user: Dict[str, Union[List, str, Dict]]) -> List[str]:
if 'host' in user:
hosts = user['host']
if isinstance(hosts, str):
hosts = [hosts]
else:
hosts = ['%', 'localhost']

for user in users:
username = user['username']
if 'host' in user:
hosts = user['host']
if isinstance(hosts, str):
hosts = [hosts]
else:
hosts = ['%', 'localhost']
username = user['username']

known_hosts[username] = hosts
# find user@host combos that already exist in mysql
found_hosts = hosts_for_user(client, username)

hosts_to_create = missing_hosts_for_user(client, username, hosts)
if hosts_to_create:
logging.info('creating user %s for hosts: %r', username,
hosts_to_create)
# a list of @hosts to create for this username that are missing
hosts_to_create = list(set(hosts) - set(found_hosts))

password = get_or_create_password(client, user)
for host in hosts_to_create:
create_user(client, username, host, password)
logging.info('created user %s@%s' % (username, host))
# a list of hosts that need passwords reset, i.e. existing mysql users from
# found_hosts when a k8s secret is lost
reset_passwords = False

# if secret is configured and already exists
if 'secret' in user:
s_namespace, s_name = parse_secret(user['secret'])
secret = get_kubernetes_secret(s_name, s_namespace)
if secret:
# TODO could we find some way to verify that the secret works?
# main issue is that users are host-dependent so we might not
# be allowed to connect from this IP
password = get_password(secret)
logger.info('will use existing password from secret %s for user '
'%s' % (s_name, username))
elif len(found_hosts) > 0:
# we found one or more existing accounts with no secret, the
# account password must be reset to guarantee we end up with a
# functional system
logger.warning('mysql user %s exists with missing secret %s/%s, '
'the password will be reset to a random value!',
username, s_name, s_namespace)
password = generate_password()
reset_passwords = True
else:
logging.info('user already exists: %s', username)
logger.info('generating random password for user %s', username)
password = generate_password()
else:
logger.info('using static password for user %s', username)
password = user['password']
secret = None

if reset_passwords:
for host in found_hosts:
logger.warning('resetting password for %s@%s', username, host)
update_password(client, username, host, password)

if hosts_to_create:
logger.info('creating user %s for hosts: %r', username,
hosts_to_create)

for host in hosts_to_create:
create_user(client, username, host, password)
logger.info('created user %s@%s', username, host)
else:
logger.info('user already exists: %s', username)

if 'secret' in user:
logger.info('ensuring secret for user %s is valid...', username)
ensure_kubernetes_secret(secret, {
'username': username,
'password': password,
'host': client.host,
'port': str(client.port)
}, user['secret'])

return hosts


def load_users(client: Connection,
users: List[Dict[str, Union[List, str]]]) -> Dict[str, List]:
known_hosts = {}

for user in users:
known_hosts[user['username']] = load_user(client, user)

return known_hosts

Expand All @@ -381,15 +477,15 @@ def load_databases(client: Connection,
for database_cfg in databases:
name = database_cfg['name']
if name in current_databases:
logging.info('database already exists: %s', name)
logger.info('database already exists: %s', name)
else:
charset = database_cfg.get('charset', None)
collation = database_cfg.get('collation', None)

create_database(client, name, charset, collation)

logging.info('created database %s: charset=%r, collation=%r',
name, charset, collation)
logger.info('created database %s: charset=%r, collation=%r',
name, charset, collation)

for grant_cfg in database_cfg.get('grants', []):
load_grant(client, name, grant_cfg, known_hosts)
Expand Down

0 comments on commit 4f612f6

Please sign in to comment.