From 49c65762d5758db68ea43b70d70db9769b2815f5 Mon Sep 17 00:00:00 2001 From: zhouxiao Date: Thu, 10 Nov 2016 21:28:44 +0800 Subject: [PATCH] first commit: prepared --- cs_settings_CRM0.py | 23 ++ py_table_checksum.py | 786 +++++++++++++++++++++++++++++++++++++++++++ settings_checksum.py | 33 ++ 3 files changed, 842 insertions(+) create mode 100644 cs_settings_CRM0.py create mode 100644 py_table_checksum.py create mode 100644 settings_checksum.py diff --git a/cs_settings_CRM0.py b/cs_settings_CRM0.py new file mode 100644 index 0000000..af4d71a --- /dev/null +++ b/cs_settings_CRM0.py @@ -0,0 +1,23 @@ + +db_host_src = '10.0.200.195' +db_host_dst = '10.0.200.195' + +TABLES_CHECK = {"d_name": ["t_1", "t_2"], + "d_name2": ["t_3"]} + +DB_SOURCE = {'db_host': db_host_src, + 'db_port': 3306, + 'db_user': 'ecuser', + 'db_pass': 'ecuser', + 'db_charset': 'latin1', + 'result_charset': 'latin-1'} + +DB_TARGET = {'db_host': db_host_dst, + 'db_port': 3307, + 'db_user': 'ecuser', + 'db_pass': 'ecuser', + 'db_charset': 'utf8mb4', + 'result_charset': 'utf-8'} + + +DB_ID_CS = 'qa-crm0' diff --git a/py_table_checksum.py b/py_table_checksum.py new file mode 100644 index 0000000..f1cd8ed --- /dev/null +++ b/py_table_checksum.py @@ -0,0 +1,786 @@ +# -*- coding=utf-8 -*- + +import MySQLdb +from MySQLdb.constants import FIELD_TYPE +from MySQLdb.converters import conversions +from zlib import crc32 +from cs_settings_BASE import DB_SOURCE, DB_TARGET, TABLES_CHECK, DB_ID_CS +import threading +import sys +import time +import random +from hotqueue import HotQueue +from settings_checksum import REDIS_INFO, DB_CHECKSUM, CALC_CRC32_DB, REDIS_QUEUE_CNT, REDIS_POOL_CNT, CHUNK_SIZE +from settings_checksum import GEN_DATAFIX, RUN_DATAFIX, DO_COMPARE + +CHARSET_MAPPING = {"latin1": "latin-1", + "utf8": "utf-8", + "utf8mb4": "utf-8"} + +TABLES_CHECK_COUNT = 0 +for t in TABLES_CHECK.values(): + TABLES_CHECK_COUNT += len(t) + + +class QueueHelper(object): + def __init__(self, in_cnt, out_cnt): + self.in_cnt = in_cnt + self.out_cnt = out_cnt + + time_uniq = int(time.time()) + checksum_queue_key_prefix = "checksum-%d-" % time_uniq + redis_host, redis_port = REDIS_INFO.split(":") + + # 生产者客户端连接池,根据同时比较的表数量而定 + self.in_queues = [HotQueue(checksum_queue_key_prefix + str(i), + host=redis_host, port=redis_port, db=11) for i in range(in_cnt)] + # 消费者取用队列,对应同等个数的后端消费者 + self.out_queues = [HotQueue(checksum_queue_key_prefix + str(i), + host=redis_host, port=redis_port, db=11) for i in range(out_cnt)] + + # 随机挑选一个redis连接 + def get_in_client(self): + in_clients = self.in_cnt - 1 + return self.in_queues[random.randint(0, in_clients)] + + def get_out_client(self): + return self.out_queues + + def destroy_queue(self, queue): + queue.clear() + +queues = QueueHelper(REDIS_POOL_CNT, REDIS_QUEUE_CNT) + + +def get_dbconn(**dbconn_info): + db_host = dbconn_info['db_host'] + db_user = dbconn_info['db_user'] + db_pass = dbconn_info['db_pass'] + db_port = dbconn_info['db_port'] + db_name = dbconn_info.get('db_name', None) + db_charset = dbconn_info.get('db_charset', 'latin1') + + myconv = {FIELD_TYPE.TIMESTAMP: str, FIELD_TYPE.DATETIME: str} + myconv = conversions.copy() + del myconv[FIELD_TYPE.TIMESTAMP] + del myconv[FIELD_TYPE.DATETIME] + + ### print "Connect to [%s:%d] using %s" % (db_host, db_port, db_charset) + + conn = None + try: + conn = MySQLdb.connect(host=db_host, user=db_user, passwd=db_pass, port=db_port, charset=db_charset, + connect_timeout=5, conv=myconv) # use_unicode=False, + except MySQLdb.Error, e: + print "Error %d: %s" % (e.args[0], e.args[1]) + sys.exit(-1) + + if db_name is not None: + conn.select_db(db_name) + #conn.autocommit(False) + + return conn + + +class CalcTbl(object): + def __init__(self, db_conn, db_conn_checksum=None, dbconn_info={}): + self.db_conn = db_conn + self.db_conn_cs = db_conn_checksum + if len(dbconn_info) != 0: + self.result_charset = CHARSET_MAPPING[dbconn_info['db_charset']] + + def close_conn(self): + try: + self.db_conn.close() + self.db_conn_cs.close() + finally: + print "db conection closed." + + def get_uniq_key(self, table_name): + cur = self.db_conn.cursor() + sql_keys = "show index from " + table_name + " where Key_name='PRIMARY'" + + #print sql_keys + + cur.execute(sql_keys) + res = cur.fetchall() + + #print res + + cur.close() + + if len(res) == 0: + sql_keys = "show index from " + table_name + " where Non_unique=0" + cur = self.db_conn.cursor() + cur.execute(sql_keys) + res = cur.fetchall() + cur.close() + + if len(res) == 0: + print "Warning: No PRIMARY or UNIQUE key found in ", table_name + sys.exit(-1) + + t_uniq = [col[4] for col in res] + + ### print "Using [", t_uniq, "] as the unique column for ", table_name + + return t_uniq + + def get_cols(self, table_name): + global TABLES_CHECK_COUNT + ts, tn = table_name.split(".") + cur = self.db_conn.cursor() + sql_cols = "select GROUP_CONCAT(COLUMN_NAME) from information_schema.COLUMNS where table_name =%s" + \ + " and table_schema=%s" + param = (tn, ts) + + cur.execute(sql_cols, param) + res = cur.fetchall() + if res[0][0] is not None: + return ",".join(res[0]) + else: + print "Error: table %s does not exist" % table_name + TABLES_CHECK_COUNT -= 1 + sys.exit(-1) + + + # 用于在找出不同行,结构与make_chunk_sql,select_chunk类似 + def get_chunk_rows(self, table_name, start_key, end_key): + t_cols = self.get_cols(table_name) + t_uniq_keys = self.get_uniq_key(table_name) + + t_uniq_key_com = ",".join(t_uniq_keys) + t_uniq_key_order = " asc,".join(t_uniq_keys) + " asc" + + t_uniq_filter_list_min = [] + for wf_cnt in range(0, len(t_uniq_keys)): + t_uniq_filter_or = ("(" + t_uniq_keys[wf_cnt] + " > %s ") + for wf_cnt2 in range(0, wf_cnt): + t_uniq_filter_or += "and " + t_uniq_keys[wf_cnt2] + " = %s " + t_uniq_filter_or += ")" + + t_uniq_filter_list_min.append(t_uniq_filter_or) + + t_uniq_filter_min = " OR ".join(t_uniq_filter_list_min) + + t_uniq_filter_list_max = [] + len_uniq_keys = len(t_uniq_keys) + for wf_cnt in range(0, len_uniq_keys): + if wf_cnt == len_uniq_keys - 1: # 前闭后开区间 (min,max] + t_uniq_filter_or = ("(" + t_uniq_keys[wf_cnt] + " <= %s ") + else: + t_uniq_filter_or = ("(" + t_uniq_keys[wf_cnt] + " < %s ") + for wf_cnt2 in range(0, wf_cnt): + t_uniq_filter_or += "and " + t_uniq_keys[wf_cnt2] + " = %s " + t_uniq_filter_or += ")" + + t_uniq_filter_list_max.append(t_uniq_filter_or) + + t_uniq_filter_max = " OR ".join(t_uniq_filter_list_max) + + t_uniq_filter = "(%s) AND (%s)" % (t_uniq_filter_min, t_uniq_filter_max) + + sql_plain_rows = "select concat_ws('-'," + t_uniq_key_com + "), " + t_cols + " from " + table_name + \ + " where " + t_uniq_filter + " order by " + t_uniq_key_order + t_start_value = start_key.split('-') + start_values = () + for wf_v in range(len(t_start_value)): + start_values += (t_start_value[wf_v],) + tuple(t_start_value[0:wf_v]) + + t_end_value = end_key.split('-') + end_values = () + for wf_v in range(len(t_end_value)): + end_values += (t_end_value[wf_v],) + tuple(t_end_value[0:wf_v]) + + # print "diff sql: ", sql_plain_rows + # print "diff param: ", start_values + end_values + chunk_crc32_rows = [] + + cur = self.db_conn.cursor() + cur.execute(sql_plain_rows, start_values + end_values) + + res = cur.fetchall() + for row in res: + #print row[1:] + row_str = self.conv_tuple_encode(0, row[1:]) + + # row_str = "#".join([r.encode(self.result_charset) for r in row[1:]]) + row_crc32 = crc32(row_str) + + row_list = [row[0]] + row_list.append(str(row_crc32)) + row_list.append(row_str) + #print row_list + chunk_crc32_rows.append(row_list) + + return chunk_crc32_rows + + def conv_tuple_encode(self, tuple_str=0, *row): + row_str = "" + for rst in row[0]: + # print "unicodetype:", type(rst) + if isinstance(rst, unicode): + rst = rst.encode(self.result_charset) + # print rst + else: + rst = str(rst) + row_str += rst + "#" + + if tuple_str == 1: + row_list = [] + for rst in row[0]: + if isinstance(rst, unicode): + rst = rst.encode(self.result_charset) + elif rst is None: + rst = "[[NULL]]" # handle NULL columns + else: + rst = str(rst) + row_list.append(rst) + row_str = tuple(row_list) + + return row_str + + def make_chunk_sql(self, table_name, chunk_size=2000): + t_cols = self.get_cols(table_name) + t_uniq_keys = self.get_uniq_key(table_name) + + t_uniq_key_com = ",".join(t_uniq_keys) + t_uniq_key_order = " asc,".join(t_uniq_keys) + " asc" + # t_uniq_start_pair = dict(zip(t_uniq_keys, start_key)) + + t_uniq_filter_list = [] + for wf_cnt in range(0, len(t_uniq_keys)): + t_uniq_filter_or = ("(" + t_uniq_keys[wf_cnt] + " > %s ") + for wf_cnt2 in range(0, wf_cnt): + t_uniq_filter_or += "and " + t_uniq_keys[wf_cnt2] + " = %s " + t_uniq_filter_or += ")" + + t_uniq_filter_list.append(t_uniq_filter_or) + + t_uniq_filter = " OR ".join(t_uniq_filter_list) + + if CALC_CRC32_DB: + print "Caculate crc32 in db instead of program.(save net traffic, but make more db load)" + sql_plain_rows = "select concat_ws('-'," + t_uniq_key_com + "), CRC32( concat_ws('#', " + t_cols + ") ) from " + table_name + \ + " where " + t_uniq_filter + " order by " + t_uniq_key_order + " limit %d" % CHUNK_SIZE + else: + print "Caculate crc32 in program instead of db.(this program need more memory and more db net traffic, but convert charset)" + sql_plain_rows = "select concat_ws('-'," + t_uniq_key_com + "), concat_ws('#', " + t_cols + ") from " + table_name + \ + " where " + t_uniq_filter + " order by " + t_uniq_key_order + " limit %d" % CHUNK_SIZE + # print sql_plain_rows + + return len(t_uniq_keys), sql_plain_rows + + # 计算一个chunk的crc32值 + # 输入拼装好的sql,传入界定chunk的参数,out_rows用在后面找到具体行的不同 + def select_chunk(self, sql_chunkraw, start_key, out_rows=0): + cur = self.db_conn.cursor() + + t_start_value = start_key.split('-') + wf_values = () + for wf_v in range(len(t_start_value)): + wf_values += (t_start_value[wf_v],) + tuple(t_start_value[0:wf_v]) + + param = wf_values + rows_count = 0 + # print sql_chunkraw + # print param + + try: + cur.execute(sql_chunkraw, tuple(param)) + rows_count = cur.rowcount + except MySQLdb.Error, e: + print "Error %d: %s !" % (e.args[0], e.args[1]) + + if rows_count > 0: + res = cur.fetchall() + + max_id = res[-1][0] + rows_crc32 = "" + rows_id = "" + + for row in res: + rows_id += str(row[0]) + "," # row_id + + if CALC_CRC32_DB: + rows_crc32 += str(row[1]) + "," + else: + #print row + #print self.result_charset + if isinstance(row[1], unicode): + row_str = row[1].encode(self.result_charset) + else: + row_str = row[1] + # print rows_id, row_str + rows_crc32 += str(crc32(row_str)) + "," + + # print max_id, rows_crc32 + if out_rows == 1: + return rows_id.split(","), rows_crc32.split(",") + else: + return max_id, crc32(rows_crc32) # chunk_crc32 + else: + # 已完成所有chunk,或者有异常 + return -1, -1 + + def crc32_test(self, table_name): + cur = self.db_conn.cursor() + sql_row = "select f_log_id,f_content from " + table_name + " where f_log_id=772713468" + cur.execute(sql_row) + res = cur.fetchall() + #print res + + cur.close() + + return res[0] + + def del_old_checksum(self, table_name): + cur = self.db_conn_cs.cursor() + print tuple(table_name.split('.')) + sql_del = "delete from t_checksum where f_schema_name='%s' and f_table_name='%s'" % \ + tuple(table_name.split('.')) + + print sql_del + cur.execute(sql_del) + + cur.close() + + return 1 + + def write_output(self, *res_cs_cols): + conn = self.db_conn_cs + + try: + cur = conn.cursor() + + sqlstr = "insert into t_checksum(f_dbid,f_table_name,f_chunk_no,f_schema_name,f_min_id,f_max_id,f_chunk_crc32) " + \ + "values(%s,%s,%s,%s,%s,%s,%s)" + param = res_cs_cols #(dbid, chunk_no, schema_name, table_name, max_id, chunk_crc32) + + cur.execute(sqlstr, param) + conn.commit() + + except MySQLdb.Error, e: + print "Error %d: %s" % (e.args[0], e.args[1]) + print "write checksum data to t_checksum FAIL." + conn.rollback() + finally: + cur.close() + + +class CheckSum(object): + def __init__(self, st_name, dbconn_cs_info, dbconn_info=None): + try: + self.dbconn_checksum = get_dbconn(**dbconn_cs_info) + except MySQLdb.Error, e: + try: + print "MySQL Error [%d]: %s" % (e.args[0], e.args[1]) + except IndexError: + print "MySQL Error: %s" % str(e) + finally: + sys.exit(-1) + + if dbconn_info is not None: + self.dbconn_info = dbconn_info + self.dbconn = get_dbconn(**dbconn_info) + self.dbid = dbconn_info['db_host']+":"+str(dbconn_info['db_port']) + if st_name != '': + self.st_name = st_name + + def close_conn(self): + try: + self.dbconn_checksum.close() + self.dbconn.close() + finally: + print "db conection closed." + + def before_checksum(self): + cur = self.dbconn_checksum.cursor() + + print "Before checksum: create table if not exists t_checksum" + sqlstr = """ + CREATE TABLE IF NOT EXISTS t_checksum ( + f_dbid varchar(80) NOT NULL, + f_table_name varchar(50) NOT NULL, + f_chunk_no int(11) NOT NULL, + f_create_time timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, + f_schema_name varchar(30) DEFAULT NULL, + f_min_id varchar(50) NOT NULL, + f_max_id varchar(50) NOT NULL, + f_chunk_crc32 varchar(20) DEFAULT NULL, + PRIMARY KEY (f_dbid,f_table_name,f_chunk_no,f_create_time), + KEY idx_tbname_maxid (f_table_name,f_max_id), + KEY idx_chunkno (f_chunk_no) + ) ENGINE=InnoDB DEFAULT CHARSET=utf8 + """ + cur.execute(sqlstr) + + print "Before checksum: delele old data from t_checksum if exists for table: ", self.st_name + sqlstr = "DELETE FROM t_checksum WHERE f_table_name=%s AND f_schema_name=%s" + schema_name, table_name = self.st_name.split(".") + param = (table_name, schema_name) + + cur.execute(sqlstr, param) + cur.close() + + + # 在源source实例上执行检查 + def do_checksum(self): + + self.before_checksum() + + schema_name, table_name = self.st_name.split('.') + dt_name = self.st_name + + mycheck = CalcTbl(self.dbconn, self.dbconn_checksum, self.dbconn_info) + + param_num, sql_chunkraw = mycheck.make_chunk_sql(dt_name) + + # mycheck.del_old_checksum(dt_name) + + print "Caculating checksums: ", self.dbid, dt_name + + start_key = "-".join(['0'] * param_num) + chunk_no = 1 + + while True: + # 批次计算crc32 + max_id, chunk_crc32 = mycheck.select_chunk(sql_chunkraw, start_key) # , self.dbconn_info['result_charset']) + + if max_id != -1: + res_cs_cols = (self.dbid, table_name, chunk_no, schema_name, start_key, max_id, chunk_crc32) + # print "SOURCE: ", sql_chunkraw + # print "SOURCE:", res_cs_cols + mycheck.write_output(*res_cs_cols) + + # 写入t_checksum表后,将刚执行过才select chunk sql放入队列 + queues.get_in_client().put([dt_name, sql_chunkraw, start_key, chunk_no]) + + start_key = max_id + chunk_no += 1 + else: + print "源实例", self.dbid, dt_name, " 计算checksum结束!" + queues.get_in_client().put([-1, -1, -1, -1]) # chunk_no=-1通知队列结束 + break + + self.close_conn() + + # 检查目标库,从队列取sql + def do_checksum_target(self, sql_queue): + global TABLES_CHECK_COUNT + dbid = DB_TARGET['db_host'] + ":" + str(DB_TARGET['db_port']) + + dbconn = get_dbconn(**DB_TARGET) + dbconn_checksum = get_dbconn(**DB_CHECKSUM) + + tbl_check = CalcTbl(dbconn, dbconn_checksum, DB_TARGET) + + for item in sql_queue.consume(): + + chunk_no = item[3] + if chunk_no == -1: + TABLES_CHECK_COUNT -= 1 # 全局变量,控制消费线程退出 + + if TABLES_CHECK_COUNT <= 0: # 通知其它队列结束 + print "消费sql %d 退出!!" % TABLES_CHECK_COUNT + print + for sql_queue_other in queues.get_out_client(): + sql_queue_other.put([-1, -1, -1, -1]) + sql_queue.clear() + sys.exit(0) + else: + schema_name, table_name = item[0].split(".") + start_key = item[2] + try: + max_id, chunk_crc32 = tbl_check.select_chunk(item[1], start_key) + + res_cs_cols = (dbid, table_name, chunk_no, schema_name, start_key, max_id, chunk_crc32) + print "TARGET:", res_cs_cols + tbl_check.write_output(*res_cs_cols) + + except MySQLdb.Error, e: + print "Error %d: %s" % (e.args[0], e.args[1]) + + +class Compare(object): + def __init__(self, st_name, **dbconn_cs_info): + self.dbconn_checksum = get_dbconn(**dbconn_cs_info) + self.st_name = st_name + + def close_conn(self): + try: + self.dbconn_checksum.close() + finally: + print "db conection closed." + + def do_compare(self): + schema_name, table_name = self.st_name.split(".") + + conn = self.dbconn_checksum + cur = conn.cursor() + + sql_vs_raw = "SELECT t1.f_chunk_no,t1.f_min_id,t1.f_max_id,t1.f_chunk_crc32,t2.f_min_id,t2.f_max_id,t2.f_chunk_crc32 " + \ + "FROM t_checksum t1, t_checksum t2 WHERE " + \ + "t1.f_table_name = t2.f_table_name AND " + \ + "t1.f_chunk_no = t2.f_chunk_no AND " + \ + "t1.f_schema_name = t2.f_schema_name AND " + \ + "t1.f_dbid != t2.f_dbid AND " + \ + "t1.f_chunk_crc32 > t2.f_chunk_crc32 AND " + \ + "t1.f_schema_name = %s and t1.f_table_name = %s " + # print sql_vs_raw + + param = (schema_name, table_name) + cur.execute(sql_vs_raw, param) + res = cur.fetchall() + + cur.close() + # conn.close() + + if len(res) > 0: + print "表 %s.%s 数据不一致chunk数:%d " % (schema_name, table_name, len(res)) + print "-" * 80 + chunk_rows_fix = {"DELETE": [], "INSERT_UPDATE": []} + for row in res: + # 两边f_min_id肯定是一样的 + # chunk_no, min_id, max_id_src, max_id_tgt \ + diff_chunk = (row[0], row[1], row[2], row[5]) + print + print u"该chunk [%s] 存在行内容不一致, CRC32: src(%s) rgt(%s)" % (row[0], row[3], row[6]) + + data_fix = self.get_diffs(self.st_name, *diff_chunk) + chunk_rows_fix['DELETE'].extend(data_fix[0]) + chunk_rows_fix['INSERT_UPDATE'].extend(data_fix[1]) + + if GEN_DATAFIX: + print + print "-" * 80 + print "正在生成在目标库的修复SQL..." + fixfile = self.generate_fixsql(self.st_name, **chunk_rows_fix) + + if RUN_DATAFIX: + print "-" * 80 + print "【注意】正在目标库(%s:%s)进行数据修复表(%s)" % (DB_TARGET['db_host'], DB_TARGET['db_port'], self.st_name) + self.run_fixsql(fixfile) + + else: + cur = conn.cursor() + sql_table_exists = "select 1 from t_checksum t1 where t1.f_schema_name = %s and t1.f_table_name = %s" + cur.execute(sql_table_exists, param) + if cur.rowcount == 0: + print "没有找到表 %s.%s 的checksum数据" % (schema_name, table_name) + else: + print "表 %s.%s 数据一致" % (schema_name, table_name) + cur.close() + + conn.close() + + def get_diffs(self, schema_table_name, *diff_chunk): + #my_conv = {MySQLdb.constants.FIELD_TYPE.LONG: str} + print "去源库和目标库获取chunk[%d]不一致行:" % diff_chunk[0] + dbconn_source = get_dbconn(**DB_SOURCE) + dbconn_target = get_dbconn(**DB_TARGET) + + mycalc_src = CalcTbl(dbconn_source, dbconn_info=DB_SOURCE) + mycalc_tgt = CalcTbl(dbconn_target, dbconn_info=DB_TARGET) + + chunk_no, min_id, max_id_src, max_id_tgt = diff_chunk + # max_id_real = max(max_id_src, max_id_tgt) + max_id_real = self.get_realmax_id(max_id_src, max_id_tgt) + + # print "max_id_real:", max_id_real + + chunk_crc32_rows_src = mycalc_src.get_chunk_rows(schema_table_name, min_id, max_id_real) + chunk_crc32_rows_tgt = mycalc_tgt.get_chunk_rows(schema_table_name, min_id, max_id_real) + + chunk_crc32_src = set([row[0]+":"+row[1] for row in chunk_crc32_rows_src]) + chunk_crc32_tgt = set([row[0]+":"+row[1] for row in chunk_crc32_rows_tgt]) + + rows_inserted = [rowid.split(":")[0] for rowid in (chunk_crc32_src - chunk_crc32_tgt)] # inserted or updated + rows_deleted2 = [rowid.split(":")[0] for rowid in (chunk_crc32_tgt - chunk_crc32_src)] # deleted or updated + + # print "XXXXXX", chunk_crc32_tgt + + rows_updated = set(rows_inserted) & set(rows_inserted) + rows_deleted = set(rows_deleted2) - rows_updated + + # fix_dict = {"DELETE": rows_deleted, "INSERT_UPDATE": list(rows_inserted)} + data_fix = list(rows_deleted), rows_inserted + + if len(rows_inserted) + len(rows_deleted) == 0: + print "表%s 上chunk %d 数据已一致" % (schema_table_name, chunk_no) + else: + print " TO insert or update: ", data_fix[1] + print " TO delete: ", data_fix[0] + + # print "data_fix: ", data_fix + return data_fix + + def get_realmax_id(self, max_id1, max_id2): + + if max_id1 == max_id2: + return max_id1 + + maxid1_list = max_id1.split('-') + maxid2_list = max_id2.split('-') + pos_max = 0 + + for id1, id2 in zip(maxid1_list, maxid2_list): + try: + id1_long = long(id1) + id2_long = long(id2) + if id1_long > id2_long: + pos_max = max_id1 + elif id1_long < id2_long: + pos_max = max_id2 + else: + pos_max = 0 + except ValueError: + if id1 > id2: + pos_max = max_id1 + elif id1 < id2: + pos_max = max_id2 + else: + pos_max = 0 + if pos_max != 0: + break + + return pos_max + + # 根据找出的不同行的主键,生成从源库到目标库的修复sql + def generate_fixsql(self, schema_table_name, **tbl_key): + time_unique = int(time.time()) + datafix_file = "datafix-%s-%d.sql" % (schema_table_name.split(".")[1], time_unique) + data_fix_sql = open(datafix_file, "a") + + tbl_ops = CalcTbl(get_dbconn(**DB_SOURCE), dbconn_info=DB_SOURCE) + wf_keys = tbl_ops.get_uniq_key(schema_table_name) + # t_cols = tbl_ops.get_cols(schema_table_name) + + wf_keys_str = "='%s' and ".join(wf_keys) + "='%s'" + + for dml, params in tbl_key.items(): + + if dml == 'DELETE': + for k in params: + wf_values = k.split('-') + sql_str_raw = "delete from " + schema_table_name + " where " + wf_keys_str + ";" + sql_str = sql_str_raw % tuple(wf_values) + #print + #print "DELETE ", sql_str, tuple(wf_values) + data_fix_sql.write(sql_str + "\n") + if dml == "INSERT_UPDATE": + dbconn = get_dbconn(**DB_SOURCE) + + for k in params: + wf_values = k.split('-') + sql_str_src = "select * from " + schema_table_name + " where " + wf_keys_str % tuple(wf_values) + + cur = dbconn.cursor() + cur.execute(sql_str_src) + if cur.rowcount == 1: + res = cur.fetchone() + res2 = tbl_ops.conv_tuple_encode(1, res) + # print "fix field: ", res + # print "fix field2: ", res2 + else: + print + print "Error: exactly one line expected." + print sql_str_src + cur.close() + + # sql_str_raw = "insert into " + schema_table_name + "(" + t_cols + ") values('%s') on duplicate key update;" + sql_str_raw = "REPLACE into " + schema_table_name + " values('%s');" + sql_str = sql_str_raw % "', '".join(res2) + sql_str = sql_str.replace("\'[[NULL]]\'", "NULL") + #print sql_str + data_fix_sql.write(sql_str + "\n") + + dbconn.close() + + data_fix_sql.close() + print "修复数据SQL文件:%s" % datafix_file + + return datafix_file + + def run_fixsql(self, fixfile): + dbconn = get_dbconn(**DB_TARGET) + cur = dbconn.cursor() + + f = open(fixfile, 'r') + line_sql = f.readline() + while line_sql: + cur.execute(line_sql) + line_sql = f.readline() + dbconn.commit() + + f.close() + + +class outThread(threading.Thread): + def __init__(self, sql_queue): + threading.Thread.__init__(self) + self.sql_queue = sql_queue + + def run(self): + # 消费线程不关心队列里是哪个表的sql + outcheck = CheckSum(st_name='', dbconn_cs_info=DB_CHECKSUM, dbconn_info=DB_TARGET) + outcheck.do_checksum_target(self.sql_queue) + + +class myThread(threading.Thread): + def __init__(self, threadID, schema_name, table_name, **dbconn_info): + threading.Thread.__init__(self) + # self.threadID = threadID + self.name = threadID + self.schema_name = schema_name + self.table_name = table_name + self.dbconn_info = dbconn_info + + def run(self): + dbid = self.dbconn_info['db_host'] + ":" + str(self.dbconn_info['db_port']) + st_name = self.schema_name + "." + self.table_name + print "Starting checksum thread for table: %s (%s)" % (st_name, dbid) + + checksum = CheckSum(st_name, DB_CHECKSUM, self.dbconn_info) + checksum.do_checksum() + + # do_checksum(self.schema_name, self.table_name, **self.dbconn_info) + + print "Checksum thread ended for table: %s (%s) " % (st_name, dbid) + + +if __name__ == '__main__': + + # 多线程并行计算checksum + if DO_COMPARE in (1, 0): + # 执行过的sql存放两个redis队列,各自对应一个线程来消费 + for out_queue in queues.get_out_client(): + thread_outcheck = outThread(out_queue) + thread_outcheck.start() + + TID_no = 0 + for s_name, t_names in TABLES_CHECK.items(): + for t_name in t_names: + thread_checksum = myThread(DB_ID_CS+"-"+str(TID_no), s_name, t_name, **DB_SOURCE) + thread_checksum.start() + TID_no += 1 + time.sleep(0.6) + + # 串行比较 + if DO_COMPARE in (1, 2): + if DO_COMPARE == 1: + while TABLES_CHECK_COUNT >= 0: + time.sleep(2) + + for s_names, t_namess in TABLES_CHECK.items(): + for t_n in t_namess: + print "#" * 80 + print "Start compare chunk's crc32 for table: [ %s.%s ]" % (s_names, t_n) + tb_cmp = Compare(s_names + "." + t_n, **DB_CHECKSUM) + tb_cmp.do_compare() + + print "" + print "#" * 80 + + + # do_checksum(st_name['schema'], st_name['tables'][0]) \ No newline at end of file diff --git a/settings_checksum.py b/settings_checksum.py new file mode 100644 index 0000000..aabe855 --- /dev/null +++ b/settings_checksum.py @@ -0,0 +1,33 @@ +#coding:utf-8 +from cs_settings_BASE import DB_SOURCE, DB_TARGET, TABLES_CHECK + +# sql队列 redis地址 +REDIS_INFO = "10.0.200.168:6379" + +# checksum结果数据存放位置 +DB_CHECKSUM = {'db_host': '192.168.1.125', + 'db_port': 3307, + 'db_user': 'ecuser', + 'db_pass': 'ecuser', + 'db_charset': 'utf8', + 'db_name': 'd_00200'} + +# 是否直接在数据库计算CRC32 +CALC_CRC32_DB = True if DB_SOURCE['db_charset'] == DB_TARGET['db_charset'] else False + +# REDIS队列生产端连接池连接数量,REDIS sql队列数量 +# 根据每次比较的表数量调整。2,2 适合3-5个表同时比较 +REDIS_POOL_CNT, REDIS_QUEUE_CNT = (2, 2) + +# 每次从源库取得计算数据行的大小 +CHUNK_SIZE = 2000 + +# 只从checksum表比较(不从目标库和源库拉取计算) +# 0:不比较,只计算; 1:计算并比较,2:只比较,不计算 +DO_COMPARE = 1 + +# 生成修复SQL +GEN_DATAFIX = True + +# 程序自动在目标库运行修复SQL (warning!) +RUN_DATAFIX = False \ No newline at end of file