Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[EPIC] Catalog refactor #23

Closed
8 of 14 tasks
dmetasoul01 opened this issue May 23, 2022 · 0 comments · Fixed by #45
Closed
8 of 14 tasks

[EPIC] Catalog refactor #23

dmetasoul01 opened this issue May 23, 2022 · 0 comments · Fixed by #45
Assignees
Labels
enhancement New feature or request epic

Comments

@dmetasoul01
Copy link
Contributor

dmetasoul01 commented May 23, 2022

Overview

An overall refactor on catalog implementation, to address some known issues and pave the way of future development:
The catalog interaction code is now tightly coupled with spark. We need a separate 'clean' implementation independent with compute engines. This enables further and easier integration with query engines like Flink/Presto, etc.

Problems

  1. users may not have Cassandra deployment or is not familiar with Cassandra;
  2. Some complex multiple partition/multiple table transaction scenarios are hard to implement based on Cassandra's LWT. We would like to extend catalog to support more DBs, like PostgresSQL with its dialect.
  3. More complex merge semantics and concurrent control semantics require more metadata in the catalog, and we need to encapsulate the details within independent catalog implementation.

Proposal

Design Goals

Catalog metadata management refers to the management of metadata such as all tables, schemas, partitions, data directories or files in a data warehouse. Similar to Hive MetaStore, metadata management needs to be connected with the Catalog interface of the computing engine to implement SQL query resolving for the LakeSoul table.
LakeSoul's metadata management hopes to achieve the following goals:

  • High performance and scalable. There are some performance problems in metadata management such as Hive. For example, when Hive queries partition, it needs to perform Join in MySQL for the two tables PARTITONS and PARTITION_KEY_VALS, there will be performance problems with thousands of partitions of one table.
    LakeSoul's metadata read and write operations can be searched through the primary key index, preventing full table scans.
  • Atomicity. The metadata atomicity of LakeSoul provides the atomicity of data commit operations, that is, partitioned micro-batch writing, which can ensure that the batch is fully submitted and the reader will not see the data in the intermediate state.
  • Consistency. LakeSoul implements tunable consistency through Cassandra, and the default is eventual consistency.
  • Isolation. LakeSoul realizes the separation of read and write versions through a multi-version mechanism, and can realize version backtracking based on timestamps (ie time travel).
  • Multiple concurrent writes. LakeSoul supports multiple concurrent writes, and determines whether data conflicts occur by detecting write conditions. For conflict-free writes (Append/Upsert), concurrent updates are allowed. For conflicting writes (Update), the retry logic is performed by the compute framework.
  • Other extension points. Metadata can also provide table-level special semantic control (CDC, SCD), file-level statistics, data distribution (Bucketing, Ordering)

Meta Operations

1. Data Format Definition

1. Table information. Store table name, path, Schema, Properties, partition name and type (Range, Hash)

CREATE TABLE table_info (
    table_id text,
    table_name text,
    table_path text,
    table_schema text,
    properties json, # entity corresponds to JSONObject
    partitions text,
    PRIMARY KEY (table_id)
)

CREATE TABLE table_name_id (
    table_name text,
    table_id text,
    PRIMARY KEY (table_name)
)

CREATE TABLE table_path_id (
    table_path text,
    table_id text,
    PRIMARY KEY (table_path)
)

PostgreSQL: Documentation: 14: Chapter 8. Data Types
PostgreSQL: Documentation: 14: 8.15. Arrays

2. File Information. File information stores file names and file operations (file_op), such as add (add), delete (delete)

3. Commit. A commit corresponds to a set of file information and records the type of the current commit (commit_op). Commit and file information are stored in a table

CREATE TYPE data_file_op AS (
    path text,
    file_op text,
    size bigint,
    file_exist_cols text
);

CREATE TABLE data_commit_info (
    table_id text,
    partition_desc text,
    commit_id UUID, # can use timestamp
    file_ops data_file_op[],
    commit_op text,
    timestamp bigint,
    PRIMARY KEY (table_id, partition_desc, commit_id)
)

PostgreSQL: Documentation: 14: 8.16. Composite Types
https://www.postgresql.org/docs/current/datatype-uuid.html

The types of commit_op include update, compaction, append, and merge. These four types are related to the semantic relationship of multiple commits in the snapshot

4. Snapshots.

A Snapshot contains a commit sequence, where the commit sequence is organized in chronological order, and each snapshot can restore the files that need to be read, ignored (deleted or invalidated after update), merged (merge on read), and the files between them. sequence relationship

5. Partition information.

Stores all historical snapshots in a partition, and a corresponding version number (used to control read-write isolation and multi-version, etc.). Among them, partition_desc is a combined description generated by multi-level partitions, which is used to uniquely locate a leaf partition of a table.

CREATE TABLE partition_info (
    table_id text,
    partition_desc text, # year=2022,month=04,day=20,bucket=000
    version int, # continuous monotonically increasing
    commit_op text,
    snapshot UUID[], # organized by commit time order
    expression text, #Entity corresponds to JSONArray
    PRIMARY KEY (table_id, partition_desc, version)
)

Write logic

