diff --git a/.gitignore b/.gitignore index 4dcb5cce0..035b4e819 100644 --- a/.gitignore +++ b/.gitignore @@ -11,7 +11,7 @@ target/ .DS_store foo.* -postgres/src/main/sql/zombodb--3.1.15.sql +postgres/src/main/sql/zombodb--3.2.0.sql postgres/src/test/sql/setup.sql postgres/src/test/sql/so_comments.sql postgres/src/test/sql/TUTORIAL.sql diff --git a/docker/elasticsearch/Dockerfile b/docker/elasticsearch/Dockerfile index 9164efc58..2cb7b01fe 100644 --- a/docker/elasticsearch/Dockerfile +++ b/docker/elasticsearch/Dockerfile @@ -9,7 +9,7 @@ FROM java:7 ENV ES_PKG_NAME elasticsearch-1.7.5 -ENV ZOMBODB_VER 3.1.15 +ENV ZOMBODB_VER 3.2.0 # Install Elasticsearch. RUN \ diff --git a/docker/postgres/Dockerfile b/docker/postgres/Dockerfile index 3b2f763e7..680f6c4f8 100644 --- a/docker/postgres/Dockerfile +++ b/docker/postgres/Dockerfile @@ -4,7 +4,7 @@ # Pull base image. FROM postgres:9.5 -ENV ZOMBODB_VER 3.1.15 +ENV ZOMBODB_VER 3.2.0 # Fetch wget RUN apt-get update && apt-get install -y --no-install-recommends ca-certificates wget && rm -rf /var/lib/apt/lists/* diff --git a/elasticsearch/pom.xml b/elasticsearch/pom.xml index e2ed73064..5430d947d 100644 --- a/elasticsearch/pom.xml +++ b/elasticsearch/pom.xml @@ -7,7 +7,7 @@ com.tcdi.elasticsearch zombodb-parent - 3.1.15 + 3.2.0 zombodb-plugin diff --git a/elasticsearch/src/main/java/com/tcdi/zombodb/ZombodbPlugin.java b/elasticsearch/src/main/java/com/tcdi/zombodb/ZombodbPlugin.java index d7a9f3fa7..e6ef48b9b 100644 --- a/elasticsearch/src/main/java/com/tcdi/zombodb/ZombodbPlugin.java +++ b/elasticsearch/src/main/java/com/tcdi/zombodb/ZombodbPlugin.java @@ -46,7 +46,10 @@ public void onModule(RestModule module) { module.addRestAction(RestTermlistAction.class); module.addRestAction(ZombodbBulkAction.class); module.addRestAction(ZombodbCommitXIDAction.class); + module.addRestAction(ZombodbDeleteTuplesAction.class); + module.addRestAction(ZombodbGetXidVacuumCandidatesAction.class); module.addRestAction(ZombodbVacuumSupportAction.class); + module.addRestAction(ZombodbVacuumCleanupAction.class); } public void onModule(ActionModule module) { diff --git a/elasticsearch/src/main/java/com/tcdi/zombodb/postgres/PostgresAggregationAction.java b/elasticsearch/src/main/java/com/tcdi/zombodb/postgres/PostgresAggregationAction.java index 6626e7c3c..fa8c6b9d0 100644 --- a/elasticsearch/src/main/java/com/tcdi/zombodb/postgres/PostgresAggregationAction.java +++ b/elasticsearch/src/main/java/com/tcdi/zombodb/postgres/PostgresAggregationAction.java @@ -54,7 +54,7 @@ protected void handleRequest(RestRequest request, RestChannel channel, Client cl SearchRequestBuilder builder = new SearchRequestBuilder(client); String input = request.content().toUtf8(); final QueryRewriter rewriter = QueryRewriter.Factory.create(client, request.param("index"), request.param("preference"), input, true, true, true); - QueryBuilder qb = rewriter.rewriteQuery(true); + QueryBuilder qb = rewriter.rewriteQuery(); AbstractAggregationBuilder ab = rewriter.rewriteAggregations(); SuggestBuilder.SuggestionBuilder tsb = rewriter.rewriteSuggestions(); diff --git a/elasticsearch/src/main/java/com/tcdi/zombodb/postgres/PostgresCountAction.java b/elasticsearch/src/main/java/com/tcdi/zombodb/postgres/PostgresCountAction.java index 6bdbee993..57f678d4c 100644 --- a/elasticsearch/src/main/java/com/tcdi/zombodb/postgres/PostgresCountAction.java +++ b/elasticsearch/src/main/java/com/tcdi/zombodb/postgres/PostgresCountAction.java @@ -16,7 +16,6 @@ */ package com.tcdi.zombodb.postgres; -import org.elasticsearch.action.admin.indices.refresh.RefreshRequestBuilder; import org.elasticsearch.action.search.SearchRequestBuilder; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.search.SearchType; @@ -47,7 +46,7 @@ protected void handleRequest(RestRequest request, RestChannel channel, Client cl BytesRestResponse response; QueryAndIndexPair query; - query = PostgresTIDResponseAction.buildJsonQueryFromRequestContent(client, request, !isSelectivityQuery, true, true, true); + query = PostgresTIDResponseAction.buildJsonQueryFromRequestContent(client, request, !isSelectivityQuery, true, true); if (query.hasLimit() && isSelectivityQuery) { count = query.getLimit().getLimit(); @@ -58,7 +57,7 @@ protected void handleRequest(RestRequest request, RestChannel channel, Client cl builder.setSize(0); builder.setSearchType(SearchType.COUNT); builder.setPreference(request.param("preference")); - builder.setQueryCache(false); + builder.setQueryCache(true); builder.setFetchSource(false); builder.setTrackScores(false); builder.setNoFields(); diff --git a/elasticsearch/src/main/java/com/tcdi/zombodb/postgres/PostgresMappingAction.java b/elasticsearch/src/main/java/com/tcdi/zombodb/postgres/PostgresMappingAction.java index 8285c53b4..9626cd8f4 100644 --- a/elasticsearch/src/main/java/com/tcdi/zombodb/postgres/PostgresMappingAction.java +++ b/elasticsearch/src/main/java/com/tcdi/zombodb/postgres/PostgresMappingAction.java @@ -42,7 +42,7 @@ protected void handleRequest(RestRequest request, RestChannel channel, Client cl BytesRestResponse response; QueryRewriter rewriter = QueryRewriter.Factory.create(client, request.param("index"), request.param("preference"), request.content().toUtf8(), true, false, false); - rewriter.rewriteQuery(false); + rewriter.rewriteQuery(); Map properties = rewriter.describedNestedObject(request.param("fieldname")); response = new BytesRestResponse(RestStatus.OK, "application/json", JsonXContent.contentBuilder().map(properties).bytes()); diff --git a/elasticsearch/src/main/java/com/tcdi/zombodb/postgres/PostgresTIDResponseAction.java b/elasticsearch/src/main/java/com/tcdi/zombodb/postgres/PostgresTIDResponseAction.java index 110395d30..46684ee46 100644 --- a/elasticsearch/src/main/java/com/tcdi/zombodb/postgres/PostgresTIDResponseAction.java +++ b/elasticsearch/src/main/java/com/tcdi/zombodb/postgres/PostgresTIDResponseAction.java @@ -107,7 +107,7 @@ protected void handleRequest(RestRequest request, RestChannel channel, Client cl try { parseStart = System.nanoTime(); - query = buildJsonQueryFromRequestContent(client, request, true, false, false, false); + query = buildJsonQueryFromRequestContent(client, request, true, false, false); parseEnd = System.nanoTime(); SearchRequestBuilder builder = new SearchRequestBuilder(client); @@ -153,7 +153,7 @@ protected void handleRequest(RestRequest request, RestChannel channel, Client cl } } - public static QueryAndIndexPair buildJsonQueryFromRequestContent(Client client, RestRequest request, boolean doFullFieldDataLookups, boolean canDoSingleIndex, boolean needVisibilityOnTopLevel, boolean all) { + public static QueryAndIndexPair buildJsonQueryFromRequestContent(Client client, RestRequest request, boolean doFullFieldDataLookups, boolean canDoSingleIndex, boolean needVisibilityOnTopLevel) { String queryString = request.content().toUtf8(); String indexName = request.param("index"); @@ -163,7 +163,7 @@ public static QueryAndIndexPair buildJsonQueryFromRequestContent(Client client, if (queryString != null && queryString.trim().length() > 0) { QueryRewriter qr = QueryRewriter.Factory.create(client, indexName, request.param("preference"), queryString, doFullFieldDataLookups, canDoSingleIndex, needVisibilityOnTopLevel); - query = qr.rewriteQuery(all); + query = qr.rewriteQuery(); indexName = qr.getSearchIndexName(); limit = qr.getLimit(); } else { diff --git a/elasticsearch/src/main/java/com/tcdi/zombodb/postgres/RoutingHelper.java b/elasticsearch/src/main/java/com/tcdi/zombodb/postgres/RoutingHelper.java new file mode 100644 index 000000000..a5bba556c --- /dev/null +++ b/elasticsearch/src/main/java/com/tcdi/zombodb/postgres/RoutingHelper.java @@ -0,0 +1,54 @@ +/* + * Copyright 2017 ZomboDB, LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.tcdi.zombodb.postgres; + +import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.ClusterService; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.routing.operation.OperationRouting; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +public class RoutingHelper { + private static Map ROUTING_TABLE = new ConcurrentHashMap<>(); + + public static String[] getRoutingTable(Client client, ClusterService clusterService, String index, int shards) { + String key = index+"."+shards; + + String[] routingTable = ROUTING_TABLE.get(key); + + if (routingTable != null) + return routingTable; + + ClusterState clusterState = clusterService.state(); + OperationRouting operationRouting = clusterService.operationRouting(); + + routingTable = new String[shards]; + for (int i=0; i trackingRequests = new ArrayList<>(); + List xmaxRequests = new ArrayList<>(); + List abortedRequests = new ArrayList<>(); if (!bulkRequest.requests().isEmpty()) { isdelete = bulkRequest.requests().get(0) instanceof DeleteRequest; if (isdelete) { - trackingRequests = handleDeleteRequests(client, bulkRequest.requests(), defaultIndex, defaultType); + handleDeleteRequests(client, bulkRequest.requests(), defaultIndex, xmaxRequests); } else { - if (pkeyFieldname == null) { - trackingRequests = handleIndexRequests(client, bulkRequest.requests(), defaultIndex, defaultType); - } else { - trackingRequests = handleIndexRequestsUsingPkey(client, bulkRequest.requests(), defaultIndex, pkeyFieldname); - if (trackingRequests == null) { - // couldn't do it by primary key, so do it the slow way - trackingRequests = handleIndexRequests(client, bulkRequest.requests(), defaultIndex, defaultType); - } - } + handleIndexRequests(client, bulkRequest.requests(), defaultIndex, requestNumber, xmaxRequests, abortedRequests); } } - BulkResponse response; if (isdelete) { - bulkRequest.refresh(false); + // when deleting, we need to delete the "data" docs first + // otherwise VisibilityQueryHelper might think "data" docs don't have an "xmax" when they really do response = client.bulk(bulkRequest).actionGet(); - if (!response.hasFailures()) - response = processTrackingRequests(request, client, trackingRequests); + + if (!response.hasFailures()) { + // then we can delete from "xmax" + response = processTrackingRequests(request, client, xmaxRequests); + } } else { - response = processTrackingRequests(request, client, trackingRequests); - if (!response.hasFailures()) - response = client.bulk(bulkRequest).actionGet(); + // when inserting, we first need to add the "aborted" docs + response = processTrackingRequests(request, client, abortedRequests); + + if (!response.hasFailures()) { + // then we need to add the "xmax" docs + // otherwise VisibilityQueryHelper might think "data" docs don't have an "xmax" when they really do + response = processTrackingRequests(request, client, xmaxRequests); + + if (!response.hasFailures()) { + // then we can insert into "data" + response = client.bulk(bulkRequest).actionGet(); + } + } } channel.sendResponse(buildResponse(response, JsonXContent.contentBuilder())); @@ -114,257 +121,219 @@ private BulkResponse processTrackingRequests(RestRequest request, Client client, bulkRequest.timeout(request.paramAsTime("timeout", BulkShardRequest.DEFAULT_TIMEOUT)); bulkRequest.refresh(request.paramAsBoolean("refresh", false)); bulkRequest.requests().addAll(trackingRequests); - return client.bulk(bulkRequest).actionGet(); } - private RestResponse buildResponse(BulkResponse response, XContentBuilder builder) throws Exception { + static RestResponse buildResponse(BulkResponse response, XContentBuilder builder) throws Exception { + int errorCnt = 0; builder.startObject(); if (response.hasFailures()) { - builder.field(Fields.TOOK, response.getTookInMillis()); - builder.field(Fields.ERRORS, response.hasFailures()); builder.startArray(Fields.ITEMS); - for (BulkItemResponse itemResponse : response) { - builder.startObject(); - builder.startObject(itemResponse.getOpType()); - builder.field(Fields._INDEX, itemResponse.getIndex()); - builder.field(Fields._TYPE, itemResponse.getType()); - builder.field(Fields._ID, itemResponse.getId()); - long version = itemResponse.getVersion(); - if (version != -1) { - builder.field(Fields._VERSION, itemResponse.getVersion()); - } + main_loop: for (BulkItemResponse itemResponse : response) { if (itemResponse.isFailed()) { - builder.field(Fields.STATUS, itemResponse.getFailure().getStatus().getStatus()); - builder.field(Fields.ERROR, itemResponse.getFailure().getMessage()); - } else { - if (itemResponse.getResponse() instanceof DeleteResponse) { - DeleteResponse deleteResponse = itemResponse.getResponse(); - if (deleteResponse.isFound()) { - builder.field(Fields.STATUS, RestStatus.OK.getStatus()); - } else { - builder.field(Fields.STATUS, RestStatus.NOT_FOUND.getStatus()); - } - builder.field(Fields.FOUND, deleteResponse.isFound()); - } else if (itemResponse.getResponse() instanceof IndexResponse) { - IndexResponse indexResponse = itemResponse.getResponse(); - if (indexResponse.isCreated()) { - builder.field(Fields.STATUS, RestStatus.CREATED.getStatus()); - } else { - builder.field(Fields.STATUS, RestStatus.OK.getStatus()); - } - } else if (itemResponse.getResponse() instanceof UpdateResponse) { - UpdateResponse updateResponse = itemResponse.getResponse(); - if (updateResponse.isCreated()) { - builder.field(Fields.STATUS, RestStatus.CREATED.getStatus()); - } else { - builder.field(Fields.STATUS, RestStatus.OK.getStatus()); + + // handle failure conditions that we know are + // okay/expected as if they never happened + BulkItemResponse.Failure failure = itemResponse.getFailure(); + + switch (failure.getStatus()) { + case CONFLICT: + if (failure.getMessage().contains("VersionConflictEngineException")) { + if ("xmax".equals(itemResponse.getType())) { + if ("delete".equals(itemResponse.getOpType())) { + // this is a version conflict error where we tried to delete + // an old xmax doc, which is perfectly acceptable + continue main_loop; + } + } + } + break; + + default: + errorCnt++; + break; + } + + builder.startObject(); + builder.startObject(itemResponse.getOpType()); + builder.field(Fields._INDEX, itemResponse.getIndex()); + builder.field(Fields._TYPE, itemResponse.getType()); + builder.field(Fields._ID, itemResponse.getId()); + long version = itemResponse.getVersion(); + if (version != -1) { + builder.field(Fields._VERSION, itemResponse.getVersion()); + } + if (itemResponse.isFailed()) { + builder.field(Fields.STATUS, itemResponse.getFailure().getStatus().getStatus()); + builder.field(Fields.ERROR, itemResponse.getFailure().getMessage()); + } else { + if (itemResponse.getResponse() instanceof DeleteResponse) { + DeleteResponse deleteResponse = itemResponse.getResponse(); + if (deleteResponse.isFound()) { + builder.field(Fields.STATUS, RestStatus.OK.getStatus()); + } else { + builder.field(Fields.STATUS, RestStatus.NOT_FOUND.getStatus()); + } + builder.field(Fields.FOUND, deleteResponse.isFound()); + } else if (itemResponse.getResponse() instanceof IndexResponse) { + IndexResponse indexResponse = itemResponse.getResponse(); + if (indexResponse.isCreated()) { + builder.field(Fields.STATUS, RestStatus.CREATED.getStatus()); + } else { + builder.field(Fields.STATUS, RestStatus.OK.getStatus()); + } + } else if (itemResponse.getResponse() instanceof UpdateResponse) { + UpdateResponse updateResponse = itemResponse.getResponse(); + if (updateResponse.isCreated()) { + builder.field(Fields.STATUS, RestStatus.CREATED.getStatus()); + } else { + builder.field(Fields.STATUS, RestStatus.OK.getStatus()); + } } } + builder.endObject(); + builder.endObject(); } - builder.endObject(); - builder.endObject(); } builder.endArray(); + builder.field(Fields.TOOK, response.getTookInMillis()); + if (errorCnt > 0) { + builder.field(Fields.ERRORS, true); + } } builder.endObject(); return new BytesRestResponse(OK, builder); } - private List handleDeleteRequests(Client client, List requests, String defaultIndex, String defaultType) { - List trackingRequests = new ArrayList<>(); - IdsFilterBuilder ids = idsFilter(defaultType); - Map lookup = new HashMap<>(requests.size()); + private void handleDeleteRequests(Client client, List requests, String defaultIndex, List xmaxRequests) { for (ActionRequest ar : requests) { DeleteRequest doc = (DeleteRequest) ar; - ids.addIds(doc.id()); - - lookup.put(doc.id(), doc); - } - - SearchResponse response = client.search( - new SearchRequestBuilder(client) - .setIndices(defaultIndex) - .setTypes(defaultType) - .setPreference("_primary") - .setQuery(filteredQuery(null, ids)) - .setSize(requests.size()) - .addField("_prev_ctid") - .request() - ).actionGet(); - - for (SearchHit hit : response.getHits()) { - DeleteRequest doc = lookup.get(hit.id()); - String prevCtid = hit.field("_prev_ctid").getValue(); - - if (prevCtid == null) - throw new RuntimeException("Found null _prev_ctid for " + hit.getId()); - if (doc != null) { - doc.routing(prevCtid); - - trackingRequests.add( - new DeleteRequestBuilder(client) - .setId(doc.id()) - .setIndex(defaultIndex) - .setType("state") - .setRouting(prevCtid) - .request() - ); - } - } - - if (trackingRequests.size() != response.getHits().getHits().length) - throw new RuntimeException("didn't create enough tracking requests"); - - return trackingRequests; - } - - private List handleIndexRequests(Client client, List requests, String defaultIndex, String defaultType) { - List trackingRequests = new ArrayList<>(); - IdsFilterBuilder ids = idsFilter(defaultType); - Map lookup = new HashMap<>(requests.size()); - - for (ActionRequest ar : requests) { - IndexRequest doc = (IndexRequest) ar; - Map data = doc.sourceAsMap(); - final String prevCtid = (String) data.get("_prev_ctid"); - - if (prevCtid == null) { - // this IndexRequest represents an INSERT - // and as such its rerouting needs to reference itself+xid - Number xid = (Number) data.get("_xid"); - String routing = doc.id() + ":" + xid.longValue(); - - data.put("_prev_ctid", routing); - doc.source(data); - doc.routing(routing); - doc.opType(IndexRequest.OpType.CREATE); - doc.versionType(VersionType.FORCE); - doc.version(xid.longValue()); - } else { - // this IndexRequest represents an UPDATE - // so we'll look up its routing value in batch below - ids.addIds(prevCtid); - lookup.put(prevCtid, doc); - } - } - - if (lookup.isEmpty()) - return Collections.emptyList(); - - SearchResponse response = null; - int retries = 0; - while (retries <= 1) { - - response = client.search( - new SearchRequestBuilder(client) - .setIndices(defaultIndex) - .setTypes(defaultType) - .setPreference("_primary") - .setQuery(filteredQuery(null, ids)) - .setQueryCache(retries == 0) - .setSize(lookup.size()) - .addField("_prev_ctid") - .request() - ).actionGet(); - - if (response.getHits().getHits().length != lookup.size()) { - // didn't find everything, maybe it's because the index needs to be refreshed - // so lets do that and try one more time - client.admin().indices().refresh(Requests.refreshRequest(defaultIndex)).actionGet(); - retries++; - continue; - } - - break; - } - - if (response.getHits().getHits().length != lookup.size()) - throw new RuntimeException("Did not find all previous ctids an UPDATE"); - - for (SearchHit hit : response.getHits()) { - String prevCtid = hit.field("_prev_ctid").getValue(); - IndexRequest doc = lookup.get(hit.id()); - - if (doc == null) - continue; - - Map data = doc.sourceAsMap(); - Number xid = (Number) data.get("_xid"); - - data.put("_prev_ctid", prevCtid); - doc.source(data); - doc.routing(prevCtid); - doc.opType(IndexRequest.OpType.CREATE); - doc.versionType(VersionType.FORCE); - doc.version(xid.longValue()); - - trackingRequests.add( - new IndexRequestBuilder(client) - .setId(hit.id()) + xmaxRequests.add( + new DeleteRequestBuilder(client) .setIndex(defaultIndex) - .setType("state") - .setRouting(prevCtid) - .setOpType(IndexRequest.OpType.INDEX) - .setVersionType(VersionType.FORCE) - .setVersion(xid.longValue()) - .setSource("_ctid", prevCtid) + .setType("xmax") + .setRouting(doc.id()) + .setId(doc.id()) .request() ); + doc.routing(doc.id()); } - - return trackingRequests; } - private List handleIndexRequestsUsingPkey(Client client, List requests, String defaultIndex, String pkeyFieldname) { - List trackingRequests = new ArrayList<>(); + private void handleIndexRequests(Client client, List requests, String defaultIndex, int requestNumber, List xmaxRequests, List abortedRequests) { + + int cnt = 0; for (ActionRequest ar : requests) { IndexRequest doc = (IndexRequest) ar; Map data = doc.sourceAsMap(); - Object pkey = data.get(pkeyFieldname); - Object prevCtid = data.get("_prev_ctid"); - Number xid = (Number) data.get("_xid"); - - if (pkey == null) - return null; // can't use this at all + String prev_ctid = (String) data.get("_prev_ctid"); + Number xmin = (Number) data.get("_xmin"); + Number cmin = (Number) data.get("_cmin"); + Number sequence = (Number) data.get("_zdb_seq"); // -1 means an index build (CREATE INDEX) + + { + // encode a few things into one binary field + // and then add that field to the json source of this document + String[] parts = doc.id().split("-"); + int blockno = Integer.parseInt(parts[0]); + int offno = Integer.parseInt(parts[1]); + BytesRef encodedTuple = encodeTuple(xmin.longValue(), cmin.intValue(), blockno, offno); + + // re-create the source as a byte array. + // this is much faster than using doc.source(Map) + byte[] source = doc.source().toBytes(); + int start = 0; + int len = source.length; + + // backup to the last closing bracket + while (source[start + --len] != '}') ; + + byte[] valueBytes = (",\"_zdb_encoded_tuple\":\"" + Base64.encodeBytes(encodedTuple.bytes) + "\"}").getBytes(); + byte[] dest = new byte[len + valueBytes.length]; + System.arraycopy(source, start, dest, 0, len); + System.arraycopy(valueBytes, 0, dest, len, valueBytes.length); + doc.source(dest); + } - data.put("_prev_ctid", String.valueOf(pkey)); - doc.routing(String.valueOf(pkey)); - doc.opType(IndexRequest.OpType.CREATE); - doc.versionType(VersionType.FORCE); - doc.version(xid.longValue()); - doc.source(data); + if (prev_ctid != null) { + // we are inserting a new doc that replaces a previous doc (an UPDATE) + String[] parts = prev_ctid.split("-"); + int blockno = Integer.parseInt(parts[0]); + int offno = Integer.parseInt(parts[1]); + BytesRef encodedTuple = encodeTuple(xmin.longValue(), cmin.intValue(), blockno, offno); - if (prevCtid != null) { - trackingRequests.add( + xmaxRequests.add( new IndexRequestBuilder(client) - .setId(String.valueOf(prevCtid)) .setIndex(defaultIndex) - .setType("state") - .setRouting(String.valueOf(pkey)) - .setOpType(IndexRequest.OpType.INDEX) + .setType("xmax") .setVersionType(VersionType.FORCE) - .setVersion(xid.longValue()) - .setSource("_ctid", String.valueOf(pkey)) + .setVersion(xmin.longValue()) + .setRouting(prev_ctid) + .setId(prev_ctid) + .setSource("_xmax", xmin, "_cmax", cmin, "_replacement_ctid", doc.id(), "_zdb_encoded_tuple", encodedTuple, "_zdb_reason", "U") .request() ); } - } + if (sequence.intValue() > -1) { + // delete a possible existing xmax value for this doc + // but only if we're in an index build (ie, CREATE INDEX) + xmaxRequests.add( + new DeleteRequestBuilder(client) + .setIndex(defaultIndex) + .setType("xmax") + .setRouting(doc.id()) + .setId(doc.id()) + .request() + ); + } - return trackingRequests; - } + // only add the "aborted" xid entry if this is the first + // record in what might be a batch of inserts from one statement + if (requestNumber == 0 && cnt == 0 && sequence.intValue() > -1) { + GetSettingsResponse indexSettings = client.admin().indices().getSettings(client.admin().indices().prepareGetSettings(defaultIndex).request()).actionGet(); + int shards = Integer.parseInt(indexSettings.getSetting(defaultIndex, "index.number_of_shards")); + String[] routingTable = RoutingHelper.getRoutingTable(client, clusterService, defaultIndex, shards); + + for (String routing : routingTable) { + abortedRequests.add( + new IndexRequestBuilder(client) + .setIndex(defaultIndex) + .setType("aborted") + .setRouting(routing) + .setId(String.valueOf(xmin)) + .setSource("_zdb_xid", xmin) + .request() + ); + } + } - private String lookupPkeyFieldname(Client client, String index) { - GetMappingsResponse mappings = client.admin().indices().getMappings(new GetMappingsRequest().indices(index).types("data")).actionGet(); - MappingMetaData mmd = mappings.getMappings().get(index).get("data"); + // every doc with an "_id" that is a ctid needs a version + // and that version must be *larger* than the document that might + // have previously occupied this "_id" value -- the Postgres transaction id (xid) + // works just fine for this as it's always increasing + doc.opType(IndexRequest.OpType.CREATE); + doc.version(xmin.longValue()); + doc.versionType(VersionType.FORCE); + doc.routing(doc.id()); + + cnt++; + } + } + static BytesRef encodeTuple(long xid, int cmin, int blockno, int offno) { try { - return (String) ((Map) mmd.getSourceAsMap().get("_meta")).get("primary_key"); + byte[] tuple = new byte[4 + 2 + 8 + 4]; // blockno + offno + xmax + cmax + ByteArrayDataOutput out = new ByteArrayDataOutput(tuple); + out.writeVInt(blockno); + out.writeVInt(offno); + out.writeVLong(xid); + out.writeVInt(cmin); + return new BytesRef(tuple, 0, out.getPosition()); } catch (IOException ioe) { throw new RuntimeException(ioe); } diff --git a/elasticsearch/src/main/java/com/tcdi/zombodb/postgres/ZombodbCommitXIDAction.java b/elasticsearch/src/main/java/com/tcdi/zombodb/postgres/ZombodbCommitXIDAction.java index f3b89a8f7..768938728 100644 --- a/elasticsearch/src/main/java/com/tcdi/zombodb/postgres/ZombodbCommitXIDAction.java +++ b/elasticsearch/src/main/java/com/tcdi/zombodb/postgres/ZombodbCommitXIDAction.java @@ -3,28 +3,23 @@ import org.elasticsearch.action.admin.indices.settings.get.GetSettingsResponse; import org.elasticsearch.action.bulk.BulkRequest; import org.elasticsearch.action.bulk.BulkResponse; +import org.elasticsearch.action.delete.DeleteRequestBuilder; import org.elasticsearch.action.index.IndexRequestBuilder; import org.elasticsearch.client.Client; import org.elasticsearch.client.Requests; import org.elasticsearch.cluster.ClusterService; -import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.routing.operation.OperationRouting; import org.elasticsearch.common.inject.Inject; -import org.elasticsearch.common.netty.util.internal.ConcurrentHashMap; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.rest.*; import java.io.BufferedReader; import java.io.InputStreamReader; -import java.util.Map; import static org.elasticsearch.rest.RestRequest.Method.POST; public class ZombodbCommitXIDAction extends BaseRestHandler { private ClusterService clusterService; - private Map routingTablesByIndex = new ConcurrentHashMap<>(); @Inject public ZombodbCommitXIDAction(Settings settings, RestController controller, Client client, ClusterService clusterService) { @@ -41,7 +36,7 @@ protected void handleRequest(RestRequest rest, RestChannel channel, Client clien boolean refresh = rest.paramAsBoolean("refresh", false); GetSettingsResponse indexSettings = client.admin().indices().getSettings(client.admin().indices().prepareGetSettings(index).request()).actionGet(); int shards = Integer.parseInt(indexSettings.getSetting(index, "index.number_of_shards")); - String[] routingTable = bruteForceRoutingValuesForShards(index, shards); + String[] routingTable = RoutingHelper.getRoutingTable(client, clusterService, index, shards); BulkRequest bulkRequest = Requests.bulkRequest(); bulkRequest.refresh(refresh); @@ -51,14 +46,13 @@ protected void handleRequest(RestRequest rest, RestChannel channel, Client clien while ((line = reader.readLine()) != null) { Long xid = Long.valueOf(line); - for (int i=0; i xmaxRequests = new ArrayList<>(); + List abortedRequests = new ArrayList<>(); + + BufferedReader reader = new BufferedReader(new InputStreamReader(request.content().streamInput())); + String line; + int cnt = 0; + + while ((line = reader.readLine()) != null) { + String[] split = line.split(":"); + String ctid = split[0]; + long xid = Long.valueOf(split[1]); + int cmax = Integer.valueOf(split[2]); + split = ctid.split("-"); + int blockno = Integer.parseInt(split[0]); + int offno = Integer.parseInt(split[1]); + BytesRef encodedTuple = ZombodbBulkAction.encodeTuple(xid, cmax, blockno, offno); + + if (cnt == 0) { + GetSettingsResponse indexSettings = client.admin().indices().getSettings(client.admin().indices().prepareGetSettings(index).request()).actionGet(); + int shards = Integer.parseInt(indexSettings.getSetting(index, "index.number_of_shards")); + String[] routingTable = RoutingHelper.getRoutingTable(client, clusterService, index, shards); + + for (String routing : routingTable) { + abortedRequests.add( + new IndexRequestBuilder(client) + .setIndex(index) + .setType("aborted") + .setRouting(routing) + .setId(String.valueOf(xid)) + .setSource("_zdb_xid", xid) + .request() + ); + } + } + + xmaxRequests.add( + new IndexRequestBuilder(client) + .setIndex(index) + .setType("xmax") + .setVersionType(VersionType.FORCE) + .setVersion(xid) + .setRouting(ctid) + .setId(ctid) + .setSource("_xmax", xid, "_cmax", cmax, "_replacement_ctid", ctid, "_zdb_encoded_tuple", encodedTuple, "_zdb_reason", "D") + .request() + ); + + cnt++; + } + + BulkResponse response; + for (List requests : new List[] { abortedRequests, xmaxRequests }){ + BulkRequest bulkRequest = Requests.bulkRequest(); + bulkRequest.refresh(refresh); + bulkRequest.add(requests); + + response = client.bulk(bulkRequest).actionGet(); + if (response.hasFailures()) + throw new RuntimeException(response.buildFailureMessage()); + } + + channel.sendResponse(new BytesRestResponse(RestStatus.OK, String.valueOf("ok"))); + } +} diff --git a/elasticsearch/src/main/java/com/tcdi/zombodb/postgres/ZombodbGetXidVacuumCandidatesAction.java b/elasticsearch/src/main/java/com/tcdi/zombodb/postgres/ZombodbGetXidVacuumCandidatesAction.java new file mode 100644 index 000000000..525c14b1a --- /dev/null +++ b/elasticsearch/src/main/java/com/tcdi/zombodb/postgres/ZombodbGetXidVacuumCandidatesAction.java @@ -0,0 +1,93 @@ +/* + * Copyright 2017 ZomboDB, LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.tcdi.zombodb.postgres; + +import com.tcdi.zombodb.query_parser.utils.Utils; +import org.elasticsearch.action.admin.indices.refresh.RefreshRequestBuilder; +import org.elasticsearch.action.search.*; +import org.elasticsearch.client.Client; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.rest.*; +import org.elasticsearch.search.SearchHit; + +import java.util.HashSet; +import java.util.Set; + +import static org.elasticsearch.rest.RestRequest.Method.GET; + +public class ZombodbGetXidVacuumCandidatesAction extends BaseRestHandler { + + @Inject + public ZombodbGetXidVacuumCandidatesAction(Settings settings, RestController controller, Client client) { + super(settings, controller, client); + + controller.registerHandler(GET, "/{index}/_zdbxidvacuumcandidates", this); + } + + @Override + protected void handleRequest(RestRequest request, RestChannel channel, Client client) throws Exception { + String index = request.param("index"); + + // the transaction ids we can consider for vacuuming are simply + // all the _zdb_xid values in the "aborted" type + // Some of these xids may still be in-progress, but that's okay + // because Postgres will decide for us which ones are really aborted + SearchRequestBuilder search = new SearchRequestBuilder(client) + .setIndices(index) + .setTypes("aborted") + .setSearchType(SearchType.SCAN) + .setScroll(TimeValue.timeValueMinutes(10)) + .setSize(10000) + .addFieldDataField("_zdb_xid"); + + Set xids = new HashSet<>(); + int total = 0, cnt = 0; + SearchResponse response = null; + while (true) { + if (response == null) { + response = client.execute(SearchAction.INSTANCE, search.request()).actionGet(); + total = (int) response.getHits().getTotalHits(); + } else { + response = client.execute(SearchScrollAction.INSTANCE, + new SearchScrollRequestBuilder(client) + .setScrollId(response.getScrollId()) + .setScroll(TimeValue.timeValueMinutes(10)) + .request()).actionGet(); + } + + for (SearchHit hit : response.getHits()) { + Number xid = hit.field("_zdb_xid").value(); + xids.add(xid.longValue()); + + cnt++; + } + + if (cnt == total) + break; + } + + byte[] bytes = new byte[1 + 8 + 8 * xids.size()]; + int offset = 1; // first byte is null to indicate binary response + offset += Utils.encodeLong(xids.size(), bytes, offset); + for (Long xid : xids) { + offset += Utils.encodeLong(xid, bytes, offset); + } + + channel.sendResponse(new BytesRestResponse(RestStatus.OK, "application/data", bytes)); + } +} diff --git a/elasticsearch/src/main/java/com/tcdi/zombodb/postgres/ZombodbMultiSearchAction.java b/elasticsearch/src/main/java/com/tcdi/zombodb/postgres/ZombodbMultiSearchAction.java index 7de16ad0b..093eff78e 100644 --- a/elasticsearch/src/main/java/com/tcdi/zombodb/postgres/ZombodbMultiSearchAction.java +++ b/elasticsearch/src/main/java/com/tcdi/zombodb/postgres/ZombodbMultiSearchAction.java @@ -93,7 +93,7 @@ protected void handleRequest(RestRequest request, RestChannel channel, final Cli srb.setIndices(md.getIndexName()); srb.setTypes("data"); if (md.getPkey() != null) srb.addFieldDataField(md.getPkey()); - srb.setQuery(QueryRewriter.Factory.create(client, md.getIndexName(), md.getPreference(), md.getQuery(), true, false, true).rewriteQuery(true)); + srb.setQuery(QueryRewriter.Factory.create(client, md.getIndexName(), md.getPreference(), md.getQuery(), true, false, true).rewriteQuery()); msearchBuilder.add(srb); } diff --git a/elasticsearch/src/main/java/com/tcdi/zombodb/postgres/ZombodbQueryAction.java b/elasticsearch/src/main/java/com/tcdi/zombodb/postgres/ZombodbQueryAction.java index b5d276e24..ec81e3c80 100644 --- a/elasticsearch/src/main/java/com/tcdi/zombodb/postgres/ZombodbQueryAction.java +++ b/elasticsearch/src/main/java/com/tcdi/zombodb/postgres/ZombodbQueryAction.java @@ -48,7 +48,7 @@ protected void handleRequest(RestRequest request, RestChannel channel, Client cl String json; qr = QueryRewriter.Factory.create(client, request.param("index"), request.param("preference"), query, true, false, false); - json = qr.rewriteQuery(true).toString(); + json = qr.rewriteQuery().toString(); response = new BytesRestResponse(RestStatus.OK, "application/json", json); } catch (Exception e) { diff --git a/elasticsearch/src/main/java/com/tcdi/zombodb/postgres/ZombodbVacuumCleanupAction.java b/elasticsearch/src/main/java/com/tcdi/zombodb/postgres/ZombodbVacuumCleanupAction.java new file mode 100644 index 000000000..180b0ee17 --- /dev/null +++ b/elasticsearch/src/main/java/com/tcdi/zombodb/postgres/ZombodbVacuumCleanupAction.java @@ -0,0 +1,219 @@ +/* + * Copyright 2017 ZomboDB, LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.tcdi.zombodb.postgres; + +import org.elasticsearch.action.ActionRequest; +import org.elasticsearch.action.admin.indices.settings.get.GetSettingsResponse; +import org.elasticsearch.action.bulk.BulkItemResponse; +import org.elasticsearch.action.bulk.BulkRequest; +import org.elasticsearch.action.bulk.BulkResponse; +import org.elasticsearch.action.delete.DeleteRequestBuilder; +import org.elasticsearch.action.search.*; +import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.ClusterService; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.xcontent.json.JsonXContent; +import org.elasticsearch.rest.BaseRestHandler; +import org.elasticsearch.rest.RestChannel; +import org.elasticsearch.rest.RestController; +import org.elasticsearch.rest.RestRequest; +import org.elasticsearch.search.SearchHit; + +import java.io.BufferedReader; +import java.io.InputStreamReader; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +import static com.tcdi.zombodb.postgres.ZombodbBulkAction.buildResponse; +import static org.elasticsearch.index.query.QueryBuilders.termsQuery; +import static org.elasticsearch.rest.RestRequest.Method.POST; + +public class ZombodbVacuumCleanupAction extends BaseRestHandler { + + private final ClusterService clusterService; + + @Inject + public ZombodbVacuumCleanupAction(Settings settings, RestController controller, Client client, ClusterService clusterService) { + super(settings, controller, client); + + this.clusterService = clusterService; + + controller.registerHandler(POST, "/{index}/_zdbvacuumcleanup", this); + } + + @Override + protected void handleRequest(RestRequest request, RestChannel channel, Client client) throws Exception { + String index = request.param("index"); + Set xids = new HashSet<>(); + BufferedReader br = new BufferedReader(new InputStreamReader(request.content().streamInput())); + String line; + + while ((line = br.readLine()) != null) { + xids.add(Long.valueOf(line)); + } + + if (!xids.isEmpty()) { + List xmaxRequests = new ArrayList<>(); + List abortedRequests = new ArrayList<>(); + + filterXidsByDataXmin(client, index, xids); + cleanupXmax(client, index, xids, xmaxRequests, abortedRequests); + + for (List requests : new List[]{xmaxRequests, abortedRequests}) { + if (!requests.isEmpty()) { + BulkRequest bulkRequest = new BulkRequest(); + BulkResponse response; + + bulkRequest.refresh(request.paramAsBoolean("refresh", false)); + bulkRequest.requests().addAll(requests); + + response = client.bulk(bulkRequest).actionGet(); + if (response.hasFailures()) { + channel.sendResponse(buildResponse(response, JsonXContent.contentBuilder())); + return; + } + } + } + } + + channel.sendResponse(buildResponse(new BulkResponse(new BulkItemResponse[0], 0), JsonXContent.contentBuilder())); + } + + private void cleanupXmax(Client client, String index, Set xids, List xmaxRequests, List abortedRequests) { + GetSettingsResponse indexSettings = client.admin().indices().getSettings(client.admin().indices().prepareGetSettings(index).request()).actionGet(); + int shards = Integer.parseInt(indexSettings.getSetting(index, "index.number_of_shards")); + String[] routingTable = RoutingHelper.getRoutingTable(client, clusterService, index, shards); + + SearchRequestBuilder search = new SearchRequestBuilder(client) + .setIndices(index) + .setTypes("xmax") + .setSearchType(SearchType.SCAN) + .setScroll(TimeValue.timeValueMinutes(10)) + .setSize(10000) + .addFieldDataField("_xmax") + .setQuery(termsQuery("_xmax", xids)); + + if (!xids.isEmpty()) { + int total = 0, cnt = 0; + SearchResponse response = null; + while (true) { + if (response == null) { + response = client.execute(SearchAction.INSTANCE, search.request()).actionGet(); + total = (int) response.getHits().getTotalHits(); + } else { + response = client.execute(SearchScrollAction.INSTANCE, + new SearchScrollRequestBuilder(client) + .setScrollId(response.getScrollId()) + .setScroll(TimeValue.timeValueMinutes(10)) + .request()).actionGet(); + } + + for (SearchHit hit : response.getHits()) { + Number xmax = hit.field("_xmax").value(); + + // we can delete this "xmax" entry because its _xmax transaction + // is known to be aborted by Postgres. As such, its corresponding + // "data" doc is going to be visible to all transactions + xmaxRequests.add( + new DeleteRequestBuilder(client) + .setIndex(index) + .setType("xmax") + // it's imperative we set the version here to the same version + // we used when we created this doc (the xid that created it) + // because it's possible that between right here and when + // the cluster performs this individual DeleteRequest, Postgres + // will have decided to re-use this tuple (hit.id) and that + // that tuple was also modified (DELETEd or UPDATEd), which gives + // it a new xmax value with a higher version number. So then + // having this DeleteRequest actually complete would be deleting something + // that isn't what we currently think it is. + .setVersion(xmax.longValue()) + .setRouting(hit.id()) + .setId(hit.id()) + .request() + ); + + cnt++; + } + + if (cnt == total) + break; + } + + // whatever xids we have remaining are guaranteed aborted + // and unreferenced so we can remove them + for (Long xid : xids) { + for (String routing : routingTable) { + abortedRequests.add( + new DeleteRequestBuilder(client) + .setIndex(index) + .setType("aborted") + .setRouting(routing) + .setId(String.valueOf(xid)) + .request() + ); + } + } + } + + } + + private void filterXidsByDataXmin(Client client, String index, Set xids) { + SearchRequestBuilder search = new SearchRequestBuilder(client) + .setIndices(index) + .setTypes("data") + .setSearchType(SearchType.SCAN) + .setScroll(TimeValue.timeValueMinutes(10)) + .setSize(10000) + .addFieldDataField("_xmin") + .setQuery(termsQuery("_xmin", xids)); + + int total = 0, cnt = 0; + SearchResponse response = null; + while (true) { + if (response == null) { + response = client.execute(SearchAction.INSTANCE, search.request()).actionGet(); + total = (int) response.getHits().getTotalHits(); + } else { + response = client.execute(SearchScrollAction.INSTANCE, + new SearchScrollRequestBuilder(client) + .setScrollId(response.getScrollId()) + .setScroll(TimeValue.timeValueMinutes(10)) + .request()).actionGet(); + } + + // any xid that is referenced as an _xmin in "data" + // can't be cleaned up yet. (auto)VACUUM will eventually + // delete both the "data" doc and its corresponding "xmax" doc + for (SearchHit hit : response.getHits()) { + long xmin = ((Number) hit.field("_xmin").value()).longValue(); + + // so remove the xid from our set + xids.remove(xmin); + + cnt++; + } + + if (cnt == total) + break; + } + + } +} \ No newline at end of file diff --git a/elasticsearch/src/main/java/com/tcdi/zombodb/postgres/ZombodbVacuumSupportAction.java b/elasticsearch/src/main/java/com/tcdi/zombodb/postgres/ZombodbVacuumSupportAction.java index 9267cf9ce..5eadb4653 100644 --- a/elasticsearch/src/main/java/com/tcdi/zombodb/postgres/ZombodbVacuumSupportAction.java +++ b/elasticsearch/src/main/java/com/tcdi/zombodb/postgres/ZombodbVacuumSupportAction.java @@ -1,117 +1,116 @@ -/* - * Copyright 2017 ZomboDB, LLC - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.tcdi.zombodb.postgres; - -import com.tcdi.zombodb.query_parser.utils.Utils; -import org.elasticsearch.action.search.*; -import org.elasticsearch.client.Client; -import org.elasticsearch.common.inject.Inject; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.rest.*; -import org.elasticsearch.search.SearchHit; - -import static com.tcdi.zombodb.postgres.PostgresTIDResponseAction.INVALID_BLOCK_NUMBER; -import static com.tcdi.zombodb.query.ZomboDBQueryBuilders.visibility; -import static org.elasticsearch.index.query.QueryBuilders.*; -import static org.elasticsearch.rest.RestRequest.Method.GET; -import static org.elasticsearch.rest.RestRequest.Method.POST; - -public class ZombodbVacuumSupportAction extends BaseRestHandler { - - @Inject - public ZombodbVacuumSupportAction(Settings settings, RestController controller, Client client) { - super(settings, controller, client); - controller.registerHandler(GET, "/{index}/_zdbvacuum", this); - controller.registerHandler(POST, "/{index}/_zdbvacuum", this); - } - - @Override - protected void handleRequest(RestRequest request, RestChannel channel, Client client) throws Exception { - String index = request.param("index"); - String type = request.param("type"); - - SearchRequestBuilder search = new SearchRequestBuilder(client) - .setIndices(index) - .setTypes(type) - .setSearchType(SearchType.SCAN) - .setScroll(TimeValue.timeValueMinutes(10)) - .setSize(10000) - .setNoFields(); - - if ("data".equals(type)) { - long xmin = request.paramAsLong("xmin", 0); - long xmax = request.paramAsLong("xmax", 0); - - search.setQuery( - constantScoreQuery( - boolQuery() - .should( - visibility("_prev_ctid").query(matchAllQuery()).myXid(-1).xmin(xmin).xmax(xmax).all(true) - ) - .should(termQuery("_type", "state")) - ) - ); - search.setTypes(type, "state"); - } - - byte[] bytes = null; - int total = 0, cnt = 0, offset = 0; - SearchResponse response = null; - while (true) { - if (response == null) { - response = client.execute(SearchAction.INSTANCE, search.request()).actionGet(); - total = (int) response.getHits().getTotalHits(); - - bytes = new byte[8 + 6 * total]; - offset += Utils.encodeLong(total, bytes, offset); - } else { - response = client.execute(SearchScrollAction.INSTANCE, - new SearchScrollRequestBuilder(client) - .setScrollId(response.getScrollId()) - .setScroll(TimeValue.timeValueMinutes(10)) - .request()).actionGet(); - } - - for (SearchHit hit : response.getHits()) { - String id; - int blockno; - char rowno; - - try { - id = hit.id(); - - int dash = id.indexOf('-', 1); - blockno = Integer.parseInt(id.substring(0, dash), 10); - rowno = (char) Integer.parseInt(id.substring(dash + 1), 10); - } catch (Exception nfe) { - logger.warn("hit.id()=/" + hit.id() + "/ is not in the proper format. Defaulting to INVALID_BLOCK_NUMBER"); - blockno = INVALID_BLOCK_NUMBER; - rowno = 0; - } - - offset += Utils.encodeInteger(blockno, bytes, offset); - offset += Utils.encodeCharacter(rowno, bytes, offset); - cnt++; - } - - if (cnt == total) - break; - } - - channel.sendResponse(new BytesRestResponse(RestStatus.OK, "application/data", bytes)); - } -} +/* + * Copyright 2017 ZomboDB, LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.tcdi.zombodb.postgres; + +import com.tcdi.zombodb.query_parser.utils.Utils; +import org.elasticsearch.action.admin.indices.refresh.RefreshRequestBuilder; +import org.elasticsearch.action.admin.indices.refresh.RefreshResponse; +import org.elasticsearch.action.search.*; +import org.elasticsearch.client.Client; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.rest.*; +import org.elasticsearch.search.SearchHit; + +import static com.tcdi.zombodb.postgres.PostgresTIDResponseAction.INVALID_BLOCK_NUMBER; +import static com.tcdi.zombodb.query.ZomboDBQueryBuilders.visibility; +import static org.elasticsearch.rest.RestRequest.Method.GET; +import static org.elasticsearch.rest.RestRequest.Method.POST; + +public class ZombodbVacuumSupportAction extends BaseRestHandler { + + @Inject + public ZombodbVacuumSupportAction(Settings settings, RestController controller, Client client) { + super(settings, controller, client); + controller.registerHandler(GET, "/{index}/_zdbvacuum", this); + controller.registerHandler(POST, "/{index}/_zdbvacuum", this); + } + + @Override + protected void handleRequest(RestRequest request, RestChannel channel, Client client) throws Exception { + String index = request.param("index"); + long xmin = request.paramAsLong("xmin", 0); + long xmax = request.paramAsLong("xmax", 0); + int commandid = request.paramAsInt("commandid", -1); + String[] tmp = request.paramAsStringArray("active", new String[]{"0"}); + long[] active = new long[tmp.length]; + for (int i = 0; i < tmp.length; i++) + active[i] = Long.valueOf(tmp[i]); + + // return the ctid (_id) of every document we think might be invisible to us + // based on the current state of the underlying index + SearchRequestBuilder search = new SearchRequestBuilder(client) + .setIndices(index) + .setTypes("data") + .setSearchType(SearchType.SCAN) + .setScroll(TimeValue.timeValueMinutes(10)) + .setSize(10000) + .setNoFields() + .setQuery( + visibility() + .xmin(xmin) + .xmax(xmax) + .commandId(commandid) + .activeXids(active) + ); + + byte[] bytes = null; + int total = 0, cnt = 0, offset = 0; + SearchResponse response = null; + while (true) { + if (response == null) { + response = client.execute(SearchAction.INSTANCE, search.request()).actionGet(); + total = (int) response.getHits().getTotalHits(); + bytes = new byte[8 + 6 * total]; + offset += Utils.encodeLong(total, bytes, offset); + } else { + response = client.execute(SearchScrollAction.INSTANCE, + new SearchScrollRequestBuilder(client) + .setScrollId(response.getScrollId()) + .setScroll(TimeValue.timeValueMinutes(10)) + .request()).actionGet(); + } + + for (SearchHit hit : response.getHits()) { + String id; + int blockno; + char rowno; + + try { + id = hit.id(); + + int dash = id.indexOf('-', 1); + blockno = Integer.parseInt(id.substring(0, dash), 10); + rowno = (char) Integer.parseInt(id.substring(dash + 1), 10); + } catch (Exception nfe) { + logger.warn("hit.id()=/" + hit.id() + "/ is not in the proper format. Defaulting to INVALID_BLOCK_NUMBER"); + blockno = INVALID_BLOCK_NUMBER; + rowno = 0; + } + + offset += Utils.encodeInteger(blockno, bytes, offset); + offset += Utils.encodeCharacter(rowno, bytes, offset); + cnt++; + } + + if (cnt == total) + break; + } + + channel.sendResponse(new BytesRestResponse(RestStatus.OK, "application/data", bytes)); + } +} diff --git a/elasticsearch/src/main/java/com/tcdi/zombodb/query/HeapTuple.java b/elasticsearch/src/main/java/com/tcdi/zombodb/query/HeapTuple.java new file mode 100644 index 000000000..361d1fc2a --- /dev/null +++ b/elasticsearch/src/main/java/com/tcdi/zombodb/query/HeapTuple.java @@ -0,0 +1,78 @@ +/* + * Copyright 2017 ZomboDB, LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.tcdi.zombodb.query; + +import org.apache.lucene.store.ByteArrayDataInput; +import org.apache.lucene.util.BytesRef; + +class HeapTuple implements Comparable { + final int blockno, offno; + long xmin; + int cmin; + long xmax; + int cmax; + + private final int hash; + + /* for unit testing */ + HeapTuple(int blockno, int offno) { + this.blockno = blockno; + this.offno = offno; + + this.hash = blockno + (31 * offno); + } + + HeapTuple(BytesRef bytes, boolean isxmin, ByteArrayDataInput in) { + // lucene prefixes binary terms with a header of two variable length ints. + // because we know how our binary data is constructed (it could never be + // more than 18 bytes) we can blindly assume that the header length is 2 bytes. + // 1 byte for the number of items and 1 byte for the first/only item's byte + // length, neither of which we need + in.reset(bytes.bytes, 2, bytes.length-2); + + blockno = in.readVInt(); + offno = in.readVInt(); + if (in.getPosition() < bytes.length) { + // more bytes, so we also have xmax and cmax to read + if (isxmin) { + xmin = in.readVLong(); + cmin = in.readVInt(); + } else { + xmax = in.readVLong(); + cmax = in.readVInt(); + } + } + + hash = blockno + (31 * offno); + } + + @Override + public int hashCode() { + return hash; + } + + @Override + public boolean equals(Object obj) { + assert(obj instanceof HeapTuple); + HeapTuple other = (HeapTuple) obj; + return this.blockno == other.blockno && this.offno == other.offno; + } + + @Override + public int compareTo(HeapTuple other) { + return this.blockno < other.blockno ? -1 : this.blockno > other.blockno ? 1 : this.offno - other.offno; + } +} diff --git a/elasticsearch/src/main/java/com/tcdi/zombodb/query/VisibilityInfo.java b/elasticsearch/src/main/java/com/tcdi/zombodb/query/VisibilityInfo.java deleted file mode 100644 index 5c86b0d09..000000000 --- a/elasticsearch/src/main/java/com/tcdi/zombodb/query/VisibilityInfo.java +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Copyright 2017 ZomboDB, LLC - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.tcdi.zombodb.query; - -class VisibilityInfo { - final int readerOrd; - final int maxdoc; - final int docid; - public final long xid; - public final long sequence; - - VisibilityInfo(int readerOrd, int maxdoc, int docid, long xid, long sequence) { - this.readerOrd = readerOrd; - this.maxdoc = maxdoc; - this.docid = docid; - this.xid = xid; - this.sequence = sequence; - } -} diff --git a/elasticsearch/src/main/java/com/tcdi/zombodb/query/VisibilityQueryHelper.java b/elasticsearch/src/main/java/com/tcdi/zombodb/query/VisibilityQueryHelper.java index d5cd8b677..f4d978198 100644 --- a/elasticsearch/src/main/java/com/tcdi/zombodb/query/VisibilityQueryHelper.java +++ b/elasticsearch/src/main/java/com/tcdi/zombodb/query/VisibilityQueryHelper.java @@ -18,167 +18,294 @@ import org.apache.lucene.index.*; import org.apache.lucene.queries.TermFilter; import org.apache.lucene.queries.TermsFilter; -import org.apache.lucene.search.FieldCache; +import org.apache.lucene.search.Filter; import org.apache.lucene.search.IndexSearcher; -import org.apache.lucene.search.MatchAllDocsQuery; -import org.apache.lucene.search.Query; +import org.apache.lucene.search.NumericRangeFilter; import org.apache.lucene.search.join.ZomboDBTermsCollector; -import org.apache.lucene.util.*; +import org.apache.lucene.store.ByteArrayDataInput; +import org.apache.lucene.util.BytesRef; +import org.apache.lucene.util.BytesRefBuilder; +import org.apache.lucene.util.FixedBitSet; +import org.apache.lucene.util.NumericUtils; +import org.elasticsearch.common.hppc.IntOpenHashSet; +import org.elasticsearch.common.hppc.IntSet; +import org.elasticsearch.common.hppc.cursors.IntCursor; +import org.elasticsearch.common.lucene.search.AndFilter; +import org.elasticsearch.common.lucene.search.MatchAllDocsFilter; +import org.elasticsearch.common.lucene.search.OrFilter; import org.elasticsearch.common.lucene.search.XConstantScoreQuery; -import org.elasticsearch.search.internal.SearchContext; import java.io.IOException; import java.util.*; -import java.util.concurrent.ConcurrentSkipListSet; final class VisibilityQueryHelper { - private static final ConcurrentSkipListSet KNOWN_COMMITTED_XIDS = new ConcurrentSkipListSet<>(); + /** + * helper class for figuring out how many _xmin and _xmax values + * our shard has + */ + private static class XminXmaxCounts { + private IndexSearcher searcher; + private int doccnt; + private int xmaxcnt; - static List findUpdatedCtids(IndexSearcher searcher) throws IOException { - final List updatedCtids = new ArrayList<>(); + XminXmaxCounts(IndexSearcher searcher) { + this.searcher = searcher; + } - // - // search the "state" type and collect a distinct set of all the _ctids - // these represent the records in the index that have been updated - // used below to determine visibility - // - // We use XConstantScoreQuery here so that we exclude deleted docs - // + int getDoccnt() { + return doccnt; + } + + int getXmaxcnt() { + return xmaxcnt; + } + + XminXmaxCounts invoke() throws IOException { + doccnt = 0; + xmaxcnt = 0; + for (AtomicReaderContext context : searcher.getIndexReader().leaves()) { + Terms terms; - searcher.search(new XConstantScoreQuery(SearchContext.current().filterCache().cache(new TermFilter(new Term("_type", "state")))), - new ZomboDBTermsCollector("_ctid") { - SortedDocValues ctids; + terms = context.reader().terms("_xmin"); + if (terms != null) + doccnt += terms.getDocCount(); + + terms = context.reader().terms("_xmax"); + if (terms != null) + xmaxcnt += terms.getDocCount(); + } + return this; + } + } + + /** + * collect all the _zdb_xid values in the "aborted" type as both a set of Longs and as a list of BytesRef + * for filtering in #determineVisibility + */ + private static void collectAbortedXids(IndexSearcher searcher, final Set abortedXids, final List abortedXidsAsBytes) throws IOException { + searcher.search(new XConstantScoreQuery(new TermFilter(new Term("_type", "aborted"))), + new ZomboDBTermsCollector() { + SortedNumericDocValues _zdb_xid; @Override public void collect(int doc) throws IOException { - updatedCtids.add(BytesRef.deepCopyOf(ctids.get(doc))); + _zdb_xid.setDocument(doc); + + long xid = _zdb_xid.valueAt(0); + BytesRefBuilder builder = new BytesRefBuilder(); + NumericUtils.longToPrefixCoded(xid, 0, builder); + + abortedXids.add(xid); + abortedXidsAsBytes.add(builder.get()); } @Override public void setNextReader(AtomicReaderContext context) throws IOException { - ctids = FieldCache.DEFAULT.getTermsIndex(context.reader(), "_ctid"); + _zdb_xid = context.reader().getSortedNumericDocValues("_zdb_xid"); } } ); + } - Collections.sort(updatedCtids); - return updatedCtids; + /** + * Collect all the "xmax" docs that exist in the shard we're running on. + * + * Depending on the state of the table, there can potentially be thousands or even millions + * of these that we have to process, so we try really hard to limit the amount of work we + * need to do for each one + */ + private static void collectMaxes(IndexSearcher searcher, final Map tuples, final IntSet dirtyBlocks) throws IOException { + abstract class Collector extends ZomboDBTermsCollector { + ByteArrayDataInput in = new ByteArrayDataInput(); + BinaryDocValues _zdb_encoded_tuple; + + @Override + public void setNextReader(AtomicReaderContext context) throws IOException { + _zdb_encoded_tuple = context.reader().getBinaryDocValues("_zdb_encoded_tuple"); + } + } + + if (dirtyBlocks != null) { + searcher.search(new XConstantScoreQuery(new TermFilter(new Term("_type", "xmax"))), + new Collector() { + @Override + public void collect(int doc) throws IOException { + HeapTuple ctid = new HeapTuple(_zdb_encoded_tuple.get(doc), false, in); + tuples.put(ctid, ctid); + dirtyBlocks.add(ctid.blockno); + } + } + ); + } else { + searcher.search(new XConstantScoreQuery(new TermFilter(new Term("_type", "xmax"))), + new Collector() { + @Override + public void collect(int doc) throws IOException { + HeapTuple ctid = new HeapTuple(_zdb_encoded_tuple.get(doc), false, in); + tuples.put(ctid, ctid); + } + } + ); + } } - static Map determineVisibility(final Query query, final String field, final long myXid, final long xmin, final long xmax, final boolean all, final Set activeXids, IndexSearcher searcher, List updatedCtids) throws IOException { - final Map visibilityBitSets = new HashMap<>(); + static Map determineVisibility(final long myXid, final long myXmin, final long myXmax, final int myCommand, final Set activeXids, IndexSearcher searcher) throws IOException { + XminXmaxCounts xminXmaxCounts = new XminXmaxCounts(searcher).invoke(); + int xmaxcnt = xminXmaxCounts.getXmaxcnt(); + int doccnt = xminXmaxCounts.getDoccnt(); - if (!all && updatedCtids.size() == 0) - return visibilityBitSets; + final boolean just_get_everything = xmaxcnt >= doccnt/3; + + final IntSet dirtyBlocks = just_get_everything ? null : new IntOpenHashSet(); + final Map modifiedTuples = new HashMap<>(xmaxcnt); + + collectMaxes(searcher, modifiedTuples, dirtyBlocks); + + final Set abortedXids = new HashSet<>(); + final List abortedXidsAsBytes = new ArrayList<>(); + + collectAbortedXids(searcher, abortedXids, abortedXidsAsBytes); + + final List filters = new ArrayList<>(); + if (just_get_everything) { + // if the number of docs with xmax values is at least 1/3 of the total docs + // just go ahead and ask for everything. This is much faster than asking + // lucene to parse and lookup tens of thousands (or millions!) of individual + // _uid values + filters.add(new MatchAllDocsFilter()); + } else { + // just look at all the docs on the blocks we've identified as dirty + if (!dirtyBlocks.isEmpty()) { + BytesRefBuilder builder = new BytesRefBuilder(); + List tmp = new ArrayList<>(); + for (IntCursor blockNumber : dirtyBlocks) { + NumericUtils.intToPrefixCoded(blockNumber.value, 0, builder); + tmp.add(builder.toBytesRef()); + } + filters.add(new TermsFilter("_zdb_blockno", tmp)); + } + + // we also need to examine docs that might be aborted or inflight on non-dirty pages + + final List activeXidsAsBytes = new ArrayList<>(activeXids.size()); + for (Long xid : activeXids) { + BytesRefBuilder builder = new BytesRefBuilder(); + NumericUtils.longToPrefixCoded(xid, 0, builder); + activeXidsAsBytes.add(builder.toBytesRef()); + } + + if (!activeXids.isEmpty()) + filters.add(new TermsFilter("_xmin", activeXidsAsBytes)); + if (!abortedXids.isEmpty()) + filters.add(new TermsFilter("_xmin", abortedXidsAsBytes)); + filters.add(NumericRangeFilter.newLongRange("_xmin", myXmin, null, true, true)); + } // - // build a map of {@link VisibilityInfo} objects by each _prev_ctid - // - // We use XConstantScoreQuery here so that we exclude deleted docs + // find all "data" docs that we think we might need to examine for visibility + // given the set of filters above, this is likely to be over-inclusive + // but that's okay because it's cheaper to find and examine more docs + // than it is to use TermsFilters with very long lists of _ids // - - final Map> map = new HashMap<>(); - searcher.search( - all ? - new MatchAllDocsQuery() : - new XConstantScoreQuery(SearchContext.current().filterCache().cache(new TermsFilter(field, updatedCtids))), - new ZomboDBTermsCollector(field) { - private SortedDocValues prevCtids; - private SortedNumericDocValues xids; - private SortedNumericDocValues sequence; - private int ord; + final Map visibilityBitSets = new HashMap<>(); + searcher.search(new XConstantScoreQuery( + new AndFilter( + Arrays.asList( + new TermFilter(new Term("_type", "data")), + new OrFilter(filters) + ) + ) + ), + new ZomboDBTermsCollector() { + private final ByteArrayDataInput in = new ByteArrayDataInput(); + private BinaryDocValues _zdb_encoded_tuple; + private int contextOrd; private int maxdoc; @Override public void collect(int doc) throws IOException { - if (xids == null) - return; - xids.setDocument(doc); - sequence.setDocument(doc); + HeapTuple ctid = new HeapTuple(_zdb_encoded_tuple.get(doc), true, in); // from "data" + HeapTuple ctidWithXmax = modifiedTuples.get(ctid); // from "xmax" + + // get all the xmin/xmax, cmin/cmax values we need to determine visibility below + long xmin = ctid.xmin; + int cmin = ctid.cmin; + boolean xmax_is_null = ctidWithXmax == null; + long xmax = -1; + int cmax = -1; + + if (!xmax_is_null) { + xmax = ctidWithXmax.xmax; + cmax = ctidWithXmax.cmax; + } + + // we can only consider transactions as committed or aborted if they're not outside + // our current snapshot's xmax (myXmax) and aren't otherwise considered active or aborted in some way - long xid = xids.valueAt(0); - long seq = sequence.valueAt(0); - BytesRef prevCtid = prevCtids.get(doc); + boolean xmin_is_committed = !(xmin >= myXmax) && !activeXids.contains(xmin) && !abortedXids.contains(xmin); + boolean xmax_is_committed = !xmax_is_null && !(xmax >= myXmax) && !activeXids.contains(xmax) && !abortedXids.contains(xmax); - List matchingDocs = map.get(prevCtid); - if (matchingDocs == null) - map.put(BytesRef.deepCopyOf(prevCtid), matchingDocs = new ArrayList<>()); - matchingDocs.add(new VisibilityInfo(ord, maxdoc, doc, xid, seq)); + // + // the logic below is taken from Postgres 9.3's "tqual.c#HeapTupleSatifiesNow()" + // + + /* + * HeapTupleSatisfiesNow + * True iff heap tuple is valid "now". + * + * Here, we consider the effects of: + * all committed transactions (as of the current instant) + * previous commands of this transaction + * + * Note we do _not_ include changes made by the current command. This + * solves the "Halloween problem" wherein an UPDATE might try to re-update + * its own output tuples, http://en.wikipedia.org/wiki/Halloween_Problem. + * + * Note: + * Assumes heap tuple is valid. + * + * The satisfaction of "now" requires the following: + * + * ((Xmin == my-transaction && inserted by the current transaction + * Cmin < my-command && before this command, and + * (Xmax is null || the row has not been deleted, or + * (Xmax == my-transaction && it was deleted by the current transaction + * Cmax >= my-command))) but not before this command, + * || or + * (Xmin is committed && the row was inserted by a committed transaction, and + * (Xmax is null || the row has not been deleted, or + * (Xmax == my-transaction && the row is being deleted by this transaction + * Cmax >= my-command) || but it's not deleted "yet", or + * (Xmax != my-transaction && the row was deleted by another transaction + * Xmax is not committed)))) that has not been committed + * + */ + if ( + !( + (xmin == myXid && cmin < myCommand && (xmax_is_null || (xmax == myXid && cmax >= myCommand))) + || + (xmin_is_committed && (xmax_is_null || (xmax == myXid && cmax >= myCommand) || (xmax != myXid && !xmax_is_committed))) + ) + ) { + // it's not visible to us + FixedBitSet visibilityBitset = visibilityBitSets.get(contextOrd); + if (visibilityBitset == null) + visibilityBitSets.put(contextOrd, visibilityBitset = new FixedBitSet(maxdoc)); + visibilityBitset.set(doc); + } } @Override public void setNextReader(AtomicReaderContext context) throws IOException { - prevCtids = FieldCache.DEFAULT.getTermsIndex(context.reader(), field); - xids = context.reader().getSortedNumericDocValues("_xid"); - sequence = context.reader().getSortedNumericDocValues("_zdb_seq"); - ord = context.ord; + _zdb_encoded_tuple = context.reader().getBinaryDocValues("_zdb_encoded_tuple"); + contextOrd = context.ord; maxdoc = context.reader().maxDoc(); } } ); - if (map.isEmpty()) - return visibilityBitSets; - - // - // pick out the first VisibilityInfo for each document that is visible & committed - // and build a FixedBitSet for each reader 'ord' that contains visible - // documents. A map of these (key'd on reader ord) is what we return. - // - - BytesRefBuilder bytesRefBuilder = new BytesRefBuilder() { - /* overloaded to avoid making a copy of the byte array */ - @Override - public BytesRef toBytesRef() { - return new BytesRef(this.bytes(), 0, this.length()); - } - }; - - Terms committedXidsTerms = MultiFields.getFields(searcher.getIndexReader()).terms("_zdb_committed_xid"); - TermsEnum committedXidsEnum = committedXidsTerms == null ? null : committedXidsTerms.iterator(null); - for (List visibility : map.values()) { - CollectionUtil.introSort(visibility, new Comparator() { - @Override - public int compare(VisibilityInfo o1, VisibilityInfo o2) { - int cmp = Long.compare(o2.xid, o1.xid); - return cmp == 0 ? Long.compare(o2.sequence, o1.sequence) : cmp; - } - }); - - boolean foundVisible = false; - for (VisibilityInfo mapping : visibility) { - - if (foundVisible || mapping.xid > xmax || activeXids.contains(mapping.xid) || (mapping.xid != myXid && !isCommitted(committedXidsEnum, mapping.xid, bytesRefBuilder))) { - // document is not visible to us - FixedBitSet visibilityBitset = visibilityBitSets.get(mapping.readerOrd); - if (visibilityBitset == null) - visibilityBitSets.put(mapping.readerOrd, visibilityBitset = new FixedBitSet(mapping.maxdoc)); - visibilityBitset.set(mapping.docid); - } else { - foundVisible = true; - } - } - } - return visibilityBitSets; } - - private static boolean isCommitted(TermsEnum termsEnum, long xid, BytesRefBuilder builder) throws IOException { - if (KNOWN_COMMITTED_XIDS.contains(xid)) - return true; - - if (termsEnum == null) - return false; - - NumericUtils.longToPrefixCoded(xid, 0, builder); - boolean isCommitted = termsEnum.seekExact(builder.toBytesRef()); - - if (isCommitted) - KNOWN_COMMITTED_XIDS.add(xid); - - builder.clear(); - return isCommitted; - } } diff --git a/elasticsearch/src/main/java/com/tcdi/zombodb/query/ZomboDBQueryBuilders.java b/elasticsearch/src/main/java/com/tcdi/zombodb/query/ZomboDBQueryBuilders.java index ab82fefb5..bb7ec8b2f 100644 --- a/elasticsearch/src/main/java/com/tcdi/zombodb/query/ZomboDBQueryBuilders.java +++ b/elasticsearch/src/main/java/com/tcdi/zombodb/query/ZomboDBQueryBuilders.java @@ -17,7 +17,7 @@ public class ZomboDBQueryBuilders { - public static ZomboDBVisibilityQueryBuilder visibility(String field) { - return new ZomboDBVisibilityQueryBuilder(field); + public static ZomboDBVisibilityQueryBuilder visibility() { + return new ZomboDBVisibilityQueryBuilder(); } } diff --git a/elasticsearch/src/main/java/com/tcdi/zombodb/query/ZomboDBVisibilityQuery.java b/elasticsearch/src/main/java/com/tcdi/zombodb/query/ZomboDBVisibilityQuery.java index e4e89c756..451d27d82 100644 --- a/elasticsearch/src/main/java/com/tcdi/zombodb/query/ZomboDBVisibilityQuery.java +++ b/elasticsearch/src/main/java/com/tcdi/zombodb/query/ZomboDBVisibilityQuery.java @@ -23,35 +23,27 @@ import org.apache.lucene.search.IndexSearcher; import org.apache.lucene.search.Query; import org.apache.lucene.util.Bits; -import org.apache.lucene.util.BytesRef; import org.apache.lucene.util.FixedBitSet; import org.elasticsearch.common.lang3.ArrayUtils; -import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.common.lucene.search.XConstantScoreQuery; -import org.elasticsearch.search.internal.SearchContext; import java.io.IOException; -import java.util.List; import java.util.Map; import java.util.Set; class ZomboDBVisibilityQuery extends Query { - private final Query query; - private final String fieldname; private final long myXid; private final long xmin; private final long xmax; - private final boolean all; + private final int commandid; private final Set activeXids; - ZomboDBVisibilityQuery(Query query, String fieldname, long myXid, long xmin, long xmax, boolean all, Set activeXids) { - this.query = query; - this.fieldname = fieldname; + ZomboDBVisibilityQuery(long myXid, long xmin, long xmax, int commandid, Set activeXids) { this.myXid = myXid; this.xmin = xmin; this.xmax = xmax; - this.all = all; + this.commandid = commandid; this.activeXids = activeXids; } @@ -60,23 +52,21 @@ public Query rewrite(final IndexReader reader) throws IOException { class VisFilter extends Filter { private Map visibilityBitSets = null; private final IndexSearcher searcher; - private final List updatedCtids; - private VisFilter(IndexSearcher searcher, List updatedCtids) { + private VisFilter(IndexSearcher searcher) { this.searcher = searcher; - this.updatedCtids = updatedCtids; } @Override public DocIdSet getDocIdSet(AtomicReaderContext context, Bits acceptDocs) throws IOException { if (visibilityBitSets == null) - visibilityBitSets = VisibilityQueryHelper.determineVisibility(query, fieldname, myXid, xmin, xmax, all, activeXids, searcher, updatedCtids); + visibilityBitSets = VisibilityQueryHelper.determineVisibility(myXid, xmin, xmax, commandid, activeXids, searcher); return visibilityBitSets.get(context.ord); } } IndexSearcher searcher = new IndexSearcher(reader); - return new XConstantScoreQuery(new VisFilter(searcher, VisibilityQueryHelper.findUpdatedCtids(searcher))); + return new XConstantScoreQuery(new VisFilter(searcher)); } @Override @@ -86,18 +76,16 @@ public void extractTerms(Set terms) { @Override public String toString(String field) { - return "visibility(" + fieldname + ", query=" + query + ", myXid=" + myXid + ", xmin=" + xmin + ", xmax=" + xmax + ", all=" + all + ", active=" + ArrayUtils.toString(activeXids) + ")"; + return "visibility(myXid=" + myXid + ", xmin=" + xmin + ", xmax=" + xmax + ", commandid=" + commandid + ", active=" + ArrayUtils.toString(activeXids) + ")"; } @Override public int hashCode() { int hash = super.hashCode(); - hash = hash * 31 + query.hashCode(); - hash = hash * 31 + fieldname.hashCode(); hash = hash * 31 + (int)(myXid ^ (myXid >>> 32)); hash = hash * 31 + (int)(xmin ^ (xmin >>> 32)); hash = hash * 31 + (int)(xmax ^ (xmax >>> 32)); - hash = hash * 31 + (all?1:0); + hash = hash * 31 + (commandid); hash = hash * 31 + ArrayUtils.toString(activeXids).hashCode(); return hash; } @@ -110,12 +98,11 @@ public boolean equals(Object obj) { assert obj instanceof ZomboDBVisibilityQuery; ZomboDBVisibilityQuery eq = (ZomboDBVisibilityQuery) obj; - return this.query.equals(eq.query) && - this.fieldname.equals(eq.fieldname) && + return this.myXid == eq.myXid && this.xmin == eq.xmin && this.xmax == eq.xmax && - this.all == eq.all && + this.commandid == eq.commandid && ArrayUtils.isEquals(this.activeXids, eq.activeXids); } } diff --git a/elasticsearch/src/main/java/com/tcdi/zombodb/query/ZomboDBVisibilityQueryBuilder.java b/elasticsearch/src/main/java/com/tcdi/zombodb/query/ZomboDBVisibilityQueryBuilder.java index ce5e3ad36..0f0734af2 100644 --- a/elasticsearch/src/main/java/com/tcdi/zombodb/query/ZomboDBVisibilityQueryBuilder.java +++ b/elasticsearch/src/main/java/com/tcdi/zombodb/query/ZomboDBVisibilityQueryBuilder.java @@ -17,26 +17,22 @@ import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.index.query.BaseQueryBuilder; -import org.elasticsearch.index.query.QueryBuilder; import java.io.IOException; -import java.util.Collection; public class ZomboDBVisibilityQueryBuilder extends BaseQueryBuilder { - private QueryBuilder query; - private final String fieldname; private long myXid; private boolean haveMyXid; private long xmin; + private int commandid; private boolean haveXmin; private boolean haveXmax; + private boolean haveCommandid; private long xmax; - private boolean all = false; private long[] activeXids; - public ZomboDBVisibilityQueryBuilder(String name) { - fieldname = name; + public ZomboDBVisibilityQueryBuilder() { } public ZomboDBVisibilityQueryBuilder myXid(long myXid) { @@ -57,25 +53,20 @@ public ZomboDBVisibilityQueryBuilder xmax(long xmax) { return this; } - public ZomboDBVisibilityQueryBuilder activeXids(long[] xids) { - activeXids = xids; + public ZomboDBVisibilityQueryBuilder commandId(int commandid) { + this.commandid = commandid; + haveCommandid = true; return this; } - public ZomboDBVisibilityQueryBuilder all(boolean all) { - this.all = all; - return this; - } - - public ZomboDBVisibilityQueryBuilder query(QueryBuilder query) { - this.query = query; + public ZomboDBVisibilityQueryBuilder activeXids(long[] xids) { + activeXids = xids; return this; } @Override protected void doXContent(XContentBuilder builder, Params params) throws IOException { builder.startObject(ZomboDBVisibilityQueryParser.NAME); - builder.field("name", fieldname); if (haveMyXid) builder.field("myxid", myXid); @@ -83,14 +74,10 @@ protected void doXContent(XContentBuilder builder, Params params) throws IOExcep builder.field("xmin", xmin); if (haveXmax) builder.field("xmax", xmax); + if (haveCommandid) + builder.field("commandid", commandid); if (activeXids != null && activeXids.length > 0) builder.field("active_xids", activeXids); - if (all) - builder.field("all", true); - if (query != null) { - builder.field("query"); - query.toXContent(builder, params); - } builder.endObject(); } } diff --git a/elasticsearch/src/main/java/com/tcdi/zombodb/query/ZomboDBVisibilityQueryParser.java b/elasticsearch/src/main/java/com/tcdi/zombodb/query/ZomboDBVisibilityQueryParser.java index c255561cc..38b41178f 100644 --- a/elasticsearch/src/main/java/com/tcdi/zombodb/query/ZomboDBVisibilityQueryParser.java +++ b/elasticsearch/src/main/java/com/tcdi/zombodb/query/ZomboDBVisibilityQueryParser.java @@ -17,15 +17,12 @@ import org.apache.lucene.search.Query; import org.elasticsearch.common.xcontent.XContentParser; -import org.elasticsearch.index.mapper.core.CompletionFieldMapper; import org.elasticsearch.index.query.QueryParseContext; import org.elasticsearch.index.query.QueryParser; import org.elasticsearch.index.query.QueryParsingException; import java.io.IOException; -import java.util.ArrayList; import java.util.HashSet; -import java.util.List; import java.util.Set; public class ZomboDBVisibilityQueryParser implements QueryParser { @@ -40,12 +37,10 @@ public String[] names() { public Query parse(QueryParseContext parseContext) throws IOException, QueryParsingException { XContentParser parser = parseContext.parser(); - Query query = null; - String fieldname = null; long myXid = -1; long xmin = -1; long xmax = -1; - boolean all = false; + int commandid = -1; Set activeXids = new HashSet<>(); String currentFieldName = null; @@ -54,12 +49,6 @@ public Query parse(QueryParseContext parseContext) throws IOException, QueryPars while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) { if (token == XContentParser.Token.FIELD_NAME) { currentFieldName = parser.currentName(); - } else if (token == XContentParser.Token.START_OBJECT) { - if ("query".equals(currentFieldName)) { - query = parseContext.parseInnerQuery(); - } else { - throw new QueryParsingException(parseContext.index(), "[zdb visibility] query does not support [" + currentFieldName + "]"); - } } else if (token == XContentParser.Token.START_ARRAY) { if ("active_xids".equals(currentFieldName)) { while (parser.nextToken() != XContentParser.Token.END_ARRAY) { @@ -67,29 +56,25 @@ public Query parse(QueryParseContext parseContext) throws IOException, QueryPars } } } else if (token.isValue()) { - if ("name".equals(currentFieldName)) { - fieldname = parser.text(); - } else if ("myxid".equals(currentFieldName)) { + if ("myxid".equals(currentFieldName)) { myXid = parser.longValue(); } else if ("xmin".equals(currentFieldName)) { xmin = parser.longValue(); } else if ("xmax".equals(currentFieldName)) { xmax = parser.longValue(); - } else if ("all".equals(currentFieldName)) { - all = parser.booleanValue(); + } else if ("commandid".equals(currentFieldName)) { + commandid = parser.intValue(); } else { throw new QueryParsingException(parseContext.index(), "[zdb visibility] query does not support [" + currentFieldName + "]"); } } } - if (query == null) - throw new QueryParsingException(parseContext.index(), "[zdb visibility] missing [query]"); - else if (fieldname == null) - throw new QueryParsingException(parseContext.index(), "[zdb visibility] missing [name]"); - else if (xmin == -1) + if (xmin == -1) throw new QueryParsingException(parseContext.index(), "[zdb visibility] missing [xmin]"); + else if (xmax == -1) + throw new QueryParsingException(parseContext.index(), "[zdb visibility] missing [xmax]"); - return new ZomboDBVisibilityQuery(query, fieldname, myXid, xmin, xmax, all, activeXids); + return new ZomboDBVisibilityQuery(myXid, xmin, xmax, commandid, activeXids); } } diff --git a/elasticsearch/src/main/java/com/tcdi/zombodb/query_parser/ASTVisibility.java b/elasticsearch/src/main/java/com/tcdi/zombodb/query_parser/ASTVisibility.java index c15291fd5..4cff17aea 100644 --- a/elasticsearch/src/main/java/com/tcdi/zombodb/query_parser/ASTVisibility.java +++ b/elasticsearch/src/main/java/com/tcdi/zombodb/query_parser/ASTVisibility.java @@ -26,8 +26,11 @@ public long getXmax() { return Long.valueOf(getChild(2).getEscapedValue()); } + public int getCommandId() { + return Integer.valueOf(getChild(3).getEscapedValue()); + } public long[] getActiveXids() { - ASTArray array = (ASTArray) getChild(3); + ASTArray array = (ASTArray) getChild(4); if (array == null) return new long[0]; long[] xids = new long[array.jjtGetNumChildren()]; diff --git a/elasticsearch/src/main/java/com/tcdi/zombodb/query_parser/optimizers/ExpansionOptimizer.java b/elasticsearch/src/main/java/com/tcdi/zombodb/query_parser/optimizers/ExpansionOptimizer.java index 3801ed6e6..ef2360da0 100644 --- a/elasticsearch/src/main/java/com/tcdi/zombodb/query_parser/optimizers/ExpansionOptimizer.java +++ b/elasticsearch/src/main/java/com/tcdi/zombodb/query_parser/optimizers/ExpansionOptimizer.java @@ -159,7 +159,7 @@ private QueryParserNode loadFielddata(ASTExpansion node, String leftFieldname, S .shardSize(!doFullFieldDataLookup ? 1024 : 0) .size(!doFullFieldDataLookup ? 1024 : 0); - QueryBuilder query = rewriter.applyVisibility(rewriter.build(nodeQuery), link.getIndexName(), true); + QueryBuilder query = rewriter.applyVisibility(rewriter.build(nodeQuery)); SearchRequestBuilder builder = new SearchRequestBuilder(client) .setSize(0) diff --git a/elasticsearch/src/main/java/com/tcdi/zombodb/query_parser/rewriters/QueryRewriter.java b/elasticsearch/src/main/java/com/tcdi/zombodb/query_parser/rewriters/QueryRewriter.java index 5748461f0..f018e537f 100644 --- a/elasticsearch/src/main/java/com/tcdi/zombodb/query_parser/rewriters/QueryRewriter.java +++ b/elasticsearch/src/main/java/com/tcdi/zombodb/query_parser/rewriters/QueryRewriter.java @@ -23,7 +23,6 @@ import com.tcdi.zombodb.query_parser.metadata.IndexMetadataManager; import com.tcdi.zombodb.query_parser.optimizers.ArrayDataOptimizer; import com.tcdi.zombodb.query_parser.optimizers.IndexLinkOptimizer; -import com.tcdi.zombodb.query_parser.optimizers.QueryTreeOptimizer; import com.tcdi.zombodb.query_parser.optimizers.TermAnalyzerOptimizer; import com.tcdi.zombodb.query_parser.utils.EscapingStringTokenizer; import com.tcdi.zombodb.query_parser.utils.Utils; @@ -223,14 +222,14 @@ public ASTLimit getLimit() { return tree.getLimit(); } - public QueryBuilder rewriteQuery(boolean all) { + public QueryBuilder rewriteQuery() { QueryBuilder qb = build(tree); queryRewritten = true; try { - return applyVisibility(qb, getAggregateIndexName(), all); + return applyVisibility(qb); } catch (Exception e) { - return needVisibilityOnTopLevel ? applyVisibility(qb, getSearchIndexName(), all) : qb; + return needVisibilityOnTopLevel ? applyVisibility(qb) : qb; } } @@ -735,7 +734,7 @@ protected QueryBuilder build(ASTExpansion node) { BoolQueryBuilder bqb = boolQuery(); bqb.must(expansionBuilder); bqb.must(build(filterQuery)); - expansionBuilder = applyVisibility(bqb, node.getIndexLink().getIndexName(), true); + expansionBuilder = applyVisibility(bqb); } return expansionBuilder; } @@ -1266,7 +1265,7 @@ private boolean shouldJoinNestedFilter() { return !_isBuildingAggregate || !tree.getAggregate().isNested(); } - public QueryBuilder applyVisibility(QueryBuilder query, final String indexName, boolean all) { + public QueryBuilder applyVisibility(QueryBuilder query) { ASTVisibility visibility = tree.getVisibility(); if (visibility == null) @@ -1275,14 +1274,14 @@ public QueryBuilder applyVisibility(QueryBuilder query, final String indexName, return boolQuery() .must(query) - .mustNot( - visibility("_prev_ctid") + .mustNot(constantScoreQuery( + visibility() .myXid(visibility.getMyXid()) .xmin(visibility.getXmin()) .xmax(visibility.getXmax()) - .all(all) + .commandId(visibility.getCommandId()) .activeXids(visibility.getActiveXids()) - .query(query) + ) ); } } diff --git a/elasticsearch/src/main/java/com/tcdi/zombodb/query_parser/rewriters/SirenQueryRewriter.java b/elasticsearch/src/main/java/com/tcdi/zombodb/query_parser/rewriters/SirenQueryRewriter.java index b0df36c94..6197509f1 100644 --- a/elasticsearch/src/main/java/com/tcdi/zombodb/query_parser/rewriters/SirenQueryRewriter.java +++ b/elasticsearch/src/main/java/com/tcdi/zombodb/query_parser/rewriters/SirenQueryRewriter.java @@ -59,7 +59,7 @@ protected QueryBuilder build(ASTExpansion node) { FilterJoinBuilder fjb = new FilterJoinBuilder(link.getLeftFieldname()) .path(link.getRightFieldname()) .indices(link.getIndexName()) - .query(applyVisibility(build(node.getQuery()), link.getIndexName(), true)) + .query(applyVisibility(build(node.getQuery()))) .types("data"); if (!doFullFieldDataLookup) diff --git a/elasticsearch/src/main/java/com/tcdi/zombodb/query_parser/utils/Utils.java b/elasticsearch/src/main/java/com/tcdi/zombodb/query_parser/utils/Utils.java index 2906c7671..296abe991 100644 --- a/elasticsearch/src/main/java/com/tcdi/zombodb/query_parser/utils/Utils.java +++ b/elasticsearch/src/main/java/com/tcdi/zombodb/query_parser/utils/Utils.java @@ -359,10 +359,10 @@ public static Map extractArrayData(String input, StringBuilder o case '[': if (nextChar == '[' && !inArrayData) { output.append(input.substring(blockStart, i)); - blockStart=-1; + blockStart = -1; inArrayData = true; i++; - arrStart = i+1; // plus one to skip the double brackets at start of array: [[ + arrStart = i + 1; // plus one to skip the double brackets at start of array: [[ } break; case ']': @@ -569,10 +569,28 @@ public static int encodeCharacter(char value, byte[] buffer, int offset) { } public static int decodeInteger(byte[] buffer, int offset) { - return ((buffer[offset+3]) << 24) + + return ((buffer[offset + 3]) << 24) + ((buffer[offset + 2] & 0xFF) << 16) + ((buffer[offset + 1] & 0xFF) << 8) + ((buffer[offset + 0] & 0xFF)); } + public static char decodeCharacter(byte[] buffer, int offset) { + return (char) (((buffer[offset + 1] & 0xFF) << 8) + + ((buffer[offset + 0] & 0xFF))); + } + + + public static long decodeLong(byte[] buffer, int offset) { + return (long) buffer[offset] & 0xFF | + ((long) buffer[offset + 1] << 8L) & 0xFF | + ((long) buffer[offset + 2] << 16L) & 0xFF | + ((long) buffer[offset + 3] << 24L) & 0xFF | + ((long) buffer[offset + 4] << 32L) & 0xFF | + ((long) buffer[offset + 5] << 40L) & 0xFF | + ((long) buffer[offset + 6] << 48L) & 0xFF | + ((long) buffer[offset + 7] << 56L) & 0xFF; + } + + } diff --git a/elasticsearch/src/main/java/org/apache/lucene/search/join/ZomboDBTermsCollector.java b/elasticsearch/src/main/java/org/apache/lucene/search/join/ZomboDBTermsCollector.java index 080e45284..b661ce5af 100644 --- a/elasticsearch/src/main/java/org/apache/lucene/search/join/ZomboDBTermsCollector.java +++ b/elasticsearch/src/main/java/org/apache/lucene/search/join/ZomboDBTermsCollector.java @@ -20,7 +20,7 @@ * that ZomboDB can implement */ public abstract class ZomboDBTermsCollector extends TermsCollector { - protected ZomboDBTermsCollector(String field) { - super(field); + protected ZomboDBTermsCollector() { + super(null); } } diff --git a/elasticsearch/src/main/jjtree/com/tcdi/elasticsearch/query_parser/QueryParser.jjt b/elasticsearch/src/main/jjtree/com/tcdi/elasticsearch/query_parser/QueryParser.jjt index 9e105f4d2..1342734a8 100644 --- a/elasticsearch/src/main/jjtree/com/tcdi/elasticsearch/query_parser/QueryParser.jjt +++ b/elasticsearch/src/main/jjtree/com/tcdi/elasticsearch/query_parser/QueryParser.jjt @@ -352,7 +352,7 @@ void FieldListEntry() #FieldListEntry : void Visibility() #Visibility : {} { - "(" Number() /* current xid */ "," Number() /* xmin */ "," Number() /* xmax */ "," Array() /* snapshot.xips */ ")" + "(" Number() /* current xid */ "," Number() /* xmin */ "," Number() /* xmax */ "," Number() /* current command id */ "," Array() /* snapshot.xips */ ")" } void IndexLink() #IndexLink : diff --git a/elasticsearch/src/main/resources/es-plugin.properties b/elasticsearch/src/main/resources/es-plugin.properties index 911a336f0..c1b44c292 100644 --- a/elasticsearch/src/main/resources/es-plugin.properties +++ b/elasticsearch/src/main/resources/es-plugin.properties @@ -1,2 +1,2 @@ plugin=com.tcdi.zombodb.ZombodbPlugin -version=3.1.15 +version=3.2.0 diff --git a/elasticsearch/src/test/java/com/tcdi/zombodb/query/TestHeapTuple.java b/elasticsearch/src/test/java/com/tcdi/zombodb/query/TestHeapTuple.java new file mode 100644 index 000000000..fae38215c --- /dev/null +++ b/elasticsearch/src/test/java/com/tcdi/zombodb/query/TestHeapTuple.java @@ -0,0 +1,47 @@ +/* + * Copyright 2017 ZomboDB, LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.tcdi.zombodb.query; + +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertTrue; + +public class TestHeapTuple { + + @Test + public void testHeapTupleComparator() throws Exception { + + assertTrue(new HeapTuple(41, 0).compareTo(new HeapTuple(42, 0)) < 0); + assertTrue(new HeapTuple(42, 0).compareTo(new HeapTuple(42, 0)) == 0); + assertTrue(new HeapTuple(43, 0).compareTo(new HeapTuple(42, 0)) > 0); + + assertTrue(new HeapTuple(0, 1).compareTo(new HeapTuple(0, 0)) > 0); + assertTrue(new HeapTuple(0, 1).compareTo(new HeapTuple(0, 1)) == 0); + assertTrue(new HeapTuple(0, 1).compareTo(new HeapTuple(0, 2)) < 0); + + assertTrue(new HeapTuple(42, 42).compareTo(new HeapTuple(0, 43)) > 0); + assertTrue(new HeapTuple(42, 42).compareTo(new HeapTuple(42, 42)) == 0); + assertTrue(new HeapTuple(42, 42).compareTo(new HeapTuple(43, 42)) < 0); + } + + @Test + public void testHeapTupleEquality() throws Exception { + assertEquals(new HeapTuple(42, 0), new HeapTuple(42, 0)); + assertNotEquals(new HeapTuple(41, 0), new HeapTuple(42, 0)); + } +} diff --git a/elasticsearch/src/test/java/com/tcdi/zombodb/query_parser/TestQueryRewriter.java b/elasticsearch/src/test/java/com/tcdi/zombodb/query_parser/TestQueryRewriter.java index a07b16273..a32622c3f 100644 --- a/elasticsearch/src/test/java/com/tcdi/zombodb/query_parser/TestQueryRewriter.java +++ b/elasticsearch/src/test/java/com/tcdi/zombodb/query_parser/TestQueryRewriter.java @@ -4892,7 +4892,7 @@ public void testIssue150() throws Exception { " }\n" + "}) beer,wine,cheese"); SearchRequestBuilder builder = new SearchRequestBuilder(client()); - builder.setQuery(qr.rewriteQuery(false)); + builder.setQuery(qr.rewriteQuery()); builder.addAggregation(qr.rewriteAggregations()); assertEquals( diff --git a/elasticsearch/src/test/java/com/tcdi/zombodb/test/ZomboDBTestCase.java b/elasticsearch/src/test/java/com/tcdi/zombodb/test/ZomboDBTestCase.java index 0caf109a0..da114fce1 100644 --- a/elasticsearch/src/test/java/com/tcdi/zombodb/test/ZomboDBTestCase.java +++ b/elasticsearch/src/test/java/com/tcdi/zombodb/test/ZomboDBTestCase.java @@ -152,7 +152,7 @@ protected void assertDifferentJson(String query1, String query2) throws Exceptio } protected String toJson(String query) { - return qr(query).rewriteQuery(false).toString().replaceAll("\r", "").trim(); + return qr(query).rewriteQuery().toString().replaceAll("\r", "").trim(); } protected String toAST(String query) { diff --git a/pom.xml b/pom.xml index 4e6df5e80..0a8599914 100644 --- a/pom.xml +++ b/pom.xml @@ -6,7 +6,7 @@ com.tcdi.elasticsearch zombodb-parent - 3.1.15 + 3.2.0 pom diff --git a/postgres/CMakeLists.txt b/postgres/CMakeLists.txt index eb2910228..e1693de7b 100644 --- a/postgres/CMakeLists.txt +++ b/postgres/CMakeLists.txt @@ -28,4 +28,4 @@ set(SOURCE_FILES include_directories(src/main/c /usr/local/src/pg93/src/include /usr/local/src/pg93/src/backend) -add_executable(postgres ${SOURCE_FILES}) +add_library(zombodb OBJECT ${SOURCE_FILES}) diff --git a/postgres/pom.xml b/postgres/pom.xml index ff6516730..a02b08fd4 100644 --- a/postgres/pom.xml +++ b/postgres/pom.xml @@ -7,7 +7,7 @@ com.tcdi.elasticsearch zombodb-parent - 3.1.15 + 3.2.0 zombodb @@ -256,6 +256,27 @@ + + xenial_pg93 + package + + jdeb + + + ${project.build.directory}/${project.artifactId}_xenial_pg93-${project.version}_amd64.deb + + + + ${project.build.directory}/pg9.3/ubuntu_xenial/ + directory + + perm + / + + + + + jessie_pg93 @@ -321,6 +342,27 @@ + + xenial_pg94 + package + + jdeb + + + ${project.build.directory}/${project.artifactId}_xenial_pg94-${project.version}_amd64.deb + + + + ${project.build.directory}/pg9.4/ubuntu_xenial/ + directory + + perm + / + + + + + jessie_pg94 package @@ -385,6 +427,27 @@ + + xenial_pg95 + package + + jdeb + + + ${project.build.directory}/${project.artifactId}_xenial_pg95-${project.version}_amd64.deb + + + + ${project.build.directory}/pg9.5/ubuntu_xenial/ + directory + + perm + / + + + + + jessie_pg95 package diff --git a/postgres/src/main/c/am/elasticsearch.c b/postgres/src/main/c/am/elasticsearch.c index cffb0de16..2120beee9 100644 --- a/postgres/src/main/c/am/elasticsearch.c +++ b/postgres/src/main/c/am/elasticsearch.c @@ -23,9 +23,11 @@ #include "access/genam.h" #include "access/heapam.h" #include "access/htup_details.h" +#include "access/transam.h" #include "access/xact.h" #include "storage/bufmgr.h" #include "storage/lmgr.h" +#include "storage/procarray.h" #include "utils/builtins.h" #include "utils/json.h" #include "utils/lsyscache.h" @@ -41,29 +43,26 @@ #include "zdb_interface.h" #define SECONDARY_TYPES_MAPPING \ -" \"state\": {"\ +" \"xmax\": {"\ " \"_source\": { \"enabled\": false },"\ " \"_routing\": { \"required\": true },"\ " \"_all\": { \"enabled\": false },"\ " \"_field_names\": { \"index\": \"no\", \"store\": false },"\ -" \"date_detection\": false,"\ -" \"properties\": { \"_ctid\":{\"type\":\"string\",\"index\":\"not_analyzed\"} }"\ -" },"\ -" \"deleted\": {"\ -" \"_source\": { \"enabled\": false },"\ -" \"_all\": { \"enabled\": false },"\ -" \"_field_names\": { \"index\": \"no\", \"store\": false },"\ " \"properties\": {"\ -" \"_deleting_xid\": { \"type\": \"long\", \"index\": \"not_analyzed\" }"\ +" \"_xmax\": { \"type\": \"long\", \"index\": \"not_analyzed\",\"fielddata\":{\"format\":\"doc_values\"},\"include_in_all\":\"false\",\"norms\": {\"enabled\":false} },"\ +" \"_cmax\": { \"type\": \"long\", \"index\": \"not_analyzed\",\"fielddata\":{\"format\":\"doc_values\"},\"include_in_all\":\"false\",\"norms\": {\"enabled\":false} },"\ +" \"_replacement_ctid\": { \"type\": \"string\", \"index\": \"not_analyzed\",\"fielddata\":{\"format\":\"doc_values\"},\"include_in_all\":\"false\",\"norms\": {\"enabled\":false} },"\ +" \"_zdb_encoded_tuple\": { \"type\": \"binary\", \"doc_values\": true, \"compress\":false, \"compress_threshold\": 18 },"\ +" \"_zdb_reason\": { \"type\": \"string\", \"index\": \"not_analyzed\",\"fielddata\":{\"format\":\"doc_values\"},\"include_in_all\":\"false\",\"norms\": {\"enabled\":false} }"\ " }"\ " },"\ -" \"committed\": {"\ +" \"aborted\": {"\ " \"_source\": { \"enabled\": false },"\ " \"_routing\": { \"required\": true },"\ " \"_all\": { \"enabled\": false },"\ " \"_field_names\": { \"index\": \"no\", \"store\": false },"\ " \"properties\": {"\ -" \"_zdb_committed_xid\": { \"type\": \"long\",\"index\":\"not_analyzed\" }"\ +" \"_zdb_xid\": { \"type\": \"long\",\"index\":\"not_analyzed\",\"fielddata\":{\"format\":\"doc_values\"},\"include_in_all\":\"false\",\"norms\": {\"enabled\":false} }"\ " }"\ " }" @@ -128,9 +127,9 @@ static StringInfo buildQuery(ZDBIndexDescriptor *desc, char **queries, int nquer appendStringInfo(baseQuery, "#field_lists(%s) ", desc->fieldLists); if (!zdb_ignore_visibility_guc && useInvisibilityMap) { - Snapshot snapshot = GetActiveSnapshot(); + Snapshot snapshot = GetTransactionSnapshot(); - appendStringInfo(baseQuery, "#visibility(%lu, %lu, %lu, [", convert_xid(GetCurrentTransactionIdIfAny()), convert_xid(snapshot->xmin), convert_xid(snapshot->xmax)); + appendStringInfo(baseQuery, "#visibility(%lu, %lu, %lu, %u, [", convert_xid(GetCurrentTransactionIdIfAny()), convert_xid(snapshot->xmin), convert_xid(snapshot->xmax), GetCurrentCommandId(false)); if (snapshot->xcnt > 0) { for (i = 0; i < snapshot->xcnt; i++) { if (i > 0) appendStringInfoChar(baseQuery, ','); @@ -242,32 +241,17 @@ void elasticsearch_createNewIndex(ZDBIndexDescriptor *indexDescriptor, int shard freeStringInfo(endpoint); } -void elasticsearch_finalizeNewIndex(ZDBIndexDescriptor *indexDescriptor, HTAB *committedXids) { - HASH_SEQ_STATUS seq; - TransactionId *xid; +void elasticsearch_finalizeNewIndex(ZDBIndexDescriptor *indexDescriptor) { StringInfo endpoint = makeStringInfo(); - StringInfo request = makeStringInfo(); StringInfo indexSettings = makeStringInfo(); StringInfo response; Relation indexRel; - /* - * push out all committed transaction ids to ES - */ - hash_seq_init(&seq, committedXids); - while ( (xid = hash_seq_search(&seq)) != NULL) { - uint64 convertedXid = convert_xid(*xid); - if (request->len > 0) - appendStringInfoChar(request, '\n'); - appendStringInfo(request, "%lu", convertedXid); - } - if (request->len > 0) { - appendStringInfo(endpoint, "%s/%s/_zdbxid?refresh=true", indexDescriptor->url, indexDescriptor->fullyQualifiedName); - response = rest_call("POST", endpoint->data, request, indexDescriptor->compressionLevel); - checkForBulkError(response, "bulk committed xid"); - } - freeStringInfo(request); + /* first refresh the index */ + appendStringInfo(endpoint, "%s/%s/_refresh", indexDescriptor->url, indexDescriptor->fullyQualifiedName); + response = rest_call("GET", endpoint->data, NULL, indexDescriptor->compressionLevel); + checkForRefreshError(response); /* * set various index settings to make it live @@ -817,30 +801,25 @@ static uint64 count_deleted_docs(ZDBIndexDescriptor *indexDescriptor) { return (uint64) atoll(response->data); } -void elasticsearch_bulkDelete(ZDBIndexDescriptor *indexDescriptor, List *itemPointers, bool isdeleted) { +void elasticsearch_bulkDelete(ZDBIndexDescriptor *indexDescriptor, List *ctidsToDelete) { StringInfo endpoint = makeStringInfo(); StringInfo request = makeStringInfo(); StringInfo response; ListCell *lc; - if (isdeleted) - appendStringInfo(endpoint, "%s/%s/_bulk?consistency=default", indexDescriptor->url, indexDescriptor->fullyQualifiedName); - else - appendStringInfo(endpoint, "%s/%s/data/_zdbbulk?consistency=default", indexDescriptor->url, indexDescriptor->fullyQualifiedName); + appendStringInfo(endpoint, "%s/%s/data/_zdbbulk?consistency=default", indexDescriptor->url, indexDescriptor->fullyQualifiedName); if (strcmp("-1", indexDescriptor->refreshInterval) == 0) { appendStringInfo(endpoint, "&refresh=true"); } - foreach (lc, itemPointers) { + foreach (lc, ctidsToDelete) { ItemPointer item = lfirst(lc); appendStringInfo(request, "{\"delete\":{\"_type\":\"data\",\"_id\":\"%d-%d\"}}\n", ItemPointerGetBlockNumber(item), ItemPointerGetOffsetNumber(item)); - if (isdeleted) - appendStringInfo(request, "{\"delete\":{\"_type\":\"deleted\",\"_id\":\"%d-%d\"}}\n", ItemPointerGetBlockNumber(item), ItemPointerGetOffsetNumber(item)); if (request->len >= indexDescriptor->batch_size) { response = rest_call("POST", endpoint->data, request, indexDescriptor->compressionLevel); - checkForBulkError(response, "delete"); + checkForBulkError(response, "zombodb vacuum"); resetStringInfo(request); freeStringInfo(response); @@ -849,7 +828,7 @@ void elasticsearch_bulkDelete(ZDBIndexDescriptor *indexDescriptor, List *itemPoi if (request->len > 0) { response = rest_call("POST", endpoint->data, request, indexDescriptor->compressionLevel); - checkForBulkError(response, "delete"); + checkForBulkError(response, "zombodb vacuum"); } if (indexDescriptor->optimizeAfter > 0) { @@ -868,21 +847,77 @@ void elasticsearch_bulkDelete(ZDBIndexDescriptor *indexDescriptor, List *itemPoi freeStringInfo(request); } -char *elasticsearch_vacuumSupport(ZDBIndexDescriptor *indexDescriptor, char *type) { +char *elasticsearch_vacuumSupport(ZDBIndexDescriptor *indexDescriptor) { + StringInfo endpoint = makeStringInfo(); + StringInfo response; + Snapshot snapshot = GetActiveSnapshot(); + + appendStringInfo(endpoint, "%s/%s/_zdbvacuum?xmin=%lu&xmax=%lu&commandid=%u", indexDescriptor->url, indexDescriptor->fullyQualifiedName, convert_xid(snapshot->xmin), convert_xid(snapshot->xmax), GetCurrentCommandId(false)); + if (snapshot->xcnt > 0) { + int i; + appendStringInfo(endpoint, "&active="); + for (i = 0; i < snapshot->xcnt; i++) { + if (i > 0) appendStringInfoChar(endpoint, ','); + appendStringInfo(endpoint, "%lu", convert_xid(snapshot->xip[i])); + } + } + + response = rest_call("GET", endpoint->data, NULL, indexDescriptor->compressionLevel); + + freeStringInfo(endpoint); + if (response->len > 0 && response->data[0] == '{' && strstr(response->data, "error") != NULL) + elog(ERROR, "%s", response->data); + return response->data; +} + +static char *confirm_aborted_xids(ZDBIndexDescriptor *indexDescriptor, uint64 *nitems) { StringInfo endpoint = makeStringInfo(); StringInfo response; - Snapshot snapshot = GetActiveSnapshot(); - appendStringInfo(endpoint, "%s/%s/_zdbvacuum?type=%s&xmin=%lu&xmax=%lu", indexDescriptor->url, indexDescriptor->fullyQualifiedName, type, convert_xid(snapshot->xmin), convert_xid(snapshot->xmax)); + appendStringInfo(endpoint, "%s/%s/_zdbxidvacuumcandidates", indexDescriptor->url, indexDescriptor->fullyQualifiedName); + response = rest_call("GET", endpoint->data, NULL, indexDescriptor->compressionLevel); freeStringInfo(endpoint); + if (response->len > 0 && response->data[0] != 0) + elog(ERROR, "%s", response->data); + + memcpy(nitems, response->data + 1, sizeof(uint64)); + return response->data + 1 + sizeof(uint64); +} + +void elasticsearch_vacuumCleanup(ZDBIndexDescriptor *indexDescriptor) { + StringInfo endpoint = makeStringInfo(); + StringInfo request = makeStringInfo(); + StringInfo response; + uint64 nitems; + char *xids; + uint64 i; + + appendStringInfo(endpoint, "%s/%s/_zdbvacuumcleanup", indexDescriptor->url, indexDescriptor->fullyQualifiedName); + if (strcmp("-1", indexDescriptor->refreshInterval) == 0) { + appendStringInfo(endpoint, "?refresh=true"); + } + + xids = confirm_aborted_xids(indexDescriptor, &nitems); + for (i=0; idata, request, indexDescriptor->compressionLevel); + + freeStringInfo(endpoint); if (response->len > 0 && response->data[0] == '{' && strstr(response->data, "error") != NULL) elog(ERROR, "%s", response->data); - return response->data; } -static void appendBatchInsertData(ZDBIndexDescriptor *indexDescriptor, ItemPointer ht_ctid, text *value, StringInfo bulk, bool isupdate, ItemPointer old_ctid, TransactionId xmin, uint64 sequence) { +static void appendBatchInsertData(ZDBIndexDescriptor *indexDescriptor, ItemPointer ht_ctid, text *value, StringInfo bulk, bool isupdate, ItemPointer old_ctid, TransactionId xmin, int64 sequence) { /* the data */ appendStringInfo(bulk, "{\"index\":{\"_id\":\"%d-%d\"}}\n", ItemPointerGetBlockNumber(ht_ctid), ItemPointerGetOffsetNumber(ht_ctid)); @@ -896,10 +931,14 @@ static void appendBatchInsertData(ZDBIndexDescriptor *indexDescriptor, ItemPoint bulk->len--; /* ...append our transaction id to the json */ - appendStringInfo(bulk, ",\"_xid\":%lu", convert_xid(xmin)); + appendStringInfo(bulk, ",\"_xmin\":%lu", convert_xid(xmin)); + appendStringInfo(bulk, ",\"_cmin\":%u", GetCurrentCommandId(true)); /* and the sequence number */ - appendStringInfo(bulk, ",\"_zdb_seq\":%lu", sequence); + appendStringInfo(bulk, ",\"_zdb_seq\":%ld", sequence); + + /* and the block number as its own field */ + appendStringInfo(bulk, ",\"_zdb_blockno\":%d", ItemPointerGetBlockNumber(ht_ctid)); if (isupdate) appendStringInfo(bulk, ",\"_prev_ctid\":\"%d-%d\"", ItemPointerGetBlockNumber(old_ctid), ItemPointerGetOffsetNumber(old_ctid)); @@ -926,7 +965,7 @@ static PostDataEntry *checkout_batch_pool(BatchInsertData *batch) { } void -elasticsearch_batchInsertRow(ZDBIndexDescriptor *indexDescriptor, ItemPointer ctid, text *data, bool isupdate, ItemPointer old_ctid, TransactionId xid, CommandId commandId, uint64 sequence) { +elasticsearch_batchInsertRow(ZDBIndexDescriptor *indexDescriptor, ItemPointer ctid, text *data, bool isupdate, ItemPointer old_ctid, TransactionId xid, CommandId commandId, int64 sequence) { BatchInsertData *batch = lookup_batch_insert_data(indexDescriptor, true); bool fast_path = false; @@ -948,7 +987,7 @@ elasticsearch_batchInsertRow(ZDBIndexDescriptor *indexDescriptor, ItemPointer ct StringInfo endpoint = makeStringInfo(); /* don't &refresh=true here as a full .refreshIndex() is called after batchInsertFinish() */ - appendStringInfo(endpoint, "%s/%s/data/_zdbbulk?consistency=default", indexDescriptor->url, indexDescriptor->fullyQualifiedName); + appendStringInfo(endpoint, "%s/%s/data/_zdbbulk?consistency=default&request_no=%d", indexDescriptor->url, indexDescriptor->fullyQualifiedName, batch->nrequests); /* send the request to index this batch */ rest_multi_call(batch->rest, "POST", endpoint->data, batch->bulk, indexDescriptor->compressionLevel); @@ -978,7 +1017,7 @@ void elasticsearch_batchInsertFinish(ZDBIndexDescriptor *indexDescriptor) { StringInfo endpoint = makeStringInfo(); StringInfo response; - appendStringInfo(endpoint, "%s/%s/data/_zdbbulk?consistency=default", indexDescriptor->url, indexDescriptor->fullyQualifiedName); + appendStringInfo(endpoint, "%s/%s/data/_zdbbulk?consistency=default&request_no=%d", indexDescriptor->url, indexDescriptor->fullyQualifiedName, batch->nrequests); if (batch->nrequests == 0) { /* @@ -1041,16 +1080,15 @@ void elasticsearch_deleteTuples(ZDBIndexDescriptor *indexDescriptor, List *ctids ListCell *lc; uint64 xid = convert_xid(GetCurrentTransactionId()); - appendStringInfo(endpoint, "%s/%s/deleted/_bulk", indexDescriptor->url, indexDescriptor->fullyQualifiedName); + appendStringInfo(endpoint, "%s/%s/_zdb_delete_tuples", indexDescriptor->url, indexDescriptor->fullyQualifiedName); if (strcmp("-1", indexDescriptor->refreshInterval) == 0) { appendStringInfo(endpoint, "?refresh=true"); } foreach (lc, ctids) { - ItemPointer ctid = (ItemPointer) lfirst(lc); + ZDBDeletedCtidAndCommand *deleted = (ZDBDeletedCtidAndCommand *) lfirst(lc); - appendStringInfo(request, "{\"index\":{\"_id\":\"%d-%d\"}}\n", ItemPointerGetBlockNumber(ctid), ItemPointerGetOffsetNumber(ctid)); - appendStringInfo(request, "{\"_deleting_xid\":%lu}\n", xid); + appendStringInfo(request, "%d-%d:%lu:%u\n", ItemPointerGetBlockNumber(&deleted->ctid), ItemPointerGetOffsetNumber(&deleted->ctid), xid, deleted->commandid); } response = rest_call("POST", endpoint->data, request, indexDescriptor->compressionLevel); diff --git a/postgres/src/main/c/am/elasticsearch.h b/postgres/src/main/c/am/elasticsearch.h index af88b417a..a004b045c 100644 --- a/postgres/src/main/c/am/elasticsearch.h +++ b/postgres/src/main/c/am/elasticsearch.h @@ -19,7 +19,7 @@ #include "zdb_interface.h" void elasticsearch_createNewIndex(ZDBIndexDescriptor *indexDescriptor, int shards, char *fieldProperties); -void elasticsearch_finalizeNewIndex(ZDBIndexDescriptor *indexDescriptor, HTAB *committedXids); +void elasticsearch_finalizeNewIndex(ZDBIndexDescriptor *indexDescriptor); void elasticsearch_updateMapping(ZDBIndexDescriptor *indexDescriptor, char *mapping); char *elasticsearch_dumpQuery(ZDBIndexDescriptor *indexDescriptor, char *userQuery); @@ -53,10 +53,11 @@ char *elasticsearch_highlight(ZDBIndexDescriptor *indexDescriptor, char *query, void elasticsearch_freeSearchResponse(ZDBSearchResponse *searchResponse); -void elasticsearch_bulkDelete(ZDBIndexDescriptor *indexDescriptor, List *itemPointers, bool isdeleted); -char *elasticsearch_vacuumSupport(ZDBIndexDescriptor *indexDescriptor, char *type); +void elasticsearch_bulkDelete(ZDBIndexDescriptor *indexDescriptor, List *ctidsToDelete); +char *elasticsearch_vacuumSupport(ZDBIndexDescriptor *indexDescriptor); +void elasticsearch_vacuumCleanup(ZDBIndexDescriptor *indexDescriptor); -void elasticsearch_batchInsertRow(ZDBIndexDescriptor *indexDescriptor, ItemPointer ctid, text *data, bool isupdate, ItemPointer old_ctid, TransactionId xid, CommandId commandId, uint64 sequence); +void elasticsearch_batchInsertRow(ZDBIndexDescriptor *indexDescriptor, ItemPointer ctid, text *data, bool isupdate, ItemPointer old_ctid, TransactionId xid, CommandId commandId, int64 sequence); void elasticsearch_batchInsertFinish(ZDBIndexDescriptor *indexDescriptor); void elasticsearch_deleteTuples(ZDBIndexDescriptor *indexDescriptor, List *ctids); diff --git a/postgres/src/main/c/am/zdb_interface.c b/postgres/src/main/c/am/zdb_interface.c index e3692a4bd..3fd261bb6 100644 --- a/postgres/src/main/c/am/zdb_interface.c +++ b/postgres/src/main/c/am/zdb_interface.c @@ -55,7 +55,7 @@ static const struct config_enum_entry zdb_log_level_options[] = { }; static void wrapper_createNewIndex(ZDBIndexDescriptor *indexDescriptor, int shards, char *fieldProperties); -static void wrapper_finalizeNewIndex(ZDBIndexDescriptor *indexDescriptor, HTAB *committedXids); +static void wrapper_finalizeNewIndex(ZDBIndexDescriptor *indexDescriptor); static void wrapper_updateMapping(ZDBIndexDescriptor *indexDescriptor, char *mapping); static char *wrapper_dumpQuery(ZDBIndexDescriptor *indexDescriptor, char *userQuery); @@ -84,10 +84,11 @@ static char *wrapper_highlight(ZDBIndexDescriptor *indexDescriptor, char *query, static void wrapper_freeSearchResponse(ZDBSearchResponse *searchResponse); -static void wrapper_bulkDelete(ZDBIndexDescriptor *indexDescriptor, List *toDelete, bool isdeleted); -static char *wrapper_vacuumSupport(ZDBIndexDescriptor *indexDescriptor, char *type); +static void wrapper_bulkDelete(ZDBIndexDescriptor *indexDescriptor, List *ctidsToDelete); +static char *wrapper_vacuumSupport(ZDBIndexDescriptor *indexDescriptor); +static void wrapper_vacuumCleanup(ZDBIndexDescriptor *indexDescriptor); -static void wrapper_batchInsertRow(ZDBIndexDescriptor *indexDescriptor, ItemPointer ctid, text *data, bool isupdate, ItemPointer old_ctid, TransactionId xmin, CommandId commandId, uint64 sequence); +static void wrapper_batchInsertRow(ZDBIndexDescriptor *indexDescriptor, ItemPointer ctid, text *data, bool isupdate, ItemPointer old_ctid, TransactionId xmin, CommandId commandId, int64 sequence); static void wrapper_batchInsertFinish(ZDBIndexDescriptor *indexDescriptor); static void wrapper_deleteTuples(ZDBIndexDescriptor *indexDescriptor, List *ctids); @@ -269,6 +270,7 @@ ZDBIndexDescriptor *zdb_alloc_index_descriptor(Relation indexRel) { desc->implementation->freeSearchResponse = wrapper_freeSearchResponse; desc->implementation->bulkDelete = wrapper_bulkDelete; desc->implementation->vacuumSupport = wrapper_vacuumSupport; + desc->implementation->vacuumCleanup = wrapper_vacuumCleanup; desc->implementation->batchInsertRow = wrapper_batchInsertRow; desc->implementation->batchInsertFinish = wrapper_batchInsertFinish; desc->implementation->deleteTuples = wrapper_deleteTuples; @@ -340,13 +342,13 @@ static void wrapper_createNewIndex(ZDBIndexDescriptor *indexDescriptor, int shar MemoryContextDelete(me); } -static void wrapper_finalizeNewIndex(ZDBIndexDescriptor *indexDescriptor, HTAB *committedXids) { +static void wrapper_finalizeNewIndex(ZDBIndexDescriptor *indexDescriptor) { MemoryContext me = AllocSetContextCreate(TopTransactionContext, "wrapper_finalizeNewIndex", 512, 64, 64); MemoryContext oldContext = MemoryContextSwitchTo(me); Assert(!indexDescriptor->isShadow); - elasticsearch_finalizeNewIndex(indexDescriptor, committedXids); + elasticsearch_finalizeNewIndex(indexDescriptor); MemoryContextSwitchTo(oldContext); MemoryContextDelete(me); @@ -571,30 +573,38 @@ static void wrapper_freeSearchResponse(ZDBSearchResponse *searchResponse) { MemoryContextSwitchTo(oldContext); } -static void wrapper_bulkDelete(ZDBIndexDescriptor *indexDescriptor, List *toDelete, bool isdeleted) { +static void wrapper_bulkDelete(ZDBIndexDescriptor *indexDescriptor, List *ctidsToDelete) { MemoryContext me = AllocSetContextCreate(TopTransactionContext, "wrapper_bulkDelete", 512, 64, 64); MemoryContext oldContext = MemoryContextSwitchTo(me); Assert(!indexDescriptor->isShadow); - elasticsearch_bulkDelete(indexDescriptor, toDelete, isdeleted); + elasticsearch_bulkDelete(indexDescriptor, ctidsToDelete); MemoryContextSwitchTo(oldContext); MemoryContextDelete(me); } -static char *wrapper_vacuumSupport(ZDBIndexDescriptor *indexDescriptor, char *type) { +static char *wrapper_vacuumSupport(ZDBIndexDescriptor *indexDescriptor) { MemoryContext oldContext = MemoryContextSwitchTo(TopTransactionContext); char *ctids; - ctids = elasticsearch_vacuumSupport(indexDescriptor, type); + ctids = elasticsearch_vacuumSupport(indexDescriptor); MemoryContextSwitchTo(oldContext); return ctids; } -static void wrapper_batchInsertRow(ZDBIndexDescriptor *indexDescriptor, ItemPointer ctid, text *data, bool isupdate, ItemPointer old_ctid, TransactionId xmin, CommandId commandId, uint64 sequence) { +static void wrapper_vacuumCleanup(ZDBIndexDescriptor *indexDescriptor) { + MemoryContext oldContext = MemoryContextSwitchTo(TopTransactionContext); + + elasticsearch_vacuumCleanup(indexDescriptor); + + MemoryContextSwitchTo(oldContext); +} + +static void wrapper_batchInsertRow(ZDBIndexDescriptor *indexDescriptor, ItemPointer ctid, text *data, bool isupdate, ItemPointer old_ctid, TransactionId xmin, CommandId commandId, int64 sequence) { MemoryContext oldContext = MemoryContextSwitchTo(TopTransactionContext); Assert(!indexDescriptor->isShadow); diff --git a/postgres/src/main/c/am/zdb_interface.h b/postgres/src/main/c/am/zdb_interface.h index 51758ad0f..2fd9a0e67 100644 --- a/postgres/src/main/c/am/zdb_interface.h +++ b/postgres/src/main/c/am/zdb_interface.h @@ -152,9 +152,14 @@ typedef struct { float4 max_score; } ZDBSearchResponse; +typedef struct { + ItemPointerData ctid; + CommandId commandid; +} ZDBDeletedCtidAndCommand; + typedef struct { ZDBIndexDescriptor *desc; - List *ctids; + List *deleted; } ZDBDeletedCtid; extern PGDLLEXPORT relopt_kind RELOPT_KIND_ZDB; @@ -178,7 +183,7 @@ void interface_transaction_cleanup(void); * Defines what an index implementation looks like */ typedef void (*ZDBCreateNewIndex_function)(ZDBIndexDescriptor *indexDescriptor, int shards, char *fieldProperties); -typedef void (*ZDBFinalizeNewIndex_function)(ZDBIndexDescriptor *indexDescriptor, HTAB *committedXids); +typedef void (*ZDBFinalizeNewIndex_function)(ZDBIndexDescriptor *indexDescriptor); typedef void (*ZDBUpdateMapping_function)(ZDBIndexDescriptor *indexDescriptor, char *mapping); typedef char *(*ZDBDumpQuery_function)(ZDBIndexDescriptor *indexDescriptor, char *userQuery); @@ -209,10 +214,11 @@ typedef char *(*ZDBHighlight_function)(ZDBIndexDescriptor *indexDescriptor, char typedef void (*ZDBFreeSearchResponse_function)(ZDBSearchResponse *searchResponse); -typedef void (*ZDBBulkDelete_function)(ZDBIndexDescriptor *indexDescriptor, List *toDelete, bool isdeleted); -typedef char *(*ZDBVacuumSupport_function)(ZDBIndexDescriptor *indexDescriptor, char *type); +typedef void (*ZDBBulkDelete_function)(ZDBIndexDescriptor *indexDescriptor, List *ctidsToDelete); +typedef char *(*ZDBVacuumSupport_function)(ZDBIndexDescriptor *indexDescriptor); +typedef void (*ZDBVacuumCleanup_function)(ZDBIndexDescriptor *indexDescriptor); -typedef void (*ZDBIndexBatchInsertRow_function)(ZDBIndexDescriptor *indexDescriptor, ItemPointer ctid, text *data, bool isupdate, ItemPointer old_ctid, TransactionId xmin, CommandId commandId, uint64 sequence); +typedef void (*ZDBIndexBatchInsertRow_function)(ZDBIndexDescriptor *indexDescriptor, ItemPointer ctid, text *data, bool isupdate, ItemPointer old_ctid, TransactionId xmin, CommandId commandId, int64 sequence); typedef void (*ZDBIndexBatchInsertFinish_function)(ZDBIndexDescriptor *indexDescriptor); typedef void (*ZDBDeleteTuples_function)(ZDBIndexDescriptor *indexDescriptor, List *ctids); @@ -257,6 +263,7 @@ struct ZDBIndexImplementation { ZDBBulkDelete_function bulkDelete; ZDBVacuumSupport_function vacuumSupport; + ZDBVacuumCleanup_function vacuumCleanup; ZDBIndexBatchInsertRow_function batchInsertRow; ZDBIndexBatchInsertFinish_function batchInsertFinish; diff --git a/postgres/src/main/c/am/zdbam.c b/postgres/src/main/c/am/zdbam.c index a3d4db669..4f82a81a1 100644 --- a/postgres/src/main/c/am/zdbam.c +++ b/postgres/src/main/c/am/zdbam.c @@ -24,6 +24,7 @@ #include "access/reloptions.h" #include "access/relscan.h" #include "access/transam.h" +#include "access/visibilitymap.h" #include "access/xact.h" #include "catalog/dependency.h" #include "catalog/index.h" @@ -85,7 +86,6 @@ typedef struct { double indtuples; ZDBIndexDescriptor *desc; - HTAB *committedXids; } ZDBBuildState; typedef struct { @@ -178,7 +178,7 @@ static void process_deleted_tuples() { foreach (lc, deletedCtids) { ZDBDeletedCtid *entry = (ZDBDeletedCtid *) lfirst(lc); - entry->desc->implementation->deleteTuples(entry->desc, entry->ctids); + entry->desc->implementation->deleteTuples(entry->desc, entry->deleted); } deletedCtids = NULL; @@ -218,8 +218,6 @@ static void zdbam_xact_callback(XactEvent event, void *arg) { case XACT_EVENT_PRE_PREPARE: case XACT_EVENT_PRE_COMMIT: { - process_deleted_tuples(); - if (indexesInsertedList != NULL) { ListCell *lc; @@ -272,7 +270,8 @@ static void zdb_executor_start_hook(QueryDesc *queryDesc, int eflags) { static void zdb_executor_end_hook(QueryDesc *queryDesc) { if (executorDepth == 0) { process_inserted_indexes(!zdb_batch_mode_guc); - } + process_deleted_tuples(); + } if (prev_ExecutorEndHook == zdb_executor_end_hook) elog(ERROR, "zdb_executor_end_hook: Somehow prev_ExecutorEndHook was set to zdb_executor_end_hook"); @@ -494,7 +493,6 @@ Datum zdbbuild(PG_FUNCTION_ARGS) { buildstate.indtuples = 0; buildstate.desc = alloc_index_descriptor(indexRel, false); buildstate.desc->logit = true; - buildstate.committedXids = NULL; if (!buildstate.desc->isShadow) { HASHCTL hashctl; @@ -504,8 +502,6 @@ Datum zdbbuild(PG_FUNCTION_ARGS) { hashctl.hcxt = TopTransactionContext; hashctl.hash = tag_hash; - buildstate.committedXids = hash_create("committed xids", 1024, &hashctl, HASH_CONTEXT | HASH_FUNCTION | HASH_ELEM); - /* drop the existing index */ buildstate.desc->implementation->dropIndex(buildstate.desc); @@ -523,7 +519,7 @@ Datum zdbbuild(PG_FUNCTION_ARGS) { buildstate.desc->implementation->batchInsertFinish(buildstate.desc); /* reset the settings to reasonable values for production use */ - buildstate.desc->implementation->finalizeNewIndex(buildstate.desc, buildstate.committedXids); + buildstate.desc->implementation->finalizeNewIndex(buildstate.desc); if (heapRel->rd_rel->relkind != 'm') { @@ -572,25 +568,14 @@ Datum zdbbuild(PG_FUNCTION_ARGS) { static void zdbbuildCallback(Relation indexRel, HeapTuple htup, Datum *values, bool *isnull, bool tupleIsAlive, void *state) { ZDBBuildState *buildstate = (ZDBBuildState *) state; ZDBIndexDescriptor *desc = buildstate->desc; - TransactionId xmin, xmax; - bool found; + TransactionId xmin; if (HeapTupleIsHeapOnly(htup)) elog(ERROR, "Heap Only Tuple (HOT) found at (%d, %d). Run VACUUM FULL %s; and reindex", ItemPointerGetBlockNumber(&(htup->t_self)), ItemPointerGetOffsetNumber(&(htup->t_self)), desc->qualifiedTableName); xmin = HeapTupleHeaderGetXmin(htup->t_data); - xmax = HeapTupleHeaderGetRawXmax(htup->t_data); - - hash_search(buildstate->committedXids, &xmin, HASH_FIND, &found); - if (!found && (TransactionIdDidCommit(xmin) || xmin == GetCurrentTransactionId())) - hash_search(buildstate->committedXids, &xmin, HASH_ENTER, &found); - - hash_search(buildstate->committedXids, &xmax, HASH_FIND, &found); - if (!found && TransactionIdDidCommit(xmax)) - hash_search(buildstate->committedXids, &xmax, HASH_ENTER, &found); - - desc->implementation->batchInsertRow(desc, &htup->t_self, DatumGetTextP(values[1]), false, NULL, xmin, HeapTupleHeaderGetRawCommandId(htup->t_data), zdb_sequence++); + desc->implementation->batchInsertRow(desc, &htup->t_self, DatumGetTextP(values[1]), false, NULL, xmin, HeapTupleHeaderGetRawCommandId(htup->t_data), -1); buildstate->indtuples++; } @@ -831,18 +816,17 @@ Datum zdbrestrpos(PG_FUNCTION_ARGS) { * Result: a palloc'd struct containing statistical info for VACUUM displays. */ Datum zdbbulkdelete(PG_FUNCTION_ARGS) { - static char *types[] = {"data", "deleted"}; IndexVacuumInfo *info = (IndexVacuumInfo *) PG_GETARG_POINTER(0); IndexBulkDeleteResult *volatile stats = (IndexBulkDeleteResult *) PG_GETARG_POINTER(1); IndexBulkDeleteCallback callback = (IndexBulkDeleteCallback) PG_GETARG_POINTER(2); void *callback_state = (void *) PG_GETARG_POINTER(3); Relation indexRel = info->index; + Relation heapRel; ZDBIndexDescriptor *desc; struct timeval tv1, tv2; - char *deletedCtids; - uint64 cntDeletedCtids, i=0; - List *toDelete = NULL; - int z; + List *ctidsToDelete = NULL; + BlockNumber blockno; + BlockNumber numOfBlocks; gettimeofday(&tv1, NULL); @@ -854,37 +838,57 @@ Datum zdbbulkdelete(PG_FUNCTION_ARGS) { if (desc->isShadow) PG_RETURN_POINTER(stats); - for (z=0; z<2; z++) { - deletedCtids = desc->implementation->vacuumSupport(desc, types[z]); - memcpy(&cntDeletedCtids, deletedCtids, sizeof(uint64)); + heapRel = RelationIdGetRelation(desc->heapRelid); + numOfBlocks = RelationGetNumberOfBlocks(heapRel); + for (blockno=0; blocknotuples_removed++; + + if (((int) stats->tuples_removed) % 10000 == 0) { + /* + * push this set of 10k deleted tuples out to elasticsearch + * just to avoid keeping too much in memory at once + */ + desc->implementation->bulkDelete(desc, ctidsToDelete); + list_free_deep(ctidsToDelete); + ctidsToDelete = NULL; + } + } + } } - desc->implementation->bulkDelete(desc, toDelete, z == 1); + if (BufferIsValid(vmap_buff)) + ReleaseBuffer(vmap_buff); + } + RelationClose(heapRel); + + desc->implementation->bulkDelete(desc, ctidsToDelete); stats->num_pages = 1; stats->num_index_tuples = desc->implementation->estimateSelectivity(desc, ""); - stats->tuples_removed = list_length(toDelete);; gettimeofday(&tv2, NULL); - elog(ZDB_LOG_LEVEL, "[zombodb vacuum status] index=%s, num_removed=%d, num_index_tuples=%lu, ttl=%fs", RelationGetRelationName(indexRel), list_length(toDelete), (uint64) stats->num_index_tuples, TO_SECONDS(tv1, tv2)); + elog(ZDB_LOG_LEVEL, "[zombodb vacuum status] index=%s, num_removed=%d, num_index_tuples=%lu, ttl=%fs", RelationGetRelationName(indexRel), list_length(ctidsToDelete), (uint64) stats->num_index_tuples, TO_SECONDS(tv1, tv2)); PG_RETURN_POINTER(stats); } @@ -906,6 +910,8 @@ Datum zdbvacuumcleanup(PG_FUNCTION_ARGS) { desc = alloc_index_descriptor(indexRel, false); + desc->implementation->vacuumCleanup(desc); + if (stats == NULL) { stats = (IndexBulkDeleteResult *) palloc0(sizeof(IndexBulkDeleteResult)); stats->num_index_tuples = desc->implementation->estimateSelectivity(desc, ""); @@ -1083,8 +1089,8 @@ Datum zdbdeletetrigger(PG_FUNCTION_ARGS) { TriggerData *trigdata = (TriggerData *) fcinfo->context; Oid indexRelId = (Oid) atoi(trigdata->tg_trigger->tgargs[0]); ZDBDeletedCtid *entry = NULL; - ItemPointer ctid; ListCell *lc; + ZDBDeletedCtidAndCommand *deleted; /* make sure it's called as a trigger at all */ if (!CALLED_AS_TRIGGER(fcinfo)) @@ -1098,7 +1104,6 @@ Datum zdbdeletetrigger(PG_FUNCTION_ARGS) { tmpcxt = MemoryContextSwitchTo(TopTransactionContext); - ctid = palloc0(sizeof(ItemPointerData)); foreach (lc, deletedCtids) { entry = (ZDBDeletedCtid *) lfirst(lc); if (entry->desc->indexRelid == indexRelId) @@ -1109,15 +1114,17 @@ Datum zdbdeletetrigger(PG_FUNCTION_ARGS) { if (entry == NULL) { Relation indexRel = RelationIdGetRelation(indexRelId); entry = palloc0(sizeof(ZDBDeletedCtid)); - entry->desc = alloc_index_descriptor(indexRel, false); + entry->desc = alloc_index_descriptor(indexRel, true); deletedCtids = lappend(deletedCtids, entry); RelationClose(indexRel); } Assert(entry->desc->indexRelid == indexRelId); - ItemPointerCopy(&trigdata->tg_trigtuple->t_self, ctid); - entry->ctids = lappend(entry->ctids, ctid); + deleted = palloc(sizeof(ZDBDeletedCtidAndCommand)); + ItemPointerCopy(&trigdata->tg_trigtuple->t_self, &deleted->ctid); + deleted->commandid = GetCurrentCommandId(true); + entry->deleted = lappend(entry->deleted, deleted); MemoryContextSwitchTo(tmpcxt); return PointerGetDatum(trigdata->tg_trigtuple); diff --git a/postgres/src/main/c/am/zdbops.c b/postgres/src/main/c/am/zdbops.c index 3f8e11c2e..57edaadd8 100644 --- a/postgres/src/main/c/am/zdbops.c +++ b/postgres/src/main/c/am/zdbops.c @@ -493,7 +493,7 @@ Datum make_es_mapping(ZDBIndexDescriptor *desc, Oid tableRelId, TupleDesc tupdes appendStringInfo(result, "{\"is_anonymous\": %s,", isAnonymous ? "true" : "false"); appendStringInfo(result, "\"properties\": {"); - appendStringInfo(result, "\"_xid\": {" + appendStringInfo(result, "\"_xmin\": {" "\"type\":\"long\"," "\"fielddata\": {\"format\": \"doc_values\"}," "\"include_in_all\":\"false\"," @@ -501,10 +501,16 @@ Datum make_es_mapping(ZDBIndexDescriptor *desc, Oid tableRelId, TupleDesc tupdes "\"index\": \"not_analyzed\"" "},"); + appendStringInfo(result, "\"_cmin\": {" + "\"type\":\"integer\"," + "\"fielddata\": {\"format\": \"doc_values\"}," + "\"include_in_all\":\"false\"," + "\"norms\": {\"enabled\":false}," + "\"index\": \"not_analyzed\"" + "},"); + appendStringInfo(result, "\"_prev_ctid\": {" - "\"store\":true," "\"type\":\"string\"," - "\"fielddata\": {\"format\": \"paged_bytes\"}," "\"include_in_all\":\"false\"," "\"norms\": {\"enabled\":false}," "\"index\": \"not_analyzed\"" @@ -512,6 +518,20 @@ Datum make_es_mapping(ZDBIndexDescriptor *desc, Oid tableRelId, TupleDesc tupdes appendStringInfo(result, "\"_zdb_seq\": {" "\"type\":\"long\"," + "\"include_in_all\":\"false\"," + "\"norms\": {\"enabled\":false}," + "\"index\": \"not_analyzed\"" + "},"); + + appendStringInfo(result, "\"_zdb_encoded_tuple\": {" + "\"type\":\"binary\"," + "\"doc_values\": true," + "\"compress\":false," + "\"compress_threshold\": 18"\ + "},"); + + appendStringInfo(result, "\"_zdb_blockno\": {" + "\"type\":\"integer\"," "\"fielddata\": {\"format\": \"doc_values\"}," "\"include_in_all\":\"false\"," "\"norms\": {\"enabled\":false}," diff --git a/postgres/src/main/c/util/zdbutils.c b/postgres/src/main/c/util/zdbutils.c index 00845a7f6..3ea21179f 100644 --- a/postgres/src/main/c/util/zdbutils.c +++ b/postgres/src/main/c/util/zdbutils.c @@ -388,3 +388,4 @@ uint64 lookup_pkey(Oid heapRelOid, char *pkeyFieldname, ItemPointer ctid) { return DatumGetInt64(pkey); } + diff --git a/postgres/src/main/docker/pg9.3/zombodb-build-ubuntu_xenial/Dockerfile b/postgres/src/main/docker/pg9.3/zombodb-build-ubuntu_xenial/Dockerfile new file mode 100644 index 000000000..bbd87d0b9 --- /dev/null +++ b/postgres/src/main/docker/pg9.3/zombodb-build-ubuntu_xenial/Dockerfile @@ -0,0 +1,38 @@ +FROM ubuntu:xenial + +MAINTAINER ZomboDB, LLC (zombodb@gmail.com) + +RUN apt-get update -y -qq --fix-missing +RUN apt-get install -y wget +RUN echo "deb http://apt.postgresql.org/pub/repos/apt/ trusty-pgdg main" >> /etc/apt/sources.list.d/pgdg.list +RUN wget --quiet -O - https://www.postgresql.org/media/keys/ACCC4CF8.asc | apt-key add - +RUN apt-get update -y --fix-missing +RUN apt-get install -y postgresql-server-dev-9.3 +RUN apt-get install -y gcc make build-essential libz-dev zlib1g-dev strace + +COPY curl-7.53.1.tar.gz /usr/local/src/ +WORKDIR /usr/local/src/ +RUN tar xzf curl-7.53.1.tar.gz +WORKDIR /usr/local/src/curl-7.53.1/ +RUN ./configure --without-ssl \ + --disable-threaded-resolver \ + --disable-dict \ + --disable-file \ + --disable-ftp \ + --disable-gopher \ + --disable-imap \ + --disable-pop3 \ + --disable-rtsp \ + --disable-smb \ + --disable-smtp \ + --disable-telnet \ + --disable-tftp +RUN make -j2 && make install +RUN ldconfig && ldconfig +RUN rm -rf /usr/local/src/curl* + +ARG user +ARG uid +RUN useradd -ms /bin/bash --uid $uid $user +USER $user +ENV IS_DOCKER true diff --git a/postgres/src/main/docker/pg9.3/zombodb-build-ubuntu_xenial/curl-7.53.1.tar.gz b/postgres/src/main/docker/pg9.3/zombodb-build-ubuntu_xenial/curl-7.53.1.tar.gz new file mode 100644 index 000000000..7970c3234 Binary files /dev/null and b/postgres/src/main/docker/pg9.3/zombodb-build-ubuntu_xenial/curl-7.53.1.tar.gz differ diff --git a/postgres/src/main/docker/pg9.4/zombodb-build-ubuntu_xenial/Dockerfile b/postgres/src/main/docker/pg9.4/zombodb-build-ubuntu_xenial/Dockerfile new file mode 100644 index 000000000..0ec382bdd --- /dev/null +++ b/postgres/src/main/docker/pg9.4/zombodb-build-ubuntu_xenial/Dockerfile @@ -0,0 +1,38 @@ +FROM ubuntu:xenial + +MAINTAINER ZomboDB, LLC (zombodb@gmail.com) + +RUN apt-get update -y -qq --fix-missing +RUN apt-get install -y wget +RUN echo "deb http://apt.postgresql.org/pub/repos/apt/ trusty-pgdg main" >> /etc/apt/sources.list.d/pgdg.list +RUN wget --quiet -O - https://www.postgresql.org/media/keys/ACCC4CF8.asc | apt-key add - +RUN apt-get update -y --fix-missing +RUN apt-get install -y postgresql-server-dev-9.4 +RUN apt-get install -y gcc make build-essential libz-dev zlib1g-dev strace + +COPY curl-7.53.1.tar.gz /usr/local/src/ +WORKDIR /usr/local/src/ +RUN tar xzf curl-7.53.1.tar.gz +WORKDIR /usr/local/src/curl-7.53.1/ +RUN ./configure --without-ssl \ + --disable-threaded-resolver \ + --disable-dict \ + --disable-file \ + --disable-ftp \ + --disable-gopher \ + --disable-imap \ + --disable-pop3 \ + --disable-rtsp \ + --disable-smb \ + --disable-smtp \ + --disable-telnet \ + --disable-tftp +RUN make -j2 && make install +RUN ldconfig && ldconfig +RUN rm -rf /usr/local/src/curl* + +ARG user +ARG uid +RUN useradd -ms /bin/bash --uid $uid $user +USER $user +ENV IS_DOCKER true diff --git a/postgres/src/main/docker/pg9.4/zombodb-build-ubuntu_xenial/curl-7.53.1.tar.gz b/postgres/src/main/docker/pg9.4/zombodb-build-ubuntu_xenial/curl-7.53.1.tar.gz new file mode 100644 index 000000000..7970c3234 Binary files /dev/null and b/postgres/src/main/docker/pg9.4/zombodb-build-ubuntu_xenial/curl-7.53.1.tar.gz differ diff --git a/postgres/src/main/docker/pg9.5/zombodb-build-ubuntu_xenial/Dockerfile b/postgres/src/main/docker/pg9.5/zombodb-build-ubuntu_xenial/Dockerfile new file mode 100644 index 000000000..d7c896119 --- /dev/null +++ b/postgres/src/main/docker/pg9.5/zombodb-build-ubuntu_xenial/Dockerfile @@ -0,0 +1,38 @@ +FROM ubuntu:xenial + +MAINTAINER ZomboDB, LLC (zombodb@gmail.com) + +RUN apt-get update -y -qq --fix-missing +RUN apt-get install -y wget +RUN echo "deb http://apt.postgresql.org/pub/repos/apt/ trusty-pgdg main" >> /etc/apt/sources.list.d/pgdg.list +RUN wget --quiet -O - https://www.postgresql.org/media/keys/ACCC4CF8.asc | apt-key add - +RUN apt-get update -y --fix-missing +RUN apt-get install -y postgresql-server-dev-9.5 +RUN apt-get install -y gcc make build-essential libz-dev zlib1g-dev strace + +COPY curl-7.53.1.tar.gz /usr/local/src/ +WORKDIR /usr/local/src/ +RUN tar xzf curl-7.53.1.tar.gz +WORKDIR /usr/local/src/curl-7.53.1/ +RUN ./configure --without-ssl \ + --disable-threaded-resolver \ + --disable-dict \ + --disable-file \ + --disable-ftp \ + --disable-gopher \ + --disable-imap \ + --disable-pop3 \ + --disable-rtsp \ + --disable-smb \ + --disable-smtp \ + --disable-telnet \ + --disable-tftp +RUN make -j2 && make install +RUN ldconfig && ldconfig +RUN rm -rf /usr/local/src/curl* + +ARG user +ARG uid +RUN useradd -ms /bin/bash --uid $uid $user +USER $user +ENV IS_DOCKER true diff --git a/postgres/src/main/docker/pg9.5/zombodb-build-ubuntu_xenial/curl-7.53.1.tar.gz b/postgres/src/main/docker/pg9.5/zombodb-build-ubuntu_xenial/curl-7.53.1.tar.gz new file mode 100644 index 000000000..7970c3234 Binary files /dev/null and b/postgres/src/main/docker/pg9.5/zombodb-build-ubuntu_xenial/curl-7.53.1.tar.gz differ diff --git a/postgres/src/main/shell/build-packages.sh b/postgres/src/main/shell/build-packages.sh index ccfe94f76..1595035a5 100755 --- a/postgres/src/main/shell/build-packages.sh +++ b/postgres/src/main/shell/build-packages.sh @@ -17,7 +17,7 @@ VERSION=$1 BASE=`pwd` -DISTROS="centos6 centos7 ubuntu_trusty ubuntu_precise debian_jessie" +DISTROS="centos6 centos7 ubuntu_trusty ubuntu_precise ubuntu_xenial debian_jessie" POSTGRES_VERSIONS="9.3 9.4 9.5" echo "Archiving working directory tree" diff --git a/postgres/src/main/sql/zombodb--3.1.15--3.2.0.sql b/postgres/src/main/sql/zombodb--3.1.15--3.2.0.sql new file mode 100644 index 000000000..0c3bccd7d --- /dev/null +++ b/postgres/src/main/sql/zombodb--3.1.15--3.2.0.sql @@ -0,0 +1,60 @@ +DROP VIEW IF EXISTS zdb_index_stats; +DROP VIEW IF EXISTS zdb_index_stats_fast; + +CREATE VIEW zdb_index_stats AS + WITH stats AS ( + SELECT + indrelid :: REGCLASS AS table_name, + indexrelid::regclass, + zdb_get_index_name(indexrelid) index_name, + zdb_get_url(indexrelid) url, + zdb_es_direct_request(indexrelid, 'GET', '_stats')::json stats, + zdb_es_direct_request(indexrelid, 'GET', '_settings')::json settings + FROM pg_index + WHERE pg_get_indexdef(indexrelid) ILIKE '%zombodb%' + ) + SELECT + index_name, + url, + table_name, + stats -> '_all' -> 'primaries' -> 'docs' -> 'count' AS es_docs, + pg_size_pretty((stats -> '_all' -> 'primaries' -> 'store' ->> 'size_in_bytes') :: INT8) AS es_size, + (stats -> '_all' -> 'primaries' -> 'store' ->> 'size_in_bytes') :: INT8 AS es_size_bytes, + count_of_table(table_name) AS pg_docs, + pg_size_pretty(pg_total_relation_size(table_name)) AS pg_size, + pg_total_relation_size(table_name) AS pg_size_bytes, + stats -> '_shards' -> 'total' AS shards, + settings -> index_name -> 'settings' -> 'index' ->> 'number_of_replicas' AS replicas, + (zdb_es_direct_request(indexrelid, 'GET', 'data/_count') :: JSON) -> 'count' AS data_count, + (zdb_es_direct_request(indexrelid, 'GET', 'xmax/_count') :: JSON) -> 'count' AS xmax_count, + (zdb_es_direct_request(indexrelid, 'GET', 'aborted/_count') :: JSON) -> 'count' AS aborted_count + FROM stats; + +CREATE VIEW zdb_index_stats_fast AS + WITH stats AS ( + SELECT + indrelid :: REGCLASS AS table_name, + indexrelid::regclass, + zdb_get_index_name(indexrelid) index_name, + zdb_get_url(indexrelid) url, + zdb_es_direct_request(indexrelid, 'GET', '_stats')::json stats, + zdb_es_direct_request(indexrelid, 'GET', '_settings')::json settings + FROM pg_index + WHERE pg_get_indexdef(indexrelid) ILIKE '%zombodb%' + ) + SELECT + index_name, + url, + table_name, + stats -> '_all' -> 'primaries' -> 'docs' -> 'count' AS es_docs, + pg_size_pretty((stats -> '_all' -> 'primaries' -> 'store' ->> 'size_in_bytes') :: INT8) AS es_size, + (stats -> '_all' -> 'primaries' -> 'store' ->> 'size_in_bytes') :: INT8 AS es_size_bytes, + (SELECT reltuples::int8 FROM pg_class WHERE oid = table_name) AS pg_docs_estimate, + pg_size_pretty(pg_total_relation_size(table_name)) AS pg_size, + pg_total_relation_size(table_name) AS pg_size_bytes, + stats -> '_shards' -> 'total' AS shards, + settings -> index_name -> 'settings' -> 'index' ->> 'number_of_replicas' AS replicas, + (zdb_es_direct_request(indexrelid, 'GET', 'data/_count') :: JSON) -> 'count' AS data_count, + (zdb_es_direct_request(indexrelid, 'GET', 'xmax/_count') :: JSON) -> 'count' AS xmax_count, + (zdb_es_direct_request(indexrelid, 'GET', 'aborted/_count') :: JSON) -> 'count' AS aborted_count + FROM stats; diff --git a/postgres/src/main/sql/zombodb.sql b/postgres/src/main/sql/zombodb.sql index 0826c5d3d..35dbb1209 100644 --- a/postgres/src/main/sql/zombodb.sql +++ b/postgres/src/main/sql/zombodb.sql @@ -132,10 +132,9 @@ CREATE VIEW zdb_index_stats AS pg_total_relation_size(table_name) AS pg_size_bytes, stats -> '_shards' -> 'total' AS shards, settings -> index_name -> 'settings' -> 'index' ->> 'number_of_replicas' AS replicas, - (zdb_es_direct_request(indexrelid, 'GET', 'data/_count')::json) -> 'count' AS data_count, - (zdb_es_direct_request(indexrelid, 'GET', 'state/_count')::json) -> 'count' AS state_count, - (zdb_es_direct_request(indexrelid, 'GET', 'committed/_count')::json) -> 'count' AS xid_count, - (zdb_es_direct_request(indexrelid, 'GET', 'deleted/_count')::json) -> 'count' AS deleted_count + (zdb_es_direct_request(indexrelid, 'GET', 'data/_count') :: JSON) -> 'count' AS data_count, + (zdb_es_direct_request(indexrelid, 'GET', 'xmax/_count') :: JSON) -> 'count' AS xmax_count, + (zdb_es_direct_request(indexrelid, 'GET', 'aborted/_count') :: JSON) -> 'count' AS aborted_count FROM stats; CREATE VIEW zdb_index_stats_fast AS @@ -162,10 +161,9 @@ CREATE VIEW zdb_index_stats_fast AS pg_total_relation_size(table_name) AS pg_size_bytes, stats -> '_shards' -> 'total' AS shards, settings -> index_name -> 'settings' -> 'index' ->> 'number_of_replicas' AS replicas, - (zdb_es_direct_request(indexrelid, 'GET', 'data/_count')::json) -> 'count' AS data_count, - (zdb_es_direct_request(indexrelid, 'GET', 'state/_count')::json) -> 'count' AS state_count, - (zdb_es_direct_request(indexrelid, 'GET', 'committed/_count')::json) -> 'count' AS xid_count, - (zdb_es_direct_request(indexrelid, 'GET', 'deleted/_count')::json) -> 'count' AS deleted_count + (zdb_es_direct_request(indexrelid, 'GET', 'data/_count') :: JSON) -> 'count' AS data_count, + (zdb_es_direct_request(indexrelid, 'GET', 'xmax/_count') :: JSON) -> 'count' AS xmax_count, + (zdb_es_direct_request(indexrelid, 'GET', 'aborted/_count') :: JSON) -> 'count' AS aborted_count FROM stats; CREATE OR REPLACE FUNCTION zdb_internal_update_mapping(index_oid oid) RETURNS void STRICT IMMUTABLE LANGUAGE c AS '$libdir/plugins/zombodb'; diff --git a/postgres/src/test/expected/deletes.out b/postgres/src/test/expected/deletes.out index 88d186ee8..c24b870e7 100644 --- a/postgres/src/test/expected/deletes.out +++ b/postgres/src/test/expected/deletes.out @@ -20,8 +20,8 @@ SELECT assert(count(*), 162240, 'deleted 3k records from so_posts (id:*)') FROM t (1 row) --- ES will still see deleted rows via zdb_estimate_count -SELECT assert(zdb_estimate_count('so_posts', 'id:*'), 165240, 'make sure ES still sees deleted rows'); +-- ES shouldn't still see deleted rows via zdb_estimate_count +SELECT assert(zdb_estimate_count('so_posts', 'id:*'), 162240, 'make sure ES still sees deleted rows'); assert -------- t diff --git a/postgres/src/test/expected/issue-150.out b/postgres/src/test/expected/issue-150.out index 0481f6e96..6a647e057 100644 --- a/postgres/src/test/expected/issue-150.out +++ b/postgres/src/test/expected/issue-150.out @@ -1,14 +1,7 @@ -SELECT * -FROM zdb_json_aggregate('so_posts', '{ - "top-tags": { - "terms": { - "field": "tags", - "size": 3 - } - } - }', 'java'); - zdb_json_aggregate -------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- - {"top-tags":{"doc_count_error_upper_bound":136,"sum_other_doc_count":13143,"buckets":[{"key":"java","doc_count":2713},{"key":"design","doc_count":695},{"key":"programming","doc_count":528}]}} +SELECT v->'top-tags'->'buckets' +FROM zdb_json_aggregate('so_posts', '{ "top-tags":{"terms":{"field":"tags", "size" : 3}}}', 'java') v; + ?column? +---------------------------------------------------------------------------------------------------------- + [{"key":"java","doc_count":2713},{"key":"design","doc_count":695},{"key":"programming","doc_count":528}] (1 row) diff --git a/postgres/src/test/expected/issue-191.out b/postgres/src/test/expected/issue-191.out index e8c668f34..ff3b61b39 100644 --- a/postgres/src/test/expected/issue-191.out +++ b/postgres/src/test/expected/issue-191.out @@ -10,46 +10,46 @@ INSERT INTO "Issue191" ("BobIsYourUncle") VALUES ('def'); -- -- one field from each of table and view -- -SELECT * FROM zdb_multi_search(ARRAY ['public."Issue191"'], NULL, ARRAY [ARRAY['BobIsYourUncle']], ''); - table_name | user_identifier | query | total | score | row_data -------------+-----------------+-------+-------+-------+------------------------------------------------------------------------------------ - "Issue191" | | | 2 | {1,1} | [{"BobIsYourUncle":"abc","ctid":"(0,1)"}, {"BobIsYourUncle":"def","ctid":"(0,2)"}] +SELECT table_name, user_identifier, query, total, row_data FROM zdb_multi_search(ARRAY ['public."Issue191"'], NULL, ARRAY [ARRAY['BobIsYourUncle']], ''); + table_name | user_identifier | query | total | row_data +------------+-----------------+-------+-------+------------------------------------------------------------------------------------ + "Issue191" | | | 2 | [{"BobIsYourUncle":"abc","ctid":"(0,1)"}, {"BobIsYourUncle":"def","ctid":"(0,2)"}] (1 row) -SELECT * FROM zdb_multi_search(ARRAY ['public."Issue191_View"'], NULL, ARRAY [ARRAY['BobIsYourUncle']], ''); - table_name | user_identifier | query | total | score | row_data ------------------+-----------------+-------+-------+-------+---------------------------------------------------------------------------------- - "Issue191_View" | | | 2 | {1,1} | [{"_zdb_pkey":1,"BobIsYourUncle":"abc"}, {"_zdb_pkey":2,"BobIsYourUncle":"def"}] +SELECT table_name, user_identifier, query, total, row_data FROM zdb_multi_search(ARRAY ['public."Issue191_View"'], NULL, ARRAY [ARRAY['BobIsYourUncle']], ''); + table_name | user_identifier | query | total | row_data +-----------------+-----------------+-------+-------+---------------------------------------------------------------------------------- + "Issue191_View" | | | 2 | [{"_zdb_pkey":1,"BobIsYourUncle":"abc"}, {"_zdb_pkey":2,"BobIsYourUncle":"def"}] (1 row) -- -- two fields from each of table and view -- -SELECT * FROM zdb_multi_search(ARRAY ['public."Issue191"'], NULL, ARRAY [ARRAY['ID', 'BobIsYourUncle']], ''); - table_name | user_identifier | query | total | score | row_data -------------+-----------------+-------+-------+-------+-------------------------------------------------------------------------------------------------- - "Issue191" | | | 2 | {1,1} | [{"ID":1,"BobIsYourUncle":"abc","ctid":"(0,1)"}, {"ID":2,"BobIsYourUncle":"def","ctid":"(0,2)"}] +SELECT table_name, user_identifier, query, total, row_data FROM zdb_multi_search(ARRAY ['public."Issue191"'], NULL, ARRAY [ARRAY['ID', 'BobIsYourUncle']], ''); + table_name | user_identifier | query | total | row_data +------------+-----------------+-------+-------+-------------------------------------------------------------------------------------------------- + "Issue191" | | | 2 | [{"ID":1,"BobIsYourUncle":"abc","ctid":"(0,1)"}, {"ID":2,"BobIsYourUncle":"def","ctid":"(0,2)"}] (1 row) -SELECT * FROM zdb_multi_search(ARRAY ['public."Issue191_View"'], NULL, ARRAY [ARRAY['ID', 'BobIsYourUncle']], ''); - table_name | user_identifier | query | total | score | row_data ------------------+-----------------+-------+-------+-------+------------------------------------------------------------------------------------------------ - "Issue191_View" | | | 2 | {1,1} | [{"_zdb_pkey":1,"ID":1,"BobIsYourUncle":"abc"}, {"_zdb_pkey":2,"ID":2,"BobIsYourUncle":"def"}] +SELECT table_name, user_identifier, query, total, row_data FROM zdb_multi_search(ARRAY ['public."Issue191_View"'], NULL, ARRAY [ARRAY['ID', 'BobIsYourUncle']], ''); + table_name | user_identifier | query | total | row_data +-----------------+-----------------+-------+-------+------------------------------------------------------------------------------------------------ + "Issue191_View" | | | 2 | [{"_zdb_pkey":1,"ID":1,"BobIsYourUncle":"abc"}, {"_zdb_pkey":2,"ID":2,"BobIsYourUncle":"def"}] (1 row) -- -- all fields from each of table and view -- -SELECT * FROM zdb_multi_search(ARRAY ['public."Issue191"'], NULL, NULL, ''); - table_name | user_identifier | query | total | score | row_data -------------+-----------------+-------+-------+-------+-------------------------------------------------------------------------------------------------- - "Issue191" | | | 2 | {1,1} | [{"ID":1,"BobIsYourUncle":"abc","ctid":"(0,1)"}, {"ID":2,"BobIsYourUncle":"def","ctid":"(0,2)"}] +SELECT table_name, user_identifier, query, total, row_data FROM zdb_multi_search(ARRAY ['public."Issue191"'], NULL, NULL, ''); + table_name | user_identifier | query | total | row_data +------------+-----------------+-------+-------+-------------------------------------------------------------------------------------------------- + "Issue191" | | | 2 | [{"ID":1,"BobIsYourUncle":"abc","ctid":"(0,1)"}, {"ID":2,"BobIsYourUncle":"def","ctid":"(0,2)"}] (1 row) -SELECT * FROM zdb_multi_search(ARRAY ['public."Issue191_View"'], NULL, NULL, ''); - table_name | user_identifier | query | total | score | row_data ------------------+-----------------+-------+-------+-------+---------------------------------------------------------------------------------------------------------------------------- - "Issue191_View" | | | 2 | {1,1} | [{"_zdb_pkey":1,"ID":1,"BobIsYourUncle":"abc","zdb":"(0,1)"}, {"_zdb_pkey":2,"ID":2,"BobIsYourUncle":"def","zdb":"(0,2)"}] +SELECT table_name, user_identifier, query, total, row_data FROM zdb_multi_search(ARRAY ['public."Issue191_View"'], NULL, NULL, ''); + table_name | user_identifier | query | total | row_data +-----------------+-----------------+-------+-------+---------------------------------------------------------------------------------------------------------------------------- + "Issue191_View" | | | 2 | [{"_zdb_pkey":1,"ID":1,"BobIsYourUncle":"abc","zdb":"(0,1)"}, {"_zdb_pkey":2,"ID":2,"BobIsYourUncle":"def","zdb":"(0,2)"}] (1 row) DROP TABLE "Issue191" CASCADE; diff --git a/postgres/src/test/sql/deletes.sql b/postgres/src/test/sql/deletes.sql index cf2460ed4..767ddd1f6 100644 --- a/postgres/src/test/sql/deletes.sql +++ b/postgres/src/test/sql/deletes.sql @@ -16,8 +16,8 @@ DELETE FROM so_posts WHERE id IN (SELECT id FROM deltmp); SELECT assert(count(*), 162240, 'deleted 3k records from so_posts') FROM so_posts; SELECT assert(count(*), 162240, 'deleted 3k records from so_posts (id:*)') FROM so_posts WHERE zdb('so_posts', ctid) ==> 'id:*'; --- ES will still see deleted rows via zdb_estimate_count -SELECT assert(zdb_estimate_count('so_posts', 'id:*'), 165240, 'make sure ES still sees deleted rows'); +-- ES shouldn't still see deleted rows via zdb_estimate_count +SELECT assert(zdb_estimate_count('so_posts', 'id:*'), 162240, 'make sure ES still sees deleted rows'); -- the *actual* number of records elasticsearch has in the and 'data' type -- should actually be 165240 as well diff --git a/postgres/src/test/sql/issue-150.sql b/postgres/src/test/sql/issue-150.sql index 4ef6eba85..acb187b62 100644 --- a/postgres/src/test/sql/issue-150.sql +++ b/postgres/src/test/sql/issue-150.sql @@ -1,9 +1,2 @@ -SELECT * -FROM zdb_json_aggregate('so_posts', '{ - "top-tags": { - "terms": { - "field": "tags", - "size": 3 - } - } - }', 'java'); \ No newline at end of file +SELECT v->'top-tags'->'buckets' +FROM zdb_json_aggregate('so_posts', '{ "top-tags":{"terms":{"field":"tags", "size" : 3}}}', 'java') v; \ No newline at end of file diff --git a/postgres/src/test/sql/issue-191.sql b/postgres/src/test/sql/issue-191.sql index 1d7d452a9..f4e0e65b7 100644 --- a/postgres/src/test/sql/issue-191.sql +++ b/postgres/src/test/sql/issue-191.sql @@ -14,19 +14,19 @@ INSERT INTO "Issue191" ("BobIsYourUncle") VALUES ('def'); -- -- one field from each of table and view -- -SELECT * FROM zdb_multi_search(ARRAY ['public."Issue191"'], NULL, ARRAY [ARRAY['BobIsYourUncle']], ''); -SELECT * FROM zdb_multi_search(ARRAY ['public."Issue191_View"'], NULL, ARRAY [ARRAY['BobIsYourUncle']], ''); +SELECT table_name, user_identifier, query, total, row_data FROM zdb_multi_search(ARRAY ['public."Issue191"'], NULL, ARRAY [ARRAY['BobIsYourUncle']], ''); +SELECT table_name, user_identifier, query, total, row_data FROM zdb_multi_search(ARRAY ['public."Issue191_View"'], NULL, ARRAY [ARRAY['BobIsYourUncle']], ''); -- -- two fields from each of table and view -- -SELECT * FROM zdb_multi_search(ARRAY ['public."Issue191"'], NULL, ARRAY [ARRAY['ID', 'BobIsYourUncle']], ''); -SELECT * FROM zdb_multi_search(ARRAY ['public."Issue191_View"'], NULL, ARRAY [ARRAY['ID', 'BobIsYourUncle']], ''); +SELECT table_name, user_identifier, query, total, row_data FROM zdb_multi_search(ARRAY ['public."Issue191"'], NULL, ARRAY [ARRAY['ID', 'BobIsYourUncle']], ''); +SELECT table_name, user_identifier, query, total, row_data FROM zdb_multi_search(ARRAY ['public."Issue191_View"'], NULL, ARRAY [ARRAY['ID', 'BobIsYourUncle']], ''); -- -- all fields from each of table and view -- -SELECT * FROM zdb_multi_search(ARRAY ['public."Issue191"'], NULL, NULL, ''); -SELECT * FROM zdb_multi_search(ARRAY ['public."Issue191_View"'], NULL, NULL, ''); +SELECT table_name, user_identifier, query, total, row_data FROM zdb_multi_search(ARRAY ['public."Issue191"'], NULL, NULL, ''); +SELECT table_name, user_identifier, query, total, row_data FROM zdb_multi_search(ARRAY ['public."Issue191_View"'], NULL, NULL, ''); DROP TABLE "Issue191" CASCADE; \ No newline at end of file diff --git a/postgres/zombodb.control b/postgres/zombodb.control index 8faff0d14..bc6801620 100644 --- a/postgres/zombodb.control +++ b/postgres/zombodb.control @@ -1,6 +1,6 @@ # zombodb extension comment = 'Elasticsearch-enabled Index Type for Postgres' -default_version = '3.1.15' +default_version = '3.2.0' module_pathname = '$libdir/zombodb' relocatable = true requires = ''