Skip to content

Commit

Permalink
change around vacuum so that we ask ES for all the 'state' ctids (in …
Browse files Browse the repository at this point in the history
…a binary blob), validate them with PG, and then delete them.

This ensures we don't try to delete docs that don't exist in our ES index and it removes the need for ZDB to copy in the LVRelStats struct from 3 different versions of PG.
  • Loading branch information
eeeebbbbrrrr committed Oct 21, 2016
1 parent 1584099 commit f16d705
Show file tree
Hide file tree
Showing 7 changed files with 157 additions and 106 deletions.
Expand Up @@ -46,6 +46,7 @@ public void onModule(RestModule module) {
module.addRestAction(RestTermlistAction.class);
module.addRestAction(ZombodbBulkAction.class);
module.addRestAction(ZombodbCommitXIDAction.class);
module.addRestAction(ZombodbVacuumSupport.class);
}

public void onModule(ActionModule module) {
Expand Down
@@ -0,0 +1,64 @@
/*
* Copyright 2015-2016 ZomboDB, LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.tcdi.zombodb.postgres;

import com.tcdi.zombodb.query_parser.utils.Utils;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.rest.*;
import org.elasticsearch.search.SearchHit;

import static org.elasticsearch.rest.RestRequest.Method.GET;

public class ZombodbVacuumSupport extends BaseRestHandler {

@Inject
protected ZombodbVacuumSupport(Settings settings, RestController controller, Client client) {
super(settings, controller, client);

controller.registerHandler(GET, "/{index}/_zdbvacsup", this);
}

@Override
protected void handleRequest(RestRequest request, RestChannel channel, Client client) throws Exception {
SearchRequestBuilder builder = new SearchRequestBuilder(client)
.setIndices(request.param("index"))
.setTypes("state")
.setSize(Integer.MAX_VALUE);

SearchResponse response = client.search(builder.request()).actionGet();
int many = response.getHits().getHits().length;
byte[] buffer = new byte[1 + 4 + 6*many];
int offset = 0;

buffer[0] = 0;
offset++;

offset += Utils.encodeInteger(many, buffer, offset);
for (SearchHit hit : response.getHits().getHits()) {
String id = hit.id();
String[] parts = id.split("[-]");

offset += Utils.encodeInteger(Integer.parseInt(parts[0]), buffer, offset);
offset += Utils.encodeCharacter((char) Integer.parseInt(parts[1]), buffer, offset);
}

channel.sendResponse(new BytesRestResponse(RestStatus.OK, "application/data", buffer));
}
}
45 changes: 34 additions & 11 deletions postgres/src/main/c/am/elasticsearch.c
Expand Up @@ -180,6 +180,7 @@ void elasticsearch_createNewIndex(ZDBIndexDescriptor *indexDescriptor, int shard
" \"mappings\": {"
" \"data\": {"
" \"_source\": { \"enabled\": false },"
" \"_routing\": { \"required\": true },"
" \"_all\": { \"enabled\": true, \"analyzer\": \"phrase\" },"
" \"_field_names\": { \"index\": \"no\", \"store\": false },"
" \"_meta\": { \"primary_key\": \"%s\", \"always_resolve_joins\": %s },"
Expand All @@ -188,13 +189,15 @@ void elasticsearch_createNewIndex(ZDBIndexDescriptor *indexDescriptor, int shard
" },"
" \"state\": {"
" \"_source\": { \"enabled\": false },"
" \"_routing\": { \"required\": true },"
" \"_all\": { \"enabled\": false },"
" \"_field_names\": { \"index\": \"no\", \"store\": false },"
" \"date_detection\": false,"
" \"properties\": { \"_ctid\":{\"type\":\"string\",\"index\":\"not_analyzed\"} }"
" },"
" \"committed\": {"
" \"_source\": { \"enabled\": false },"
" \"_routing\": { \"required\": true },"
" \"_all\": { \"enabled\": false },"
" \"_field_names\": { \"index\": \"no\", \"store\": false },"
" \"properties\": {"
Expand Down Expand Up @@ -792,11 +795,11 @@ void elasticsearch_freeSearchResponse(ZDBSearchResponse *searchResponse) {
pfree(searchResponse);
}

void elasticsearch_bulkDelete(ZDBIndexDescriptor *indexDescriptor, ItemPointer itemPointers, int nitems) {
int elasticsearch_bulkDelete(ZDBIndexDescriptor *indexDescriptor, ItemPointer itemPointers, int nitems) {
StringInfo endpoint = makeStringInfo();
StringInfo request = makeStringInfo();
StringInfo response;
int i;
int i, valid = 0;

appendStringInfo(endpoint, "%s/%s/data/_zdbbulk?consistency=default", indexDescriptor->url, indexDescriptor->fullyQualifiedName);
if (strcmp("-1", indexDescriptor->refreshInterval) == 0) {
Expand All @@ -806,15 +809,18 @@ void elasticsearch_bulkDelete(ZDBIndexDescriptor *indexDescriptor, ItemPointer i
for (i=0; i<nitems; i++) {
ItemPointer item = &itemPointers[i];

appendStringInfo(request, "{\"delete\":{\"_id\":\"%d-%d\"}}\n", ItemPointerGetBlockNumber(item), ItemPointerGetOffsetNumber(item));

if (request->len >= indexDescriptor->batch_size) {
response = rest_call("POST", endpoint->data, request, indexDescriptor->compressionLevel);
checkForBulkError(response, "delete");

resetStringInfo(request);
freeStringInfo(response);
}
if (ItemPointerIsValid(item)) {
appendStringInfo(request, "{\"delete\":{\"_id\":\"%d-%d\"}}\n", ItemPointerGetBlockNumber(item), ItemPointerGetOffsetNumber(item));
valid++;

// if (request->len >= indexDescriptor->batch_size) {
// response = rest_call("POST", endpoint->data, request, indexDescriptor->compressionLevel);
// checkForBulkError(response, "delete");
//
// resetStringInfo(request);
// freeStringInfo(response);
// }
}
}

if (request->len > 0) {
Expand All @@ -824,8 +830,25 @@ void elasticsearch_bulkDelete(ZDBIndexDescriptor *indexDescriptor, ItemPointer i

freeStringInfo(endpoint);
freeStringInfo(request);

return valid;
}

char *elasticsearch_vacuumSupport(ZDBIndexDescriptor *indexDescriptor) {
StringInfo endpoint = makeStringInfo();
StringInfo response;

appendStringInfo(endpoint, "%s/%s/_zdbvacsup", indexDescriptor->url, indexDescriptor->fullyQualifiedName);
response = rest_call("GET", endpoint->data, NULL, 0);
if (response->data[0] != 0)
elog(ERROR, "%s", response->data);

freeStringInfo(endpoint);

return response->data+1;
}


static void appendBatchInsertData(ZDBIndexDescriptor *indexDescriptor, ItemPointer ht_ctid, text *value, StringInfo bulk, bool isupdate, ItemPointer old_ctid, TransactionId xmin, uint64 sequence) {
/* the data */
appendStringInfo(bulk, "{\"index\":{\"_id\":\"%d-%d\"}}\n", ItemPointerGetBlockNumber(ht_ctid), ItemPointerGetOffsetNumber(ht_ctid));
Expand Down
3 changes: 2 additions & 1 deletion postgres/src/main/c/am/elasticsearch.h
Expand Up @@ -53,7 +53,8 @@ char *elasticsearch_highlight(ZDBIndexDescriptor *indexDescriptor, char *query,

void elasticsearch_freeSearchResponse(ZDBSearchResponse *searchResponse);

void elasticsearch_bulkDelete(ZDBIndexDescriptor *indexDescriptor, ItemPointer itemPointers, int nitems);
int elasticsearch_bulkDelete(ZDBIndexDescriptor *indexDescriptor, ItemPointer itemPointers, int nitems);
char *elasticsearch_vacuumSupport(ZDBIndexDescriptor *indexDescriptor);

void elasticsearch_batchInsertRow(ZDBIndexDescriptor *indexDescriptor, ItemPointer ctid, text *data, bool isupdate, ItemPointer old_ctid, TransactionId xid, CommandId commandId, uint64 sequence);
void elasticsearch_batchInsertFinish(ZDBIndexDescriptor *indexDescriptor);
Expand Down
22 changes: 19 additions & 3 deletions postgres/src/main/c/am/zdb_interface.c
Expand Up @@ -69,7 +69,8 @@ static char *wrapper_highlight(ZDBIndexDescriptor *indexDescriptor, char *query,

static void wrapper_freeSearchResponse(ZDBSearchResponse *searchResponse);

static void wrapper_bulkDelete(ZDBIndexDescriptor *indexDescriptor, ItemPointer itemPointers, int nitems);
static int wrapper_bulkDelete(ZDBIndexDescriptor *indexDescriptor, ItemPointer itemPointers, int nitems);
static char *wrapper_vacuumSupport(ZDBIndexDescriptor *indexDescriptor);

static void wrapper_batchInsertRow(ZDBIndexDescriptor *indexDescriptor, ItemPointer ctid, text *data, bool isupdate, ItemPointer old_ctid, TransactionId xmin, CommandId commandId, uint64 sequence);
static void wrapper_batchInsertFinish(ZDBIndexDescriptor *indexDescriptor);
Expand Down Expand Up @@ -247,6 +248,7 @@ ZDBIndexDescriptor *zdb_alloc_index_descriptor(Relation indexRel) {
desc->implementation->highlight = wrapper_highlight;
desc->implementation->freeSearchResponse = wrapper_freeSearchResponse;
desc->implementation->bulkDelete = wrapper_bulkDelete;
desc->implementation->vacuumSupport = wrapper_vacuumSupport;
desc->implementation->batchInsertRow = wrapper_batchInsertRow;
desc->implementation->batchInsertFinish = wrapper_batchInsertFinish;
desc->implementation->markTransactionCommitted = wrapper_markTransactionCommitted;
Expand Down Expand Up @@ -548,16 +550,30 @@ static void wrapper_freeSearchResponse(ZDBSearchResponse *searchResponse) {
MemoryContextSwitchTo(oldContext);
}

static void wrapper_bulkDelete(ZDBIndexDescriptor *indexDescriptor, ItemPointer itemPointers, int nitems) {
static int wrapper_bulkDelete(ZDBIndexDescriptor *indexDescriptor, ItemPointer itemPointers, int nitems) {
MemoryContext me = AllocSetContextCreate(TopTransactionContext, "wrapper_bulkDelete", 512, 64, 64);
MemoryContext oldContext = MemoryContextSwitchTo(me);
int rc;

Assert(!indexDescriptor->isShadow);

elasticsearch_bulkDelete(indexDescriptor, itemPointers, nitems);
rc = elasticsearch_bulkDelete(indexDescriptor, itemPointers, nitems);

MemoryContextSwitchTo(oldContext);
MemoryContextDelete(me);

return rc;
}

static char *wrapper_vacuumSupport(ZDBIndexDescriptor *indexDescriptor) {
MemoryContext oldContext = MemoryContextSwitchTo(TopTransactionContext);
char *response;

response = elasticsearch_vacuumSupport(indexDescriptor);

MemoryContextSwitchTo(oldContext);

return response;
}

static void wrapper_batchInsertRow(ZDBIndexDescriptor *indexDescriptor, ItemPointer ctid, text *data, bool isupdate, ItemPointer old_ctid, TransactionId xmin, CommandId commandId, uint64 sequence) {
Expand Down
4 changes: 3 additions & 1 deletion postgres/src/main/c/am/zdb_interface.h
Expand Up @@ -198,7 +198,8 @@ typedef char *(*ZDBHighlight_function)(ZDBIndexDescriptor *indexDescriptor, char

typedef void (*ZDBFreeSearchResponse_function)(ZDBSearchResponse *searchResponse);

typedef void (*ZDBBulkDelete_function)(ZDBIndexDescriptor *indexDescriptor, ItemPointer itemPointers, int nitems);
typedef int (*ZDBBulkDelete_function)(ZDBIndexDescriptor *indexDescriptor, ItemPointer itemPointers, int nitems);
typedef char *(*ZDBVacuumSupport_function)(ZDBIndexDescriptor *indexDescriptor);

typedef void (*ZDBIndexBatchInsertRow_function)(ZDBIndexDescriptor *indexDescriptor, ItemPointer ctid, text *data, bool isupdate, ItemPointer old_ctid, TransactionId xmin, CommandId commandId, uint64 sequence);
typedef void (*ZDBIndexBatchInsertFinish_function)(ZDBIndexDescriptor *indexDescriptor);
Expand Down Expand Up @@ -242,6 +243,7 @@ struct ZDBIndexImplementation {
ZDBFreeSearchResponse_function freeSearchResponse;

ZDBBulkDelete_function bulkDelete;
ZDBVacuumSupport_function vacuumSupport;

ZDBIndexBatchInsertRow_function batchInsertRow;
ZDBIndexBatchInsertFinish_function batchInsertFinish;
Expand Down
124 changes: 34 additions & 90 deletions postgres/src/main/c/am/zdbam.c
Expand Up @@ -742,90 +742,6 @@ Datum zdbrestrpos(PG_FUNCTION_ARGS) {
PG_RETURN_VOID();
}

/*
* lifted from various Postgres versions of vacuumlazy.c
*
* We do this so that vacuuming our remote Elasticsearch index can
* look directly at LVRelStats.dead_tuples instead of walking through
* the index and asking about each and every tuple.
*/
#if (PG_VERSION_NUM < 90400)
typedef struct LVRelStats
{
/* hasindex = true means two-pass strategy; false means one-pass */
bool hasindex;
/* Overall statistics about rel */
BlockNumber old_rel_pages; /* previous value of pg_class.relpages */
BlockNumber rel_pages; /* total number of pages */
BlockNumber scanned_pages; /* number of pages we examined */
double scanned_tuples; /* counts only tuples on scanned pages */
double old_rel_tuples; /* previous value of pg_class.reltuples */
double new_rel_tuples; /* new estimated total # of tuples */
BlockNumber pages_removed;
double tuples_deleted;
BlockNumber nonempty_pages; /* actually, last nonempty page + 1 */
/* List of TIDs of tuples we intend to delete */
/* NB: this list is ordered by TID address */
int num_dead_tuples; /* current # of entries */
int max_dead_tuples; /* # slots allocated in array */
ItemPointer dead_tuples; /* array of ItemPointerData */
int num_index_scans;
TransactionId latestRemovedXid;
bool lock_waiter_detected;
} LVRelStats;
#elif (PG_VERSION_NUM < 90500)
typedef struct LVRelStats
{
/* hasindex = true means two-pass strategy; false means one-pass */
bool hasindex;
/* Overall statistics about rel */
BlockNumber old_rel_pages; /* previous value of pg_class.relpages */
BlockNumber rel_pages; /* total number of pages */
BlockNumber scanned_pages; /* number of pages we examined */
double scanned_tuples; /* counts only tuples on scanned pages */
double old_rel_tuples; /* previous value of pg_class.reltuples */
double new_rel_tuples; /* new estimated total # of tuples */
double new_dead_tuples; /* new estimated total # of dead tuples */
BlockNumber pages_removed;
double tuples_deleted;
BlockNumber nonempty_pages; /* actually, last nonempty page + 1 */
/* List of TIDs of tuples we intend to delete */
/* NB: this list is ordered by TID address */
int num_dead_tuples; /* current # of entries */
int max_dead_tuples; /* # slots allocated in array */
ItemPointer dead_tuples; /* array of ItemPointerData */
int num_index_scans;
TransactionId latestRemovedXid;
bool lock_waiter_detected;
} LVRelStats;
#elif (PG_VERSION_NUM < 90600)
typedef struct LVRelStats
{
/* hasindex = true means two-pass strategy; false means one-pass */
bool hasindex;
/* Overall statistics about rel */
BlockNumber old_rel_pages; /* previous value of pg_class.relpages */
BlockNumber rel_pages; /* total number of pages */
BlockNumber scanned_pages; /* number of pages we examined */
BlockNumber pinskipped_pages; /* # of pages we skipped due to a pin */
double scanned_tuples; /* counts only tuples on scanned pages */
double old_rel_tuples; /* previous value of pg_class.reltuples */
double new_rel_tuples; /* new estimated total # of tuples */
double new_dead_tuples; /* new estimated total # of dead tuples */
BlockNumber pages_removed;
double tuples_deleted;
BlockNumber nonempty_pages; /* actually, last nonempty page + 1 */
/* List of TIDs of tuples we intend to delete */
/* NB: this list is ordered by TID address */
int num_dead_tuples; /* current # of entries */
int max_dead_tuples; /* # slots allocated in array */
ItemPointer dead_tuples; /* array of ItemPointerData */
int num_index_scans;
TransactionId latestRemovedXid;
bool lock_waiter_detected;
} LVRelStats;
#endif

/*
* Bulk deletion of all index entries pointing to a set of heap tuples.
* The set of target tuples is specified via a callback routine that tells
Expand All @@ -836,12 +752,14 @@ typedef struct LVRelStats
Datum zdbbulkdelete(PG_FUNCTION_ARGS) {
IndexVacuumInfo *info = (IndexVacuumInfo *) PG_GETARG_POINTER(0);
IndexBulkDeleteResult *volatile stats = (IndexBulkDeleteResult *) PG_GETARG_POINTER(1);
// IndexBulkDeleteCallback callback = (IndexBulkDeleteCallback) PG_GETARG_POINTER(2);
IndexBulkDeleteCallback callback = (IndexBulkDeleteCallback) PG_GETARG_POINTER(2);
void *callback_state = (void *) PG_GETARG_POINTER(3);
Relation indexRel = info->index;
ZDBIndexDescriptor *desc;
LVRelStats *relstats;
struct timeval tv1, tv2;
char *ctidsToVacuum;
int manyCtidsToVacuum;
int removed = 0;

gettimeofday(&tv1, NULL);

Expand All @@ -853,17 +771,43 @@ Datum zdbbulkdelete(PG_FUNCTION_ARGS) {
if (desc->isShadow)
PG_RETURN_POINTER(stats);

relstats = (LVRelStats *) callback_state; /* XXX: cast to our copy/paste of LVRelStats! */
ctidsToVacuum = desc->implementation->vacuumSupport(desc);
memcpy(&manyCtidsToVacuum, ctidsToVacuum, sizeof(int32));
ctidsToVacuum += sizeof(int32);

if (manyCtidsToVacuum > 0) {
ItemPointer dead_tuples = palloc0(manyCtidsToVacuum * sizeof(ItemPointerData));
int i;

for (i=0; i<manyCtidsToVacuum; i++) {
BlockNumber blockno;
OffsetNumber offno;
ItemPointerData ctid;

memcpy(&blockno, ctidsToVacuum, sizeof(BlockNumber));
ctidsToVacuum += sizeof(BlockNumber);
memcpy(&offno, ctidsToVacuum, sizeof(OffsetNumber));
ctidsToVacuum += sizeof(OffsetNumber);

ItemPointerSet(&ctid, blockno, offno);

if (!callback(&ctid, callback_state)) {
ItemPointerSet(&ctid, InvalidBlockNumber, InvalidOffsetNumber);
}

memcpy(&dead_tuples[i], &ctid, sizeof(ItemPointerData));
}

desc->implementation->bulkDelete(desc, relstats->dead_tuples, relstats->num_dead_tuples);
removed = desc->implementation->bulkDelete(desc, dead_tuples, manyCtidsToVacuum);
}

stats->num_pages = 1;
stats->num_index_tuples = desc->implementation->estimateSelectivity(desc, "");
stats->tuples_removed = relstats->num_dead_tuples;
stats->tuples_removed = removed;

gettimeofday(&tv2, NULL);

elog(LOG, "[zombodb vacuum status] index=%s, num_removed=%d, num_index_tuples=%lu, ttl=%fs", RelationGetRelationName(indexRel), relstats->num_dead_tuples, (uint64) stats->num_index_tuples, TO_SECONDS(tv1, tv2));
elog(LOG, "[zombodb vacuum status] index=%s, num_removed=%d, num_index_tuples=%lu, ttl=%fs", RelationGetRelationName(indexRel), removed, (uint64) stats->num_index_tuples, TO_SECONDS(tv1, tv2));

PG_RETURN_POINTER(stats);
}
Expand Down

0 comments on commit f16d705

Please sign in to comment.