-
Notifications
You must be signed in to change notification settings - Fork 2
/
LDM-135.tex
2242 lines (1925 loc) · 116 KB
/
LDM-135.tex
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
\documentclass[DM,toc]{lsstdoc}
% lsstdoc documentation: https://lsst-texmf.lsst.io/lsstdoc.html
% Package imports go here.
% Local commands go here.
% To add a short-form title:
% \title[Short title]{Title}
\title{Data Management Database Design}
% Optional subtitle
% \setDocSubtitle{A subtitle}
\author{%
Jacek Becla,
Daniel Wang,
Serge Monkewitz,
K-T Lim,
John Gates,
Andy Salnikov,
Andrew Hanushevsky,
Douglas Smith,
Bill Chickering,
Michael Kelsey,
and
Fritz Mueller
}
\setDocRef{LDM-135}
\date{2017-07-07}
\setDocUpstreamLocation{\url{https://github.com/lsst/LDM-135}}
% Optional: name of the document's curator
\setDocCurator{Fritz Mueller}
\setDocAbstract{%
This document discusses the LSST database system architecture.
}
% Change history defined here.
% Order: oldest first.
% Fields: VERSION, DATE, DESCRIPTION, OWNER NAME.
% See LPM-51 for version number policy.
\setDocChangeRecord{%
\addtohist{1.0}{2009-06-15}{Initial version.}{Jacek Becla}
\addtohist{2.0}{2011-07-12}{Most sections rewritten, added scalability test section}{Jacek Becla}
\addtohist{2.1}{2011-08-12}{Refreshed future-plans and schedule of testing sections, added section about fault tolerance.}{Jacek Becla, Daniel Wang}
\addtohist{3.0}{2013-08-02}{Synchronized with latest changes to the requirements \citedsp{LSE-163}. Rewrote most of the “Implementation” chapter. Documented new tests, refreshed all other chapters.}{Jacek Becla, Daniel Wang, Serge Monkewitz, Kian-Tat Lim, Douglas Smith, Bill Chickering}
\addtohist{3.1}{2013-10-10}{Refreshed numbers based on latest \citeds{LDM-141}. Updated shared scans (implementation) and 300-node test sections, added section about shared scans demonstration}{Jacek Becla, Daniel Wang}
\addtohist{3.2}{2013-10-10}{TCT approved}{R Allsman}
\addtohist{}{2016-07-18}{Update with async query, shared scan, secondary index, XRootD, metadata service information.}{John Gates, Andy Salnikov, Andrew Hanushevsky, Michael Kelsey, Fritz Mueller}
\addtohist{}{2017-07-05}{Move historical investigations to separate documents: \citeds{DMTN-046,DMTN-047,DMTN-048,DMTR-21,DMTR-12}}{T.~Jenness}
\addtohist{4.0}{2017-07-07}{Bring up to date with current status, condense requirements section, re-order sections for improved readability.}{Fritz~Mueller}
}
\begin{document}
% Create the title page.
% Table of contents is added automatically with the "toc" class option.
\maketitle
\section{Executive Summary}\label{executive-summary}
Two facets of LSST database architecture and their motivating requirements
are discussed: database architecture in support of real time Alert Production,
and database architecture in support of user query access to catalog data.
Following this, Qserv is explored in depth as an implementation of the query
access architecture.
The LSST baseline database architecture for real time Alert
Production relies on horizontal time-based partitioning. To guarantee
reproducibility, no-overwrite-update techniques combined with
maintaining validity time for appropriate rows are employed. Two
database replicas are maintained to isolate live production catalogs
from user queries; the replicas are synchronized in real time using
native database replication.
The LSST baseline database architecture for user query access to catalog data
is an MPP (massively parallel processing) relational database composed of a
single-node non-parallel DBMS, a distributed communications layer, and a
master controller, all running on a shared-nothing cluster of commodity
servers with locally attached spinning disk drives, capable of incremental
scaling and recovering from hardware failures without disrupting running
queries. All large catalogs are spatially partitioned horizontally into
materialized \emph{chunks}, and the remaining catalogs are replicated on each
server; the chunks are distributed across all nodes. The Object catalog is
further partitioned into \emph{sub-chunks} with overlaps, materialized on-the-
fly when needed. Chunking is handled automatically without exposure to users.
Selected tables are also partitioned vertically to maximize performance of
most common analysis. The system uses a few critical indexes to speed up
spatial searches, time series analysis, and simple but interactive queries.
Shared scans are used to answer all but the interactive queries. Such an
architecture is primarily driven by the variety and complexity of anticipated
queries, ranging from single object lookups to complex \(O(n^2)\) full-sky
correlations over billions of elements.
A prototype implementation of the baseline architecture for user query access
as described above, \emph{Qserv}, was developed during the R\&D phase of LSST,
and its feasibility was demonstrated in early testing. Productization was
subsequently planned and resourced for the construction phase of LSST and is
presently underway.
Qserv leverages two mature, open-source technologies as major components of
its design: MySQL as a SQL execution engine (though any alternative SQL engine
could be substituted without undue effort if need be), and
XRootD\footnote{\url{http://xrootd.org}} \citep{Dorigo:2005:XRootd} to provide
a distributed-system framework for fault-tolerant, elastic, content-addressed
messaging.
We currently maintain three running instances of Qserv at a scale of
approximately 30 physical nodes each in continuous operation on dedicated
hardware clusters at NCSA and CC-IN2P3. The system has been demonstrated to
correctly distribute and execute both low and high volume queries, including
small-area cone and object searches, region restricted queries, and full table
scans including large-area near-neighbor searches. Analytic queries involving
SciSQL UDFs in both filter and aggregation clauses have also been
demonstrated. Scale testing has been successfully conducted on the
above-mentioned clusters with datasets of up to approximately 70 TB, and we
expect to cross the 100 TB mark as tests continue in 2017. To date the system
is on track through a series of graduated data-challenge style tests to meet
or exceed the stated performance requirements for the project.
If an equivalent open-source, community supported, off-the-shelf database
system were to become available in time, it could present significant support
cost advantages over a production-ready Qserv. The largest barrier preventing
us from using an off-the-shelf system is lack of sufficient spherical geometry
and spherical partitioning support.
To increase the chances such a system will become reality in the next
few years, we closely collaborate with the MonetDB open
source columnar database team -- a successful demonstration of Qserv
based on MonetDB instead of MySQL was done in 2012. Further, to stay
current with the state-of-the-art in petascale data management and
analysis, we continue a dialog with all relevant solution providers,
both DBMS and Map/Reduce, as well as with data-intensive users, both
industrial and scientific, through the XLDB\footnote{\url{https://xldb.org}}
conference series we lead, and beyond.
\section{Introduction}\label{introduction}
This document discusses the LSST database system architecture in general,
and an implementation of part of that architecture (Qserv) in particular.
\secref{requirements} summarizes LSST database-related requirements that
motivate the architecture. \secref{baseline-architecture} discusses the
baseline architecture itself. \secref{implementation} discusses Qserv as an
implementation of the baseline architecture for user query access.
\secref{risk-analysis} covers attendant risk analysis. For some additional
background, \citeds{DMTN-046} covers in-depth analysis of off-the-shelf
potential solutions (Map/Reduce and RDBMS) as of 2013, and \citeds{DMTR-21}
and \citeds{DMTR-12} describe large scale Qserv tests from 2013. The full
Qserv test specification is described in \citeds{LDM-552}. \citeds{DMTN-048}
discusses the original design trade-offs and decision process, including small
scale tests that were run and some Qserv demonstrations.
\section{Requirements}\label{requirements}
Formal DM database requirements are called out in \citeds{LDM-555}.
For purposes of exposition, this section summarizes some of the
key requirements which drive the LSST database architecture.
\subsection{General Requirements}\label{general-requirements}
\textbf{Incremental scaling}. The system must scale to tens of petabytes and
trillions of rows. It must grow as the data grows and as the access
requirements grow. New technologies that become available during the life of
the system must be able to be incorporated easily. For quantitative storage,
disk and network bandwidth and I/O analyses, see \citeds{LDM-141}.
\textbf{Reliability}. The system must not lose data, and it must provide
at least 98\% up time in the face of hardware failures, software
failures, system maintenance, and upgrades.
\textbf{Low cost}. It is essential to not overrun the allocated budget,
thus a cost-effective, preferably open-source solution is strongly
preferred.
\subsection{Data Production Related Requirements}\label{data-production-related-requirements}
The LSST database catalogs will be generated by a small set of production
pipelines:
\begin{itemize}
\item
Data Release Production -- it produces all key catalogs. Ingest rates
are very modest, as DRP takes several months to complete and is
dominated by CPU-intensive application jobs. Ingest can be done
separately from pipeline processing, as a post-processing step.
\item
Nightly Alert Production -- it produces difference image sources, and
updates the DiaObject, SSObject, DiaSource, DiaForcedSource catalogs.
Since alerts need to be generated in under a minute after data has
been taken, data has to be ingested/updated in almost-real time. The
number of row updates/ingested is modest: \textasciitilde{}40K new
rows and updates occur every \textasciitilde{}39 sec \citep{2008ASPC..394..114B}.
\item
Calibration Pipeline -- it produces calibration information. Due to
small data volume and no stringent timing requirements, ingest
bandwidth needs are very modest.
\end{itemize}
In addition, the camera and telescope configuration is captured in the
Engineering \& Facility Database. Data volumes are very modest.
Further, the Level 1 live catalog will need to be updated with minimal
delay. This catalog should not be taken off-line for extended periods of
time.
The database system must allow for occasional schema changes for the
Level 1 data, and occasional changes that do not alter query
results\footnote{Example of non-altering changes including
adding/removing/resorting indexes, adding a new column with derived
information, changing type of a column without loosing information,
(e.g. \texttt{FLOAT} to \texttt{DOUBLE} would be always allowed,
DOUBLE to \texttt{FLOAT} would only be allowed if all values can be
expressed using \texttt{FLOAT} without loosing any information)} for
the Level 2 data after the data has been released. Schemas for different
data releases are allowed to be very different.
\subsection{Query Access Related Requirements}\label{query-access-related-requirements}
The Science Data Archive Data Release query load is defined primarily in
terms of access to the large catalogs in the archive: Object, Source,
and ForcedSource. Queries to image metadata, for example, though
numerous, are expected to be fast. In general the following are required:
\textbf{Reproducibility}. Queries executed on any Level 1 and Level 2
data products must be reproducible.
\textbf{Real time}. A large fraction of ad-hoc user access will involve
so called ``low-volume'' queries -- queries that touch small area of
sky, or request small number of objects. These queries are required to
be answered in a few seconds. On average, we expect to see
\textasciitilde{}100 such queries running at any given time.
\textbf{Fast turnaround}. High-volume queries -- queries that involve full-sky
scans are expected to be answered in approximately 1 hour, while more complex
full-sky spatial and temporal correlations are expected to be answered in
\textasciitilde{}8-12 hours. \textasciitilde{}50 simultaneous high-volume
queries are expected to be running at any given time.
\textbf{Cross-matching with external/user data}. Occasionally, LSST
database catalog will need to be cross-matched with external catalogs:
both large, such as SDSS, SKA, or Gaia, and small, such as small amateur
data sets. Users should be able to save results of their queries, and
access them during subsequent queries.
\textbf{Query complexity}. The system needs to handle complex queries,
including spatial correlations, time series comparisons. Spatial
correlations are required for the Object catalog only -- this is an
important observation, as this class of queries requires highly
specialized, 2-level partitioning with overlaps.
\textbf{Flexibility}. Sophisticated end users need to be able to access
all this data in a flexible way with as few constraints as possible.
Many end users will want to express queries directly in SQL, so most of
basic SQL92 will be required.
\subsection{Discussion}\label{discussion}
\subsubsection{Design Considerations}\label{design-consideration}
The above requirements have important implications on the LSST data
access architecture.
\begin{itemize}
\item
The system must allow rapid selection of small number of rows out of
multi-billion row tables. To achieve this, efficient data indexing in
both spatial and temporal dimensions is essential.
\item
The system must efficiently join multi-trillion with multi-billion row
tables. Denormalizing these tables to avoid common joins, such as
Object with Source or Object with ForcedSource, would be prohibitively
expensive.
\item
The system must provide high data bandwidth. In order to process
terabytes of data in minutes, data bandwidths on the order of tens to
hundreds of gigabytes per second are required.
\item
To achieve high bandwidths, to enable expandability, and to provide
fault tolerance, the system will need to run on a distributed cluster
composed of multiple machines.
\item
The most effective way to provide high-bandwidth access to large
amounts of data is to partition the data, allowing multiple machines
to work against distinct partitions. Data partitioning is also
important to speed up some operations on tables, such as index
building.
\item
Multiple machines and partitioned data in turn imply that at least the
largest queries will be executed in parallel, requiring the management
and synchronization of multiple tasks.
\item
Limited budget implies the system needs to get most out available
hardware, and scale it incrementally as needed. The system will be
disk I/O limited, and therefore we anticipate attaching multiple
queries to a single table scan (shared scans) will be a must.
\end{itemize}
\subsubsection{Query complexity and access
patterns}\label{query-complexity-and-access-patterns}
A compilation of representative queries provided by the LSST Science
Collaborations, the Science Council, and other surveys have been
captured \citep{CommonQueries}. These queries can be divided into
several distinct groups: analysis of a single object, analysis of
objects meeting certain criteria in a region or across entire sky,
analysis of objects close to other objects, analysis that require
special grouping, time series analysis and cross match with external
catalogs. They give hints as to the complexity required: these queries
include distance calculations, spatially localized self-joins, and time
series analysis.
Small queries are expected to exhibit substantial spatial locality
(refer to rows that contain similar spatial coordinates: right ascension
and declination). Some kinds of large queries are expected to exhibit a
slightly different form of spatial locality: joins will be among rows
that have nearby spatial coordinates. Spatial correlations will be
executed on the Object table; spatial correlations will \emph{not} be
needed on Source or ForcedSource tables.
Queries related to time series analysis are expected to need to look at
the history of observations for a given Object, so the appropriate
Source or ForcedSource rows must be easily joined and aggregate
functions operating over the list of Sources must be provided.
External data sets and user data, including results from past queries
may have to be distributed alongside distributed production table to
provide adequate join performance.
The query complexity has important implications on the overall
architecture of the entire system.
\section{Baseline Architecture}\label{baseline-architecture}
This section describes the most important aspects of the LSST baseline
database architecture. The choice of the architecture is driven by the project
requirements (see \citeds{LDM-555}) as well as cost, availability, and
maturity of the off-the-shelf solutions currently available on the market (see
\citeds{DMTN-046}), and design trade-offs (see \citeds{DMTN-048}). The
architecture is periodically revisited: we continuously monitor all relevant
technologies, and accordingly fine-tune the baseline architecture.
In summary:
\begin{itemize}
\item
The LSST baseline architecture for Alert Production is a (yet to be
selected) off-the-shelf RDBMS system which uses replication for fault
tolerance and which takes advantage of horizontal (time-based)
partitioning;
\item
The baseline architecture for user access to Data Releases is an MPP
(massively parallel processing) relational database running on a
shared-nothing cluster of commodity servers with locally attached
spinning disk drives; capable of (a) incremental scaling and (b)
recovering from hardware failures without disrupting running queries.
All large catalogs are spatially partitioned into materialized
\emph{chunks}, and the remaining catalogs are replicated on each server;
the chunks are distributed across all nodes. The Object catalog is
further partitioned into \emph{sub-chunks} with overlaps,\footnote{A
chunk's overlap is implicitly contained within the overlaps of its
edge sub-chunks.} materialized on-the-fly when needed. Shared scans
are used to answer all but low-volume user queries.
\end {itemize}
\subsection{Alert Production and Up-to-date Catalog}\label{alert-production-and-up-to-date-catalog}
Alert Production involves detection and measurement of difference-image-analysis
sources (DiaSources). New DiaSources are spatially matched against
the most recent versions of existing DiaObjects, which contain summary
properties for variable and transient objects (and false positives). Unmatched
DiaSources are used to create new DiaObjects. If a DiaObject has an associated
DiaSource that is no more than a month old, then a forced measurement
(DiaForcedSource) is taken at the position of that object, whether a
corresponding DiaSource was detected in the exposure or not.
The output of Alert Production consists mainly of three large catalogs --
DiaObject, DiaSource, and DiaForcedSource - as well as several smaller tables
that capture information about e.g. exposures, visits and provenance.
These catalogs will be modified live every night.
Note that existing DiaObjects are never overwritten. Instead, new versions of
the AP-produced and DRP-produced DiaObjects are inserted, allowing users to
retrieve (for example) the properties of DiaObjects as known to the pipeline
when alerts were issued against them. To enable historical queries, each
DiaObject row is tagged with a validity start and end time. The start time of
a new DiaObject version is set to the observation time of the DiaSource or
DiaForcedSource that led to its creation, and the end time is set to infinity.
If a prior version exists, then its validity end time is updated (in place) to
equal the start time of the new version. As a result, the most recent versions
of DiaObjects can always be retrieved with:
\begin{lstlisting}[language=SQL]
SELECT * FROM DiaObject WHERE validityEnd = infinity
\end{lstlisting}
Versions as of some time \emph{t} are retrievable via:
\begin{lstlisting}[language=SQL]
SELECT * FROM DiaObject WHERE validityStart <= t AND t < validityEnd
\end{lstlisting}
Note that a DiaSource can also be re-associated to a solar-system object
during day time processing. This will result in a new DiaObject version unless
the DiaObject no longer has any associated DiaSources. In that case, the
validity end time of the existing version is set to the time at which the re-association
occurred.
Once a DiaSource is associated with a solar system object, it is never
associated back to a DiaObject. Therefore, rather than also versioning
DiaSources, columns for the IDs of both the associated DiaObject and solar
system object, as well as a re-association time, are included. Re-association
will set the solar system object ID and re-association time, so that
DiaSources for DiaObject 123 at time \emph{t} can be obtained using:
\begin{lstlisting}[language=SQL]
SELECT *
FROM DiaSource
WHERE diaObjectId = 123
AND midPointTai <= t
AND (ssObjectId is NULL OR ssObjectReassocTime > t)
\end{lstlisting}
DiaForcedSources are never re-associated or updated in any way.
From the database point of view then, the alert production pipeline will
perform the following database operations 189 times (once per LSST CCD) per
visit (every 39 seconds):
\begin{enumerate}
\def\labelenumi{\arabic{enumi}.}
\item
Issue a point-in-region query against the DiaObject catalog, returning
the most recent versions of the objects falling inside the CCD.
\item
Use the IDs of these diaObjects to retrieve all associated diaSources
and diaForcedSources.
\item
Insert new diaSources and diaForcedSources.
\item
Update validity end times of diaObjects that will be superseded.
\item
Insert new versions of diaObjects.
\end{enumerate}
All spatial joins will be performed on in-memory data by pipeline code,
rather than in the database. While Alert Production does also involve a
spatial join against the Level 2 (DRP-produced) Object catalog, this
does not require any database interaction: Level 2 Objects are never
modified, so the Object columns required for spatial matching will be
dumped to compact binary files once per Data Release. These files will
be laid out in a way that allows for very fast region queries, allowing
the database to be bypassed entirely.
The DiaSource and DiaForcedSource tables will be split into two tables,
one for historical data and one containing records inserted during the
current night. The current-night tables will be small and rely on a
transactional engine like InnoDB, allowing for speedy recovery from
failures. The historical-data tables will use the faster
non-transactional MyISAM or Aria storage engine, and will also take
advantage of partitioning. The Data Release catalogs used to seed the
live catalogs will be stored in a single initial partition, sorted
spatially (using the Hierarchical Triangular Mesh trixel IDs for their
positions). This means that the diaSources and diaForcedSources for the
diaObjects in a CCD will be located close together on disk, minimizing
seeks. Every month of new data will be stored in a fresh partition,
again sorted spatially. Such partitions will grow to contain just a few
billion rows over the course of a month, even for the largest catalog.
At the end of each night, the contents of the current-night table are
sorted and appended to the partition for the current-month, then
emptied. Each month, the entire current-month partition is sorted
spatially (during the day), and a partition for the next month is
created.
For DiaObject, the same approach is used. However, DiaObject validity
end-time updates can occur in any partition, and are not confined to the
current-night table. We therefore expect to use a transactional storage
engine like InnoDB for all partitions. Because InnoDB clusters tables
using the primary key, we will likely declare it to consist of a leading
HTM ID column, followed by disambiguating columns (diaObjectId,
validityStart). The validity end time column will not be part of any
index.
No user queries will be allowed on the live production catalogs. We
expect to maintain a separate replica just for user queries,
synchronized in real time using one-way master-slave native database
replication. The catalogs for user queries will be structured
identically to the live catalogs, and views will be used to hide the
splits (using a ``\texttt{UNION\ ALL}'').
For additional safety, we might choose to replicate the small
current-night tables, all DiaObject partitions, and the remaining
(small) changing tables to another hot stand-by replica. In case of
disastrous master failure that cannot be fixed rapidly, the slave
serving user queries will be used as a temporary replacement, and user
queries will be disallowed until the problem is resolved.
Based on the science requirements, only short-running, relatively simple
user queries will be needed on the Level 1 catalogs. The most complex
queries, such as large-area near neighbor queries, will not be needed.
Instead, user queries will consist mainly of small-area cone searches,
light curve lookups, and historical versions of the same. Since the
catalogs are sorted spatially, we expect to be able to quickly answer
spatial queries using indexed HTM ID columns and the SciSQL UDFs, an
approach that has worked well in data-challenges to date. Furthermore,
note that the positions of diaSources/diaForcedSources associated with
the same diaObject will be very close together, so that sorting to
obtain good spatial locality also ends up placing sources belonging to
the same light curve close together. In other words, the data
organization used to provide fast pipeline query response is also
advantageous for user queries.
\subsection{Data Release Production}\label{data-release-production}
Data Release Production will involve the generation of significantly
larger catalogs than Alert Production. However, these are produced over
the course of several months, pipelines will not write directly to the
database, and there are no pipeline queries with very low-latency
execution time requirements to be satisfied. While we do expect several
pipeline-related full table scans over the course of a Data Release
Production, we will need to satisfy many user queries involving such
scans on a daily basis. User query access is therefore the primary
driver of our scalable database architecture, which is described in
detail below. For a description of the data loading process, please see
\secref{data-loading}.
\subsection{User Query Access}\label{user-query-access}
The user query access is the primary driver of the scalable database
architecture. Such architecture is described below.
\subsubsection{Distributed and parallel}\label{distributed-and-parallel}
The database architecture for user query access relies on a model of
distributing computation among autonomous worker nodes. Autonomous
workers have no direct knowledge of each other and can complete their
assigned work without data or management from their peers. This implies
that data must be partitioned, and the system must be capable of
dividing a single user query into sub-queries, and executing these
sub-queries in parallel -- running a high-volume query without
parallelizing it would take unacceptably long time, even if run on very
fast CPU. The parallelism and data distribution should be handled
automatically by the system and hidden from users.
\subsubsection{Shared-nothing}\label{shared-nothing}
Such architecture provides good foundation for incremental scaling and
fault recovery: because nodes have no direct knowledge of each other and
can complete their assigned work without data or management from their
peers, it is possible to add node to, or remove node from such system
with no (or with minimal) disruption. However, to achieve fault
tolerance and provide recover mechanisms, appropriate smarts have to be
build into the node management software.
\begin{figure}[H]
\centering
\includegraphics{_static/shared_nothing_arch.pdf}
\caption{Shared-nothing database architecture.}
\end{figure}
\subsubsection{Indexing}\label{indexing}
Disk I/O bandwidth is expected to be the greatest bottleneck. Data can
be accessed either through index, which typically translates to a random
access, or a scan, which translates to a sequential read (unless
multiple competing scans are involved).
Indexes dramatically speed up locating individual rows, and avoid
expensive full table scans. They are essential to answer low volume
queries quickly, and to do efficient table joins. Also, spatial indexes
are essential. However, unlike in traditional, small-scale systems, the
advantages of indexes become questionable when a larger number of rows
is to be selected from a table. In case of LSST, selecting even a 0.01\%
of a table might lead to selecting millions of rows. Since each fetch
through an index might turn into a disk seek, it is often cheaper to
read sequentially from disk than to seek for particular rows via index,
especially when the index itself is out-of-memory. For that reason the
architecture forgoes relying on heavy indexing, only a small number of
carefully selected indexes essential for answering low-volume queries,
enabling table joins, and speeding up spatial searches will be
maintained. For an analytical query system, it makes sense to make as
few assumptions as possible about what will be important to our users,
and to try and provide reasonable performance for as broad a query load
as possible, i.e. focus on scan throughput rather than optimizing
indexes. A further benefit to this approach is that many different
queries are likely to be able to share scan I/O, boosting system
throughput, whereas caching index lookup results is likely to provide
far fewer opportunities for sharing as the query count scales (for the
amounts of cache we can afford).
\subsubsection{Shared scanning}\label{shared-scanning}
Now with table-scanning being the norm rather than the exception and
each scan taking a significant amount of time, multiple full-scan
queries would randomize disk access if they each employed their own
full-scanning read from disk. Shared scanning (also called \emph{convoy
scheduling}) shares the I/O from each scan with multiple queries. The
table is read in pieces, and all concerning queries operate on that
piece while it is in memory. In this way, results from many full-scan
queries can be returned in little more than the time for a single
full-scan query. Shared scanning also lowers the cost of data
compression by amortizing the CPU cost among the sharing queries,
tilting the trade-off of increased CPU cost versus reduced I/O cost
heavily in favor of compression.
Shared scanning will be used for all high-volume and super-high volume
queries. Shared scanning is helpful for unpredictable, ad-hoc analysis,
where it prevents the extra load from increasing the disk I/O cost --
only more CPU is needed. On average we expect to continuously run the
following scans:
\begin{itemize}
\item
one full table scan of Object table for the latest data release only,
\item
one synchronized full table scan of Object, Source and ForcedSource
tables every 12 hours for the latest data release only,
\item
one synchronized full table scan of Object and Object\_Extra every 8
hours for the latest and previous data releases.
\end{itemize}
Appropriate Level 3 user tables will be scanned as part of each shared
scan as needed to answer any in-flight user queries.
Shared scans will take advantage of table chunking explained below. In
practice, within a single node a scan will involve fetching sequentially
a chunk of data at a time and executing on this chunk all queries in the
queue. The level of parallelism will depend on the number of available
cores.
Running multiple shared scans allows relatively fast response time for
Object-only queries, and supporting complex, multi-table joins:
synchronized scans are required for two-way joins between different
tables. For self-joins, a single shared scan will be sufficient,
however each node must have sufficient memory to hold 2 chunks at any
given time (the processed chunk and next chunk). Refer to the sizing
model \citedsp{LDM-141} for further details on the cost of shared scans.
Low-volume queries will be executed ad-hoc, interleaved with the shared
scans. Given the number of spinning disks is much larger than the number
of low-volume queries running at any given time, this will have very
limited impact on the sequential disk I/O of the scans, as shown in
\citeds{LDM-141}.
\subsubsection{Clustering}\label{clustering}
The data in the Object Catalog will be physically clustered on disk
spatially -- that means that objects collocated in space will be also
collocated on disk. All Source-type catalogs (Source, ForcedSource,
DiaSource, DiaForcedSource) will be clustered based on their
corresponding objectId -- this approach enforces spatial clustering and
collocates sources belonging to the same object, allowing sequential
read for queries that involve time series analysis.
SSObject catalog will be unpartitioned, because there is no obvious
fixed position that we could choose to use for partitioning. The
associated diaSources (which will be intermixed with diaSources
associated with static diaSources) will be partitioned, according to their
position. For that reason the SSObject-to-DiaSource join queries will
require index searches on all chunks, unlike DiaObject-to-DiaSource
queries. Since SSObject is small (low millions), this should not be an
issue.
\subsubsection{Partitioning}\label{partitioning}
Data must be partitioned among nodes in a shared-nothing architecture.
While some \emph{sharding} approaches partition data based on a hash of
the primary key, this approach is unusable for LSST data since it
eliminates optimizations based on celestial objects' spatial nature.
\paragraph{Sharded data and sharded
queries}\label{sharded-data-and-sharded-queries}
All catalogs that require spatial partitioning (Object, Source,
ForcedSource, DiaSource, DiaForcedSource) as well as all the auxiliary
tables associated with them, such as Object\_Extra, will be
divided into spatial partitions of roughly the same area by partitioning
then into \emph{declination} zones, and chunking each zone into
\emph{RA} stripes. Further, to be able to perform table joins without
expensive inter-node data transfers, partitioning boundaries for each
partitioned table must be aligned, and chunks of different tables
corresponding to the same area of sky must be co-located on the same
node. To ensure chunks are appropriately sized, the two largest
catalogs, Source and ForcedSource, are expected to be partitioned into
finer-grain chunks. Since objects occur at an approximately-constant
density throughout the celestial sphere, an equal-area partition should
spread a load that is uniformly distributed over the sky.
Smaller catalogs that can be partitioned spatially, such as Alert and
exposure metadata will be partitioned spatially. All remaining catalogs,
such provenance or SDQA tables will be replicated on each node. The size
of these catalogs is expected to be only a few terabytes.
With data in separate physical partitions, user queries are themselves
fragmented into separate physical queries to be executed on partitions.
Each physical query's result can be combined into a single final result.
\paragraph{Two-level partitions}\label{two-level-partitions}
Determining the size and number of data partitions may not be obvious.
Queries are fragmented according to partitions so an increasing number
of partitions increases the number of physical queries to be dispatched,
managed, and aggregated. Thus a greater number of partitions increases
the potential for parallelism but also increases the overhead. For a
data-intensive and bandwidth-limited query, a parallelization width
close to the number of disk spindles should minimize seeks and
maximize bandwidth and performance.
From a management perspective, more partitions facilitate re-balancing
data among nodes when nodes are added or removed. If the number of
partitions were equal to the number of nodes, then the addition of a new
node would require the data to be re-partitioned. On the other hand, if
there were many more partitions than nodes, then a set of partitions
could be assigned to the new node without re-computing partition
boundaries.
Smaller and more numerous partitions benefit spatial joins. In an
astronomical context, we are interested in objects near other objects,
and thus a full \(O(n^2)\) join is not required--a localized spatial
join is more appropriate. With spatial data split into smaller
partitions, an SQL engine computing the join need not even consider (and
reject) all possible pairs of objects, merely all the pairs within a
region. Thus a task that is \(O(n^2)\) naively becomes \(O(kn)\) where
\(k\) is the number of objects in a partition.
In consideration of these trade-offs, two-level partitioning seems to be
a conceptually simple way to blend the advantages of both extremes.
Queries can be fragmented in terms of coarse partitions (``chunks''),
and spatial near-neighbor joins can be executed over more fine
partitions (``sub-chunks'') within each partition. To avoid the overhead
of the sub-chunks for non-join queries, the system can store chunks and
generate sub-chunks on-demand for spatial join queries. On-the-fly
generation for joins is cost-effective due to the drastic reduction of
pairs, which is true as long as there are many sub-chunks for each
chunk.
\paragraph{Overlap}\label{overlap}
A strict partitioning eliminates nearby pairs where objects from
adjacent partitions are paired. To produce correct results under strict
partitioning, nodes need access to objects from outside partitions,
which means that data exchange is required. To avoid this, each
partition can be stored with a precomputed amount of overlapping data.
This overlapping data does not strictly belong to the partition but is
within a preset spatial distance from the partition's borders. Using
this data, spatial joins can be computed correctly within the preset
distance without needing data from other partitions that may be on other
nodes.
Overlap is needed only for the Object Catalog, as all spatial
correlations will be run on that catalog only. Guided by the experience
from other projects including SDSS, we expect to preset the overlap to
\textasciitilde{}1 arcmin, which results in duplicating approximately
30\% of the Object Catalog.
\paragraph{Spherical geometry}\label{spherical-geometry}
Support for spherical geometry is not common among databases and
spherical geometry-based partitioning was non-existent in other
solutions when we decided to develop Qserv. Since spherical geometry is
the norm in recording positions of celestial objects (right-ascension
and declination), any spatial partitioning scheme for astronomical
objects must account for its complexities.
\paragraph{Data immutability}\label{data-immutability}
It is important to note that user query access operates on read-only
data. Not having to deal with updates simplifies the architecture and
allows us to add extra optimizations not possible otherwise. The Level 1
data which is updated is small enough and will not require the scalable
architecture -- we plan to handle all Level 1 data set with out-of-the
box MySQL as described in \secref{alert-production-and-up-to-date-catalog}.
\subsubsection{Long-running queries}\label{long-running-queries}
Many of the typical user queries may need significant time to complete,
at the scale of hours. To avoid re-submission of those long-running
queries in case of various failures (networking or hardware issues) the
system will support asynchronous query execution mode. In this mode
users will submit queries using special options or syntax and the system
will dispatch a query and immediately return to user some identifier of
the submitted query without blocking user session. This query identifier
will be used by user to retrieve query processing status, query result
after query completes, or a partial query result while query is still
executing.
The system should be able to estimate the time which user query will
need to complete and refuse to run long queries in a regular blocking
mode.
\subsubsection{Technology choice}\label{technology-choice}
As explained in \citeds{DMTN-046}, no off-the-shelf solution meets the above
requirements today, and an RDBMS seems a much better fit than a Map/Reduce-based
system primarily due to features such as indexes, schema, and speed. For
that reason, our baseline architecture consists of \emph{custom} software
built on two production components: an open source, ``simple'', single-node,
non-parallel DBMS (MySQL) and XRootD. To ease potential future DBMS
migrations, the communication with the underlying DBMS relies on \emph{basic}
DBMS functionality only, and avoids vendor-specific features and additions.
\begin{figure}[H]
\centering
\includegraphics{_static/qserve_components.png}
\caption{Component connections in Qserv.}
\end{figure}
\section{Implementation (Qserv)}\label{implementation}
A prototype implementation of the baseline architecture for user query
access described above, \emph{Qserv}, was developed during the R\&D phase of
LSST, and its feasibility was demonstrated in early testing (\citeds{DMTR-21},
\citeds{DMTR-12}). Productization was subsequently planned and resourced for
the construction phase of LSST and is presently underway. The system as
currently implemented is described here.
\subsection{Components}\label{components}
\subsubsection{MySQL}\label{mysql}
To control the scope of effort, Qserv uses an existing SQL engine, MySQL, to
perform as much query processing as possible. MySQL is a good choice because
of its active development community, mature implementation, wide client
software support, simple installation, lightweight execution, and low data
overhead. MySQL's large development and user community means that expertise is
relatively common, which could be important during Qserv's development or
long-term maintenance in the years ahead. MySQL's MyISAM storage engine is
also lightweight and well-understood, giving predictable I/O access patterns
without an advanced storage layout that may demand more capacity, bandwidth,
and IOPS from a tightly constrained hardware budget.
It is worth noting, however, that Qserv's design and implementation do
not depend on specifics of MySQL beyond glue code facilitating results
transmission. Loose coupling is maintained in order to allow the system
to leverage a more advanced or more suitable database engine in the
future.
\subsubsection{XRootD}\label{xrootd}
The XRootD distributed file system is used to provide a distributed, data-addressed,
replicated, fault-tolerant communication facility for Qserv. Re-implementing
these features would have been non-trivial, so we wanted to
leverage an existing system. XRootD has provided scalability, fault-tolerance,
performance, and efficiency for over 10 years to the high-energy physics
community. Its relatively flexible API enabled its use in our application as
more of a general communication routing system. Since it was designed to
serve large data sets, we were confident that it could mediate not only query
dispatch communication, but also bulk transfer of results.
A XRootD cluster is implemented as a set of data servers and redirectors. A
client connects to a redirector, which acts as a caching namespace lookup
service that redirects clients to appropriate data servers. In Qserv, XRootD
data servers become Qserv workers by implementing plug-ins within the XRootD
framework which advertise partitioned data chunks as addressable resources
within the XRootD cluster. The Qserv master then dispatches work and receives
results as an XRootD client by dispatching messages to these resources.
\begin{figure}[H]
\centering
\includegraphics{_static/xrootd.png}
\caption{XRootD.}
\end{figure}
\subsection{Partitioning}\label{partitioning-1}
In Qserv, large spatial tables are fragmented into spatial pieces in a two-level
partitioning scheme. The partitioning space is a spherical space defined
by two angles $\phi$ (right ascension/$\alpha$) and $\theta$
(declination/$\delta$). For example, the Object table is fragmented spatially,
using a coordinate pair specified in two columns: right-ascension and
declination. On worker nodes, these fragments are represented as tables named
\emph{Object\_CC} and \emph{Object\_CC\_SS} where \emph{CC} is the ``chunk
id'' (first-level fragment) and \emph{SS} is the ``sub-chunk id'' (second-level
fragment of the first larger fragment. Sub-chunk tables are built on-the-fly
to optimize performance of spatial join queries. Large tables are
partitioned on the same spatial boundaries where possible to enable joining
between them.
\subsection{Query Generation}\label{query-generation}
Qserv is unusual (though not unique) in processing a user query into one or
more query fragments that are subsequently distributed to and executed by
off-the-shelf single-node RDBMS software. This is done in the hopes of
providing a distributed parallel query service while avoiding a full re-implementation
of common database features. However, we have found that it is
necessary to implement a query processing framework much like one found in a
more standard database, with the exception that the resulting query plans
contain SQL statements as the intermediate language.
A significant amount of query analysis not unlike a database query optimizer
is required in order to generate a distributed execution plan that accurately
and efficiently executes user queries. Incoming user queries are first parsed
into an intermediate representation using a modified SQL92-compliant grammar
(Lubos Vnuk's anltr-based SqlSQL2). The resulting query representation is
equivalent to the original user query, and does not include any stateful
interpretation, but may not completely reflect the original syntax. The
purpose of this representation is to provide a semantic representation that
may be operated upon by query analysis and transformation modules without the
complexity of a parse tree containing every node in the original EBNF grammar.
Once the intermediate representation has been created, it is processed by two
sequences of modules. The first sequence operates on the query as a single
statement. A transformation step occurs to split the single representation
into a ``plan'' involving multiple phases of execution, one to be executed
per-data-chunk, and one to be executed to combine the distributed results
into final user results. A second sequence is then applied on this plan to
apply the necessary transformations for an accurate result.
We have found that regular expressions and parse element handlers are
insufficient to analyze and manipulate queries for anything beyond the
most basic query syntax constructions.
\subsubsection{Processing modules}\label{processing-modules}
The processing modules perform most of the work in transforming the user
query into statements that can produce a faithful result from a Qserv
cluster. These include:
\begin{itemize}
\item
Identify spatial indexing opportunities. This allows Qserv to dispatch
spatially-restricted queries on only a subset of the available chunks
constituting a table. Spatial restrictions given in Qserv-specific
syntax are rewritten as boolean SQL clauses.
\item
Identify secondary index opportunities. Qserv databases designate one
column (more are under consideration) as a key column whose values
are guaranteed to exist in exactly one spatial location. Identification
allows Qserv to convert point queries on this column into spatial
restrictions.
\item
Identify table joins and generate syntax to perform distributed join
results. Qserv primarily supports ``near-neighbor'' spatial joins for
limited distances defined in the partitioning coordinate space.
Arbitrary joins between distributed tables are only supported using
the key column. Queries are classified according to data coverage and table
scanning. By identifying tables scanned in a query, Qserv is able to
mark queries for execution using shared scanning, which greatly
increases efficiency.
\end{itemize}
\subsubsection{Processing module
overview}\label{processing-module-overview}
\begin{figure}[H]
\centering
\includegraphics{_static/processing_modules.png}
\caption{Processing modules.}
\end{figure}
This figure illustrates the query preparation pipeline that generates
physical queries from an input query string. User query strings are
parsed (1) into a structured query representation that is passed through
a sequence of processing modules (2) that operate on that representation
in-place. Then, it is broken up (3) into pieces that are explicitly
intended for parallel execution on table partitions and pieces intended
to merge parallel results into user results. Another processing sequence
(4) operates on this new representation, and then finally, concrete
query strings are generated (5) for execution.
The two sequences of processing modules provide an extensible means to
implement query analysis and manipulation. Earlier prototypes performed
analysis and manipulation during parsing, but this led to a practically
unmaintainable code base and the functionality has been ported the
processing module model. Processing is split into two sequences to
provide the flexibility to modules that manipulate the physical
structures while offering the simpler single-query representation to
modules that do not require the complexity. The clear separation between
parsing, whose only goal is to provide a intelligible and modifiable
query representation, and the Qserv-specific analysis and manipulation
is a key factor in the overall flexibility, maintainability, and
extensibility of the system and should help the system adapt to current
and future LSST needs.
\subsection{Dispatch}\label{dispatch}
Qserv uses XRootD as a distributed, highly-available communications system to
allow Qserv frontends to communicate with data workers. Up until 2015, Qserv
used a synchronous client API with named files as communication channels. The
current baseline system utilizes a general two-way named-channeling system
which eliminates explicit file abstractions in favor of generalized protocol
messages that can be flexibly streamed. The scheme is called Scalable Service
Interface (SSI) and is built on top of XRootD.
\subsubsection{Wire protocol}\label{wire-protocol}
Qserv encodes query dispatches in Google Protobuf messages, which contain SQL
statements to be executed by the worker and annotations that describe query
dependencies and characteristics. Transmitting query characteristics allows
Qserv workers to optimize query execution under changing CPU and disk loads as
well as memory considerations. The worker need not re-analyze the query to
discover these characteristics or guess at conditions that cannot be
determined by query inspection.