-
Notifications
You must be signed in to change notification settings - Fork 14
Conversation
0b6fa4c
to
5f3b8bb
Compare
It is not enough to simply refragment a table inside the storage. Assigning the new fragment size means you change data assigned to already used chunk IDs. This means all data cached in buffer managers becomes irrelevant but you would still use it in the following queries. There is other data that is referenced by chunk ID and can be cached, e. g. chunk stats. So, refragmenting needs additional cleanups but it's not clear how to do it all atomically. Did you consider creating a new table as a result of refragmentation? Same price, no conflicts. |
@ienkovich Do we update anything above the storage layer when we append to a table and the last fragment wasn't full and now it starts to span arrow's chunks boundaries (and so we update its metadata)? https://github.com/intel-ai/hdk/blob/ad924b50a2e909a98efba9a6363f9b5415742bf6/omniscidb/ArrowStorage/ArrowStorage.cpp#L876 |
Appended data was originally supported in buffer managers. The size of the chunk grows and buffer managers ask a storage for new data. If you reduce the size of a chunk then they would simply use the old data. |
mapd_unique_lock<mapd_shared_mutex> table_lock(table.mutex); | ||
data_lock.unlock(); | ||
const size_t new_frag_count = | ||
table.row_count / new_frag_size + ((table.row_count % new_frag_size) != 0); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe std::ceil
? It will be more clear.
? dynamic_cast<const hdk::ir::ArrayBaseType*>(col_type)->elemType() | ||
: col_type; | ||
bool compute_stats = !col_type->isString(); | ||
if (compute_stats) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's better to move this condition out of cycle. Earlier you already checked which frag_ids that have String type.
for (auto& pr : column_infos_) { | ||
if (pr.second->type->isExtDictionary()) { | ||
dicts_to_remove.erase(pr.second->type->as<hdk::ir::ExtDictionaryType>()->dictId()); | ||
if (!is_view) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
simplify it please.
if (is_view) {
return;
}
for (auto& col_info : col_infos) { | ||
if (col_info->type->isExtDictionary()) { | ||
dicts_to_remove.insert(col_info->type->as<hdk::ir::ExtDictionaryType>()->dictId()); | ||
if (!is_view) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same simplification here, please.
TableInfoPtr createRefragmentedView(const std::string& table_name, | ||
const std::string& new_table_name, | ||
const size_t new_frag_size); | ||
void refragmentTable(const std::string& table_name, const size_t new_frag_size); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need this methods in public? As far as I see getRefragmentedView
uses both this methods, maybe only this method is enough?
d63196e
to
b7a7893
Compare
@@ -128,7 +131,7 @@ class ArrowStorage : public SimpleSchemaProvider, public AbstractDataProvider { | |||
void appendParquetFile(const std::string& file_name, int table_id); | |||
|
|||
void dropTable(const std::string& table_name, bool throw_if_not_exist = false); | |||
void dropTable(int table_id, bool throw_if_not_exist = false); | |||
void dropTable(int table_id, bool is_view = false, bool throw_if_not_exist = false); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we guarantee the table exists if there's a view for it? What happens to views if a non-view table is dropped?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The general implementation looks fine, but there are some synchronization issues. To correctly create new table you need:
- Get data unique lock
- Get schema unique lock
- Get dict unique lock
- Make all checks and add a new table + modify dict data
- Release schema and dict locks
- Create new table data
- Get a table unique lock
- Release the data lock
- Make the rest of the manipulations with the table
- Release the table lock
It's hard to make it using all three methods taking the table name as a parameter. I suggest you have createRefragmentedView
as the entry point (better name than getRefragmentedView
because it is not a simple getter), do steps 1-8 there, and then call refragmentTable
passing TableData reference as a param. Then you should be thread-safe.
@@ -128,7 +131,7 @@ class ArrowStorage : public SimpleSchemaProvider, public AbstractDataProvider { | |||
void appendParquetFile(const std::string& file_name, int table_id); | |||
|
|||
void dropTable(const std::string& table_name, bool throw_if_not_exist = false); | |||
void dropTable(int table_id, bool throw_if_not_exist = false); | |||
void dropTable(int table_id, bool is_view = false, bool throw_if_not_exist = false); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't see any reason to care about view vs. non-view tables. I think it's better to make no difference here. The fact that two tables share arrow data shouldn't affect anything since this data is immutable and no additional control over the data lifetime is required.
TableInfoPtr ArrowStorage::createRefragmentedView(const std::string& table_name, | ||
const std::string& new_table_name, | ||
const size_t new_frag_size) { | ||
if (getTableInfoNoLock(db_id_, new_table_name)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You shouldn't access the schema provider data with no lock obtained. Since you are going to add a new table, a unique lock should be obtained first. You can release the schema lock after the new table is created.
} | ||
} | ||
|
||
auto [iter, inserted] = tables_.emplace(new_table_id, std::make_unique<TableData>()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You shouldn't modify tables_
member without obtaining a unique data lock first. After adding a new table, you can unique-lock the table and release the data lock.
@@ -1052,10 +1212,10 @@ void ArrowStorage::dropTable(const std::string& table_name, bool throw_if_not_ex | |||
} | |||
return; | |||
} | |||
dropTable(tinfo->table_id); | |||
dropTable(tinfo->table_id, tinfo->is_view); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please don't add is_view
usages, this field is legacy and has to be removed.
col_type); | ||
} | ||
} else { | ||
if (col_type->isString()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't see how this change is related to the new feature. Can we remove it from the patch?
b7a7893
to
bfb856b
Compare
throw std::runtime_error("Cannot refragment to fragment size 0"); | ||
} | ||
|
||
if (table_name.empty()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
new_table_name
?
void ArrowStorage::refragmentTable(TableData& table, | ||
const int table_id, | ||
const size_t new_frag_size) { | ||
if (!new_frag_size) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Duplicated check that makes an impression that we create a new table and then leave it with uninitialized fragments in case of zero fragment size. I'd use CHECK
instead here.
throw std::runtime_error("Cannot refragment to fragment size 0"); | ||
} | ||
const size_t new_frag_count = | ||
std::ceil(static_cast<float>(table.row_count) / new_frag_size); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Float precision is not good enough here. If you have 1'000'000'001 rows and fragment size is 10'000'000 then your result here would be 100 fragments, not 101. Use integer arithmetics to avoid precision issues.
@@ -131,7 +131,7 @@ cdef extern from "omniscidb/ArrowStorage/ArrowStorage.h": | |||
CArrowStorage(int, string, int, shared_ptr[CConfig]) except +; | |||
|
|||
CTableInfoPtr createTable(const string&, const vector[CColumnDescription]&, const CTableOptions&) except + | |||
|
|||
CTableInfoPtr createRefragmentedView(const string& , const string&, const size_t) except + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Our main API access point is pyhdk.HDK
class. Do you use it in your demo? I'd expect usage scenario to be something like:
ht1 = hdk.import_csv(file, fragment_size=10)
...
ht2 = ht1.refragmented_view(fragment_size=20)
...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Demo will be redone as a separate PR.
321b258
to
6775459
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good!
It would be nice to have a googletest / c++ test for this - also I am wondering what happens if you enable lazy dictionary materialization then re-fragment. Could we add something to |
This PR introduces a table refragmentation functionality, which allows to avoid doing an import of the same table just for a new fragment size.
It is relatively cheaper to refragment a table (as opposed to re-inserting it), since we don't have to modify the physical layout/location of the data in memory. The price we pay for refragmenting is recalculating the metadata (basically fragment's offset, row_count + small materialized aggregates per fragment) in order to change the logical view of the table.
Addresses #572 .
Spends quite some time in
ArrowStorage::computeStats()
, the code forFixedLengthEncoder::updateStatsEncoded()
appears to be scalar, possible performance improvement opportunity for calculating min/max/detecting nulls using SIMD.