Each time the data of a table partition is updated, the overall process is as follows

  1. A list of DataFileOp file operation pairs is generated by the computing framework, each file operation pair is a tuple of (file name, operation), where the operation includes add, delete
  2. Generate a UUID as the commit id, and write the file operation pair list and the current commit_op into the data_commit_info table (replace with timestamp)
  3. At the same time, generate the corresponding partition_info data according to the generated data_commit_info
  4. Get the current version value from each partition: current version , current snapshot: current_snapshot
  5. Calculate the new version and snapshot of each partition:
  6. new_version = current_version + 1
  7. new_snapshot =
    1. append/merge: current_snapshot.append(commit_id)
    2. update, compaction: [commit_id]
  8. Ideally, the data can be written to the table at a time. Considering that there may be version conflicts in concurrency, it is necessary to consider the conflict detection mechanism. Here, the pg transcation mechanism is used to ensure that data conflicts can be safely rolled back, and different operations handle conflicts.
  9. Fine-grained Concurrent conflict resolution:
    In some cases, conflict is resolvable. For example, two concurrent append operations have no conflict, and the second writer could just retry to get newest commit sequence and make a new snapshot. However in some cases like to concurrent udpate, the conflict cannot be resolved and the second operation should fail.
OP append compaction update merge
append Retry Commit Reorder as compaction+append sequence
compaction Reorder as compaction+append sequence Discard the last one Keep Update only reorder as compaction+ merge sequence
update Keep update only
merge Reorder as compaction+ merge sequence

Conflict resolving steps:

  1. If there is a conflict, according to the difference of this submission operation, you can re-fetch the current (cur_1) partition information each time in the following way:
    1. If this is an append operation and a conflict with the concurrent update/merge is not allowed, you can directly determine that the append operation fails. If it conflicts with compaction/append, then snapshot = cur_1.snapshot + partition_info.snapshot;
    2. If this is a compaction operation, update will prevail when it conflicts with updte. The operation will be abandoned and the value true will be returned directly. If it conflicts with compaction, the operation will be invalid and return true. If it conflicts with append/merge , then snapshot= partition_info.snapshot.append(cur_1.snapshot-current.snapshot);
    3. If this is an update operation, the conflict with append/merge/update that occurs at the same time is not allowed, so the judgment fails. If it conflicts with compaction, just overwrite the snapshot, then snapshot = partition_info.snapshot;
    4. If this is a merge operation, conflicts with append/merge/update that occur at the same time are not allowed, so the judgment fails. If it conflicts with compaction, snapshot = cur_1.snapshot + partition_info.snapshot;
  2. Resubmitted version = cur_1.version + 1

Read logic

1. Normal reading process

  1. Get the latest partition version number and snapshot of each partition
SELECT max(version), snapshot FROM partition_info
WHERE table_id = id and partition_desc in (descs...);

select m.table_id, t.partition_desc, m.version, m.commit_op, m.snapshot, m.expression
from
    select table_id,partition_desc,max(version)
    from partition_info
    where table_id = 'tableId' and partition_desc in ('partitionlist')
    group by table_id, partition_desc) t
left join partition_info m
on t.table_id = m.table_id and t.partition_desc = m.partition_desc and t.max = m.version

Note: Since version is inside the composite primary key, pg would automatically use btree as index so this query execution requires no table scan and is actually very fast.

  1. Create a read logical plan through the commit_id list in the snapshot
  2. For each commit id, read the list of DataFileOp from data_commit_info
  3. According to the commit order, decide the Scan plan
    1. If there is a merge commit, you need to create a MergeScan
    2. In the rest of the cases, create ParquetScan directly
    There is actually a requirement implicit here, that is, only one file can be retained in each partition after Update, while merge can have any number of delta files. For example, if a partition has been updated several times, only one file may remain in this partition. Therefore, we can actually keep only the latest commit id after an update.

2 Time Travel reading process

Time Travel supports reading at a given version or at a given timestamp. When the version is given, the corresponding version is directly searched, and the rest of the logic is the same as 4.1. Given a timestamp, each partition needs to traverse the timestamp of the version to find the version with the first commit timestamp <= the given timestamp.

Exception handling logic

Possible exceptions when submitting file updates:

  1. After writing the file, the writing job fails and no commit record is created.
  2. After the commit record is created, the update partition version fails.
    Logically, both failure cases can be ignored because the final version number is not modified and has no effect on reads. But it leaves invalid data in storage and metadata that needs to be cleaned up.

TTL processing logic

TTL is a common requirement for data warehouses.

  1. For tables with TTL set, set the same TTL to partition_info, commit, and set the TTL of file storage at the same time.
  2. For situations where TTL cannot be used for file storage, such as HDFS, cleanup operations can be performed asynchronously by listening to TTL events of metadata (via CDC).

Schema Evolution (DDL)

The above section does not take into account the capabilities of Schema evolution. For schema evolution, the main problem that needs to be dealt with is that when the schema is changed (adding, deleting, and modifying several columns)

Development

Branch

develop/catalog_refactor

Tasks

@dmetasoul01 dmetasoul01 added enhancement New feature or request epic labels May 23, 2022
@dmetasoul01 dmetasoul01 self-assigned this May 23, 2022
@dmetasoul01 dmetasoul01 changed the title [EPIC] Catalog refactor [EPIC][WIP] Catalog refactor May 23, 2022
@dmetasoul01 dmetasoul01 changed the title [EPIC][WIP] Catalog refactor [EPIC] Catalog refactor Jun 29, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request epic
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants