/
backend.rs
2586 lines (2386 loc) · 102 KB
/
backend.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
//!
//! [`Backend`] handles the execution of queries and prepared statements. Queries and
//! statements can be executed either on ReadySet itself, or on the upstream when applicable.
//! In general if an upstream (fallback) connection is available queries and statements
//! will execute as follows:
//!
//! * `INSERT`, `DELETE`, `UPDATE` - on upstream
//! * Anything inside a transaction - on upstream
//! * Cached statements created with "always" - on ReadySet
//! * `SELECT` - on ReadySet
//! * Anything that failed on ReadySet, or while a migration is ongoing - on upstream
//!
//! # The execution flow
//!
//! ## Prepare
//!
//! When an upstream is available we will only try to prepare `SELECT` statements on ReadySet and
//! forward all other prepare requests to the upstream. For `SELECT` statements we will attempt
//! to prepare on both ReadySet and the upstream. The if ReadySet select fails we will perform a
//! fallback execution on the upstream (`execute_cascade`).
//!
//! ## Queries
//!
//! Queries are handled in a similar way to prepare statements. with the exception that additional
//! overhead is required to parse and rewrite them prior to their execution.
//!
//! ## Migrations
//!
//! When a prepared statement is not immediately available for execution on ReadySet, we will
//! perform a migration, migrations can happen in one of three ways:
//!
//! * Explicit migrations: only `CREATE CACHE` and `CREATE VIEW` will cause migrations.
//! A `CREATE PREPARED STATEMENT` will not cause a migration, and queries will go to upstream
//! fallback. Enabled with the `--query-caching=explicit` argument. However if a migration already
//! happened, we will use it.
//! * Async migration: prepared statements will be put in a [`QueryStatusCache`] and another
//! thread will perform migrations in the background. Once a statement finished migration it
//! will execute on ReadySet, while it is waiting for a migration to happen it will execute on
//! fallback. Enabled with the `--query-caching=async` flag.
//! * In request path: migrations will happen when either `CREATE CACHE` or
//! `CREATE PREPARED STATEMENT` are called. It is also the only available option when a
//! upstream fallback is not available.
//!
//! ## Caching
//!
//! Since we don't want to pay a penalty every time we execute a prepared statement, either
//! on ReadySet or on the upstream fallback, we aggressively cache all the information required
//! for immediate execution. This way a statement can be immediately forwarded to either ReadySet
//! or upstream with no additional overhead.
//!
//! ## Handling unsupported queries
//!
//! Queries are marked with MigrationState::Unsupported when they fail to prepare on ReadySet
//! with an Unsupported ReadySetError. These queries should not be tried again against ReadySet,
//! however, if a fallback database exists, may be executed against the fallback.
//!
//! ## Handling component outage
//!
//! In a distributed deployment, a component (such as a readyset-server instance) may go down,
//! causing some queries that rely on that server instance to fail. To help direct all affected
//! queries immediately to fallback when this happens, you can configure the
//! --query-max-failure-seconds flag to provide a maximum time in seconds that any given query may
//! continuously fail for before entering into a fallback only recovery period. You can configure
//! the --fallback-recovery-seconds flag to configure how long you would like this recovery period
//! to be enabled for, before allowing affected queries to be retried against noria.
//!
//! The metadata for this feature is tracked in the QueryStatusCache for each query. We currently
//! only trigger on networking related errors specifically to try to prevent this feature from
//! being too heavy handed.
use std::borrow::Cow;
use std::collections::hash_map::Entry;
use std::collections::HashMap;
use std::convert::{TryFrom, TryInto};
use std::fmt::{self, Debug};
use std::marker::PhantomData;
use std::sync::Arc;
use std::time::{Duration, Instant};
use futures::future::{self, OptionFuture};
use mysql_common::row::convert::{FromRow, FromRowError};
use nom_sql::{
CacheInner, CreateCacheStatement, DeleteStatement, Dialect, DropCacheStatement,
InsertStatement, Relation, SelectStatement, SetStatement, ShowStatement, SqlIdentifier,
SqlQuery, UpdateStatement, UseStatement,
};
use readyset_client::consensus::Authority;
use readyset_client::consistency::Timestamp;
use readyset_client::query::*;
use readyset_client::results::Results;
use readyset_client::{ColumnSchema, PlaceholderIdx, ViewCreateRequest};
pub use readyset_client_metrics::QueryDestination;
use readyset_client_metrics::{recorded, EventType, QueryExecutionEvent, SqlQueryType};
use readyset_data::DfValue;
use readyset_errors::ReadySetError::{self, PreparedStatementMissing};
use readyset_errors::{internal, internal_err, unsupported, unsupported_err, ReadySetResult};
use readyset_telemetry_reporter::{TelemetryBuilder, TelemetryEvent, TelemetrySender};
use readyset_util::redacted::Sensitive;
use readyset_version::READYSET_VERSION;
use timestamp_service::client::{TimestampClient, WriteId, WriteKey};
use tokio::sync::mpsc::UnboundedSender;
use tracing::{error, instrument, trace, warn};
use vec1::Vec1;
use crate::backend::noria_connector::ExecuteSelectContext;
use crate::query_handler::SetBehavior;
use crate::query_status_cache::QueryStatusCache;
pub use crate::upstream_database::UpstreamPrepare;
use crate::utils::create_dummy_column;
use crate::{rewrite, QueryHandler, UpstreamDatabase, UpstreamDestination};
pub mod noria_connector;
pub use self::noria_connector::NoriaConnector;
use self::noria_connector::{MetaVariable, SelectPrepareResult};
/// Query metadata used to plan query prepare
#[allow(clippy::large_enum_variant)]
#[derive(Debug)]
enum PrepareMeta {
/// Query was received in a state that should unconditionally proxy upstream
Proxy,
/// Query could not be parsed
FailedToParse,
/// Query could not be rewritten for processing in noria
FailedToRewrite(ReadySetError),
/// ReadySet does not implement this prepared statement. The statement may also be invalid SQL
Unimplemented(ReadySetError),
/// A write query (Insert, Update, Delete)
Write { stmt: SqlQuery },
/// A read (Select; may be extended in the future)
Select(PrepareSelectMeta),
}
#[derive(Debug)]
struct PrepareSelectMeta {
stmt: nom_sql::SelectStatement,
rewritten: nom_sql::SelectStatement,
must_migrate: bool,
should_do_noria: bool,
always: bool,
}
/// How to behave when receiving unsupported `SET` statements
#[derive(Copy, Clone, Debug, PartialEq, Eq)]
pub enum UnsupportedSetMode {
/// Return an error to the client (the default)
Error,
/// Proxy all subsequent statements to the upstream
Proxy,
/// Allow all unsupported set statements
Allow,
}
/// A state machine representing how statements are proxied upstream for a particular instance of a
/// backend.
///
/// The possible transitions of the state machine are modeled by the following graph:
///
/// ```dot
/// digraph ProxyState {
/// Never -> Never;
///
/// Upstream -> InTransaction;
/// InTransaction -> Upstream;
/// Upstream -> ProxyAlways;
/// InTransaction -> ProxyAlways;
/// }
/// ```
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
enum ProxyState {
/// Never proxy statements upstream. This is the behavior used when no upstream database is
/// configured for a backend
Never,
/// Proxy writes upstream, and proxy reads upstream only after they fail when executed against
/// ReadySet.
///
/// This is the initial behavior used when an upstream database is configured for a backend
Fallback,
/// We are inside an explicit transaction (received a BEGIN or START TRANSACTION packet), so
/// proxy all statements upstream, but return to [`ProxyState::Fallback`] when the transaction
/// is finished. This state does not apply to transactions formed by `SET autocommit=0`.
InTransaction,
/// We are inside of an implicit transaction due to autocommit being turned off. This means
/// that every time we get COMMIT or ROLLBACK, we instantly start a new transaction. All
/// statements are proxied upstream unless we receive a `SET autocommit=1` statement, which
/// would turn autocommit back on.
AutocommitOff,
/// Unconditionally proxy all statements upstream, and do not leave this state when leaving
/// transactions. The backend enters this state when it receives an unsupported SQL `SET`
/// statement and the [`unsupported_set_mode`] is set to [`Proxy`]
///
/// [`unsupported_set_mode`]: Backend::unsupported_set_mode
/// [`Proxy`]: UnsupportedSetMode::Proxy
ProxyAlways,
}
impl ProxyState {
/// Returns true if a query should be proxied upstream in most cases per this [`ProxyState`].
/// The case in which we should not proxy a query upstream, is if the query in question has
/// been manually migrated with the optional `ALWAYS` flag, such as `CREATE CACHE ALWAYS`.
fn should_proxy(&self) -> bool {
matches!(
self,
Self::AutocommitOff | Self::InTransaction | Self::ProxyAlways
)
}
/// Perform the appropriate state transition for this proxy state to begin a new transaction.
fn start_transaction(&mut self) {
if self.is_fallback() {
*self = ProxyState::InTransaction;
}
}
/// Perform the appropriate state transition for this proxy state to end a transaction
fn end_transaction(&mut self) {
if !matches!(self, Self::Never | Self::ProxyAlways | Self::AutocommitOff) {
*self = ProxyState::Fallback;
}
}
/// Sets the autocommit state accordingly. If turning autcommit on, will set ProxyState to
/// Fallback as long as current state is AutocommitOff.
///
/// If turning autocommit off, will set state to AutocommitOff as long as state is not
/// currently ProxyAlways or Never, as these states should not be overwritten.
fn set_autocommit(&mut self, on: bool) {
if on {
if matches!(self, Self::AutocommitOff) {
*self = ProxyState::Fallback;
}
} else if !matches!(self, Self::ProxyAlways | Self::Never) {
*self = ProxyState::AutocommitOff;
}
}
/// Returns `true` if the proxy state is [`Fallback`].
///
/// [`Fallback`]: ProxyState::Fallback
#[must_use]
fn is_fallback(&self) -> bool {
matches!(self, Self::Fallback)
}
}
/// Builder for a [`Backend`]
#[must_use]
#[derive(Clone)]
pub struct BackendBuilder {
slowlog: bool,
dialect: Dialect,
users: HashMap<String, String>,
require_authentication: bool,
ticket: Option<Timestamp>,
timestamp_client: Option<TimestampClient>,
query_log_sender: Option<UnboundedSender<QueryExecutionEvent>>,
query_log_ad_hoc_queries: bool,
unsupported_set_mode: UnsupportedSetMode,
migration_mode: MigrationMode,
query_max_failure_seconds: u64,
fallback_recovery_seconds: u64,
telemetry_sender: Option<TelemetrySender>,
enable_experimental_placeholder_inlining: bool,
}
impl Default for BackendBuilder {
fn default() -> Self {
BackendBuilder {
slowlog: false,
dialect: Dialect::MySQL,
users: Default::default(),
require_authentication: true,
ticket: None,
timestamp_client: None,
query_log_sender: None,
query_log_ad_hoc_queries: false,
unsupported_set_mode: UnsupportedSetMode::Error,
migration_mode: MigrationMode::InRequestPath,
query_max_failure_seconds: (i64::MAX / 1000) as u64,
fallback_recovery_seconds: 0,
telemetry_sender: None,
enable_experimental_placeholder_inlining: false,
}
}
}
impl BackendBuilder {
pub fn new() -> Self {
Self::default()
}
pub fn build<DB: UpstreamDatabase, Handler>(
self,
noria: NoriaConnector,
upstream: Option<DB>,
query_status_cache: &'static QueryStatusCache,
authority: Arc<Authority>,
) -> Backend<DB, Handler> {
metrics::increment_gauge!(recorded::CONNECTED_CLIENTS, 1.0);
let proxy_state = if upstream.is_some() {
ProxyState::Fallback
} else {
ProxyState::Never
};
Backend {
noria,
upstream,
users: self.users,
query_log_sender: self.query_log_sender,
last_query: None,
state: BackendState {
proxy_state,
parsed_query_cache: HashMap::new(),
prepared_statements: Vec::new(),
query_status_cache,
ticket: self.ticket,
timestamp_client: self.timestamp_client,
},
settings: BackendSettings {
slowlog: self.slowlog,
dialect: self.dialect,
require_authentication: self.require_authentication,
unsupported_set_mode: self.unsupported_set_mode,
migration_mode: self.migration_mode,
query_max_failure_duration: Duration::new(self.query_max_failure_seconds, 0),
query_log_ad_hoc_queries: self.query_log_ad_hoc_queries,
fallback_recovery_duration: Duration::new(self.fallback_recovery_seconds, 0),
enable_experimental_placeholder_inlining: self
.enable_experimental_placeholder_inlining,
},
telemetry_sender: self.telemetry_sender,
authority,
_query_handler: PhantomData,
}
}
pub fn slowlog(mut self, slowlog: bool) -> Self {
self.slowlog = slowlog;
self
}
pub fn dialect(mut self, dialect: Dialect) -> Self {
self.dialect = dialect;
self
}
pub fn query_log(
mut self,
query_log_sender: Option<UnboundedSender<QueryExecutionEvent>>,
ad_hoc_queries: bool,
) -> Self {
self.query_log_sender = query_log_sender;
self.query_log_ad_hoc_queries = ad_hoc_queries;
self
}
pub fn users(mut self, users: HashMap<String, String>) -> Self {
self.users = users;
self
}
pub fn require_authentication(mut self, require_authentication: bool) -> Self {
self.require_authentication = require_authentication;
self
}
/// Specifies whether RYW consistency should be enabled. If true, RYW consistency
/// constraints will be enforced on all reads.
pub fn enable_ryw(mut self, enable_ryw: bool) -> Self {
if enable_ryw {
// initialize with an empty timestamp, which will be satisfied by any data version
self.ticket = Some(Timestamp::default());
self.timestamp_client = Some(TimestampClient::default())
}
self
}
pub fn unsupported_set_mode(mut self, unsupported_set_mode: UnsupportedSetMode) -> Self {
self.unsupported_set_mode = unsupported_set_mode;
self
}
pub fn migration_mode(mut self, q: MigrationMode) -> Self {
self.migration_mode = q;
self
}
pub fn query_max_failure_seconds(mut self, secs: u64) -> Self {
self.query_max_failure_seconds = secs;
self
}
pub fn fallback_recovery_seconds(mut self, secs: u64) -> Self {
self.fallback_recovery_seconds = secs;
self
}
pub fn telemetry_sender(mut self, telemetry_sender: TelemetrySender) -> Self {
self.telemetry_sender = Some(telemetry_sender);
self
}
pub fn enable_experimental_placeholder_inlining(
mut self,
enable_experimental_placeholder_inlining: bool,
) -> Self {
self.enable_experimental_placeholder_inlining = enable_experimental_placeholder_inlining;
self
}
}
/// A [`CachedPreparedStatement`] stores the data needed for an immediate
/// execution of a prepared statement on either noria or the upstream
/// connection.
struct CachedPreparedStatement<DB>
where
DB: UpstreamDatabase,
{
/// Indicates if the statement was prepared for ReadySet, Fallback, or Both
prep: PrepareResult<DB>,
/// The current ReadySet migration state
migration_state: MigrationState,
/// Indicates whether the prepared statement was already migrated manually with the optional
/// ALWAYS flag, such as a CREATE CACHE ALWAYS FROM command.
/// This is imperfect, but leans on performance over correctness. It requires a user to
/// re-prepare queries if they decide to change between ALWAYS and not ALWAYS.
always: bool,
/// Holds information about if executes have been succeeding, or failing, along with a state
/// transition timestamp. None if prepared statement has never been executed.
execution_info: Option<ExecutionInfo>,
/// If query was successfully parsed, will store the parsed query
parsed_query: Option<Arc<SqlQuery>>,
/// If was able to hash the query, will store the generated hash
query_id: Option<QueryId>,
/// If statement was successfully rewritten, will store all information necessary to install
/// the view in readyset
view_request: Option<ViewCreateRequest>,
}
impl<DB> CachedPreparedStatement<DB>
where
DB: UpstreamDatabase,
{
/// Returns whether we are currently in fallback recovery mode for the given prepared statement
/// we are attempting to execute.
/// WARNING: This will also mutate execution info timestamp if we have exceeded the supplied
/// recovery period.
pub(crate) fn in_fallback_recovery(
&mut self,
query_max_failure_duration: Duration,
fallback_recovery_duration: Duration,
) -> bool {
if let Some(info) = self.execution_info.as_mut() {
info.reset_if_exceeded_recovery(query_max_failure_duration, fallback_recovery_duration);
info.execute_network_failure_exceeded(query_max_failure_duration)
} else {
false
}
}
pub(crate) fn is_unsupported_execute(&self) -> bool {
if let Some(info) = self.execution_info.as_ref() {
matches!(info.state, ExecutionState::Unsupported)
} else {
false
}
}
/// Get a reference to the `ViewRequest` or return an error
fn as_view_request(&self) -> ReadySetResult<&ViewCreateRequest> {
self.view_request
.as_ref()
.ok_or_else(|| internal_err!("Expected ViewRequest for CachedPreparedStatement"))
}
}
pub struct Backend<DB, Handler>
where
DB: UpstreamDatabase,
{
/// ReadySet connector used for reads, and writes when no upstream DB is present
noria: NoriaConnector,
/// Optional connector to the upstream DB. Used for fallback reads and all writes if it exists
upstream: Option<DB>,
/// Map from username to password for all users allowed to connect to the db
pub users: HashMap<String, String>,
query_log_sender: Option<UnboundedSender<QueryExecutionEvent>>,
/// Information regarding the last query sent over this connection. If None, then no queries
/// have been handled using this connection (Backend) yet.
last_query: Option<QueryInfo>,
/// Encapsulates the inner state of this [`Backend`]
state: BackendState<DB>,
/// The settings with which the [`Backend`] was started
settings: BackendSettings,
/// Provides the ability to send [`TelemetryEvent`]s to Segment
telemetry_sender: Option<TelemetrySender>,
/// Handle to the Authority. A handle is also stored in Self::noria where it is used to find
/// the Controller.
authority: Arc<Authority>,
_query_handler: PhantomData<Handler>,
}
/// Variables that keep track of the [`Backend`] state
struct BackendState<DB>
where
DB: UpstreamDatabase,
{
proxy_state: ProxyState,
/// A cache of queries that we've seen, and their current state, used for processing
query_status_cache: &'static QueryStatusCache,
// a cache of all previously parsed queries
parsed_query_cache: HashMap<String, SqlQuery>,
// all queries previously prepared on noria or upstream, mapped by their ID.
prepared_statements: Vec<CachedPreparedStatement<DB>>,
/// Current RYW ticket. `None` if RYW is not enabled. This `ticket` will
/// be updated as the client makes writes so as to be an accurate low watermark timestamp
/// required to make RYW-consistent reads. On reads, the client will pass in this ticket to be
/// checked by noria view nodes.
ticket: Option<Timestamp>,
/// `timestamp_client` is the Backends connection to the TimestampService. The TimestampService
/// is responsible for creating accurate RYW timestamps/tickets based on writes made by the
/// Backend client.
timestamp_client: Option<TimestampClient>,
}
/// Settings that have no state and are constant for a given [`Backend`]
struct BackendSettings {
/// SQL dialect to use when parsing queries from clients
dialect: Dialect,
slowlog: bool,
require_authentication: bool,
/// Whether to log ad-hoc queries by full query text in the query logger.
query_log_ad_hoc_queries: bool,
/// How to behave when receiving unsupported `SET` statements
unsupported_set_mode: UnsupportedSetMode,
/// How this backend handles migrations, See MigrationMode.
migration_mode: MigrationMode,
/// The maximum duration that a query can continuously fail for before we enter into a recovery
/// period.
query_max_failure_duration: Duration,
/// The recovery period that we enter into for a given query, when that query has
/// repeatedly failed for query_max_failure_duration.
fallback_recovery_duration: Duration,
/// Whether to automatically create inlined migrations for queries with unsupported
/// placeholders.
enable_experimental_placeholder_inlining: bool,
}
/// QueryInfo holds information regarding the last query that was sent along this connection
/// (Backend).
#[derive(Debug, Default)]
pub struct QueryInfo {
pub destination: QueryDestination,
pub noria_error: String,
}
impl FromRow for QueryInfo {
fn from_row_opt(row: mysql_common::row::Row) -> Result<Self, FromRowError> {
let mut res = QueryInfo::default();
// Parse each column into it's respective QueryInfo field.
for (i, c) in row.columns_ref().iter().enumerate() {
if let mysql_common::value::Value::Bytes(d) = row.as_ref(i).unwrap() {
let dest = std::str::from_utf8(d).map_err(|_| FromRowError(row.clone()))?;
if c.name_str() == "Query_destination" {
res.destination =
QueryDestination::try_from(dest).map_err(|_| FromRowError(row.clone()))?;
} else if c.name_str() == "ReadySet_error" {
res.noria_error = std::str::from_utf8(d)
.map_err(|_| FromRowError(row.clone()))?
.to_string();
} else {
return Err(FromRowError(row.clone()));
}
}
}
Ok(res)
}
}
/// How to handle a migration in the adapter.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum MigrationMode {
/// Handle migrations as part of the query process, if a query has not been
/// successfully migrated when we are processing the query, attempt to
/// perform the migration as part of the query.
InRequestPath,
/// Never perform migrations in the query path. If a query has not been
/// migrated yet, send it to fallback if fallback exists, otherwise reject
/// the query.
///
/// This mode is used when some other operation is performing the
/// migrations and updating a query's migration status. Either
/// --query-caching=async which runs migrations in a separate thread,
/// or --query-caching=explicit which enables special syntax to perform
/// migrations "CREATE CACHE ..." may be used.
OutOfBand,
}
#[derive(Debug, Clone)]
pub struct SelectSchema<'a> {
pub schema: Cow<'a, [ColumnSchema]>,
pub columns: Cow<'a, [SqlIdentifier]>,
}
impl<'a> SelectSchema<'a> {
pub fn into_owned(self) -> SelectSchema<'static> {
SelectSchema {
schema: Cow::Owned(self.schema.into_owned()),
columns: Cow::Owned(self.columns.into_owned()),
}
}
}
/// Adapter clients need only one of the prepare results returned from prepare().
/// PrepareResult provides noria_biased() and upstream_biased() to get
/// the single relevant prepare result from `PrepareResult` which may return
/// PrepareResult::Both.
pub enum SinglePrepareResult<'a, DB: UpstreamDatabase> {
Noria(&'a noria_connector::PrepareResult),
Upstream(&'a UpstreamPrepare<DB>),
}
/// The type returned when a query is prepared by `Backend` through the `prepare` function.
pub enum PrepareResult<DB: UpstreamDatabase> {
Noria(noria_connector::PrepareResult),
Upstream(UpstreamPrepare<DB>),
Both(noria_connector::PrepareResult, UpstreamPrepare<DB>),
}
impl<DB: UpstreamDatabase> Debug for PrepareResult<DB> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::Noria(r) => f.debug_tuple("Noria").field(r).finish(),
Self::Upstream(r) => f.debug_tuple("Upstream").field(r).finish(),
Self::Both(nr, ur) => f.debug_tuple("Both").field(nr).field(ur).finish(),
}
}
}
// Sadly rustc is very confused when trying to derive Clone for UpstreamPrepare, so have to do it
// manually
impl<DB: UpstreamDatabase> Clone for PrepareResult<DB> {
fn clone(&self) -> Self {
match self {
PrepareResult::Noria(n) => PrepareResult::Noria(n.clone()),
PrepareResult::Upstream(u) => PrepareResult::Upstream(u.clone()),
PrepareResult::Both(n, u) => PrepareResult::Both(n.clone(), u.clone()),
}
}
}
impl<DB: UpstreamDatabase> PrepareResult<DB> {
pub fn noria_biased(&self) -> SinglePrepareResult<'_, DB> {
match self {
Self::Noria(res) | Self::Both(res, _) => SinglePrepareResult::Noria(res),
Self::Upstream(res) => SinglePrepareResult::Upstream(res),
}
}
pub fn upstream_biased(&self) -> SinglePrepareResult<'_, DB> {
match self {
Self::Upstream(res) | Self::Both(_, res) => SinglePrepareResult::Upstream(res),
Self::Noria(res) => SinglePrepareResult::Noria(res),
}
}
/// If this [`PrepareResult`] is a [`PrepareResult::Both`], convert it into only a
/// [`PrepareResult::Upstream`]
pub fn make_upstream_only(&mut self) {
match self {
Self::Noria(_) => {}
Self::Upstream(_) => {}
Self::Both(_, u) => *self = Self::Upstream(u.clone()),
}
}
}
/// The type returned when a query is carried out by `Backend`, through either the `query` or
/// `execute` functions.
pub enum QueryResult<'a, DB: UpstreamDatabase>
where
DB: 'a,
{
/// Results from noria
Noria(noria_connector::QueryResult<'a>),
/// Results from upstream
Upstream(DB::QueryResult<'a>),
}
impl<'a, DB: UpstreamDatabase> From<noria_connector::QueryResult<'a>> for QueryResult<'a, DB> {
fn from(r: noria_connector::QueryResult<'a>) -> Self {
Self::Noria(r)
}
}
impl<'a, DB> Debug for QueryResult<'a, DB>
where
DB: UpstreamDatabase,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::Noria(r) => f.debug_tuple("Noria").field(r).finish(),
Self::Upstream(r) => f.debug_tuple("Upstream").field(r).finish(),
}
}
}
/// TODO: The ideal approach for query handling is as follows:
/// 1. If we know we can't support a query, send it to fallback.
/// 2. If we think we can support a query, try to send it to ReadySet. If that hits an error that
/// should be retried, retry. If not, try fallback without dropping the connection inbetween.
/// 3. If that fails and we got a MySQL error code, send that back to the client and keep the
/// connection open. This is a real correctness bug. 4. If we got another kind of error that is
/// retryable from fallback, retry. 5. If we got a non-retry related error that's not a MySQL error
/// code already, convert it to the most appropriate MySQL error code and write that back to the
/// caller without dropping the connection.
impl<DB, Handler> Backend<DB, Handler>
where
DB: 'static + UpstreamDatabase,
Handler: 'static + QueryHandler,
{
pub fn version(&self) -> String {
self.upstream
.as_ref()
.map(|upstream| upstream.version())
.unwrap_or_else(|| DB::DEFAULT_DB_VERSION.to_string())
}
/// The identifier of the last prepared statement (which is always the last in the vector)
pub fn last_prepared_id(&self) -> u32 {
(self.state.prepared_statements.len() - 1)
.try_into()
.expect("Too many prepared statements")
}
/// The identifier we can reserve for the next prepared statement
pub fn next_prepared_id(&self) -> u32 {
(self.state.prepared_statements.len())
.try_into()
.expect("Too many prepared statements")
}
/// Switch the active database for this backend to the given named database.
///
/// Internally, this will set the schema search path to a single-element vector with the
/// database, and send a `USE` command to the upstream, if any.
pub async fn set_database(&mut self, db: &str) -> Result<(), DB::Error> {
if let Some(upstream) = &mut self.upstream {
upstream
.query(
&UseStatement {
database: db.into(),
}
.to_string(),
)
.await?;
}
self.noria.set_schema_search_path(vec![db.into()]);
Ok(())
}
/// Executes query on the upstream database, for when it cannot be parsed or executed by noria.
/// Returns the query result, or an error if fallback is not configured
#[instrument(skip_all)]
pub async fn query_fallback<'a>(
upstream: Option<&'a mut DB>,
query: &'a str,
event: &mut QueryExecutionEvent,
) -> Result<QueryResult<'a, DB>, DB::Error> {
let upstream = upstream.ok_or_else(|| {
ReadySetError::Internal("This case requires an upstream connector".to_string())
})?;
let _t = event.start_upstream_timer();
let result = upstream.query(query).await;
drop(_t);
event.destination = Some(match &result {
Ok(qr) => qr.destination(),
Err(_) => QueryDestination::Upstream,
});
result.map(QueryResult::Upstream)
}
/// Prepares query on the mysql_backend, if present, when it cannot be parsed or prepared by
/// noria.
pub async fn prepare_fallback(
&mut self,
query: &str,
data: DB::PrepareData<'_>,
) -> Result<UpstreamPrepare<DB>, DB::Error> {
let upstream = self.upstream.as_mut().ok_or_else(|| {
ReadySetError::Internal("This case requires an upstream connector".to_string())
})?;
upstream.prepare(query, data).await
}
/// Prepares query against ReadySet. If an upstream database exists, the prepare is mirrored to
/// the upstream database.
///
/// This function may perform a migration, and update a queries migration state, if
/// InRequestPath mode is enabled or of not upstream is set
async fn mirror_prepare(
&mut self,
select_meta: &PrepareSelectMeta,
query: &str,
data: DB::PrepareData<'_>,
event: &mut QueryExecutionEvent,
) -> Result<PrepareResult<DB>, DB::Error> {
let prep_idx = self.next_prepared_id();
let do_noria = select_meta.should_do_noria;
let do_migrate = select_meta.must_migrate;
let up_prep: OptionFuture<_> = self
.upstream
.as_mut()
.map(|u| u.prepare(query, data))
.into();
let noria_prep: OptionFuture<_> = do_noria
.then_some(self.noria.prepare_select(
select_meta.stmt.clone(),
prep_idx,
do_migrate,
None,
))
.into();
let (upstream_res, noria_res) = future::join(up_prep, noria_prep).await;
let destination = match (upstream_res.is_some(), noria_res.is_some()) {
(true, true) => Some(QueryDestination::Both),
(false, true) => Some(QueryDestination::Readyset),
(true, false) => Some(QueryDestination::Upstream),
(false, false) => None,
};
self.last_query = destination.map(|d| QueryInfo {
destination: d,
noria_error: String::new(),
});
// Update noria migration state for query
match &noria_res {
Some(Ok(noria_connector::PrepareResult::Select(_))) => {
self.state.query_status_cache.update_query_migration_state(
&ViewCreateRequest::new(
select_meta.rewritten.clone(),
self.noria.schema_search_path().to_owned(),
),
MigrationState::Successful,
);
}
Some(Err(e)) => {
if e.caused_by_view_not_found() {
warn!(error = %e, "View not found during mirror_prepare()");
self.state.query_status_cache.view_not_found_for_query(
&ViewCreateRequest::new(
select_meta.rewritten.clone(),
self.noria.schema_search_path().to_owned(),
),
);
} else if e.caused_by_unsupported() {
self.state.query_status_cache.update_query_migration_state(
&ViewCreateRequest::new(
select_meta.rewritten.clone(),
self.noria.schema_search_path().to_owned(),
),
MigrationState::Unsupported,
);
} else {
error!(
error = %e,
"Error received from noria during mirror_prepare()"
);
}
event.set_noria_error(e);
}
None => {}
_ => internal!("Can only return SELECT result or error"),
}
let prep_result = match (upstream_res, noria_res) {
(Some(upstream_res), Some(Ok(noria_res))) => {
PrepareResult::Both(noria_res, upstream_res?)
}
(None, Some(Ok(noria_res))) => {
if let noria_connector::PrepareResult::Select(SelectPrepareResult::NoSchema(_)) =
noria_res
{
// We fail when attempting to borrow a cache without an upstream here in case
// the connection to the upstream is temporarily down.
internal!(
"Cannot create PrepareResult for borrowed cache without an upstream result"
);
}
PrepareResult::Noria(noria_res)
}
(None, Some(Err(noria_err))) => return Err(noria_err.into()),
(Some(upstream_res), _) => PrepareResult::Upstream(upstream_res?),
(None, None) => return Err(ReadySetError::Unsupported(query.to_string()).into()),
};
Ok(prep_result)
}
/// Prepares Insert, Delete, and Update statements
async fn prepare_write(
&mut self,
query: &str,
stmt: &SqlQuery,
data: DB::PrepareData<'_>,
event: &mut QueryExecutionEvent,
) -> Result<PrepareResult<DB>, DB::Error> {
let prep_idx = self.next_prepared_id();
event.sql_type = SqlQueryType::Write;
if let Some(ref mut upstream) = self.upstream {
let _t = event.start_upstream_timer();
let res = upstream
.prepare(query, data)
.await
.map(PrepareResult::Upstream);
self.last_query = Some(QueryInfo {
destination: QueryDestination::Upstream,
noria_error: String::new(),
});
res
} else {
let _t = event.start_noria_timer();
let res = match stmt {
SqlQuery::Insert(stmt) => self.noria.prepare_insert(stmt.clone(), prep_idx).await?,
SqlQuery::Delete(stmt) => self.noria.prepare_delete(stmt.clone(), prep_idx).await?,
SqlQuery::Update(stmt) => self.noria.prepare_update(stmt.clone(), prep_idx).await?,
// prepare_write does not support other statements
_ => internal!(),
};
self.last_query = Some(QueryInfo {
destination: QueryDestination::Readyset,
noria_error: String::new(),
});
Ok(PrepareResult::Noria(res))
}
}
/// Provides metadata required to prepare a select query
fn plan_prepare_select(&mut self, stmt: nom_sql::SelectStatement) -> PrepareMeta {
match self.rewrite_select_and_check_readyset(&stmt) {
Ok((rewritten, should_do_readyset)) => {
let status = self
.state
.query_status_cache
.query_status(&ViewCreateRequest::new(
rewritten.clone(),
self.noria.schema_search_path().to_owned(),
));
if self.state.proxy_state == ProxyState::ProxyAlways && !status.always {
PrepareMeta::Proxy
} else {
PrepareMeta::Select(PrepareSelectMeta {
stmt,
rewritten,
should_do_noria: should_do_readyset,
// For select statements only InRequestPath should trigger migrations
// synchronously, or if no upstream is present.
must_migrate: self.settings.migration_mode == MigrationMode::InRequestPath
|| !self.has_fallback(),
always: status.always,
})
}
}
Err(e) => {
warn!(
// FIXME(ENG-2499): Use correct dialect.
statement = %Sensitive(&stmt.display(nom_sql::Dialect::MySQL)),
"This statement could not be rewritten by ReadySet"
);
PrepareMeta::FailedToRewrite(e)
}
}
}
/// Rewrites the provided select, and checks if the select statement should be
/// handled by readyset. If so, the second tuple member will be true. If the select should be
/// handled by upstream, the second tuple member will be false.
///
/// If the rewrite fails, the option will be None.