Permalink
Browse files

v1.6.1 Partitioning order can now be set during data migration. Autov…

…acuum management. Better unique check capabilities.
  • Loading branch information...
1 parent 4b51128 commit 542dd04fd7872cbddfb404047e34ae90f16a8810 @keithf4 committed Feb 21, 2014
View
@@ -1,3 +1,11 @@
+1.6.1
+-- The python partitioning script now turns off autovacuum on the entire partition set while it is running. This should help reduce load since it will prevent the autovacuum daemon from kicking off while data is being migrated. When the script is done running, the default value for autovacuum is restored to all tables in the partition set. Also, VACUUM ANALYZE is run on the parent table when all data has finished moving as well. There is an option to disable the turning off of autovacuum if the ALTER TABLE statements are causing more contention and issues than the autovacuum. There is no option for turning off autovacuum when using the plpgsql partitioning functions (inability to COMMIT within function loop would cause too much contention).
+-- The order that data is migrated from the parent to the children can now be determined via an option to the partition_data_id/time() functions or the python script. The default is the way it originally moved data (ascending order). Thanks for bougyman from #postgresql on freenode for this idea.
+-- Removed plpgsql function "check_unique_column()" and created python script "check_unique_constraint.py". This runs far more efficiently and causes less contention within the database while checking if a unique constraint is consistent across all child tables. Also now supports checking multi-column constraints. See doc file for more info on script options.
+-- Fixed syntax error in create_parent(), create_id_function() exception blocks. Reported by bougyman.
+-- Added pgtap tests for additional constraints feature.
+
+
1.6.0
-- A new partitioning type has been added to allow setting almost any desired time interval (time-custom). The smallest interval supported is 1 second and the upper limit is bounded by the minimum and maximum timestamp values that PostgreSQL supports (http://www.postgresql.org/docs/current/static/datatype-datetime.html). This feature uses the range data type for internal configuration management, so it is only supported in PostgreSQL 9.2+.
-- The custom time interval is less efficient than both time-static and time-dynamic since it must use a lookup table. If your needed partitioning interval can fit in one of the pre-made intervals given in the documentation, it is highly recommended to use one of those for better performance. time-static is still the best method when performance of inserts is important. See the documentation for more details on this new partitioning type.
View
@@ -1,7 +1,7 @@
{
"name": "pg_partman",
"abstract": "Extension to manage partitioned tables by time or ID",
- "version": "1.6.0",
+ "version": "1.6.1",
"maintainer": [
"Keith Fiske <keith@omniti.com>"
],
@@ -20,9 +20,9 @@
},
"provides": {
"pg_partman": {
- "file": "sql/pg_partman--1.6.0.sql",
+ "file": "sql/pg_partman--1.6.1.sql",
"docfile": "doc/pg_partman.md",
- "version": "1.6.0",
+ "version": "1.6.1",
"abstract": "Extension to manage partitioned tables by time or ID"
}
},
@@ -0,0 +1,50 @@
+#!/usr/bin/env python
+
+import argparse, collections, psycopg2, os, subprocess, sys, tempfile
+# use psql \copy so target file can be anywhere current user can write to
+# Have option that will just output a single number count sum of everything found so it can be more easily used by automated monitoring software (resmon, nagios, etc)
+
+parser = argparse.ArgumentParser(description="This script is used to check that all rows in a partition set are unique for the given columns. Since unique constraints are not applied across partition sets, this cannot be enforced within the database. This script can be used as a monitor to ensure uniquness. If any unique violations are found, the values, along with a count of each, are output.")
+parser.add_argument('-p', '--parent', required=True, help="Parent table of the partition set to be checked")
+parser.add_argument('-l', '--column_list', required=True, help="Comma separated list of columns that make up the unique constraint to be checked")
+parser.add_argument('-c','--connection', default="host=localhost", help="""Connection string for use by psycopg. Defaults to "host=localhost".""")
+parser.add_argument('-t', '--temp', help="Path to a writable folder that can be used for temp working files. Defaults system temp folder.")
+parser.add_argument('--psql', help="Full path to psql binary if not in current PATH")
+parser.add_argument('--simple', action="store_true", help="Output a single integer value with the total duplicate count. Use this for monitoring software that requires a simple value to be checked for.")
+parser.add_argument('-q', '--quiet', action="store_true", help="Suppress all output unless there is a constraint violation found.")
+args = parser.parse_args()
+
+if args.temp == None:
+ tmp_copy_file = tempfile.NamedTemporaryFile(prefix="partman_constraint")
+else:
+ tmp_copy_file = tempfile.NamedTemporaryFile(prefix="partman_constraint", dir=args.temp)
+
+fh = open(tmp_copy_file.name, 'w')
+conn = psycopg2.connect(args.connection)
+cur = conn.cursor()
+if not args.quiet:
+ print "Dumping out column data to temp file..."
+cur.copy_to(fh, args.parent, sep=",", columns=args.column_list.split(","))
+conn.close()
+fh.close()
+
+total_count = 0
+if not args.quiet:
+ print "Checking for dupes..."
+with open(tmp_copy_file.name) as infile:
+ counts = collections.Counter(l.strip() for l in infile)
+for line, count in counts.most_common():
+ if count > 1:
+ if not args.simple:
+ print str(line) + ": " + str(count)
+ total_count += count
+
+if args.simple:
+ if total_count > 0:
+ print total_count
+ elif not args.quiet:
+ print total_count
+else:
+ if total_count == 0 and not args.quiet:
+ print "No constraint violations found"
+
View
@@ -2,68 +2,161 @@
import argparse, psycopg2, time, sys
-parser = argparse.ArgumentParser(description="This script calls either partition_data_time() or partition_data_id() depending on the value given for --type. A commit is done at the end of each --interval and/or fully created partition. Returns the total number of rows moved to partitions. Automatically stops when parent is empty. See docs for examples.")
+parser = argparse.ArgumentParser(description="This script calls either partition_data_time() or partition_data_id() depending on the value given for --type. A commit is done at the end of each --interval and/or fully created partition. Returns the total number of rows moved to partitions. Automatically stops when parent is empty. See docs for examples.", epilog="NOTE: To help avoid heavy load and contention during partitioning, autovacuum is turned off for the parent table and all child tables when this script is run. When partitioning is complete, autovacuum is set back to its default value and the parent table is vacuumed when it is emptied.")
parser.add_argument('-p','--parent', required=True, help="Parent table of an already created partition set. (Required)")
parser.add_argument('-t','--type', choices=["time","id",], required=True, help="""Type of partitioning. Valid values are "time" and "id". (Required)""")
parser.add_argument('-c','--connection', default="host=localhost", help="""Connection string for use by psycopg to connect to your database. Defaults to "host=localhost".""")
parser.add_argument('-i','--interval', help="Value that is passed on to the partitioning function as p_batch_interval argument. Use this to set an interval smaller than the partition interval to commit data in smaller batches. Defaults to the partition interval if not given.")
parser.add_argument('-b','--batch', default=0, type=int, help="""How many times to loop through the value given for --interval. If --interval not set, will use default partition interval and make at most -b partition(s). Script commits at the end of each individual batch. (NOT passed as p_batch_count to partitioning function). If not set, all data in the parent table will be partitioned in a single run of the script.""")
parser.add_argument('-w','--wait', default=0, type=float, help="Cause the script to pause for a given number of seconds between commits (batches) to reduce write load")
+parser.add_argument('-o', '--order', choices=["ASC", "DESC"], default="ASC", help="This option allows you to specify the order that data is migrated from the parent to the children, either ascending (ASC) or descending (DESC). Default is ASC.")
parser.add_argument('-l','--lockwait', default=0, type=float, help="Have a lock timeout of this many seconds on the data move. If a lock is not obtained, that batch will be tried again.")
parser.add_argument('--lockwait_tries', default=10, type=int, help="Number of times to allow a lockwait to time out before giving up on the partitioning. Defaults to 10")
+parser.add_argument('--autovacuum_on', action="store_true", help="Turning autovacuum off requires a brief lock to ALTER the table property. Set this option to leave autovacuum on and avoid the lock attempt.")
parser.add_argument('-q','--quiet', action="store_true", help="Switch setting to stop all output during and after partitioning for use in cron jobs")
+parser.add_argument('--debug', action="store_true", help="Show additional debugging output")
args = parser.parse_args()
-batch_count = 0
-total = 0
-lockwait_count = 0
-
+'''
conn = psycopg2.connect(args.connection)
+conn.autocommit = True
cur = conn.cursor()
sql = "SELECT nspname FROM pg_catalog.pg_namespace n, pg_catalog.pg_extension e WHERE e.extname = 'pg_partman' AND e.extnamespace = n.oid"
cur.execute(sql)
partman_schema = cur.fetchone()[0]
cur.close()
+'''
-cur = conn.cursor()
+def create_conn():
+ conn = psycopg2.connect(args.connection)
+ conn.autocommit = True
+ return conn
-sql = "SELECT " + partman_schema + ".partition_data_" + args.type + "(%s"
-if args.interval != "":
- sql += ", p_batch_interval := %s"
-sql += ", p_lock_wait := %s)"
-while True:
- if args.interval != "":
- li = [args.parent, args.interval, args.lockwait]
- else:
- li = [args.parent, args.lockwait]
-# print cur.mogrify(sql, li)
- cur.execute(sql, li)
- result = cur.fetchone()
- conn.commit()
+def close_conn(conn):
+ conn.close()
+
+
+def get_partman_schema(conn):
+ cur = conn.cursor()
+ sql = "SELECT nspname FROM pg_catalog.pg_namespace n, pg_catalog.pg_extension e WHERE e.extname = 'pg_partman' AND e.extnamespace = n.oid"
+ cur.execute(sql)
+ partman_schema = cur.fetchone()[0]
+ cur.close()
+ return partman_schema
+
+
+def turn_off_autovacuum(conn, partman_schema):
+ cur = conn.cursor()
+ sql = "ALTER TABLE " + args.parent + " SET (autovacuum_enabled = false, toast.autovacuum_enabled = false)"
+ if args.debug:
+ print cur.mogrify(sql)
+ cur.execute(sql)
+ sql = "SELECT * FROM " + partman_schema + ".show_partitions(%s)"
+ if args.debug:
+ print cur.mogrify(sql, [args.parent])
+ cur.execute(sql, [args.parent])
+ result = cur.fetchall()
+ for r in result:
+ sql = "ALTER TABLE " + r[0] + " SET (autovacuum_enabled = false, toast.autovacuum_enabled = false)"
+ if args.debug:
+ print cur.mogrify(sql)
+ cur.execute(sql)
+ cur.close()
+
+
+def reset_autovacuum(conn, table):
+ cur = conn.cursor()
+ sql = "ALTER TABLE " + args.parent + " RESET (autovacuum_enabled, toast.autovacuum_enabled)"
+ if args.debug:
+ print cur.mogrify(sql)
+ cur.execute(sql)
+ sql = "SELECT * FROM " + partman_schema + ".show_partitions(%s)"
+ if args.debug:
+ print cur.mogrify(sql, [args.parent])
+ cur.execute(sql, [args.parent])
+ result = cur.fetchall()
+ for r in result:
+ sql = "ALTER TABLE " + r[0] + " RESET (autovacuum_enabled, toast.autovacuum_enabled)"
+ if args.debug:
+ print cur.mogrify(sql)
+ cur.execute(sql)
+ cur.close()
+
+
+def vacuum_parent(conn):
+ cur = conn.cursor()
+ sql = "VACUUM ANALYZE " + args.parent
+ if args.debug:
+ print cur.mogrify(sql)
if not args.quiet:
- if result[0] > 0:
- print "Rows moved: " + str(result[0])
- elif result[0] == -1:
- print "Unable to obtain lock, trying again"
- # if lock wait timeout, do not increment the counter
- if result[0] <> -1:
- batch_count += 1
- total += result[0]
- lockwait_count = 0
- else:
- lockwait_count += 1
- if lockwait_count > args.lockwait_tries:
- print "quitting due to inability to get lock on next rows to be moved"
- print "total rows moved: %d" % total
+ print "Running vacuum analyze on parent table..."
+ cur.execute(sql)
+ cur.close()
+
+
+def partition_data(conn, partman_schema):
+ batch_count = 0
+ total = 0
+ lockwait_count = 0
+
+ cur = conn.cursor()
+
+ sql = "SELECT " + partman_schema + ".partition_data_" + args.type + "(%s"
+ if args.interval != "":
+ sql += ", p_batch_interval := %s"
+ sql += ", p_lock_wait := %s"
+ sql += ", p_order := %s)"
+
+ while True:
+ if args.interval != "":
+ li = [args.parent, args.interval, args.lockwait, args.order]
+ else:
+ li = [args.parent, args.lockwait, args.order]
+ if args.debug:
+ print cur.mogrify(sql, li)
+ cur.execute(sql, li)
+ result = cur.fetchone()
+ if not args.quiet:
+ if result[0] > 0:
+ print "Rows moved: " + str(result[0])
+ elif result[0] == -1:
+ print "Unable to obtain lock, trying again"
+ # if lock wait timeout, do not increment the counter
+ if result[0] <> -1:
+ batch_count += 1
+ total += result[0]
+ lockwait_count = 0
+ else:
+ lockwait_count += 1
+ if lockwait_count > args.lockwait_tries:
+ print "quitting due to inability to get lock on next rows to be moved"
+ print "total rows moved: %d" % total
+ break
+ # If no rows left or given batch argument limit is reached
+ if (result[0] == 0) or (args.batch > 0 and batch_count >= int(args.batch)):
break
- # If no rows left or given batch argument limit is reached
- if (result[0] == 0) or (args.batch > 0 and batch_count >= int(args.batch)):
- break
- time.sleep(args.wait)
+ time.sleep(args.wait)
+
+ return total
+
+if __name__ == "__main__":
+ conn = create_conn()
+ cur = conn.cursor()
+ partman_schema = get_partman_schema(conn)
+
+ if not args.autovacuum_on:
+ turn_off_autovacuum(conn, partman_schema)
+
+ total = partition_data(conn, partman_schema)
+
+ if not args.quiet:
+ print "Total rows moved: %d" % total
+
+ vacuum_parent(conn)
-if not args.quiet:
- print "total rows moved: %d" % total
+ if not args.autovacuum_on:
+ reset_autovacuum(conn, partman_schema)
-conn.close()
+ close_conn(conn)
Oops, something went wrong.

0 comments on commit 542dd04

Please sign in to comment.