Skip to content

Databus for MySQL

Jingguo Yao edited this page Jan 9, 2015 · 22 revisions

Introduction

A frequently asked question on the Databus open source mailing list is about the possibility of capturing changes in MySQL through Databus. Below, we describe a proof-of-concept implementation of a Databus fetcher that taps into MySQL internal replication stream, the MySQL binary log (the binlog, for short).

At a high-level, the Databus MySQL adapter connects as a replication slave to the MySQL database. It converts the replication events to Databus internal event format and stores those events in the relay’s memory buffer. For parsing the incoming binlog replication stream, the fetcher uses an open-source Java library called “OpenReplicator”. The library is available here.

Quick Start

  • You need a MySQL instance running with binlog replication enabled. Please refer to How to Set Up Replication guide on how to enable the binlog replication for the MySQL instance. That MySQL instance will act as a master for the Databus relay. If your mysql version has binlog_checksum feature, it has to be disabled for the change-capture to work.
  • Compile the example

The code is available on github. It may be built with the following commands:

$ gradle -Dopen_source=true assemble : Tested with gradle-1.10 version

  • Run the MySQL example

$ cd build/databus2-example-relay-pkg/distributions

$ tar -zxvf databus2-example-relay-pkg.tar.gz

$ (cd bin && ./create_person.sh) : The script assumes that MySQL is started on port 33066; please change it appropriately for your setup. It creates a database called ‘or_test’, a table called ‘person’ within it, and it inserts 9 sample rows in that table.

$ ./bin/start-example-relay.sh or_person -Y ./conf/sources-or-person.json : This script starts a Databus relay and subscribes for change-capture for the table ‘or_test.person’. If the mysql is running on a port other than 33066, please change the port number and server-id present in the uri section in conf/sources-or-person.json to the correct-port. This relay uses the http port 11115 (configured in conf/relay_or_person.properties)

  • Test if the relay has successfully been started

A quick way is to issue a curl command to the relay’s HTTP port as specified in conf/relay_or_person.properties (11115). The logical source id specified for the table ‘Person’ in conf/sources-or-person.json is 40:

$ curl -s http://localhost:11115/sources
[{"name":"com.linkedin.events.example.or_test.Person","id":40}]
  • Insert an event into the table with a command like:
update person set first_name='John' where id=1;
  • Check if the relay has received the event from the database with the following command:
$ curl -s http://localhost:11115/containerStats/inbound/events/total?pretty | grep -m1 numDataEvents
"numDataEvents" : 1,

Notes on the implementation

The current implementation although fully functional should be considered a proof-of-concept. Its main goal is to demonstrate how to write a MySQL fetcher for Databus. The basic functionality of the fetcher has been tested but has not been deployed in our production environment. At LinkedIn, we run a slightly different implementation which relies on a custom-patched MySQL.

A quick question arises in the implementation of a MySQL adapter - how do we define the logical clock for the event sequence ?. The event sequence number (System Change Number or SCN for short) is used as a timeline for ordering events from various transactions occurring at the database. Please refer to Section 3.2 in our paper titled “All Aboard the Databus”. It is also used in the consumers’ checkpoints to determine where they are in the change stream.

There are several possible approaches for defining the logical clock.

The MySQL binlog offset

This is the simplest approach and it is the one used by this implementation. Each replication slave keeps track of its position in the replication stream through a pair of values: the current binlog file and the offset within that file. The binlog files share common prefix and have an increasing index number for their suffix. Therefore, the position can be uniquely identified by the tuple (file number, file offset). Both the file index and offset increase monotonically and thus the SCNs will be exactly in the commit order.

In the current implementation, SCN is represented as a long (64 bits). Of the 64 bits, the high-order 32 bits are used to represent the binlog file number, and the low-order 32 bits are used to represent the binlog file offset. Therefore, an event starting in binary log file mysql-bin.000001 at binlog offset 4 is represented as (1 << 32) | 4 = 4294967300 .

Advantages

  1. Simple to understand and easy to implement
  2. Works with Vanilla MySQL-5.5.x architecture
  3. Works well for non-clustered environments
  4. Maintains the commit order of the updates

Limitations

This approach inherits the typical limitations of the MySQL replication which stem from the fact that the binlog offset is not stable and may change.

  • Changes on the MySQL master node

The binlog files may change as a result of DBA commands like ‘reset master’. That command resets the binlog file/sequence number generation to initial values of mysql-bin.000001 and binlog offset 4, i.e. resets the logical clock. After such command, all downstream components (Databus relays, Databus bootstraps, Databus consumers) need to be reset to the new timeline. A simple workaround is to just use ‘purge logs’.

  • Changes across MySQL nodes due to clustered setups

A typical MySQL setup will have a master database and one or more slave databases. MySql replication does not guarantee that binlog file numbers and sequences match on the slave nodes with corresponding masters in the cluster. In such a case, if a Databus relay is connected to a slave storage node, the binlog co-ordinates of a transaction may be different than that on the master. A Databus client switching from a Databus relay connected to such a master node to a relay that is connected to the corresponding slave node will get no/inconsistent data due to the different clocks.

If the client has consumed from node n1 up through scn1. It would look for transactions greater than scn1 on the new node n2. But due to the nature of MySQL replication, the newly committed transaction for the database can actually be numerically smaller than scn1, which will cause the new transaction to be missed and thereby to affect consistency.

Even if all relays capture updates from a single MySQL node, we can run into the same problem with the relays need to fail-over to a different node.

