Skip to content

Commit

Permalink
active shards 1 crate (#384)
Browse files Browse the repository at this point in the history
* enable write.wait_for_active_shards, by enabling "write.wait_for_active_shards = 1" writes are possible also when table health is not 100%
* use variable to set values for wait_for_active_shards
  • Loading branch information
chicco785 committed Dec 1, 2020
1 parent 07a9408 commit 970898e
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 3 deletions.
1 change: 1 addition & 0 deletions docs/manuals/admin/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ To configure QuantumLeap you can use the following environment variables:
| `DEFAULT_CACHE_TTL`| Time to live of metadata cache, default: 60 (seconds) | |
| `QL_CONFIG` | Pathname for tenant configuration |
| `QL_DEFAULT_DB` | Default backend: `timescale` or `crate` |
| `CRATE_WAIT_ACTIVE_SHARDS` | Specifies the number of shard copies that need to be active for write operations to proceed. Default `1`. See related [crate documentation](https://crate.io/docs/crate/reference/en/4.3/sql/statements/create-table.html#write-wait-for-active-shards). |
| `USE_FLASK` | `True` or `False` to use flask server (only for Dev) or gunicorn. Default to `False` |
| `LOGLEVEL` | Define the log level for all services (`DEBUG`, `INFO`, `WARNING` , `ERROR`) |

Expand Down
12 changes: 9 additions & 3 deletions src/translators/crate.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@ def setup(self):
# we need to think if we want to cache this information
# and save few msec for evey API call
self.db_version = self.get_db_version()
self.active_shards = EnvReader(log=logging.getLogger(__name__).debug)\
.read(StrVar('CRATE_WAIT_ACTIVE_SHARDS', '1'))

major = int(self.db_version.split('.')[0])
if major <= 2:
Expand Down Expand Up @@ -170,8 +172,10 @@ def _create_data_table(self, table_name, table, fiware_service):
columns = ', '.join('"{}" {}'.format(cn.lower(), ct)
for cn, ct in table.items())
stmt = "create table if not exists {} ({}) with " \
"(number_of_replicas = '2-all', " \
"column_policy = 'strict')".format(table_name, columns)
"(\"number_of_replicas\" = '2-all', " \
"\"column_policy\" = 'strict', " \
"\"write.wait_for_active_shards\" = '{}'" \
")".format(table_name, columns, self.active_shards)
self.cursor.execute(stmt)

def _update_data_table(self, table_name, new_columns, fiware_service):
Expand All @@ -188,7 +192,9 @@ def _should_insert_original_entities(self,
def _create_metadata_table(self):
stmt = "create table if not exists {} " \
"(table_name string primary key, entity_attrs object) " \
"with (number_of_replicas = '2-all', column_policy = 'dynamic')"
"with (" \
"number_of_replicas = '2-all', " \
"column_policy = 'dynamic')"
op = stmt.format(METADATA_TABLE_NAME)
self.cursor.execute(op)

Expand Down

0 comments on commit 970898e

Please sign in to comment.