Skip to content

Commit

Permalink
Just replace on optimize and skip the pop
Browse files Browse the repository at this point in the history
  • Loading branch information
Jeroen van der Heijden committed Jun 30, 2017
1 parent beaefa8 commit bd12931
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 56 deletions.
84 changes: 37 additions & 47 deletions src/siri/db/shard.c
Expand Up @@ -773,17 +773,20 @@ int siridb_shard_get_points_log64(
int siridb_shard_optimize(siridb_shard_t * shard, siridb_t * siridb)
{
int rc = 0;
siridb_shard_t * tmp_shard;
siridb_shard_t * new_shard = NULL;
uint64_t duration = (shard->tp == SIRIDB_SHARD_TP_NUMBER) ?
siridb->duration_num : siridb->duration_log;
siridb_series_t * series;

uv_mutex_lock(&siridb->shards_mutex);

tmp_shard = (siridb_shard_t *) imap_pop(siridb->shards, shard->id);
if (tmp_shard == shard)
/* In case the shard is not removed, it must be the shard inside the imap
* because we check and replace the shard within the shards_mutex lock.
* If the shard is marked as removed we can simply skip the optimize.
*/
if (~shard->flags & SIRIDB_SHARD_IS_REMOVED)
{
if ((tmp_shard = siridb_shard_create(
if ((new_shard = siridb_shard_create(
siridb,
shard->id,
duration,
Expand All @@ -798,7 +801,7 @@ int siridb_shard_optimize(siridb_shard_t * shard, siridb_t * siridb)
}
else
{
siridb_shard_incref(tmp_shard);
siridb_shard_incref(new_shard);
}
}
else
Expand All @@ -807,24 +810,11 @@ int siridb_shard_optimize(siridb_shard_t * shard, siridb_t * siridb)
"Skip optimizing shard id '%" PRIu64 "' "
"because the shard is probably dropped.",
shard->id);

if (tmp_shard != NULL && imap_add(
siridb->shards,
shard->id,
tmp_shard))
{
log_critical(
"Cannot restore old shard with id '%" PRIu64 "'",
shard->id);
siridb_shard_decref(tmp_shard);
}

tmp_shard = NULL;
}

uv_mutex_unlock(&siridb->shards_mutex);

if (tmp_shard == NULL)
if (new_shard == NULL)
{
/*
* Creating the new shard has failed or the shard is dropped.
Expand All @@ -837,8 +827,8 @@ int siridb_shard_optimize(siridb_shard_t * shard, siridb_t * siridb)
/* at this point the references should be as following (unless dropped):
* shard->ref (=>2)
* - simple list
* - tmp_shard->replacing
* tmp_shard->ref (=>2)
* - new_shard->replacing
* new_shard->ref (=>2)
* - siridb->shards
* - this method
*/
Expand Down Expand Up @@ -872,15 +862,15 @@ int siridb_shard_optimize(siridb_shard_t * shard, siridb_t * siridb)
siri.optimize->status != SIRI_OPTIMIZE_CANCELLED &&
shard->id % siridb->duration_num == series->mask &&
(~series->flags & SIRIDB_SERIES_IS_DROPPED) &&
(~tmp_shard->flags & SIRIDB_SHARD_IS_REMOVED))
(~new_shard->flags & SIRIDB_SHARD_IS_REMOVED))
{
uv_mutex_lock(&siridb->series_mutex);

if ( (~tmp_shard->flags & SIRIDB_SHARD_IS_REMOVED) &&
if ( (~new_shard->flags & SIRIDB_SHARD_IS_REMOVED) &&
siridb_series_optimize_shard(
siridb,
series,
tmp_shard))
new_shard))
{
log_critical(
"Optimizing shard '%s' has failed due to a critical "
Expand All @@ -899,23 +889,23 @@ int siridb_shard_optimize(siridb_shard_t * shard, siridb_t * siridb)

slist_free(slist);

if (tmp_shard->flags & SIRIDB_SHARD_IS_REMOVED)
if (new_shard->flags & SIRIDB_SHARD_IS_REMOVED)
{
log_warning(
"Cancel optimizing shard '%s' because the shard is dropped",
tmp_shard->fn);
siridb_shard_decref(tmp_shard);
new_shard->fn);
siridb_shard_decref(new_shard);
return siri_err;
}

if (siri_err || siri.optimize->status == SIRI_OPTIMIZE_CANCELLED)
{
/*
* Error occurred or the optimize task is cancelled. By decrementing
* only the reference counter for the tmp_shard we keep this shard as
* only the reference counter for the new_shard we keep this shard as
* if it is still optimizing so remaining points can still be written.
*/
siridb_shard_decref(tmp_shard);
siridb_shard_decref(new_shard);
return siri_err;
}

Expand All @@ -924,51 +914,51 @@ int siridb_shard_optimize(siridb_shard_t * shard, siridb_t * siridb)
uv_mutex_lock(&siridb->series_mutex);

/* make sure both shards files are closed */
siri_fp_close(tmp_shard->replacing->fp);
siri_fp_close(tmp_shard->fp);
siri_fp_close(new_shard->replacing->fp);
siri_fp_close(new_shard->fp);

/*
* Closing files or writing to the new shard might have produced
* critical errors. This seems to be a good point to check for errors.
*/
if (siri_err || (tmp_shard->flags & SIRIDB_SHARD_IS_REMOVED))
if (siri_err || (new_shard->flags & SIRIDB_SHARD_IS_REMOVED))
{
if (tmp_shard->flags & SIRIDB_SHARD_IS_REMOVED)
if (new_shard->flags & SIRIDB_SHARD_IS_REMOVED)
{
log_warning(
"Cancel optimizing shard '%s' because the shard is dropped",
tmp_shard->fn);
new_shard->fn);
}
}
else
{
/* remove the old shard file, this is not critical */
unlink(tmp_shard->replacing->fn);
unlink(new_shard->replacing->fn);

/* rename the temporary files to the correct file names */
if (rename(tmp_shard->fn, tmp_shard->replacing->fn) ||
if (rename(new_shard->fn, new_shard->replacing->fn) ||
siri_optimize_finish_idx(
tmp_shard->replacing->fn,
tmp_shard->replacing->flags & SIRIDB_SHARD_HAS_INDEX))
new_shard->replacing->fn,
new_shard->replacing->flags & SIRIDB_SHARD_HAS_INDEX))
{
log_critical(
"Could not rename file '%s' to '%s'",
tmp_shard->fn,
tmp_shard->replacing->fn);
new_shard->fn,
new_shard->replacing->fn);
ERR_FILE
}
else
{
/* free the original allocated memory and set the new filename */
free(tmp_shard->fn);
tmp_shard->fn = tmp_shard->replacing->fn;
tmp_shard->replacing->fn = NULL;
free(new_shard->fn);
new_shard->fn = new_shard->replacing->fn;
new_shard->replacing->fn = NULL;

/* decrement reference to old shard and set
* tmp_shard->replacing to NULL
* new_shard->replacing to NULL
*/
siridb_shard_decref(tmp_shard->replacing);
tmp_shard->replacing = NULL;
siridb_shard_decref(new_shard->replacing);
new_shard->replacing = NULL;
}
}

Expand All @@ -977,7 +967,7 @@ int siridb_shard_optimize(siridb_shard_t * shard, siridb_t * siridb)
/* can raise an error only if the shard is dropped, in any other case we
* still have a reference left and an error cannot be raised.
*/
siridb_shard_decref(tmp_shard);
siridb_shard_decref(new_shard);

sleep(1);

Expand Down
16 changes: 7 additions & 9 deletions test/test_cluster.py
Expand Up @@ -22,22 +22,22 @@
class TestCluster(TestBase):
title = 'Test siridb-cluster'

@default_test_setup(2, time_precision='s')
@default_test_setup(4, time_precision='s')
async def run(self):
await self.client0.connect()

await self.db.add_pool(self.server1)
await self.assertIsRunning(self.db, self.client0, timeout=12)

# await asyncio.sleep(35)
await asyncio.sleep(45)

# await self.db.add_replica(self.server2, 0)
# await self.assertIsRunning(self.db, self.client0, timeout=12)
await self.db.add_replica(self.server2, 0)
await self.assertIsRunning(self.db, self.client0, timeout=12)

# await asyncio.sleep(35)
await asyncio.sleep(45)

# await self.db.add_replica(self.server3, 1)
# await self.assertIsRunning(self.db, self.client0, timeout=12)
await self.db.add_replica(self.server3, 1)
await self.assertIsRunning(self.db, self.client0, timeout=12)

# await asyncio.sleep(35)

Expand All @@ -49,8 +49,6 @@ async def run(self):
# await self.db.add_pool(self.server5)
# await self.assertIsRunning(self.db, self.client0, timeout=12)



# await self.db.add_replica(self.server1, 0)
# await asyncio.sleep(5)

Expand Down

0 comments on commit bd12931

Please sign in to comment.