Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Analyzing VReplication behavior #8056

Open
shlomi-noach opened this issue May 6, 2021 · 12 comments
Open

Analyzing VReplication behavior #8056

shlomi-noach opened this issue May 6, 2021 · 12 comments

Comments

@shlomi-noach
Copy link
Contributor

shlomi-noach commented May 6, 2021

I was looking into VReplication behavior as part of ongoing improvement to VReplication for Online DDL. It led me into a better understanding of the VReplication flow, and what I believe to be bottlenecks we can solve. I did some benchmarking in a no-traffic scenario (traffic scenario soon to come) to better understand the relationship between the different components. I'd like to share my findings. This will be long, so TL;DR:

  • The major bottleneck is gRPC
  • Current design is to use large thresholds, large timeouts, large queries, in order to reduce gRPC traffic
  • There are advantages to using smaller thresholds/queries, and it is possible to decouple those sizes from the gRPC traffic.
  • Consistent snapshots and accurate GTID tracking can be replaced with a more relaxed algorithm
  • I offer alternate design(s)
  • Different use cases may require different designs

Sample PR to demonstrate potential code changes: #8044

  • Yes, most VReplication tests fail on this PR. Ongoing work.

Disclaimer: some opinions expressed here based on my personal past experience in production. I realize some may not hold true everywhere. Also, tests are failing, so obviously I haven't solved this yet.

But let's begin with a general explanation of the VReplication flow (also see Life of a Stream). I'll use an Online DDL flow, which mostly simple: copy shared columns from source to target. Much like CopyTables.

The general VReplication flow

The current flow uses perfect accuracy by tracking down GTIDs, locking tables and getting consistent read snapshots. For simplicity, we illustrate the flow for a single table operation. It goes like this:

  • There are two sides to the flow: the source (vstreamer) and target (vreplicator)
  • There are two data dimensions to the flow: exxisting table data, and incoming binlog changes on that table
  • vstreamer (rowstreamer) is responsible of reading & streaming table data from source to target
  • vreplicator has two main components:
    • vcopier: reads rows streamed from vstreamer and INSERTs them to target table
    • vplayer: tails the binary logs and applies relevant events to the target table

These building blocks are essential. What's interesting is how they interact. We identify three phases: Copy, Catch-up, Fast-forward

Copy

  • vplayer will only apply changes to rows already copied, or anything before those queries.

  • so we begin with vplayer doing nothing

  • rowstreamer begins work

  • rowstreamer runs lock table my_table READ (writes blocked on the table) and gets opens a transaction with consistent snapshot, and reads current GTID.
    Which means, a locking & blocking operation, which gets us a GTID and a transaction that is guaranteed to read data associated with that exact GTID

  • in that transaction, rowstreamer runs a select <all-relevant-columns> from <my_table> order by <pk-columns>.
    this is an attempt to read basically all columns, all rows from a table in a single transaction, and stream those results back.

  • rowstreamer sends down the GTID value (to be intercepted by vcopier)

  • rowstreamer reads row by row; when the data it has accumulated from the query exceeds -vstream_packet_size, it sends down accumulated rows, runs some housekeeping, and continues to accumulate further rows.

  • Meanwhile, vcopier got the GTID, takes note.

  • vcopier now gets the first batch of rows. It writes them all at once, in a single transaction, to MySQL.
    In that same transaction, it updates _vt.copy_state table with identities of written rows (identified by last PK)

  • The flow then switches to Catchup

Catchup

  • Vplayer processes events from the binary log. By now the binary logs have accumulated quite a few events.
  • It discards anything that is not related to our table(s)
  • It discards any event that has a larger PK than the ones we already copied
  • It applies any remaining events: insert/update/delete on our table rows
    In same transaction where we apply the event, we update _vt.vreplication table with associated GTID
  • It goes on without specific limit; it stops when replication lag is small enough, which means we've worked through almost all of the binlog (read: backlog)

Side notes

  • If the table is small enough, we might actually suffice with the flow thus far. Remember, vstreamer was actually selecting all rows from the table. So if all went well, we copied all rows, we caught up with binlog events, and we're good to cut-over or complete the workflow.

  • However, if the table is large enough, there are situations where we are not done.

    1. Any network issue may interrupt vstreamer from sending the rows.
    2. vreplicator has a copyTimeout of 1 hour. When that timeout expires, vreplicator sends an io.EOF to vstreamer, which aborts the operation.

    In either of these scenarios, vreplicator has the last processed GTID and last read PK, and can resume work by:

    • fast forward (follows)
    • copy again
    • catchup again
    • (repeat)

Fast forward

This is actually a merger between both Copy & Catchup:

  • We call on vstreamer to prepare another snapshot
  • This time vreplicator tells vstreamer: "start with this PK"
    • vstreamer creates a new transaction with consistent snapshot. It prepares a new query: select <all-relevant-columns> from <my_table> where <pk-columns> > :lastPK order by <pk-columns>
      So it attempts to read all table rows, starting from the given position (rows before that position are already handled by vcopier)
  • vstreamer sends down the GTID for the transaction

