-
Notifications
You must be signed in to change notification settings - Fork 29
/
replicaset.lua
1497 lines (1445 loc) · 54.1 KB
/
replicaset.lua
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
-- vshard.replicaset
--
-- <replicaset> = {
-- replicas = {
-- [replica_id] = {
-- uri = URI,
-- name = string,
-- uuid = string,
-- id = <name or uuid>,
-- conn = <netbox> + .replica + .replicaset,
-- zone = number,
-- next_by_priority = <replica object of the same type>,
-- weight = number,
-- down_ts = <timestamp of disconnect from the
-- replica>,
-- backoff_ts = <timestamp when was sent into backoff state>,
-- activity_ts = <timestamp when the replica was used last time>,
-- backoff_err = <error object caused the backoff>,
-- net_timeout = <current network timeout for calls,
-- doubled on each network fail until
-- max value, and reset to minimal
-- value on each success>,
-- net_sequential_ok = <count of sequential success
-- requests to the replica>,
-- net_sequential_fail = <count of sequential failed
-- requests to the replica>,
-- is_outdated = nil/true,
-- }
-- },
-- master = <master server from the array above>,
-- master_cond = <condition variable signaled when the replicaset finds or
-- changes its master>,
-- is_master_auto = <true when is configured to find the master on
-- its own>,
-- on_master_required = <trigger invoked every time when a new master call
-- stumbles upon not knowing the master's location>,
-- master_wait_count = <number of fibers right now waiting for the master to be
-- discovered>,
-- replica = <nearest available replica object>,
-- balance_i = <index of a next replica in priority_list to
-- use for a load-balanced request>,
-- replica_up_ts = <timestamp updated on each attempt to
-- connect to the nearest replica, and on
-- each connect event>,
-- name = <replicaset_name>,
-- uuid = <replicaset_uuid>,
-- id = <replicaset_name or replicaset_uuid>,
-- weight = number,
-- priority_list = <list of replicas, sorted by weight asc>,
-- etalon_bucket_count = <bucket count, that must be stored
-- on this replicaset to reach the
-- balance in a cluster>,
-- is_outdated = nil/true,
-- }
--
-- replicasets = {
-- [replicaset_id] = <replicaset>
-- }
--
local log = require('log')
local netbox = require('net.box')
local consts = require('vshard.consts')
local lerror = require('vshard.error')
local fiber = require('fiber')
local luri = require('uri')
local luuid = require('uuid')
local ffi = require('ffi')
local lcfg = require('vshard.cfg')
local util = require('vshard.util')
local fiber_clock = fiber.clock
local fiber_yield = fiber.yield
local fiber_cond_wait = util.fiber_cond_wait
local future_wait = util.future_wait
local gsc = util.generate_self_checker
--
-- vconnect is asynchronous vshard greeting, saved inside netbox connection.
-- It stores future object and additional info, needed for its work.
-- Future is initialized, when the connection is established (inside
-- netbox_on_connect). The connection cannot be considered "connected"
-- until vconnect is properly validated.
--
local function conn_vconnect_set(conn)
assert(conn.replica ~= nil)
local is_named = conn.replica.id == conn.replica.name
if not is_named then
-- Nothing to do. Check is not needed.
return
end
-- Update existing vconnect. Fiber condition cannot be dropped,
-- somebody may already waiting on it.
if conn.vconnect then
-- Connections are preserved during reconfiguration,
-- identification may be changed.
conn.vconnect.is_named = is_named
if not conn.vconnect.future then
return
end
-- Future object must be updated. Old result is irrelevant.
conn.vconnect.future:discard()
conn.vconnect.future = nil
return
end
-- Create new vconnect.
conn.vconnect = {
-- Whether the connection is done to the named replica.
is_named = is_named,
-- Used to wait for the appearance of vconnect.future object,
-- if call is done before the connection is established.
future_cond = fiber.cond(),
}
end
--
-- Initialize future object. Should be done, when connection is established
-- (inside netbox_on_connect).
--
local function conn_vconnect_start(conn)
local vconn = conn.vconnect
if not vconn then
-- Nothing to do. Check is not needed.
return
end
local opts = {is_async = true}
vconn.future = conn:call('vshard.storage._call', {'info'}, opts)
vconn.future_cond:broadcast()
end
--
-- Check, that future is ready, and its result is expected.
-- The function doesn't yield.
-- @retval true; The correct response is received.
-- @retval nil, ...; Response is not received or validation error happened.
--
local function conn_vconnect_check(conn)
local vconn = conn.vconnect
local replica = conn.replica
-- conn.vconnect may be nil, if connection was created on old version
-- and the storage was reloaded to a new one. It's also nil, when
-- all checks were already done.
if not vconn then
return true
end
-- Nothing to do, but wait in such case.
if not vconn.future or not vconn.future:is_ready() then
return nil, lerror.vshard(lerror.code.VHANDSHAKE_NOT_COMPLETE,
replica.id)
end
-- Critical errors. Connection should be closed after these ones.
local result, err = vconn.future:result()
if not result then
-- Failed to get response. E.g. access error.
return nil, lerror.make(err)
end
-- If name is nil, it means, name was not set yet. If uuid is specified,
-- then we allow mismatch between config name and nil.
local is_name_set = result[1].name ~= nil or replica.uuid == nil
if vconn.is_named and is_name_set and result[1].name ~= replica.name then
return nil, lerror.vshard(lerror.code.INSTANCE_NAME_MISMATCH,
replica.name, result[1].name)
end
-- Don't validate until reconnect happens.
conn.vconnect = nil
return true
end
local function conn_vconnect_check_or_close(conn)
local ok, err = conn_vconnect_check(conn)
-- Close the connection, if error happened, but it is not
-- VSHANDSHAKE_NOT_COMPLETE.
if not ok and err and not (err.type == 'ShardingError' and
err.code == lerror.code.VHANDSHAKE_NOT_COMPLETE) then
conn:close()
end
return ok, err
end
--
-- Wait until the future object is ready. Returns remaining timeout.
-- @retval timeout; Future boject is ready.
-- @retval nil, err; Timeout passed.
--
local function conn_vconnect_wait(conn, timeout)
local vconn = conn.vconnect
-- Fast path. In most cases no validation should be done at all.
-- conn.vconnect may be nil, if connection was created on old version
-- and the storage was reloaded to a new one. It's also nil, when
-- all checks were already done.
if not vconn or (vconn.future and vconn.future:is_ready()) then
return timeout
end
local deadline = fiber_clock() + timeout
-- Wait for connection to be established.
if not vconn.future and
not fiber_cond_wait(vconn.future_cond, timeout) then
return nil, lerror.timeout()
end
timeout = deadline - fiber_clock()
-- Wait for async call to return.
local res, err = future_wait(vconn.future, timeout)
if res == nil then
-- Either timeout error or any other. If it's not a timeout error,
-- then conn must be recteated, handshake must be retried.
return nil, lerror.make(err)
end
return deadline - fiber_clock()
end
local function conn_vconnect_wait_or_close(conn, timeout)
local ok, err = conn_vconnect_wait(conn, timeout)
if not ok and not lerror.is_timeout(err) then
conn:close()
end
return ok, err
end
--
-- on_connect() trigger for net.box
--
local function netbox_on_connect(conn)
log.info("connected to %s:%s", conn.host, conn.port)
local rs = conn.replicaset
local replica = conn.replica
assert(replica ~= nil)
-- If a replica's connection has revived, then unset
-- replica.down_ts - it is not down anymore.
replica.down_ts = nil
-- conn.vconnect is set on every connect, as it may be nil,
-- if previous check successfully passed. Also, the needed
-- checks may change on reconnect.
conn_vconnect_set(conn)
conn_vconnect_start(conn)
if replica.uuid and conn.peer_uuid ~= replica.uuid and
-- XXX: Zero UUID means not a real Tarantool instance. It
-- is likely to be a cartridge.remote-control server,
-- which is started before the actual storage. Let it
-- work, anyway it will be shut down, and reconnect to the
-- real storage will happen. Otherwise the connection will
-- be left broken in 'closed' state until a request will
-- come specifically for this instance, or reconfiguration
-- will happen. That would prevent reconnect to the real
-- storage.
conn.peer_uuid ~= luuid.NULL:str() then
log.info('Mismatch server UUID on replica %s: expected "%s", but got '..
'"%s"', replica, replica.uuid, conn.peer_uuid)
conn:close()
return
end
if replica == rs.replica and replica == rs.priority_list[1] then
-- Update replica_up_ts, if the current replica has the
-- biggest priority. Really, it is not necessary to
-- increase replica connection priority, if the current
-- one already has the biggest priority. (See failover_f).
rs.replica_up_ts = fiber_clock()
end
end
--
-- on_disconnect() trigger for net.box
--
local function netbox_on_disconnect(conn)
log.info("disconnected from %s:%s", conn.host, conn.port)
assert(conn.replica)
-- Replica is down - remember this time to decrease replica
-- priority after FAILOVER_DOWN_TIMEOUT seconds.
conn.replica.down_ts = fiber_clock()
end
--
-- Wait until the connection is established. This is necessary at least for
-- async requests because they fail immediately if the connection is not done.
-- Returns the remaining timeout because is expected to be used to connect to
-- many instances in a loop, where such return saves one clock get in the caller
-- code and is just cleaner code.
--
local function netbox_wait_connected(conn, timeout)
-- Fast path. Usually everything is connected.
if conn:is_connected() then
return timeout
end
local deadline = fiber_clock() + timeout
-- Loop against spurious wakeups.
repeat
-- Netbox uses fiber_cond inside, which throws an irrelevant usage error
-- at negative timeout. Need to check the case manually.
if timeout < 0 then
return nil, lerror.timeout()
end
local ok, res = pcall(conn.wait_connected, conn, timeout)
if not ok then
return nil, lerror.make(res)
end
if not res then
return nil, lerror.timeout()
end
timeout = deadline - fiber_clock()
until conn:is_connected()
return timeout
end
--
-- Check if the connection is dead and it won't be restored automatically.
-- Even though replicaset's connections are initialized with the option
-- 'reconnect_after', there's cases, when no reconnect will be done (e.g.
-- if the conn was explicitly cancelled or the connection's worker fiber
-- was killed). In these situations we need to reestablish it manually as
-- the missing connection to some replicaset is a critical problem which
-- leads to the inability of the user to access some part of a data via the
-- router.
--
-- Note: the function expects the conn to be non-nil.
--
local function netbox_is_conn_dead(conn)
-- Fast path - conn is almost always 'active'.
if conn.state == 'active' then
return false
end
if conn.state == 'error' or conn.state == 'closed' then
return true
end
if conn.state ~= 'error_reconnect' then
-- The connection is fetching schema or doing auth, which
-- means it is not dead and shoudn't be reinitialized.
return false
end
if not conn._fiber then
-- The fiber field is not present in old netbox, which correctly
-- reports state as 'error' when its fiber got killed. It would
-- be filtered out above then.
return false
end
return conn._fiber:status() ~= "dead"
end
--
-- Check if the replica is not in backoff. It also serves as an update - if the
-- replica still has an old backoff timestamp, it is cleared. This way of
-- backoff update does not require any fibers to perform background updates.
-- Hence works not only on the router.
--
local function replica_check_backoff(replica, now)
if not replica.backoff_ts then
return true
end
if replica.backoff_ts + consts.REPLICA_BACKOFF_INTERVAL > now then
return false
end
log.warn('Replica %s returns from backoff', replica.id)
replica.backoff_ts = nil
replica.backoff_err = nil
return true
end
--
-- Connect to a specified replica and remember a new connection
-- in the replica object. Note, that the function does not wait
-- until a connection is established.
--
local function replicaset_connect_to_replica(replicaset, replica)
replica.activity_ts = fiber_clock()
local conn = replica.conn
if not conn or netbox_is_conn_dead(conn) then
conn = netbox.connect(replica.uri, {
reconnect_after = consts.RECONNECT_TIMEOUT,
wait_connected = false
})
conn.replica = replica
conn.replicaset = replicaset
-- vconnect must be set before the time, connection is established,
-- as we must know, that the connection cannot be used. If vconnect
-- is nil, it means all checks passed, so we may make a call and
-- only after that 'require' checks.
conn_vconnect_set(conn)
conn.on_connect_ref = netbox_on_connect
conn:on_connect(netbox_on_connect)
conn.on_disconnect_ref = netbox_on_disconnect
conn:on_disconnect(netbox_on_disconnect)
replica.conn = conn
end
return conn
end
local function replicaset_wait_master(replicaset, timeout)
local master = replicaset.master
-- Fast path - master is almost always known.
if master then
return master, timeout
end
-- Slow path.
local deadline = fiber_clock() + timeout
replicaset.master_wait_count = replicaset.master_wait_count + 1
if replicaset.on_master_required then
pcall(replicaset.on_master_required)
end
while true do
master = replicaset.master
if master then
break
end
if not replicaset.is_master_auto or
not fiber_cond_wait(replicaset.master_cond, timeout) then
timeout = lerror.vshard(lerror.code.MISSING_MASTER,
replicaset.id)
break
end
timeout = deadline - fiber_clock()
end
assert(replicaset.master_wait_count > 0)
replicaset.master_wait_count = replicaset.master_wait_count - 1
return master, timeout
end
--
-- Create net.box connection to master.
--
local function replicaset_connect_master(replicaset)
local master = replicaset.master
if master == nil then
return nil, lerror.vshard(lerror.code.MISSING_MASTER,
replicaset.id)
end
return replicaset_connect_to_replica(replicaset, master)
end
--
-- Wait until the master instance is connected.
--
local function replicaset_wait_connected(replicaset, timeout)
local master
master, timeout = replicaset_wait_master(replicaset, timeout)
if not master then
return nil, timeout
end
local conn = replicaset_connect_to_replica(replicaset, master)
return netbox_wait_connected(conn, timeout)
end
--
-- Wait until all instances are connected (with an optional exception).
--
local function replicaset_wait_connected_all(replicaset, opts)
local timeout = opts.timeout
local except = opts.except
local are_all_connected, err
repeat
are_all_connected = true
for replica_id, replica in pairs(replicaset.replicas) do
if replica_id == except then
goto next_check
end
local conn = replicaset_connect_to_replica(replicaset, replica)
if not conn:is_connected() then
timeout, err = netbox_wait_connected(conn, timeout)
if not timeout then
return nil, err, replica_id
end
-- While was waiting for this connection, another could break.
-- Need to re-check all of them.
are_all_connected = false
end
::next_check::
end
until are_all_connected
return timeout
end
--
-- Create net.box connections to all replicas and master.
--
local function replicaset_connect_all(replicaset)
for _, replica in pairs(replicaset.replicas) do
replicaset_connect_to_replica(replicaset, replica)
end
end
--
-- Connect to a next replica with less priority against a current
-- one. It is needed, if a current replica's connection is down
-- too long.
--
local function replicaset_down_replica_priority(replicaset)
local old_replica = replicaset.replica
assert(old_replica and old_replica.down_ts and
not old_replica:is_connected())
local new_replica = old_replica.next_by_priority
if new_replica then
assert(new_replica ~= old_replica)
replicaset_connect_to_replica(replicaset, new_replica)
replicaset.replica = new_replica
end
-- Else the current replica already has the lowest priority.
-- Can not down it.
end
--
-- Search a replica with higher priority than a current replica
-- has.
--
local function replicaset_up_replica_priority(replicaset)
local old_replica = replicaset.replica
if old_replica == replicaset.priority_list[1] and
old_replica:is_connected() then
replicaset.replica_up_ts = fiber_clock()
return
end
for _, replica in pairs(replicaset.priority_list) do
if replica == old_replica then
-- Failed to up priority.
return
end
if replica:is_connected() then
replicaset.replica = replica
assert(not old_replica or
old_replica.weight >= replicaset.replica.weight)
return
end
end
end
--
-- Handler for failed request to a replica. It increments count
-- of sequentially failed requests. When it reaches 2, it
-- increases network timeout twice.
--
local function replica_on_failed_request(replica)
replica.net_sequential_ok = 0
local val = replica.net_sequential_fail + 1
if val >= 2 then
local new_timeout = replica.net_timeout * 2
if new_timeout <= consts.CALL_TIMEOUT_MAX then
replica.net_timeout = new_timeout
end
replica.net_sequential_fail = 1
else
replica.net_sequential_fail = val
end
end
--
-- Same, as above, but for success request. And when count of
-- success requests reaches 10, the network timeout is decreased
-- to minimal timeout.
--
local function replica_on_success_request(replica)
replica.net_sequential_fail = 0
local val = replica.net_sequential_ok + 1
if val >= 10 then
replica.net_timeout = consts.CALL_TIMEOUT_MIN
replica.net_sequential_ok = 1
else
replica.net_sequential_ok = val
end
end
--
-- Call a function on a replica using its connection. The typical
-- usage is calls under storage.call, because of which there
-- are no more than 3 return values. It is because storage.call
-- returns:
-- * true/nil for storage.call();
-- * error object, if storage.call() was not ok, or called
-- function retval;
-- * error object, if called function has been failed, or nil
-- else.
-- @retval true, ... The correct response is received.
-- @retval false, ... Response is not received. It can be timeout
-- or unexpectedly closed connection.
--
local function replica_call(replica, func, args, opts)
assert(opts and opts.timeout)
replica.activity_ts = fiber_clock()
local conn = replica.conn
if not opts.is_async then
-- Async call cannot yield. So, we cannot wait for the connection
-- to be established and validate vconnect. Immediately fail below,
-- in conn_vconnect_check_or_close, if something is wrong.
local timeout, err = conn_vconnect_wait_or_close(conn, opts.timeout)
if not timeout then
return false, nil, lerror.make(err)
end
opts.timeout = timeout
end
local ok, err = conn_vconnect_check_or_close(conn)
if not ok then
return false, nil, lerror.make(err)
end
assert(conn.vconnect == nil)
local net_status, storage_status, retval, error_object =
pcall(conn.call, conn, func, args, opts)
if not net_status then
-- Do not increase replica's network timeout, if the
-- requested one was less, than network's one. For
-- example, if replica's timeout was 30s, but an user
-- specified 1s and it was expired, then there is no
-- reason to increase network timeout.
if opts.timeout >= replica.net_timeout then
replica_on_failed_request(replica)
end
local err = storage_status
-- VShard functions can throw exceptions using error() function. When
-- it reaches the network layer, it is wrapped into LuajitError. Try to
-- extract the original error if this is the case. Not always is
-- possible - the string representation could be truncated.
--
-- In old Tarantool versions LuajitError turned into ClientError on the
-- client. Check both types.
if func:startswith('vshard.') and (err.type == 'LuajitError' or
err.type == 'ClientError') then
err = lerror.from_string(err.message) or err
end
log.error("Exception during calling '%s' on '%s': %s", func, replica,
err)
return false, nil, lerror.make(err)
else
replica_on_success_request(replica)
end
if storage_status == nil then
-- Workaround for `not msgpack.NULL` magic.
storage_status = nil
end
return true, storage_status, retval, error_object
end
--
-- Detach the connection object from its replica object.
-- Detachment means that the connection is not closed, but all its
-- links with the replica are teared. All current requests are
-- finished, but next calls on this replica are processed by
-- another connection.
-- Initially this function is intended for failover, which should
-- not close the old connection in case if it receives a huge
-- response and because of it ignores pings.
--
local function replica_detach_conn(replica)
local c = replica.conn
if c ~= nil then
-- The connection now has nothing to do with the replica
-- object. In particular, it shall not touch up and down
-- ts.
c:on_connect(nil, c.on_connect_ref)
c.on_connect_ref = nil
c:on_disconnect(nil, c.on_disconnect_ref)
c.on_disconnect_ref = nil
-- Detach looks like disconnect for an observer.
netbox_on_disconnect(c)
c.replica = nil
c.replicaset = nil
replica.conn = nil
end
end
--
-- Call a function on remote storage
-- Note: this function uses pcall-style error handling
-- @retval false, err on error
-- @retval true, ... on success
--
local function replicaset_master_call(replicaset, func, args, opts)
assert(opts == nil or type(opts) == 'table')
local master = replicaset.master
if not master then
opts = opts and table.copy(opts) or {}
if opts.is_async then
return nil, lerror.vshard(lerror.code.MISSING_MASTER,
replicaset.id)
end
local timeout = opts.timeout or consts.MASTER_SEARCH_TIMEOUT
master, timeout = replicaset_wait_master(replicaset, timeout)
if not master then
return nil, timeout
end
opts.timeout = master.net_timeout
else
if not opts then
opts = {timeout = master.net_timeout}
elseif not opts.timeout then
opts = table.copy(opts)
opts.timeout = master.net_timeout
end
end
if not master.conn or not master.conn:is_connected() then
replicaset_connect_to_replica(replicaset, master)
-- It could be that the master was disconnected due to a critical
-- failure and now a new master is assigned. The owner of the connector
-- must try to find it.
if replicaset.on_master_required then
pcall(replicaset.on_master_required)
end
end
-- luacheck: ignore 211/net_status
local net_status, storage_status, retval, error_object =
replica_call(master, func, args, opts)
-- Ignore net_status - master does not retry requests.
return storage_status, retval, error_object
end
--
-- True, if after error @a e a read request can be retried.
--
local function can_retry_after_error(e)
if not e or (type(e) ~= 'table' and
(type(e) ~= 'cdata' or not ffi.istype('struct error', e))) then
return false
end
if e.type == 'ShardingError' and
(e.code == lerror.code.WRONG_BUCKET or
e.code == lerror.code.TRANSFER_IS_IN_PROGRESS) then
return true
end
return e.type == 'ClientError' and e.code == box.error.TIMEOUT
end
--
-- True if after the given error on call of the given function the connection
-- must go into backoff.
--
local function can_backoff_after_error(e, func)
if not e then
return false
end
if type(e) ~= 'table' and
(type(e) ~= 'cdata' or not ffi.istype('struct error', e)) then
return false
end
-- So far it is enabled only for vshard's own functions. Including
-- vshard.storage.call(). Otherwise it is not possible to retry safely -
-- user's function could have side effects before raising that error.
-- For instance, 'access denied' could be raised by user's function
-- internally after it already changed some data on the storage.
if not func:startswith('vshard.') then
return false
end
-- ClientError is sent for all errors by old Tarantool versions which didn't
-- keep error type. New versions preserve the original error type.
if e.type == 'ClientError' or e.type == 'AccessDeniedError' then
if e.code == box.error.ACCESS_DENIED then
return e.message:startswith("Execute access to function 'vshard.")
end
if e.code == box.error.NO_SUCH_PROC then
return e.message:startswith("Procedure 'vshard.")
end
end
if e.type == 'ShardingError' then
return e.code == lerror.code.STORAGE_IS_DISABLED
end
return false
end
--
-- Pick a next replica according to round-robin load balancing
-- policy.
--
local function replicaset_balance_replica(replicaset)
local i = replicaset.balance_i
local pl = replicaset.priority_list
local size = #pl
replicaset.balance_i = i % size + 1
assert(i <= size)
return pl[i]
end
--
-- Template to implement a function able to visit multiple
-- replicas with certain details. One of applications - a function
-- making a call on a nearest available replica. It is possible
-- for 'read' requests only. And if the nearest replica is not
-- available now, then use master's connection - we can not wait
-- until failover fiber will repair the nearest connection.
--
local function replicaset_template_multicallro(prefer_replica, balance)
local function pick_next_replica(replicaset, now)
local r
local master = replicaset.master
if balance then
local i = #replicaset.priority_list
while i > 0 do
r = replicaset_balance_replica(replicaset)
i = i - 1
if r:is_connected() and (not prefer_replica or r ~= master) and
replica_check_backoff(r, now) then
return r
end
end
else
local start_r = replicaset.replica
r = start_r
while r do
if r:is_connected() and (not prefer_replica or r ~= master) and
replica_check_backoff(r, now) then
return r
end
r = r.next_by_priority
end
-- Iteration above could start not from the best prio replica.
-- Check the beginning of the list too.
for _, r in ipairs(replicaset.priority_list) do
if r == start_r then
-- Reached already checked part.
break
end
if r:is_connected() and (not prefer_replica or r ~= master) and
replica_check_backoff(r, now) then
return r
end
end
end
end
return function(replicaset, func, args, opts)
assert(opts == nil or type(opts) == 'table')
opts = opts and table.copy(opts) or {}
local timeout = opts.timeout or consts.CALL_TIMEOUT_MAX
local net_status, storage_status, retval, err, replica
if timeout <= 0 then
return nil, lerror.timeout()
end
local now = fiber_clock()
local end_time = now + timeout
while not net_status and timeout > 0 do
replica = pick_next_replica(replicaset, now)
if not replica then
replica, timeout = replicaset_wait_master(replicaset, timeout)
if not replica then
return nil, timeout
end
replicaset_connect_to_replica(replicaset, replica)
if replica.backoff_ts then
return nil, lerror.vshard(
lerror.code.REPLICASET_IN_BACKOFF, replicaset.id,
replica.backoff_err)
end
end
opts.timeout = timeout
net_status, storage_status, retval, err =
replica_call(replica, func, args, opts)
now = fiber_clock()
timeout = end_time - now
if not net_status and not storage_status and
not can_retry_after_error(retval) then
if can_backoff_after_error(retval, func) then
if not replica.backoff_ts then
log.warn('Replica %s goes into backoff for %s sec '..
'after error %s', replica.id,
consts.REPLICA_BACKOFF_INTERVAL, retval)
replica.backoff_ts = now
replica.backoff_err = retval
end
else
-- There is no sense to retry LuaJit errors, such as
-- assertions, undefined variables etc.
net_status = true
break
end
end
end
if not net_status then
return nil, lerror.make(retval)
else
return storage_status, retval, err
end
end
end
--
-- Parallel call on all instances in the replicaset. Fails if couldn't be done
-- on at least one instance.
--
-- @return In case of success - a map with replica IDs (UUID or name) keys and
-- values being what the function returned from each replica.
--
-- @return In case of an error - nil, error object, UUID or name of the replica
-- where the error happened.
--
local function replicaset_map_call(replicaset, func, args, opts)
local timeout = opts.timeout or consts.CALL_TIMEOUT_MIN
local except = opts.except
local deadline = fiber_clock() + timeout
local _, res, err, err_id, map
local replica_count = 0
local futures = {}
local opts_call = {is_async = true, timeout = 0}
--
-- Wait all connections. Sending any request if at least one connection is
-- completely dead would only produce unnecessary workload.
--
timeout, err, err_id = replicaset_wait_connected_all(replicaset, {
timeout = timeout,
except = except,
})
if not timeout then
goto fail
end
--
-- Send requests.
--
for replica_id, replica in pairs(replicaset.replicas) do
if replica_id == except then
goto next_call
end
_, res, err = replica_call(replica, func, args, opts_call)
if res == nil then
err_id = replica_id
goto fail
end
futures[replica_id] = res
replica_count = replica_count + 1
::next_call::
end
--
-- Collect results
--
map = table.new(0, replica_count)
for replica_id, future in pairs(futures) do
res, err = future_wait(future, timeout)
if res == nil then
err_id = replica_id
goto fail
end
map[replica_id] = res
timeout = deadline - fiber_clock()
end
do return map end
::fail::
for _, f in pairs(futures) do
f:discard()
end
return nil, lerror.make(err), err_id
end
--
-- Nice formatter for replicaset
--
local function replicaset_tostring(replicaset)
local master
if replicaset.master then
master = replicaset.master
else
master = 'missing'
end
return string.format('replicaset(id="%s", master=%s)', replicaset.id,
master)
end
--
-- Copy netbox connections from old replica objects to new ones
-- and outdate old objects.
-- @param replicasets New replicasets
-- @param old_replicasets Replicasets and replicas to be outdated.
--
local function rebind_replicasets(replicasets, old_replicasets)
for replicaset_id, replicaset in pairs(replicasets) do
local old_replicaset = old_replicasets and
old_replicasets[replicaset_id]
for replica_id, replica in pairs(replicaset.replicas) do
local old_replica = old_replicaset and
old_replicaset.replicas[replica_id]
if old_replica and util.uri_eq(old_replica.uri, replica.uri) then
local conn = old_replica.conn
replica.conn = conn
replica.down_ts = old_replica.down_ts
replica.backoff_ts = old_replica.backoff_ts
replica.backoff_err = old_replica.backoff_err
replica.net_timeout = old_replica.net_timeout
replica.net_sequential_ok = old_replica.net_sequential_ok
replica.net_sequential_fail = old_replica.net_sequential_fail
if conn then
conn.replica = replica
conn.replicaset = replicaset
end
end
end
if old_replicaset then
-- Take a hint from the old replicaset who is the master now.
if replicaset.is_master_auto then
local master = old_replicaset.master
if master then
replicaset.master = replicaset.replicas[master.id]
end
end
-- Stop waiting for master in the old replicaset. Its running
-- requests won't find it anyway. Auto search works only for the
-- most actual replicaset objects.
if old_replicaset.is_master_auto then
old_replicaset.is_master_auto = false
old_replicaset.master_cond:broadcast()
end
end
end
end
--
-- Let the replicaset know @a old_master_id is not a master anymore, should
-- use @a candidate_id instead.
-- Returns whether the request, which brought this information, can be retried.
--
local function replicaset_update_master(replicaset, old_master_id, candidate_id)