This description of scheduling replicator's functionality is mainly geared to CouchDB developers. It dives a bit into the internal and explains how everything is connected together.
A natural place to start is the top application supervisor:
couch_replicator_sup
. It's a rest_for_one
restart strategy supervisor,
so if a child process terminates, the rest of the children in the hierarchy
following it are also terminated. This structure implies a useful constraint --
children lower in the list can safely call their siblings which are higher in
the list.
A description of each child:
-
couch_replication_event
: Starts a gen_event publication bus to publish replication related events. This used to wait for replication jobs in couch_replicator and in tests to monitor for replication events. Notification is performed via thecouch_replicator_notifier:notify/1
function. -
couch_replicator_connection
: Maintains a global replication connection pool. It allows reusing connections across replication tasks. The main interface isacquire/1
andrelease/1
. The general idea is once a connection is established, it is kept around forreplicator.connection_close_interval
milliseconds in case another replication task wants to re-use it. It is worth pointing out how linking and monitoring is handled: workers are linked to the connection pool when they are created. If they crash, the connection pool will receive an 'EXIT' event and clean up after the worker. The connection pool also monitors owners (by monitoring thePid
from theFrom
argument in the call toacquire/1
) and cleans up if owner dies, and the pool receives a 'DOWN' message. Another interesting thing is that connection establishment (creation) happens in the owner process so the pool is not blocked on it. -
couch_replicator_rate_limiter
: Implements a rate limiter to handle connection throttling from sources or targets where requests return 429 error codes. Uses the Additive Increase / Multiplicative Decrease feedback control algorithm to converge on the channel capacity. Implemented using a 16-way sharded ETS table to maintain connection state. The table sharding code is split out tocouch_replicator_rate_limiter_tables
module. The purpose of the module it to maintain and continually estimate sleep intervals for each connection represented as a{Method, Url}
pair. The interval is updated accordingly on each call tofailure/1
orsuccess/1
calls. For a successful request, a client should callsuccess/1
. Whenever a 429 response is received the client should callfailure/1
. When no failures are happening the code ensures the ETS tables are empty in order to have a lower impact on a running system. -
couch_replicator_scheduler
: This is the core component of the scheduling replicator. It's main task is to switch between replication jobs, by stopping some and starting others to ensure all of them make progress. Replication jobs which fail are penalized using an exponential backoff. That is, each consecutive failure will double the time penalty. This frees up system resources for more useful work than just continuously trying to run the same subset of failing jobs.The main API function is
add_job/1
. Its argument is an instance of the#rep{}
record, which could be the result of a document update from a_replicator
db or the result of a POST to_replicate
endpoint.Each job internally is represented by the
#job{}
record. It contains the original#rep{}
but also, maintains an event history. The history is a sequence of past events for each job. These are timestamped and ordered such that the most recent event is at the head. History length is limited based on thereplicator.max_history
configuration value. The default is 20 entries. History events types are:added
: job was just added to the scheduler. This is the first event.started
: job was started. This was an attempt to run the job.stopped
: job was stopped by the scheduler.crashed
: job has crashed (instead of stopping cleanly).
The core of the scheduling algorithm is the
reschedule/1
function. This function is called everyreplicator.interval
milliseconds (default is 60000 i.e. a minute). During each call the scheduler will try to stop some jobs, start some new ones and will also try to keep the maximum number of jobs running less thanreplicator.max_jobs
(default 500). So the functions does these operations (actual code paste):Running = running_job_count(), Pending = pending_job_count(), stop_excess_jobs(State, Running), start_pending_jobs(State, Running, Pending), rotate_jobs(State, Running, Pending), update_running_jobs_stats(State#state.stats_pid)
Running
is the total number of currently running jobs.Pending
is the total number of jobs waiting to be run.stop_excess_jobs
will stop any exceeding thereplicator.max_jobs
configured limit. This code takes effect if user reduces themax_jobs
configuration value.start_pending_jobs
will start any jobs if there is more room available. This will take effect on startup or when user increases themax_jobs
configuration value.rotate_jobs
is where all the action happens. The scheduler picksreplicator.max_churn
running jobs to stop and then picks the same number of pending jobs to start. The default value ofmax_churn
is 20. So by default every minute, 20 running jobs are stopped, and 20 new pending jobs are started.Before moving on it is worth pointing out that scheduler treats continuous and non-continuous replications differently. Normal (non-continuous) replications once started will be allowed to run to completion. That behavior is to preserve their semantics of replicating a snapshot of the source database to the target. For example if new documents are added to the source after the replication are started, those updates should not show up on the target database. Stopping and restarting a normal replication would violate that constraint. The only exception to the rule is the user explicitly reduces
replicator.max_jobs
configuration value. Even then scheduler will first attempt to stop as many continuous jobs as possible and only if it has no choice left will it stop normal jobs.Keeping that in mind and going back to the scheduling algorithm, the next interesting part is how the scheduler picks which jobs to stop and which ones to start:
-
Stopping: When picking jobs to stop the scheduler will pick longest running continuous jobs first. The sorting callback function to get the longest running jobs is unsurprisingly called
longest_running/2
. To pick the longest running jobs it looks at the most recentstarted
event. After it gets a sorted list by longest running, it simply picks first few depending on the value ofmax_churn
usinglists:sublist/2
. Then those jobs are stopped. -
Starting: When starting the scheduler will pick the jobs which have been waiting the longest. Surprisingly, in this case it also looks at the
started
timestamp and picks the jobs which have the oldeststarted
timestamp. If there are 3 jobs, A[started=10], B[started=7], C[started=9], then B will be picked first, then C then A. This ensures that jobs are not starved, which is a classic scheduling pitfall.
In the code, the list of pending jobs is picked slightly differently than how the list of running jobs is picked.
pending_jobs/1
usesets:foldl
to iterate over all the pending jobs. As it iterates it tries to keep only up tomax_churn
oldest items in the accumulator. The reason this is done is that there could be a very large number of pending jobs and loading them all in a list (making a copy from ETS) and then sorting it can be quite expensive performance-wise. The tricky part of the iteration is happening inpending_maybe_replace/2
. Agb_sets
ordered set is used to keep top-N longest waiting jobs so far. The code has a comment with a helpful example on how this algorithm works.The last part is how the scheduler treats jobs which keep crashing. If a job is started but then crashes then that job is considered unhealthy. The main idea is to penalize such jobs such that they are forced to wait an exponentially larger amount of time with each consecutive crash. A central part to this algorithm is determining what forms a sequence of consecutive crashes. If a job starts then quickly crashes, and after its next start it crashes again, then that would become a sequence of 2 consecutive crashes. The penalty then would be calculated by
backoff_micros/1
function where the consecutive crash count would end up as the exponent. However for practical concerns there is also maximum penalty specified and that's the equivalent of 10 consecutive crashes. Timewise it ends up being about 8 hours. That means even a job which keep crashing will still get a chance to retry once in 8 hours.There is subtlety when calculating consecutive crashes and that is deciding when the sequence stops. That is, figuring out when a job becomes healthy again. The scheduler considers a job healthy again if it started and hasn't crashed in a while. The "in a while" part is a configuration parameter
replicator.health_threshold
defaulting to 2 minutes. This means if job has been crashing, for example 5 times in a row, but then on the 6th attempt it started and ran for more than 2 minutes then it is considered healthy again. The next time it crashes its sequence of consecutive crashes will restart at 1. -
couch_replicator_doc_processor
: The doc processor component is in charge of processing replication document updates, turning them into replication jobs and adding those jobs to the scheduler. Unfortunately the only reason there is even acouch_replicator_doc_processor
gen_server, instead of replication documents being turned to jobs and inserted into the scheduler directly, is because of one corner case -- filtered replications using custom (JavaScript mostly) filters. More about this later. It is better to start with how updates flow through the doc processor:Document updates come via the
db_change/3
callback fromcouch_multidb_changes
, then go to theprocess_change/2
function.In
process_change/2
a few decisions are made regarding how to proceed. The first is "ownership" check. That is a check if the replication document belongs on the current node. If not, then it is ignored. In a cluster, in general there would be N copies of a document change and we only want to run the replication once. Another check is to see if the update has arrived during a time when the cluster is considered "unstable". If so, it is ignored, because soon enough a rescan will be launched and all the documents will be reprocessed anyway. Another noteworthy thing inprocess_change/2
is handling of upgrades from the previous version of the replicator when transient states were written to the documents. Two such states weretriggered
anderror
. Both of those states are removed from the document then then update proceeds in the regular fashion.failed
documents are also ignored here.failed
is a terminal state which indicates the document was somehow unsuitable to become a replication job (it was malformed or a duplicate). Otherwise the state update proceeds toprocess_updated/2
.process_updated/2
is where replication document updates are parsed and translated to#rep{}
records. The interesting part here is that the replication ID isn't calculated yet. Unsurprisingly the parsing function used is calledparse_rep_doc_without_id/1
. Also note that up until now everything is still running in the context of thedb_change/3
callback. After replication filter type is determined the update gets passed to thecouch_replicator_doc_processor
gen_server.The
couch_replicator_doc_processor
gen_server's main role is to try to calculate replication IDs for each#rep{}
record passed to it, then add that as a scheduler job. As noted before,#rep{}
records parsed up until this point lack a replication ID. The reason is replication ID calculation includes a hash of the filter code. And because user defined replication filters live in the source DB, which most likely involves a remote network fetch there is a possibility of blocking and a need to handle various network failures and retries. Because of thatreplication_doc_processor
dispatches all of that blocking and retrying to a separateworker
process (couch_replicator_doc_processor_worker
module).couch_replicator_doc_processor_worker
is where replication IDs are calculated for each individual doc update. There are two separate modules which contain utilities related to replication ID calculation:couch_replicator_ids
andcouch_replicator_filters
. The first one contains ID calculation algorithms and the second one knows how to parse and fetch user filters from a remote source DB. One interesting thing about the worker is that it is time-bounded and is guaranteed to not be stuck forever. That's why it spawns an extra process withspawn_monitor
, just so it can do anafter
clause in receive and bound the maximum time this worker will take.A doc processor worker will either succeed or fail but never block for too long. Success and failure are returned as exit values. Those are handled in the
worker_returned/3
doc processor clauses. The most common pattern is that a worker is spawned to add a replication job, it does so and returns a{ok, ReplicationID}
value inworker_returned
.In case of a filtered replication with custom user code there are two case to consider:
-
Filter fetching code has failed. In that case worker returns an error. But because the error could be a transient network error, another worker is started to try again. It could fail and return an error again, then another one is started and so on. However each consecutive worker will do an exponential backoff, not unlike the scheduler code.
error_backoff/1
is where the backoff period is calculated. Consecutive errors are held in theerrcnt
field in the ETS table. -
Fetching filter code succeeds, replication ID is calculated and job is added to the scheduler. However, because this is a filtered replication the source database could get an updated filter. Which means replication ID could change again. So the worker is spawned to periodically check the filter and see if it changed. In other words doc processor will do the work of checking for filtered replications, get an updated filter and will then refresh the replication job (remove the old one and add a new one with a different ID). The filter checking interval is determined by the
filter_backoff
function. An unusual thing about that function is it calculates the period based on the size of the ETS table. The idea there is for a few replications in a cluster, it's ok to check filter changes often. But when there are lots of replications running, having each one checking their filter often is not a good idea.
-