cut scene

  • vcopier does not proceed to read rows from vstreamer. In the time it took to create the new consistent snapshot, a few more events have appeared in the binary log. Because the flow keeps strict ordering of transactions, the flow wants to first apply all transactions in the binlog up to the GTID just received from vstreamer.
  • vplayer applies those rows

now that we're caught up, we return to vstreamer

  • From this point on we loop back into the start of the flow. vstreamer reads and streams more rows, either manages to read till end of table or it does not. vcopier writes rows, vplayer catches up with events, and so forth.

Benchmark & analysis of impact of factors

Let's identify some factors affecting the flow:

  • vstream_packet_size: the smaller it is, the more back-and-forth gRPC traffic we will see between vstreamer and vcopier: those will be more batches (smaller batches) of data sent from source to target.

  • copyTimeout: the smaller it is, the more we will interrupt the mega-query rowstreamer attempts to run. Theoretically, we could set that value to near infinite, in the hope of delivering the entire table in one single sweep. But as mentioned before, network failures can happen anyway.
    So with a small value, we will:

    • Have more gRPCs (from target to source: "get me more rows please")
    • Cancel more transactions on source
    • Have more locks on LOCK TABLES my_table READ
    • Create more transactions on source
  • While not tunable in master branch, I've made it possible for the source query to have LIMIT <number-of-rows>. What if we voluntarily close the copy cycle on the vstreamer side?
    See these changes to support the new behavior:
    https://github.com/vitessio/vitess/pull/8044/files#diff-a1cffc790e352be31a3f600180d968b6d96f3bd90acf4bfa0a49e3a66611558cR227-R332
    https://github.com/vitessio/vitess/pull/8044/files#diff-862152928bc2f7cafae8ed7dd0fa49608a7f2f60615dc6131059876cd42c0087R206
    Spoiler: smaller reads lead to more gRPC communication. This time vstreamer terminates the communication. vcopier says "well I want more rows" and initiates the next copy. Again, this means:

    • Have more gRPCs (from target to source: "get me more rows please")
    • Cancel more transactions on source
    • Have more locks one LOCK TABLES my_table READ
    • Create more transactions on source
  • To be discussed later, I've also experimented with the overhead of the consistent snapshot and of the table lock.

The below benchmarks some of these params. It is notable that I did not include a vstream_packet_size tuning in the below.

  • On my benchmarks, any vstream_packet_size >= 64K seems to be good enough and similar behavior
  • Performance degraded very quickly on lower values
  • It is apparent that keeping vstream_packet_size high is desirable. But, what's interesting is how the flow is coupled with that value, to be discussed later.
  • In the below benchmark I used a value of 640000 (640k)

The benchmark is to run an Online DDL on a large table. In Online DDL both source and target are same vttablet, so obviously same host, and there's no cross hosts network latency. It's noteworthy that there's still network involved: the source and target communicate via gRPC even though they're the same process.

The table:

CREATE TABLE `stress_test_pk` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT,
  `sig` varchar(40) NOT NULL,
  `c` char(8) NOT NULL DEFAULT '',
  PRIMARY KEY (`id`,`c`)
) ENGINE=InnoDB;

mysql> select * from stress_test_pk limit 10;
+----+------------------------------------------+----------+
| id | sig                                      | c        |
+----+------------------------------------------+----------+
|  1 | af15f1c7691cf684244f610e3b668e9de1dd83d6 | af15f1c7 |
|  2 | 1ccffab9bfa41d8e13fe4a8efa4764a532e6a605 | 1ccffab9 |
|  3 | a45617e5d2158c1a82fb2f98fda0106e15fe4dd2 | a45617e5 |
|  4 | fcffd065bf6e44950206617ce182b59a523986cc | fcffd065 |
|  6 | 10be967cf937218cbba69fd333020e2f1fdddd4f | 10be967c |
|  7 | 13804bb5f9e5cf8e08cc9d363790f339b761907f | 13804bb5 |
|  8 | cdb426bdddfdf660344a9bf875db911f84b30ff2 | cdb426bd |
|  9 | 7ad21b2c9261a85b558b2cdaf684ff616cad002e | 7ad21b2c |
| 13 | 94a02951925b5823fc0566a9dacbda0cad2b16cf | 94a02951 |
| 14 | d6fba9f908b703604a1b428f89d9ea6648ffe0bc | d6fba9f9 |
+----+------------------------------------------+----------+

mysql> select count(*) from stress_test_pk;
+----------+
| count(*) |
+----------+
| 16777216 |
+----------+

*************************** 1. row ***************************
           Name: stress_test_pk
         Engine: InnoDB
        Version: 10
     Row_format: Dynamic
           Rows: 15974442
 Avg_row_length: 82
    Data_length: 1325400064
Max_data_length: 0
   Index_length: 0
      Data_free: 7340032
 Auto_increment: 20578168
    Create_time: 2021-05-05 08:22:03
    Update_time: NULL
     Check_time: NULL
      Collation: utf8_general_ci
       Checksum: NULL
 Create_options: 
        Comment: 

The table takes 1.4GB on disk.

So this table is quite simple, and with some text content to make it somewhat fat.

