Skip to content

Commit

Permalink
Add blocking pids termination
Browse files Browse the repository at this point in the history
  • Loading branch information
secwall committed Dec 2, 2017
1 parent 2ca64cb commit dc18315
Show file tree
Hide file tree
Showing 6 changed files with 187 additions and 26 deletions.
23 changes: 23 additions & 0 deletions doc/tutorial.md
Original file line number Diff line number Diff line change
Expand Up @@ -370,3 +370,26 @@ lock timeout to 30 seconds one could do something like this:
pgmigrate -s "SET SESSION CHARACTERISTICS AS TRANSACTION ISOLATION LEVEL SERIALIZABLE" \
-s "SET lock_timeout = '30s'" ...
```

## Terminating blocking pids

On heavy loaded production environments running some migrations
could block queries by application backends.
Unfortunately if migration is blocked by some other query it could lead
to really slow database queries.
For example lock queue like this:
```
<lots of app backends>
<pgmigrate>
<stale backend in idle in transaction>
```
makes database almost unavailable for at least `idle_in_transaction_timeout`.
To mitigate such issues there is `-l <interval>` option in pgmigrate
which starts separate thread running `pg_terminate_backend(pid)` for
each pid blocking any of pgmigrate conn pids every `interval` seconds.
Of course pgmigrate should be able to terminate other pids so migration user
should be the app user or have `pg_signal_backend` grant. To terminate
superuser (e.g. `postgres`) pids one could run pgmigrate with superuser.

Note: this feature relays on `pg_blocking_pids()` function available since
PostgreSQL 9.6.
40 changes: 40 additions & 0 deletions features/conflicting_pids.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
Feature: Conflicting pids termination

Scenario: Transactional migration blocked by update passes
Given migration dir
And migrations
| file | code |
| V1__Create_test_table.sql | CREATE TABLE test (id bigint); |
| V2__Insert_test_data.sql | INSERT INTO test (id) VALUES (1); |
| V3__Alter_test_table.sql | ALTER TABLE test ADD COLUMN test text; |
And database and connection
And successful pgmigrate run with "-t 2 migrate"
And not commited query "UPDATE test SET id = 2 WHERE id = 1"
When we run pgmigrate with "-l 0.1 -t 3 migrate"
Then pgmigrate command "succeeded"

Scenario: Nontransactional migration blocked by update passes
Given migration dir
And migrations
| file | code |
| V1__Create_test_table.sql | CREATE TABLE test (id bigint); |
| V2__Insert_test_data.sql | INSERT INTO test (id) VALUES (1); |
| V3__NONTRANSACTIONAL_migration.sql | ALTER TABLE test ADD COLUMN test text; |
And database and connection
And successful pgmigrate run with "-t 2 migrate"
And not commited query "UPDATE test SET id = 2 WHERE id = 1"
When we run pgmigrate with "-l 0.1 -t 3 migrate"
Then pgmigrate command "succeeded"

Scenario: Mixed transactional and nontransactional migrations blocked by update pass
Given migration dir
And migrations
| file | code |
| V1__Transactional_migration.sql | ALTER TABLE test ADD COLUMN test text; |
| V2__NONTRANSACTIONAL_migration.sql | ALTER TABLE test ADD COLUMN test2 text; |
And database and connection
And query "CREATE TABLE test (id bigint)"
And query "INSERT INTO test (id) VALUES (1)"
And not commited query "UPDATE test SET id = 2 WHERE id = 1"
When we run pgmigrate with "-l 0.1 -t 2 migrate"
Then pgmigrate command "succeeded"
6 changes: 6 additions & 0 deletions features/steps/query.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
from behave import given, then


@given('not commited query "{query}"') # noqa
def step_impl(context, query):
cur = context.conn.cursor()
cur.execute(query)


@given('query "{query}"') # noqa
def step_impl(context, query):
cur = context.conn.cursor()
Expand Down
7 changes: 6 additions & 1 deletion features/steps/run_pgmigrate.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import sys

import yaml
from func_timeout import FunctionTimedOut, func_timeout

from behave import given, then, when

Expand All @@ -15,7 +16,11 @@ def run_pgmigrate(migr_dir, args):
p = subprocess.Popen(cmd, stdout=subprocess.PIPE,
stderr=subprocess.PIPE)

stdout, stderr = p.communicate()
try:
stdout, stderr = func_timeout(5, p.communicate)
except FunctionTimedOut:
p.terminate()
stdout, stderr = p.communicate()
return p.returncode, str(stdout), str(stderr)

@given('successful pgmigrate run with "{args}"')
Expand Down
134 changes: 109 additions & 25 deletions pgmigrate.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,11 @@
import os
import re
import sys
import threading
import time
from builtins import str as text
from collections import OrderedDict, namedtuple
from contextlib import closing

import psycopg2
import sqlparse
Expand Down Expand Up @@ -85,13 +88,69 @@ class BaselineError(MigrateError):
pass


class ConflictTerminator(threading.Thread):
"""
Kills conflicting pids (only on postgresql > 9.6)
"""
def __init__(self, conn_str, interval):
threading.Thread.__init__(self, name='terminator')
self.daemon = True
self.log = logging.getLogger('terminator')
self.conn_str = conn_str
self.pids = set()
self.interval = interval
self.should_run = True
self.conn = None

def stop(self):
"""
Stop iterations and close connection
"""
self.should_run = False

def add_conn(self, conn):
"""
Add conn pid to pgmirate pids list
"""
self.pids.add(conn.get_backend_pid())

def remove_conn(self, conn):
"""
Remove conn from pgmigrate pids list
"""
self.pids.remove(conn.get_backend_pid())

def run(self):
"""
Periodically terminate all backends blocking pgmigrate pids
"""
self.conn = _create_raw_connection(self.conn_str, self.log)
self.conn.autocommit = True
while self.should_run:
with self.conn.cursor() as cursor:
for pid in self.pids:
cursor.execute(
'SELECT pg_terminate_backend(pid) FROM '
'unnest(pg_blocking_pids(%s)) AS pid',
(pid,))
time.sleep(self.interval)


REF_COLUMNS = ['version', 'description', 'type',
'installed_by', 'installed_on']


def _create_connection(conn_string):
def _create_raw_connection(conn_string, logger=LOG):
conn = psycopg2.connect(conn_string, connection_factory=LoggingConnection)
conn.initialize(LOG)
conn.initialize(logger)

return conn


def _create_connection(config):
conn = _create_raw_connection(config.conn)
if config.terminator_instance:
config.terminator_instance.add_conn(conn)

return conn

Expand Down Expand Up @@ -146,9 +205,10 @@ def _is_initialized(cursor):

Config = namedtuple('Config', ('target', 'baseline', 'cursor', 'dryrun',
'callbacks', 'user', 'base_dir', 'conn',
'session', 'conn_instance'))
'session', 'conn_instance',
'terminator_instance', 'termination_interval'))

CONFIG_IGNORE = ['cursor', 'conn_instance']
CONFIG_IGNORE = ['cursor', 'conn_instance', 'terminator_instance']


def _get_migrations_info_from_dir(base_dir):
Expand Down Expand Up @@ -473,6 +533,9 @@ def _finish(config):
config.cursor.execute('rollback')
else:
config.cursor.execute('commit')
if config.terminator_instance:
config.terminator_instance.stop()
config.conn_instance.close()


def info(config, stdout=True):
Expand Down Expand Up @@ -558,6 +621,21 @@ def _prepare_nontransactional_steps(state, callbacks):
return steps


def _execute_mixed_steps(config, steps, nt_conn):
commit_req = False
for step in steps:
if commit_req:
config.cursor.execute('commit')
commit_req = False
if not list(step['state'].values())[0]['transactional']:
cur = _init_cursor(nt_conn, config.session)
else:
cur = config.cursor
commit_req = True
_migrate_step(step['state'], step['cbs'],
config.base_dir, config.user, cur)


def migrate(config):
"""
Migrate cmdline wrapper
Expand Down Expand Up @@ -585,29 +663,23 @@ def migrate(config):
raise MigrateError('Unable to mix transactional and '
'nontransactional migrations')
config.cursor.execute('rollback')
nt_conn = _create_connection(config.conn)
nt_conn.autocommit = True
cursor = _init_cursor(nt_conn, config.session)
_migrate_step(state, _get_callbacks(''),
config.base_dir, config.user, cursor)
with closing(_create_connection(config)) as nt_conn:
nt_conn.autocommit = True
cursor = _init_cursor(nt_conn, config.session)
_migrate_step(state, _get_callbacks(''),
config.base_dir, config.user, cursor)
if config.terminator_instance:
config.terminator_instance.remove_conn(nt_conn)
else:
steps = _prepare_nontransactional_steps(state, config.callbacks)

nt_conn = _create_connection(config.conn)
nt_conn.autocommit = True
with closing(_create_connection(config)) as nt_conn:
nt_conn.autocommit = True

commit_req = False
for step in steps:
if commit_req:
config.cursor.execute('commit')
commit_req = False
if not list(step['state'].values())[0]['transactional']:
cur = _init_cursor(nt_conn, config.session)
else:
cur = config.cursor
commit_req = True
_migrate_step(step['state'], step['cbs'],
config.base_dir, config.user, cur)
_execute_mixed_steps(config, steps, nt_conn)

if config.terminator_instance:
config.terminator_instance.remove_conn(nt_conn)
else:
_migrate_step(state, config.callbacks, config.base_dir,
config.user, config.cursor)
Expand All @@ -627,7 +699,9 @@ def migrate(config):
session=['SET lock_timeout = 0'],
conn='dbname=postgres user=postgres '
'connect_timeout=1',
conn_instance=None)
conn_instance=None,
terminator_instance=None,
termination_interval=None)


def get_config(base_dir, args=None):
Expand Down Expand Up @@ -656,7 +730,13 @@ def get_config(base_dir, args=None):
else:
conf = conf._replace(target=int(conf.target))

conf = conf._replace(conn_instance=_create_connection(conf.conn))
if conf.termination_interval and not conf.dryrun:
conf = conf._replace(
terminator_instance=ConflictTerminator(
conf.conn, conf.termination_interval))
conf.terminator_instance.start()

conf = conf._replace(conn_instance=_create_connection(conf))
conf = conf._replace(cursor=_init_cursor(conf.conn_instance, conf.session))
conf = conf._replace(callbacks=_get_callbacks(conf.callbacks,
conf.base_dir))
Expand Down Expand Up @@ -705,6 +785,10 @@ def _main():
parser.add_argument('-n', '--dryrun',
action='store_true',
help='Say "rollback" in the end instead of "commit"')
parser.add_argument('-l', '--termination_interval',
type=float,
help='Inverval for terminating blocking pids '
'(only for transational migrations)')
parser.add_argument('-v', '--verbose',
default=0,
action='count',
Expand Down
3 changes: 3 additions & 0 deletions tox.ini
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ commands = rm -rf htmlcov
deps = behave
importlib
coverage
func_timeout

[testenv:py35]
whitelist_externals = rm
Expand All @@ -28,6 +29,7 @@ commands = rm -rf htmlcov
coverage report --fail-under=100 pgmigrate.py
deps = behave
coverage
func_timeout

[testenv:py36]
whitelist_externals = rm
Expand All @@ -39,6 +41,7 @@ commands = rm -rf htmlcov
coverage report --fail-under=100 pgmigrate.py
deps = behave
coverage
func_timeout

[testenv:flake8]
commands = flake8 pgmigrate.py
Expand Down

0 comments on commit dc18315

Please sign in to comment.