Skip to content

Commit

Permalink
Added PostgreSQL replication checks
Browse files Browse the repository at this point in the history
  • Loading branch information
blackandred committed Nov 30, 2019
1 parent 2525d8d commit 6dac168
Show file tree
Hide file tree
Showing 3 changed files with 169 additions and 0 deletions.
57 changes: 57 additions & 0 deletions infracheck/checks/postgres-primary-streaming-status
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
#!/usr/bin/env python3

import inspect
import os
import sys

"""
<sphinx>
postgres-primary-streaming-status
---------------------------------
Verifies if local PostgreSQL instance is currently serving WALs to a specified replica.
The SQL command that is validated: `select * from pg_stat_replication;`
Parameters:
- pg_host (hostname or socket path, defaults to "localhost" which will use local unix socket, use IP address eg. 127.0.0.1 to connect via tcp)
- pg_port (port, defaults to 5432)
- pg_db_name (database name to connect to, defaults to "postgres")
- pg_user (username, defaults to "postgres")
- pg_password
- pg_conn_timeout (defaults to 15 which means 15 seconds)
- expected_status (defaults to "streaming")
- expected_replication_user: Expected user that will be used for replication connection (defaults to "replication")
</sphinx>
"""

path = os.path.dirname(os.path.abspath(inspect.getfile(inspect.currentframe()))) + '/../../'
sys.path.insert(0, path)


from infracheck.infracheck.checklib.postgres import BasePostgreSQL


class PostgresReplicaCheck(BasePostgreSQL):
def main(self, expected_status: str, expected_user: str) -> tuple:
return self.validate_replication_row_exists('SELECT state, username FROM pg_stat_replication;',
expected_status=expected_status, expected_user=expected_user)


if __name__ == '__main__':
app = PostgresReplicaCheck(
host=os.getenv('PG_HOST', 'localhost'),
port=int(os.getenv('PG_PORT', 5432)),
dbname=os.getenv('PG_DB_NAME', 'postgres'),
username=os.getenv('PG_USER', 'postgres'),
password=os.getenv('PG_PASSWORD', ''),
connect_timeout=int(os.getenv('PG_CONN_TIMEOUT', 5))
)

status, message = app.main(
expected_status=os.getenv('EXPECTED_STATUS', 'streaming'),
expected_user=os.getenv('EXPECTED_REPLICATION_USER', 'replication')
)

print(message)
sys.exit(0 if status else 1)
57 changes: 57 additions & 0 deletions infracheck/checks/postgres-replica-status
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
#!/usr/bin/env python3

import inspect
import os
import sys

"""
<sphinx>
postgres-replica-status
-----------------------
Checks if local PostgreSQL server acts as a replication server, by validating the list of active wal receivers.
The SQL command that is validated: `select * from pg_stat_wal_receiver;`
Parameters:
- pg_host (hostname or socket path, defaults to "localhost" which will use local unix socket, use IP address eg. 127.0.0.1 to connect via tcp)
- pg_port (port, defaults to 5432)
- pg_db_name (database name to connect to, defaults to "postgres")
- pg_user (username, defaults to "postgres")
- pg_password
- pg_conn_timeout (defaults to 15 which means 15 seconds)
- expected_status (defaults to "streaming")
- expected_replication_user: Expected user that will be used for replication connection (defaults to "replication")
</sphinx>
"""

path = os.path.dirname(os.path.abspath(inspect.getfile(inspect.currentframe()))) + '/../../'
sys.path.insert(0, path)


from infracheck.infracheck.checklib.postgres import BasePostgreSQL


class PostgresReplicaCheck(BasePostgreSQL):
def main(self, expected_status: str, expected_user: str) -> tuple:
return self.validate_replication_row_exists('SELECT status, conninfo FROM pg_stat_wal_receiver;',
expected_status=expected_status, expected_user=expected_user)


if __name__ == '__main__':
app = PostgresReplicaCheck(
host=os.getenv('PG_HOST', 'localhost'),
port=int(os.getenv('PG_PORT', 5432)),
dbname=os.getenv('PG_DB_NAME', 'postgres'),
username=os.getenv('PG_USER', 'postgres'),
password=os.getenv('PG_PASSWORD', ''),
connect_timeout=int(os.getenv('PG_CONN_TIMEOUT', 5))
)

status, message = app.main(
expected_status=os.getenv('EXPECTED_STATUS', 'streaming'),
expected_user=os.getenv('EXPECTED_REPLICATION_USER', 'replication')
)

print(message)
sys.exit(0 if status else 1)
55 changes: 55 additions & 0 deletions infracheck/infracheck/checklib/postgres.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@

import psycopg2
import psycopg2.extras


class BasePostgreSQL:
conn = None

def __init__(self, host: str, dbname: str, port: int, username: str, password: str, connect_timeout: int):
self.conn = psycopg2.connect(
database=dbname,
host=host,
password=password,
user=username,
port=port,
connect_timeout=connect_timeout
)

@staticmethod
def create_instance(cls):
return cls(
host=os.getenv('DB_HOST', 'localhost'),
dbname=os.getenv('DB_NAME', ''),
username=os.getenv('DB_USER', 'root'),
password=os.getenv('DB_PASSWORD', ''),
connect_timeout=int(os.getenv('DB_CONNECT_TIMEOUT', 10))
)

def query(self, qstr: str):
cur = self.conn.cursor(cursor_factory=psycopg2.extras.DictCursor)
cur.execute(qstr)

return cur.fetchall()

def validate_replication_row_exists(self, sql: str, expected_status: str, expected_user: str):
out = self.query(sql)
active_replications = len(out)

for row in out:
status, conn_info = row

if status != expected_status and expected_user in conn_info:
return False, 'Expected "%s" status for conn_info="%s", got "%s"' % (
expected_status, conn_info, status
)

if status == expected_status and expected_user in conn_info:
return True, "%i replications active. The replication for '%s' user looks healthy" % (
active_replications, expected_user
)

if active_replications == 0:
return False, "no replications active"

return False, "%i replications active, but none found for user '%s'" % (active_replications, expected_user)

0 comments on commit 6dac168

Please sign in to comment.