With snapshot? Traffic? SELECT ... LIMIT copyTimeout (seconds) runtime (seconds) comments
FALSE FALSE unlimited 3600 140
TRUE FALSE unlimited 3600 140
FALSE FALSE unlimited 5 181
TRUE FALSE unlimited 5 185
FALSE FALSE 1000000 3600 166 ~10sec per round
TRUE FALSE 1000000 3600 167 ~10sec per round
FALSE FALSE 1000000 60 165
TRUE FALSE 1000000 60 165
FALSE FALSE 1000000 5 180
TRUE FALSE 1000000 5 182
FALSE FALSE 100000 3600 350 ~1.5sec per round
TRUE FALSE 100000 3600 352 ~1.5sec per round
FALSE FALSE 20000 60 infitinty like, ETA=hours
TRUE FALSE 20000 60 infitinty like, ETA=hours

For now, only consider rows With Snapshot=TRUE.

This is a subset of some more experiments I ran. The results are consistent up to 2-3 seconds across executions. What can we learn?

  • The obvious winner (faster) is the existing flow in master branch. No LIMIT to the SELCET query. 1 hour timeout (longer than it takes to copy the table)
  • small LIMITs are catastrophic
  • small timeouts are likewise catastrophic

Now, I began by blaming this on gRPC. However, the above is not enough information to necessarily blame gRPC. Smaller timeout == more calls to vstreamer. Smaller LIMIT == more calls to vstreamer. It makes sens to first suspect the performance of a vstreamer cycle. Is it the LOCK TABLES, perhaps? Is it the transaction WITH CONSISTENT SNAPSHOT? Anything else about vstreamer? The following indicator gives us the answer:

Further development, which I'll discuss later, actually creates smaller LIMIT queries, but continuously streaming. I'll present it shortly, but the conclusion is clear: if you LIMIT without creating new gRPC calls, performance remains stable. BTW, @vmg is similarly looking into decoupling the value of vstream_packet_size from INSERTs on vcopier side; we can batch smaller chunks of INSERTs from one large vstream_packet_size.

But, but, ... What's wrong with the current flow?

It looks like a winner. BTW it is also 2 minutes faster than a gh-ost migration on same table! So is there anything to fix?

A few things:

  • The current flow only runs one table at a time. That is, it can operate on multiple tables, but will only copy+catchup+ff a single table at a time. We are looking to parallelize that. Flows like Resharding or MoveTables could see massive gains.
  • I claimed earlier that a single SELECT <all-relevant-columns> FROM my_table ORDER BY <pk-columns>, in a CONSISTENT SNAPSHOT transaction is unsustainable. Evidently (and discussed internally) quite a few users have been using this and without reporting an issue. Either that executed on a replica, or their workload is somehow different, or my claim is invalid. Or in some gray area in between. I just can't shake off my experience where a high history list length predicts an imminent outage in production.
  • Anyway, if we are to read and stream multiple tables at once, we must abandon the Grand Select approach. If only because we can't SELECT ... from multiple tables at once in a single transaction... A transaction requires us to serialize our queries.

So what I'm looking into now, is how to break down the Grand Select into multiple smaller selects, while:

  1. Not breaking the correctness of the flow, and:
  2. Not degrading performance.

(1) is easy, right? Just use LIMIT 10000. But we see how that leads to terrible run times.

Other disadvantages to the current flow:

  • It's "unbalanced". You can spend 1hour copying rows and then 30min applying binlogs. The direct incentive I had in looking into this was to support a Online DDL progress % & ETA. The current flow makes it difficult to predict an ETA. We can maybe predict how long it's going to take for rowcopy to complete (based on number of table rows and number of copied rows), but we can't predict how long it will take to ctachup with the binary logs. In gh-ost, there is frequent switch between rowcopy and binlog catchup, and so the progress becomes more predictable. I'll discuss the gh-ost logic shortly.
  • As per performance traits above, it is essential in our flow that we perform large bulks of row copy and large bulks of binlog catchup, and there is no easy way to interleave both.
  • It is wasteful for deleted rows. The Grand Select will read rows even if they're deleted seconds after snapshot began. vcopier will copy, write down those rows, and vplayer will later delete them.
  • The logic is complex.

Of course, let's not forget about the advantages of the current flow:

  • It works! It answers multiple use cases with one single logic. It's amazing.
  • It is accurate. The way we wait for a GTID, the way we create a consistent snapshot, the way we catchup and fast-forward, is precise, and the results are predictable at every single transaction.
  • I like how the catchup does not bother to apply events > :lastPK. That is, the flow requires this behavior, but a side effect is that we only process relevant rows.

Anyway. If only for the sake of parallelism, I'm looking into eliminating the consistent snapshot and breaking down the Grand Select query into smaller, LIMITed queries.

The gh-ost flow

I'd like to explain how gh-ost runs migrations, without locking and without consistent snapshots or GTID tracking. gh-ost was written when we were not even using GTIDs, but to simplify the discussion and to make it more comparable to the current VReplication flow, I'll describe gh-ost's flow as if it were using GTIDs. It's just syntactic sugar.

I think we can apply the gh-ost flow to Vitess/VReplication, either fully or partially, to achieve parallelism.

