Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 47 additions & 6 deletions pymysqlreplication/binlogstream.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,13 @@

from .packet import BinLogPacketWrapper
from .constants.BINLOG import TABLE_MAP_EVENT, ROTATE_EVENT
from .constants.FLAG import FLAG_STMT_END_F, FLAG_NO_FOREIGN_KEY_CHECKS_F, FLAG_RELAXED_UNIQUE_CHECKS_F, FLAG_COMPLETE_ROWS_F
from .event import NotImplementedEvent


MYSQL_EXPECTED_ERROR_CODES = [2013, 2006] #2013 Connection Lost
#2006 MySQL server has gone away


class BinLogStreamReader(object):
"""Connect to replication stream and read event
"""
Expand Down Expand Up @@ -45,6 +46,7 @@ def __init__(self, connection_settings={}, resume_stream=False,

#Store table meta information
self.table_map = {}
self.dict_table_id = {}
self.log_pos = log_pos
self.log_file = log_file

Expand Down Expand Up @@ -154,9 +156,9 @@ def fetchone(self):
self._ctl_connection,
self.__use_checksum)
if binlog_event.event_type == TABLE_MAP_EVENT:
self.table_map[binlog_event.event.table_id] = \
binlog_event.event.get_table()

if binlog_event.event.table_id not in self.table_map:
self.table_map[binlog_event.event.table_id] = binlog_event.event.get_table()
self.__manage_table_map(binlog_event.event.table_id)
if binlog_event.event_type == ROTATE_EVENT:
self.log_pos = binlog_event.event.position
self.log_file = binlog_event.event.next_binlog
Expand All @@ -166,16 +168,18 @@ def fetchone(self):
# wrong table schema.
# The fix is to rely on the fact that MySQL will also rotate to a new binlog file every time it
# restarts. That means every rotation we see *could* be a sign of restart and so potentially
# invalidates all our cached table id to schema mappings. This means we have to load them all
# invalidates all our cached table id to schema mappings. This means we have to load them all
# again for each logfile which is potentially wasted effort but we can't really do much better
# without being broken in restart case
self.table_map = {}
self.dict_table_id = {}
elif binlog_event.log_pos:
self.log_pos = binlog_event.log_pos

self.__manage_if_last_event_of_statement(binlog_event)

if self.__filter_event(binlog_event.event):
continue

return binlog_event.event

def __filter_event(self, event):
Expand Down Expand Up @@ -215,5 +219,42 @@ def __get_table_information(self, schema, table):
else:
raise error

def __manage_if_last_event_of_statement(self, last_event_statement):
"""
looking for flags to FLAG_STMT_END_F
if event is the last event of a statement
"""
for row_event in last_event_statement:
if isinstance(event, row_event):
if event.flags & FLAG_STMT_END_F :
key = "%s.%s" % (self.table_map[event.table_id].schema,self.table_map[event.table_id].table)

# key exists ?
if key in self.dict_table_id:
# keep the last one
del self.dict_table_id[key]

# id exists ?
if event.table_id in self.table_map:
# del it
del self.table_map[event.table_id]
break

def __manage_table_map(self, new_table_id):
"""
Looking for a duplicate entry in self.table_map (same schema.table with old table_id)
from the new_table_id
"""
key = "%s.%s" % (self.table_map[new_table_id].schema,self.table_map[new_table_id].table)

# key exists ?
if key in self.dict_table_id:
# get old entry
if self.dict_table_id[key] in self.table_map:
# del it
del self.table_map[self.dict_table_id[key]]
# keep the last one
self.dict_table_id[key] = new_table_id

def __iter__(self):
return iter(self.fetchone, None)
11 changes: 11 additions & 0 deletions pymysqlreplication/constants/FLAG.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
# Last event of a statement
FLAG_STMT_END_F = 1 << 0
# Value of the OPTION_NO_FOREIGN_KEY_CHECKS flag in thd->options
FLAG_NO_FOREIGN_KEY_CHECKS_F = 1 << 1
# Value of the OPTION_RELAXED_UNIQUE_CHECKS flag in thd->options
FLAG_RELAXED_UNIQUE_CHECKS_F = 1 << 2
#
# Indicates that rows in this event are complete, that is contain
# values for all columns of the table.
#
FLAG_COMPLETE_ROWS_F = 1 << 3