Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
113 changes: 111 additions & 2 deletions pymysqlreplication/binlogstream.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import pymysql
import struct

from pymysql.constants.COMMAND import COM_BINLOG_DUMP
from pymysql.constants.COMMAND import COM_BINLOG_DUMP, COM_REGISTER_SLAVE
from pymysql.cursors import DictCursor
from pymysql.util import int2byte

Expand All @@ -30,17 +30,105 @@
MYSQL_EXPECTED_ERROR_CODES = [2013, 2006]


class ReportSlave(object):

"""Represent the values that you may report when connecting as a slave
to a master. SHOW SLAVE HOSTS related"""

hostname = ''
username = ''
password = ''
port = 0

def __init__(self, value):
"""
Attributes:
value: string or tuple
if string, then it will be used hostname
if tuple it will be used as (hostname, user, password, port)
"""

if isinstance(value, tuple):
l = len(value)
if l > 1:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

my 2¢

try:
  self.hostname = value[0]
  self.username = value[1]
  …
except IndexError:
  pass

self.hostname = value[0]
if l > 2:
self.username = value[1]
if l > 3:
self.password = value[2]
if l > 4:
self.port = int(value[3])
else:
self.hostname = value

def __repr__(self):
return '<ReportSlave hostname=%s username=%s password=%s port=%d>' %\
(self.hostname, self.username, self.password, self.port)

def encoded(self, server_id, master_id=0):
"""
server_id: the slave server-id
master_id: usually 0. Appears as "master id" in SHOW SLAVE HOSTS
on the master. Unknown what else it impacts.
"""

# 1 [15] COM_REGISTER_SLAVE
# 4 server-id
# 1 slaves hostname length
# string[$len] slaves hostname
# 1 slaves user len
# string[$len] slaves user
# 1 slaves password len
# string[$len] slaves password
# 2 slaves mysql-port
# 4 replication rank
# 4 master-id

lhostname = len(self.hostname)
lusername = len(self.username)
lpassword = len(self.password)

packet_len = (1 + # command
4 + # server-id
1 + # hostname length
lhostname +
1 + # username length
lusername +
1 + # password length
lpassword +
2 + # slave mysql port
4 + # replication rank
4) # master-id

MAX_STRING_LEN = 257 # one byte for length + 256 chars

return (struct.pack('<i', packet_len) +
int2byte(COM_REGISTER_SLAVE) +
struct.pack('<L', server_id) +
struct.pack('<%dp' % min(MAX_STRING_LEN, lhostname + 1),
self.hostname) +
struct.pack('<%dp' % min(MAX_STRING_LEN, lusername + 1),
self.username) +
struct.pack('<%dp' % min(MAX_STRING_LEN, lpassword + 1),
self.password) +
struct.pack('<H', self.port) +
struct.pack('<l', 0) +
struct.pack('<l', master_id))


class BinLogStreamReader(object):

"""Connect to replication stream and read event
"""
report_slave = None

def __init__(self, connection_settings, server_id, resume_stream=False,
blocking=False, only_events=None, log_file=None, log_pos=None,
filter_non_implemented_events=True,
ignored_events=None, auto_position=None,
only_tables=None, only_schemas=None,
freeze_schema=False, skip_to_timestamp=None):
freeze_schema=False, skip_to_timestamp=None,
report_slave=None):
"""
Attributes:
resume_stream: Start for event from position or the latest event of
Expand All @@ -55,6 +143,7 @@ def __init__(self, connection_settings, server_id, resume_stream=False,
only_schemas: An array with the schemas you want to watch
freeze_schema: If true do not support ALTER TABLE. It's faster.
skip_to_timestamp: Ignore all events until reaching specified timestamp.
report_slave: Report slave in SHOW SLAVE HOSTS.
"""
self.__connection_settings = connection_settings
self.__connection_settings["charset"] = "utf8"
Expand Down Expand Up @@ -85,6 +174,9 @@ def __init__(self, connection_settings, server_id, resume_stream=False,
self.auto_position = auto_position
self.skip_to_timestamp = skip_to_timestamp

if report_slave:
self.report_slave = ReportSlave(report_slave)

def close(self):
if self.__connected_stream:
self._stream_connection.close()
Expand Down Expand Up @@ -118,6 +210,21 @@ def __checksum_enabled(self):
return False
return True

def _register_slave(self):
if not self.report_slave:
return

packet = self.report_slave.encoded(self.__server_id)

if pymysql.__version__ < "0.6":
self._stream_connection.wfile.write(packet)
self._stream_connection.wfile.flush()
self._stream_connection.read_packet()
else:
self._stream_connection._write_bytes(packet)
self._stream_connection._next_seq_id = 1
self._stream_connection._read_packet()

def __connect_to_stream(self):
# log_pos (4) -- position in the binlog-file to start the stream with
# flags (2) BINLOG_DUMP_NON_BLOCK (0 or 1)
Expand All @@ -134,6 +241,8 @@ def __connect_to_stream(self):
cur.execute("set @master_binlog_checksum= @@global.binlog_checksum")
cur.close()

self._register_slave()

if not self.auto_position:
# only when log_file and log_pos both provided, the position info is
# valid, if not, get the current position from master
Expand Down