The flow has two componenst: rowcopy and binlog catchup. There is no fast forward. Binlog catchup are prioritized over rowcopy. Do notice that gh-ost rowcopy operates on a single machine, so it runs something like INSERT INTO... SELECT rather than the VReplication two step of SELECT into memory & INSERT read values.

The steps:

  • Begin. Create target table etc.
  • Mark current GTID.
  • Begin tailing the binary logs starting said GTID
    • We actually begin applying binlog evens at this time! But more on this shortly.
  • On source table, evaluate min PK and max PK.
    • We will only copy rows in that range.
  • Binlog catchup is prioritized. Are there binlog events to consume? If yes:
    • Apply binary logs!
    • Wait. We haven't even copied a single row. What does it mean to apply a binlog event if it's a DELETE? If it's an UPDATE?
      • If it's a DELETE, we delete the row, whether it actually exists or not.
      • If it's an INSERT, we convert it into a REPLACE INTO and execute
      • If it's an UPDATE, we convert it into a REPLACE INTO and execute.
        This means an UPDATE will actually create a new row.
      • Continue applying binlog events, one at a time, until we've consumed the binary logs.
  • Now that we've consumed the binary log (for now), we can switch to rowcopy.
    • This is our first iteration. Evaluare the PK range of the next 1000 rows to copy
    • 1000 as an example, we only copy between 10 and 10000 rows at a time. these limites are hard coded into gh-ost.
    • Issue a INSERT INGORE INTO _my_table_gho SELECT <relevant-columns> FROM my_table WHERE <pk in computed range>
      Notice that we INSERT IGNORE.

Methodic break. We prioritize binlog events over rowcopy. This is done by making the binlog/catchup writes a REPLACE INTO (always succeeds and overwrites), and making rowcopy INSERT IGNORE (always yields in case of conflict).

consider that UPDATE we encountered in the catchup phase, and where we didn't even copy a single line. That UPDATE was converted to a REPLACE INTO, creating a new row. Later, in the future, rowcopy will reach that row and attempt to copy it. But it will use INSERT IGNORE so it will yield to the data applied by the binlog event. If no transaction ever changed that row again, then the two (the REPLACE INTO and the INSERT IGNORE for the specific row) will have had the same data anyway.

It is difficult to mathematically formalize the correctness of the algorithm, and it's something I wanted to do a while back. Maybe some day. At any case, it's the same algorithm created in oak-online-schema-change, copied by pt-online-schema-change, and continued by fb-osc and of course gh-ost. It is well tested and has been in production for a decade.

Back to the flow:

  • Once done copying the range, we switch back to binlog/catchup. Are there more events? If so, apply them.
  • No more events? Copy next range.
  • Repeat, repeat, repeat...
  • ...Until we've copied the last rows (we've reached maxPK which we evaluated at the beginning of the migration)
  • We catchup with the remaining binlog events
  • Keep on doing so until replication lag is low or otherwise things look healthy
  • Attempt cut-over
    • If successful, great
    • If not, we used some short timeouts, continue applying binlog events, try cut-over again in next opportunity.

Some characteristics of the gh-ost flow:

  • It's simple(r). It does not require coordination between rowcopy and binlog/catchup
  • rowcopy does not require LOCK TABLES (only) final cut-over phase does)
  • rowcopy does not require consistent snapshot transactions
  • rowcopy can work in small chunks (in fact, our experiments in production showed little change in overall performance between 50 row chunk size and 1000 row chunk size; for some workloads it could make a difference)
  • binlog/catchup does not filter events; it just applies whatever it sees, whether the row is already copied or not.
  • It is accurate
    • Except for this particular scenario: adding a new UNIQUE KEY.
      • In this scenario, and because of the INSERT IGNORE and REPLACE INTO queries, when you add a new UNIQUE KEY, gh-ost will silently drop duplicate rows while copying the data or while applying binlog events to the new table.
      • To clarify, the operation will be successful, and the resulting table will be consistent.
      • But people expect a different thing: they expect th emigration to fail if the table (or incoming traffic) do not comply with intended UNIQUE constraint.
      • The VReplication flow, in comparison, will fail the opration.
  • The operation is more balanced
    • There is a frequent swithch between rowcopy and binlog/catchup
      We prioritise binlog/catchup just because we want to avoid a log backlog and the risk of binlog events being purged, but logically we could switch any tim ewe like.
    • Which means the rowcopy progress is a good predictive of overall progress
      • again, consider that by definition we keep replication lag low, and binlog backlog low. So at all times we've goot a good estimate about remaining tasks.
      • We get a reliable progress/ETA for the process.
  • The operation is more wasteful: gh-ost applies binlog events for rows we haven't copied yet. We could skip those events like VREplication does, at the cost of tracking PK for the table.
    It is in a sense the opposite of the VReplication wasteful scenario. VReplication is wasteful for deletes, gh-ost is wasteful for INSERTs and UPDATEs.

Where/how we can merge gh-ost flow into VReplication

I'll begin by again stressing:

  • The gh-ost flow cannot solve the issue of adding a UNIQue KEY. The resulting table is consistent, but the user might expect an error if duplicates are found.
  • I'm not sure yet about Materialization with aggregate functions.