The MySQL fetcher implementation has been designed for a single-node MySQL setup, and it has been tested in a single-database (with multiple tables) scenario. It has not yet been deployed in our production clusters. Further, we describe the limitations in the current design and would like to note that the design/implementation may change significantly going forward. Hence subsequent versions may not be backward-compatible.

An application-generated SCN

Another approach is for the application that writes to MySQL to generate the sequence number. For example, this can be achieved using an algorithm like Twitter’s Snowflake. The number can be written in a column which can be easily extracted from the relay.

Advantages

  • Relatively easy to implement
  • Algorithms like Snowflake allow for highly available and scalable generation of globally unique numbers

Limitations

  • Ordering is not guaranteed to be in commit order

Since the SCNs are determined before the updates are written to MySQL, their order may (and probably will) be different from the MySQL commit order. This may be OK for applications that do not rely on strong consistency.

A master-generated SCN

A third approach is for the master MySQL instance to generate the SCN sequence. For example, this can be achieved through an {{auto_increment}} column in a dedicated table. As in the application case, this will create ordering that may differ from the commit order. Further, depending on the write load, there may be contention in updating the sequence.

A more sophisticated approach is to have MySQL generate the SCN sequence. This is the approach we have taken at LinkedIn. We run a modified version of MySQL 5.5.8 which generates a sequence number after the transaction commit. The sequence number is prefixed by a generation id which increases every time the mastership is handed over to a different MySQL node. This method of SCN generation preserves both the commit order within a single MySQL node and also the ordering of events when the mastership changes. The generated SCN is inserted in the binlog stream so it can be read from any slaves (including the relays).

Newer versions of MySQL (5.6.5 and later) have added support for unique global transaction ids (GTIDs). Those GTIDs are added to the replication log and are used as a replacement of the (binlog file number, binlog file offset) pair. The GTIDs are of the form (server-guid, sequence number). Unlike binlog offsets, slave preserve GTIDs. Mastership transfers still present a challenge for using GTIDs for SCNs. If a new MySQL node becomes the master, it will start using its own server-guid and sequence number which can break ordering. A possible approach is to use GTIDS as a basis for a relay-generated SCNs (see next section).

A relay-generated SCN

The last option is for a relay to generate the SCN sequence. The relay is already capturing the changes in commit order, so it can maintain the order in the generated timeline.

For generating the SCNs, a relay can use the aforementioned GTIDs. A cluster manager (say, Helix) can be used to keep track of mastership changes. Every time a mastership change occurs, this can be recorded in a map (timestamp, new-master-guid) -> generation id. Relays can use the shared map to convert the timestamp and GTID of a binlog event to a 64-bit SCN consisting of a generation id and sequence number.

If using a MySQL 5.6 and GTIDs is not possible, yet another approach is to have relays generate independent SCNs. Relays can map (binlog file number, binlog file offset) pairs from different MySQL replicas to the same SCN timeline. Even though different MySQL replicas have different binlog offsets, they still follow the same timeline determined with the master. The state of a relay can be described by (binlog file number FN, binlog file offset FO, sequence number SCN). This specifies relay the last consumed binlog event has binlog coordinates (FN, FO) that has the given SCN. Every time a relay reads a new transaction from the binlog, it updates (FN, FO) and increases SCN by 1. Since all replicas consume the same update timeline, the changes to SCN are deterministic.

Let’s say that two MySQL replicas M1 and M2 are at the same point in applying the master changes. Let’s say that M1 has its binlog stream at (10, 1000) and M2 has its binlog stream at (50, 4). If a relay R1 is to capture changes from M1, we can bootstrap it with the state (FN=10, FO=1000, SCN=100). Similarly, if a relay R2 is to capture changes from M2, we can bootstrap it with the state (FN=50, FO=4, SCN=100). 100 transactions later, R1 state will change to something like (FN=10, FO=11000, SCN=200) and R2 state will change to (FN=50, FO=10004, SCN=200).

Note that with this approach, divergence between replicas (say, a replica missed a transaction), will lead to divergence of SCNs across different relays consuming from those replicas.

Comparison of SCN generation approaches

SCN generation approach Ease of implementation Commit order Needs MySQL modification Support for MySQL internal replication Support for MySQL mastership transfers Susceptible to master/slave divergence
Binlog offset Easy Yes No No No Yes
Application-generated Medium No No Yes Yes No
Master-generated sequence Complex Yes Yes Yes (with modification to ship SCN) Yes No
Relay-generated using GTIDs Complex Yes No Yes Yes No
Relay-generated independent Complex Yes No Yes Yes Yes

Code Structure

The relevant code is available in databus2-relay/databus2-event-producer-or

  1. OpenReplicatorEventProducer.java Implements the fetcher for mysql
  2. ORListener.java Implements interface methods for processing binlog events
  3. OpenReplicatorAvroEventFactory.java Implements the logic for constructing a databus event in AVRO serialized format from the underlying binlog entry

Features

  1. Real-time change-capture from MySQL
  2. Binlog file rotation on MySQL
  3. Tested for single / multiple tables
  4. Tested for MySQL-5.5.35 (the example relay works with 5.6.14 with binlog_checksum disabled).

Future Work

  1. Automatic Avro schema file generation for a given MySQL table
  2. Support for consistent change-capture in a clustered MySQL environment with mastership transfers
  3. Support for global TXID in MySQL-5.6
  4. Multi-tenancy optimizations (w.r.t. number of fetches of binlog files from master and server-side filtering)
  5. Composite keys