Otherwise, the process is sound. The PR actually incorporates gh-ost logic now:

benchmark results

This works for Online DDL; as mentioned above other tests are failing, there' still some termination condition to handle. We'll get this, but the proof of concept:

With snapshot? Traffic? SELECT ... LIMIT copyTimeout (seconds) runtime (seconds) comments
FALSE FALSE 10000 3600 140

The above shows we've managed to break the source query into multiple smaller queries, and remain at exact same performance.

It is also the final proof that our bottleneck was gRPC to begin with; not the queries, not database performance.

How can the new flow help us?

  • It can help us because we can reads from multiple tables, concurrently.
  • We will stream the data sequentially
  • But on vcopier side, we could split it again, and write 1000 rows of this table, and 1000 of that table, concurrently
  • Applying binlog events is immaterial to which table has been copied or has not been copied
  • If we go the full gh-ost flow, we don't even need to track PK per table, this simplifies the logic.

Different approaches?

At the cost of maintaining two different code paths, we could choose to:

  • Use the new flow for MoveTables, OnlineDDLs, Resharding, import/export
  • Keep the old flow for Materialize
    I'm still unsure about aggregation logic/impact, and @sougou suggested Materialized Views are in particular sensitive to chunk size and prefer mass queries.
  • Keep the old flow for Online DDL that adds a new UNIQUE KEY constraint

To be continued

I'll need to fix outstanding issues with the changed flow, and then run a benchmark under load. I want to say I can predict how the new flow will be so much better - but I know reality will have to prove me wrong.

Thoughts are welcome.

cc @vitessio/ps-vitess @rohit-nayak-ps @sougou @deepthi @vmg

@shlomi-noach
Copy link
Contributor Author

I realize this is a terribly long read. Not sure I'd survive it myself. If there's any thoughts before I pursue the suggested path, please let me know.

@rohit-nayak-ps
Copy link
Contributor

Thanks for the detailed explanation/comparison of the two approaches. Since I have worked only with the current VReplication algo, I don't have an intuitive feel for which algo will perform better in the at-scale sharded Vitess deployments. Some comments/thoughts, more to clarify my understanding ...

Takeaway

The stream-first gh-ost approach is well-tested. The current copy-first approach works for all VReplication workflows. But horses-for-courses: now that we are hitting perfomance blocks we may want to implement the stream-first approach. stream-first likely provides an simpler path for movetables/reshard flows and it should be easier to concurrently copy multiple tables and parallelizing a single table copy. copy-first will still be needed for certain Materialize workflows, at least to start with.

Notes (in no particular order)

  • We haven't (yet!) had any issues with the "large select" causing an outage or delays in query serving. That said, gh-ost has been tested orders of magnitude more.

  • The current approach does support all use cases with a single approach. Adding multiple approaches will add more code complexity. We need to provide a common abstraction for logging/metrics/operation visibility so the choice of algo is opaque to the enduser as far as possible.

  • Having "insert ignore", "replace ignore" intuitively feels more expensive than a direct "insert" or "update/delete". However the current approach does have where clauses to filter out rows in catchup using PK comparisons. Maybe they cancel out.

  • One way to increase the copy throughput for large tables is to do concurrent inserts of packets. A naive POC recently yielded a 30% increase in copy performance (4 threads were optimal) locally and presumably will do better in actual environments. But there are warnings that this might hit MySQL gap lock deadlocks. Using the stream-first approach presumably the chunks can be spread out to avoid these deadlocks and index page contention.

  • I assume the state of a workflow in the stream-first case will be ([table, [(min PK,max PK)]], gtid)

  • For multiple tables and a chunked single table the copy phase for the tables/chunks would be concurrent and the binlog streaming would be serialized

@shlomi-noach
Copy link
Contributor Author

I like the terms "stream-first" and "copy-first".

Adding multiple approaches will add more code complexity.

The current state of #8044: it's stable and working, and supports both approaches with minimal abstraction. The difference between the two isn't that big, really, in terms of code changes.
Right now #8044 supports a query comment to flag "I want stream-first" or "I want copy-first". The abstraction is therefore already complete, even if we change the mechanism (ie instead of query comment we change RPC signatures).

so the choice of algo is opaque to the enduser as far as possible.

It will be absolutely opaque to the user. According to #8044: Materialize with GROUP BY ==> copy-first. Online DDL which adds a UNIQUE constraint ==> copy-first. The rest: stream-first.

Having "insert ignore", "replace ignore" intuitively feels more expensive than a direct "insert" or "update/delete". However the current approach does have where clauses to filter out rows in catchup using PK comparisons. Maybe they cancel out.

I agree that copy-first is more efficient and does not waste double-writes. assuming we proceed with stream-first, we can also apply PK comparisons for UPDATEs, something copy-first does not support, today (for testing/mockup reasons).

Using the stream-first approach presumably the chunks can be spread out to avoid these deadlocks and index page contention.

Not sure stream-first has an advantage over copy-first in this regard.

I assume the state of a workflow in the stream-first case will be ([table, [(min PK,max PK)]], gtid)

Per #8044 the state is identical to copy-first. min-PK == lastPk anyway, and max-PK can be re-evaluated if needed (in case vreplication is restarted).

For multiple tables and a chunked single table the copy phase for the tables/chunks would be concurrent and the binlog streaming would be serialized

Right. And I wasn't even planning on running concurrent copies of chunks of same tables; I was only considering parallelizing multiple table writes.

@shlomi-noach
Copy link
Contributor Author

Summary of video discussion 2021-06-01 between @deepthi , @rohit-nayak-ps and myself

We scheduled the discussion in an attempt to get a shared understanding and consensus of how we should proceed with VReplication parallelization/optimization efforts.

There are two use cases in two different trajectories:

  • User has many tables, can we parallelize read of tables in source and write of tables in target?
  • User has one very large table and rest of tables are minor. Parallelizing can only go up as far as 2 because the large table's copy can outlast combined copy of all other tables. Can we paralleize writes of a single table?

Single table parallelization

First we concluded we must experiment a bit to see how much we can gain by parallelizing writes of a single table.

Single table optimization plausibility

I ran a hacky experiment on a ~120MB table:

  • Exported into many 5,000*rows chunks
  • Importing the chunks into an empty table at various parallel jobs number

I noticed a major difference if the table does or does not have a AUTO_INCREMENT column:

  • Without AUTO_INCREMENT I saw up to x3 speed in wall clock load time, if parallelized at 10 - 12 jobs
    • In addition, if chunks were completely shuffled, another 10% time shave off
  • With AUTO_INCREMENT, a 30% improvement going up to 2 jobs, and no further improvement in higher number of concurrent jobs.

The latter is a bit discouraging.

Single table optimization implementation

This is entirely on the vreplication (target) side, irrespective of any changes in the streamer (source) side.

Target side receives rows up to some max-packet-size. As @vmg noted, nothing forces us to wait for the packet to fill up before writing down rows to the target table. We can choose to flush rows every 1,000 or 5,000 rows at will. Or, we can wait for, say, 50,000 rows to be fetched, and then write them down in 10 concurrent writes, each with 5,000 rows.

That part is easy. Somewhat more complicated is how we track each such write. Today we only track the last-PK written in copy_state. But with multiple concurrent writes, we may want to write down from->to of every distinct chunk, and possibly squash them once entire range (of 50,000 rows in our example) is written. Or, we can use an optimistic approach and at worst case (failure while some writes take place) re-fetch the entire 50,000 rows again and re-write them, even though some of them may have been persisted already.

We are yet to conclude whether it's worth trying to parallize writes for a single table.

Multi table parallelization

Let's consider the existing logic, which @rohit-nayak-ps coined as copy-first.

As illustrated in the original comment, this logic uses a transaction with consistent snapshot. A stream cannot read multiple tables like that and therefore we cannot parallelize tables on a single stream.

Our options are:

  1. Run multiple streams
  2. Switch to the alternate logic suggested in the original comment, which @rohit-nayak-ps coined as stream-first (or binlog-first?)

1. Multiple streams

This was the approach that was first considered when the idea for parallel vreplication first came up. Discussion points for this approach:

  • IMO (shlomi) transaction with consistent snapshot is bad enough, * n of these can be catastrophic.
    @deepthi notes that most (all?) customers use a replica tablet as source, and probably this is the reason why no one reported the catastrophy I suggested.
  • We know gRPC is a significant overhead to VReplication communication (notwithstanding recent work by @vmg). Multiple streams will waste *n network/time resources.
  • If the user has 1,000 tables, that doesn't mean we can create 1,000 streams. We'll probably create some, say, 10 streams.
    The problem is that this makes things extra complicated. This is not like 10 job runners running through 1000 jobs. A job runner normally fully completes one job and moves to the next available one. But with VReplication this is not the case. Yes, for each table we only need to copy its rows once, but for that table we still need to catchup/fast-forward via VPlayer until all 1,000 are copied.
    • So this requires quite a bit of juggling. We'd need to go back and forth between all streams.
    • Each stream will have its own reader on the binary log. This both creates more load on source tablet, and also wastes *n times (10 in our example) reading the contents of the binary logs. Each event in the binary log will be processed by n different players, which then choose to either use it or drop it (and at most one will actually use it).
    • @rohit-nayak-ps believes this will be very complex, or as he says, "@sougou level complexity"

2. Change of logic: stream-first

#8044 is a valid (relevant tests are passing) POC where we change the logic to avoid consistent snapshot transactions on the source tablet, which in turn allows us to read small chunks of tables at a time, which in turn allows us to potentially (not implemented yet in #8044) read multiple tables in parallel and stream them down (mixed/sequentially).

  • This means single stream can fetch multiple tables from source. We will limit to n tables at a time (e.g. 10) and the choice should probably be greedy: pick the largest table that isn't copied yet.
  • target fetches data, and gets results from multiple tables. It can read into separate buffers, likely same number n (much simpler if so), and wirte them to target tables, concurrently. We'd need copy_state per such table.
  • The request (from target to streamer) for next rows will include lastPK per existing incomplete table, and the response (from streamer to target) is actually whatever the streamer chooses to deliver, but probably best-effort to provide requested tables, then EOF for tables where all rows are read, and on to the next tables.
  • Target only needs one VPlayer
  • VPlayer discards event for tables not copied yet
  • VPlayer discards rows for mid-copied tables, if those rows >= lastPK (PK value of last row copied) -- much like today's algorithm
  • VPlayer applies events of tables that are fully copied
  • and once all tables are copied, VPlayer continues to apply all events of all relevant tables, until termination (cut-over time)

Downside is that this doesn't exist yet, and is a change in paradigm (we will need to build trust).

Creating a POC for this is basically implementing full functionality. We need a solid and large test case.

Summary

@deepthi suggests to proceed with POC for (2) Change of logic: stream-first.

Thoughts welcome.

@deepthi
Copy link
Member

deepthi commented Jun 3, 2021

Also, based on the discussion, it is likely that the consistent snapshot approach will simply not work for large VReplication-based online DDL (because we have to copy from primary->primary, and not from a replica/rdonly).

@rafael
Copy link
Member

rafael commented Jun 4, 2021

I’m not entirely up to speed with this discussion, but I wanted to chime in with some initial thoughts. Shlomi, thanks for the detailed write-up and all the careful considerations. 

In terms of the technical details of the gh-ost flow, I will have some follow-up questions. I'm still reasoning about the algorithm.

I’m asking myself a somewhat philosophical question: Are the performance benefits from this new approach worth a rewrite? 

My initial answer to this question is no. Here some points on why I’m thinking this way: 

  • After the latest improvements from Rohit, the performance we have been observing in our environment is that we can copy around 30GB per hour. Sure, it can be faster, but as long as this is robust, reliable, and resilient to errors, I’m not overly concerned about how fast this operation finishes. For example, we recently performed a horizontal resharding of a source of ~1TB into four new destination shards, and it took less than 8 hours. 

    Anecdotally, alters on large tables with gh-ost in our environment take multiple days. We could go faster, but that introduces reliability risks. I can provide more details about this, but my main point is that performance is not the most significant concern in this context.

    One more general point on this direction, we recommend that MySQL instances in Vitess shouldn’t exceed 250GB. That would mean that even for non resharding use cases, the upper bound of the current implementation should be around 8 hours. 

    For this part of the system, it seems that we should strive for correctness and robustness. I think the current implementation is proving to excel at that. It has taken us a significant amount of investment to get to this point. I’m not convinced that the performance gains are worth risking a setback in these other dimensions.

  • In this case, although there are some fundamental limitations with the existent approach, I would try to squeeze performance before changing the process entirely. If we run into performance issues that are so bad that we reach the limits of the existing implementation, I would consider a rewrite. We haven’t run into these issues at Slack, but perhaps I’m missing some context on the motivation behind the performance problems we are tackling here. 

  • In the short term, I’m more concerned about VCopy being too fast and not having enough mechanisms to back off when copying data. A concrete example of this: currently, Materialize workflows are not usable for large keyspaces (more than 256 shards) at Slack. The moment we try to create these Workflows, the number of concurrent streams running in the destination shards during the copy phase overwhelms the primaries. 

  • The following is not a technical concern: Over the years, we have accumulated knowledge across multiple people in our community about the existing implementation. We will have to go through that process again with the new implementation. For such a core functionality, this is a considerable risk for me. 

  • One of the properties I appreciate the most about the current implementation is how robust it is. It survives almost any failures that might happen to the shard while it is copying/vreplicating. This might be true in the new approach as well, but it is not obvious from my initial pass. Two failure modes in particular that I’m curious if the new implementation takes into consideration:

    • Will it survive a primary failover in source/destinations? 
    • Will it survive reparents in source/destinations? 

=====

One last side note that I’ve been thinking about while digesting this proposal. I agree that the existing implementation is complex. Buuut I would argue that is conceptually is simpler. I found that it is straightforward to reason about and understand why it is correct. The opposite seems true for the gh-ost flow. Implementation is simpler, but the correctness of the approach is more complex. Granted, this could be just a side effect of my familiarity with how things work today.

=====

Also, based on the discussion, it is likely that the consistent snapshot approach will simply not work for large VReplication-based online DDL (because we have to copy from primary->primary, and not from a replica/rdonly).

I think this is a good property of existent VReplication. I would love to not lose this.

@shlomi-noach
Copy link
Contributor Author

Thank you for your thoughts @rafael!

Some comments on the technical side:

my main point is that performance is not the most significant concern in this context.

That's commendable! I think some use cases require better performance: importing an entire database from an external source is one example where we've projected a multi-week operation in a real use case and with current implementation.

The following is not a technical concern: Over the years, we have accumulated knowledge across multiple people in our community about the existing implementation. We will have to go through that process again with the new implementation. For such a core functionality, this is a considerable risk for me.

The new approach is not all that different from the existing approach. If you take #8044, the change of logic is rather minor.

Will it survive a primary failover in source/destinations?

It will.

Will it survive reparents in source/destinations?

I twill.

I agree that the existing implementation is complex. Buuut I would argue that is conceptually is simpler. I found that it is straightforward to reason about and understand why it is correct. The opposite seems true for the gh-ost flow. Implementation is simpler, but the correctness of the approach is more complex.

I am yet to provide a mathematical proof of the gh-ost logic. I'm not sure how to formalize it. But it has been tested well. Anecdotally, my first logic for online schema changes, oak-online-alter-table, on which pt-online-schema-change is based, provided no test suite, and to my understanding there's yet no testing logic for pt-osc. We will build trust in the new logic by testing. We do have the luxury of already having a mechanism with a test suite, and can test the new logic by running it through the existing suite.

FWIW, coming from the gh-ost side, I personally found the current VReplication logic to be complex -- points of view?

Also, based on the discussion, it is likely that the consistent snapshot approach will simply not work for large VReplication-based online DDL (because we have to copy from primary->primary, and not from a replica/rdonly).

I think this is a good property of existent VReplication. I would love to not lose this.

Not sure how you mean? Did you mean the property of copying from replica/rdonly? This will not change. Again, the switch of logic from as presented in #8044 is basically it (though it does not introduce parallelism yet). There is no fundamental change to the flow. It's the same streamer, same vcopier, same vplayer. Streamer just reads rows in a different way and flushes them in a different way. Player runs a replace into where it used to run insert into. But there is no change to the identities of the source/target, no change in the creation of both, the communication layer of both, in the backend table tracking of the process. Really the biggest change is "no table locks, no consistent snapshot, and multiple SELECT queries as opposed to a single SELECT query".

To me the "lock/consistent snapshot/big SELECT" is intimidating. I can't imagine running a 8 hour operation that would need to maintain an open transaction with consistent snapshot on a primary. In my experience this can lead to outage. That was my main concern embarking on the task. Assuming I'm correct about the risk on a primary (and that was my clear experience at my previous job), that means you always have to rely on a replica to be handy to run some operation. I'm not sure how much of a big deal that is.

For Online DDL, the design is to run/read/write on primary. It's possible to run replica->primary, I think, but haven't explored it yet.

@rafael
Copy link
Member

rafael commented Jun 6, 2021

That's commendable! I think some use cases require better performance: importing an entire database from an external source is one example where we've projected a multi-week operation in a real use case and with current implementation.

Oh if you already have use cases where it will take multiple weeks, I definitely agree it requires better performance. I'm still curious of more details of what kind of external databases. With my current understanding of performance, it seems that is external databases that are larger than 10 TB and also resharding many tables as part of this process. Something like that??

My point around performance here is that so far, they have been solvable without major changes. There are other issues (like the one I mentioned for Materialize tables), that are blockers and we don't have solutions.

Did you mean the property of copying from replica/rdonly?

Yes, that was the property. Ah great, this is not changing.

To me the "lock/consistent snapshot/big SELECT" is intimidating. I can't imagine running a 8 hour operation that would need to maintain an open transaction with consistent snapshot on a primary. In my experience this can lead to outage. That was my main concern embarking on the task.

Original implementation here didn't use consistent snapshot and it was a short lock. So it was not opened for multiple hours. Would you have the same concern in that case? If I recall correctly, the consistent snapshot was introduced to reduce even further the duration of the locks. I wonder if that optimization actually just added a reliability risk.

The previous implementation was used at Slack extensively and we never ran into issues. For context, it was not for copy, but for VDiff. We used it when we migrated our legacy systems to Vitess. This is a similar use case to the one you describe of copying an external database. The main difference in our approach is that we didn't use the copy phase, we started from a backup and used vreplication to catch up. To your point, among others, performance was a concern for this use case and we avoided copying the data altogether.

The new approach is not all that different from the existing approach.

If this is the case, I'm less concerned. I haven't actually look at the PR yet, I was catching up with the discussion here. I still have to do my homework of reflecting more about the new algorithm.

@shlomi-noach
Copy link
Contributor Author

I am not really familiar with the previous implementation, myself, so I have no insights on that. I'll ask around though.

@shlomi-noach
Copy link
Contributor Author

I'm gonna pivot a bit and focus again on current implementation, coined copy-first. I mentioned earlier that one scenario binlog-first approach can't handle is adding a new UNIQUE KEY or otherwise a new unique constraint (e.g. reducing the number of columns covered by an index, introduces a logical constraint).
I have recently met with multiple such situation and I'd like to accommodate them.

Moreover, I've come to realize that with copy-first approach, we can work on completely different keys on source and target. One may have PRIMARY KEY(uuid) on source and PRIMARY_KEY (autoinc_id) on target, and we can still make a schema migration consistent! The only constraint is that we still share the source key columns between the two tables, which is a significant relaxation of Online DDL limitations.

So, gonna look more closely into how we can optimize with current logic.

@shlomi-noach
Copy link
Contributor Author

shlomi-noach commented Jul 14, 2021

I have a rough draft (in my brain) of how we might run concurrent table copy using the existing VReplication logic, on top of a single stream. It will take me a while to write it down in words, and I'm still unsure how complex it would be to implement it in code.

@shlomi-noach
Copy link
Contributor Author

Some design thoughts and initial work on parallel vreplication copy: #8934

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

5 participants