From cfeba3780e2f027a15f5e4943d2903ca9201fbba Mon Sep 17 00:00:00 2001 From: Vincent Royer Date: Thu, 2 Mar 2017 20:40:37 +0100 Subject: [PATCH] Release 2.4.2-10 * Add support for blob and inet CQL types in primary key (in elasticsearch _id). * Fix partitioned index #83 and provides customizable partition function implementations. * Include optimized Cassandra serializers CASSANDRA-13271 --- CHANGES.txt | 5 + .../db/marshal/DynamicCompositeType.java | 21 +- .../apache/cassandra/db/marshal/ListType.java | 14 +- .../apache/cassandra/db/marshal/MapType.java | 17 +- .../apache/cassandra/db/marshal/SetType.java | 13 +- .../cassandra/serializers/ListSerializer.java | 18 +- .../cassandra/serializers/MapSerializer.java | 11 +- .../cassandra/serializers/SetSerializer.java | 11 +- .../InternalCassandraClusterService.java | 116 ++-- .../index/ElasticSecondaryIndex.java | 519 ++++++++++-------- .../index/MessageFormatPartitionFunction.java | 33 ++ .../elassandra/index/PartitionFunction.java | 25 + .../index/StringPartitionFunction.java | 25 + .../cluster/metadata/IndexMetaData.java | 11 +- .../index/mapper/core/BinaryFieldMapper.java | 1 - .../java/org/elassandra/CqlTypesTests.java | 88 +++ .../org/elassandra/PartitionedIndexTests.java | 93 ++++ docs/elassandra/source/limitations.rst | 5 + 18 files changed, 655 insertions(+), 371 deletions(-) rename core/{cassandra/src => src/main}/java/org/apache/cassandra/db/marshal/DynamicCompositeType.java (96%) rename core/{cassandra/src => src/main}/java/org/apache/cassandra/db/marshal/MapType.java (92%) rename core/{cassandra/src => src/main}/java/org/apache/cassandra/serializers/ListSerializer.java (92%) rename core/{cassandra/src => src/main}/java/org/apache/cassandra/serializers/MapSerializer.java (92%) rename core/{cassandra/src => src/main}/java/org/apache/cassandra/serializers/SetSerializer.java (90%) create mode 100644 core/src/main/java/org/elassandra/index/MessageFormatPartitionFunction.java create mode 100644 core/src/main/java/org/elassandra/index/PartitionFunction.java create mode 100644 core/src/main/java/org/elassandra/index/StringPartitionFunction.java create mode 100644 core/src/test/java/org/elassandra/PartitionedIndexTests.java diff --git a/CHANGES.txt b/CHANGES.txt index 873aac55589..837590906c3 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,3 +1,8 @@ +2.4.2-10 - 2017-03-02 + * Add support for blob and inet CQL types in primary key (in elasticsearch _id). + * Fix partitioned index #83 and provides customizable partition function implementations. + * Include optimized Cassandra serializers CASSANDRA-13271 + 2.4.2-9 - 2017-02-26 * Significant write performance improvement. * New optimized version less internal engine (don't store any more version number in lucene files). diff --git a/core/cassandra/src/java/org/apache/cassandra/db/marshal/DynamicCompositeType.java b/core/src/main/java/org/apache/cassandra/db/marshal/DynamicCompositeType.java similarity index 96% rename from core/cassandra/src/java/org/apache/cassandra/db/marshal/DynamicCompositeType.java rename to core/src/main/java/org/apache/cassandra/db/marshal/DynamicCompositeType.java index 657f126706b..54b93be718a 100644 --- a/core/cassandra/src/java/org/apache/cassandra/db/marshal/DynamicCompositeType.java +++ b/core/src/main/java/org/apache/cassandra/db/marshal/DynamicCompositeType.java @@ -17,20 +17,20 @@ */ package org.apache.cassandra.db.marshal; -import java.nio.charset.CharacterCodingException; import java.nio.ByteBuffer; -import java.util.HashMap; +import java.nio.charset.CharacterCodingException; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import org.apache.cassandra.cql3.Term; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import org.apache.cassandra.exceptions.ConfigurationException; import org.apache.cassandra.exceptions.SyntaxException; -import org.apache.cassandra.serializers.TypeSerializer; import org.apache.cassandra.serializers.MarshalException; +import org.apache.cassandra.serializers.TypeSerializer; import org.apache.cassandra.utils.ByteBufferUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /* * The encoding of a DynamicCompositeType column name should be: @@ -58,21 +58,18 @@ public class DynamicCompositeType extends AbstractCompositeType private final Map> aliases; // interning instances - private static final Map>, DynamicCompositeType> instances = new HashMap>, DynamicCompositeType>(); + private static final ConcurrentMap>, DynamicCompositeType> instances = new ConcurrentHashMap>, DynamicCompositeType>(); public static synchronized DynamicCompositeType getInstance(TypeParser parser) throws ConfigurationException, SyntaxException { return getInstance(parser.getAliasParameters()); } - public static synchronized DynamicCompositeType getInstance(Map> aliases) + public static DynamicCompositeType getInstance(Map> aliases) { DynamicCompositeType dct = instances.get(aliases); if (dct == null) - { - dct = new DynamicCompositeType(aliases); - instances.put(aliases, dct); - } + dct = instances.computeIfAbsent(aliases, K -> new DynamicCompositeType(K)); return dct; } diff --git a/core/src/main/java/org/apache/cassandra/db/marshal/ListType.java b/core/src/main/java/org/apache/cassandra/db/marshal/ListType.java index 8ad453270ea..658aba275e9 100644 --- a/core/src/main/java/org/apache/cassandra/db/marshal/ListType.java +++ b/core/src/main/java/org/apache/cassandra/db/marshal/ListType.java @@ -18,11 +18,7 @@ package org.apache.cassandra.db.marshal; import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.Collections; -import java.util.Iterator; -import java.util.List; -import java.util.UUID; +import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -35,7 +31,7 @@ import org.apache.cassandra.serializers.CollectionSerializer; import org.apache.cassandra.serializers.MarshalException; import org.apache.cassandra.serializers.ListSerializer; -import org.apache.cassandra.serializers.MarshalException; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -65,11 +61,7 @@ public static ListType getInstance(AbstractType elements, final Boolea ConcurrentMap, ListType> internMap = isMultiCell ? instances : frozenInstances; ListType t = internMap.get(elements); if (t == null) - { - t = new ListType(elements, isMultiCell); - ListType t2 = internMap.putIfAbsent(elements, t); - t = (t2 == null) ? t : t2; - } + t = internMap.computeIfAbsent(elements, K -> new ListType<>(K, isMultiCell) ); return t; } diff --git a/core/cassandra/src/java/org/apache/cassandra/db/marshal/MapType.java b/core/src/main/java/org/apache/cassandra/db/marshal/MapType.java similarity index 92% rename from core/cassandra/src/java/org/apache/cassandra/db/marshal/MapType.java rename to core/src/main/java/org/apache/cassandra/db/marshal/MapType.java index 425ffc25f57..61e4eb39f77 100644 --- a/core/cassandra/src/java/org/apache/cassandra/db/marshal/MapType.java +++ b/core/src/main/java/org/apache/cassandra/db/marshal/MapType.java @@ -19,6 +19,8 @@ import java.nio.ByteBuffer; import java.util.*; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import org.apache.cassandra.cql3.Json; import org.apache.cassandra.cql3.Maps; @@ -27,16 +29,16 @@ import org.apache.cassandra.exceptions.ConfigurationException; import org.apache.cassandra.exceptions.SyntaxException; import org.apache.cassandra.serializers.CollectionSerializer; -import org.apache.cassandra.serializers.MarshalException; import org.apache.cassandra.serializers.MapSerializer; +import org.apache.cassandra.serializers.MarshalException; import org.apache.cassandra.transport.Server; import org.apache.cassandra.utils.Pair; public class MapType extends CollectionType> { // interning instances - private static final Map, AbstractType>, MapType> instances = new HashMap<>(); - private static final Map, AbstractType>, MapType> frozenInstances = new HashMap<>(); + private static final ConcurrentMap, AbstractType>, MapType> instances = new ConcurrentHashMap<>(); + private static final ConcurrentMap, AbstractType>, MapType> frozenInstances = new ConcurrentHashMap<>(); private final AbstractType keys; private final AbstractType values; @@ -52,16 +54,13 @@ public class MapType extends CollectionType> return getInstance(l.get(0), l.get(1), true); } - public static synchronized MapType getInstance(AbstractType keys, AbstractType values, boolean isMultiCell) + public static MapType getInstance(AbstractType keys, AbstractType values, boolean isMultiCell) { - Map, AbstractType>, MapType> internMap = isMultiCell ? instances : frozenInstances; + ConcurrentMap, AbstractType>, MapType> internMap = isMultiCell ? instances : frozenInstances; Pair, AbstractType> p = Pair., AbstractType>create(keys, values); MapType t = internMap.get(p); if (t == null) - { - t = new MapType<>(keys, values, isMultiCell); - internMap.put(p, t); - } + t = internMap.computeIfAbsent(p, P -> new MapType<>(P.left, P.right, isMultiCell) ); return t; } diff --git a/core/src/main/java/org/apache/cassandra/db/marshal/SetType.java b/core/src/main/java/org/apache/cassandra/db/marshal/SetType.java index 05fc43f1589..d6ab0902862 100644 --- a/core/src/main/java/org/apache/cassandra/db/marshal/SetType.java +++ b/core/src/main/java/org/apache/cassandra/db/marshal/SetType.java @@ -18,12 +18,7 @@ package org.apache.cassandra.db.marshal; import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashSet; -import java.util.Iterator; -import java.util.List; -import java.util.Set; +import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -60,11 +55,7 @@ public static SetType getInstance(AbstractType elements, boolean isMul ConcurrentMap, SetType> internMap = isMultiCell ? instances : frozenInstances; SetType t = internMap.get(elements); if (t == null) - { - t = new SetType(elements, isMultiCell); - SetType t2 = internMap.putIfAbsent(elements, t); - t = (t2 == null) ? t : t2; - } + t = internMap.computeIfAbsent(elements, K -> new SetType<>(K, isMultiCell) ); return t; } diff --git a/core/cassandra/src/java/org/apache/cassandra/serializers/ListSerializer.java b/core/src/main/java/org/apache/cassandra/serializers/ListSerializer.java similarity index 92% rename from core/cassandra/src/java/org/apache/cassandra/serializers/ListSerializer.java rename to core/src/main/java/org/apache/cassandra/serializers/ListSerializer.java index 3fd0803138f..dc355d79693 100644 --- a/core/cassandra/src/java/org/apache/cassandra/serializers/ListSerializer.java +++ b/core/src/main/java/org/apache/cassandra/serializers/ListSerializer.java @@ -18,27 +18,27 @@ package org.apache.cassandra.serializers; -import org.apache.cassandra.transport.Server; - import java.nio.BufferUnderflowException; import java.nio.ByteBuffer; -import java.util.*; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +import org.apache.cassandra.transport.Server; public class ListSerializer extends CollectionSerializer> { // interning instances - private static final Map, ListSerializer> instances = new HashMap, ListSerializer>(); + private static final ConcurrentMap, ListSerializer> instances = new ConcurrentHashMap, ListSerializer>(); public final TypeSerializer elements; - public static synchronized ListSerializer getInstance(TypeSerializer elements) + public static ListSerializer getInstance(TypeSerializer elements) { ListSerializer t = instances.get(elements); if (t == null) - { - t = new ListSerializer(elements); - instances.put(elements, t); - } + t = instances.computeIfAbsent(elements, K -> new ListSerializer<>(K) ); return t; } diff --git a/core/cassandra/src/java/org/apache/cassandra/serializers/MapSerializer.java b/core/src/main/java/org/apache/cassandra/serializers/MapSerializer.java similarity index 92% rename from core/cassandra/src/java/org/apache/cassandra/serializers/MapSerializer.java rename to core/src/main/java/org/apache/cassandra/serializers/MapSerializer.java index 67e56371d5c..abfe30fbd19 100644 --- a/core/cassandra/src/java/org/apache/cassandra/serializers/MapSerializer.java +++ b/core/src/main/java/org/apache/cassandra/serializers/MapSerializer.java @@ -21,6 +21,8 @@ import java.nio.BufferUnderflowException; import java.nio.ByteBuffer; import java.util.*; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import org.apache.cassandra.db.marshal.AbstractType; import org.apache.cassandra.transport.Server; @@ -29,21 +31,18 @@ public class MapSerializer extends CollectionSerializer> { // interning instances - private static final Map, TypeSerializer>, MapSerializer> instances = new HashMap, TypeSerializer>, MapSerializer>(); + private static final ConcurrentMap, TypeSerializer>, MapSerializer> instances = new ConcurrentHashMap, TypeSerializer>, MapSerializer>(); public final TypeSerializer keys; public final TypeSerializer values; private final Comparator> comparator; - public static synchronized MapSerializer getInstance(TypeSerializer keys, TypeSerializer values, Comparator comparator) + public static MapSerializer getInstance(TypeSerializer keys, TypeSerializer values, Comparator comparator) { Pair, TypeSerializer> p = Pair., TypeSerializer>create(keys, values); MapSerializer t = instances.get(p); if (t == null) - { - t = new MapSerializer(keys, values, comparator); - instances.put(p, t); - } + t = instances.computeIfAbsent(p, P -> new MapSerializer<>(P.left, P.right, comparator) ); return t; } diff --git a/core/cassandra/src/java/org/apache/cassandra/serializers/SetSerializer.java b/core/src/main/java/org/apache/cassandra/serializers/SetSerializer.java similarity index 90% rename from core/cassandra/src/java/org/apache/cassandra/serializers/SetSerializer.java rename to core/src/main/java/org/apache/cassandra/serializers/SetSerializer.java index da7744b863b..f26eb43a316 100644 --- a/core/cassandra/src/java/org/apache/cassandra/serializers/SetSerializer.java +++ b/core/src/main/java/org/apache/cassandra/serializers/SetSerializer.java @@ -21,23 +21,22 @@ import java.nio.BufferUnderflowException; import java.nio.ByteBuffer; import java.util.*; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; public class SetSerializer extends CollectionSerializer> { // interning instances - private static final Map, SetSerializer> instances = new HashMap, SetSerializer>(); + private static final ConcurrentMap, SetSerializer> instances = new ConcurrentHashMap, SetSerializer>(); public final TypeSerializer elements; private final Comparator comparator; - public static synchronized SetSerializer getInstance(TypeSerializer elements, Comparator elementComparator) + public static SetSerializer getInstance(TypeSerializer elements, Comparator elementComparator) { SetSerializer t = instances.get(elements); if (t == null) - { - t = new SetSerializer(elements, elementComparator); - instances.put(elements, t); - } + t = instances.computeIfAbsent(elements, K -> new SetSerializer<>(K, elementComparator) ); return t; } diff --git a/core/src/main/java/org/elassandra/cluster/InternalCassandraClusterService.java b/core/src/main/java/org/elassandra/cluster/InternalCassandraClusterService.java index fca8f4643b7..dc2989d76ae 100644 --- a/core/src/main/java/org/elassandra/cluster/InternalCassandraClusterService.java +++ b/core/src/main/java/org/elassandra/cluster/InternalCassandraClusterService.java @@ -19,10 +19,9 @@ import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_REPLICAS; import java.io.IOException; -import java.math.BigDecimal; -import java.math.BigInteger; import java.net.InetAddress; import java.nio.ByteBuffer; +import java.nio.CharBuffer; import java.nio.charset.CharacterCodingException; import java.util.ArrayList; import java.util.Arrays; @@ -84,25 +83,9 @@ import org.apache.cassandra.schema.IndexMetadata; import org.apache.cassandra.schema.KeyspaceMetadata; import org.apache.cassandra.schema.ReplicationParams; -import org.apache.cassandra.serializers.AsciiSerializer; -import org.apache.cassandra.serializers.BooleanSerializer; -import org.apache.cassandra.serializers.BytesSerializer; import org.apache.cassandra.serializers.CollectionSerializer; -import org.apache.cassandra.serializers.DecimalSerializer; -import org.apache.cassandra.serializers.DoubleSerializer; -import org.apache.cassandra.serializers.EmptySerializer; -import org.apache.cassandra.serializers.FloatSerializer; -import org.apache.cassandra.serializers.Int32Serializer; -import org.apache.cassandra.serializers.IntegerSerializer; -import org.apache.cassandra.serializers.ListSerializer; -import org.apache.cassandra.serializers.LongSerializer; import org.apache.cassandra.serializers.MapSerializer; import org.apache.cassandra.serializers.MarshalException; -import org.apache.cassandra.serializers.SetSerializer; -import org.apache.cassandra.serializers.TimeUUIDSerializer; -import org.apache.cassandra.serializers.TimestampSerializer; -import org.apache.cassandra.serializers.TypeSerializer; -import org.apache.cassandra.serializers.UTF8Serializer; import org.apache.cassandra.serializers.UUIDSerializer; import org.apache.cassandra.service.ClientState; import org.apache.cassandra.service.ElassandraDaemon; @@ -118,7 +101,6 @@ import org.apache.lucene.search.BooleanQuery; import org.apache.lucene.search.NumericRangeQuery; import org.apache.lucene.search.Query; -import org.apache.lucene.util.BytesRef; import org.codehaus.jackson.JsonGenerationException; import org.codehaus.jackson.JsonParseException; import org.codehaus.jackson.map.JsonMappingException; @@ -129,7 +111,6 @@ import org.elassandra.index.ExtendedElasticSecondaryIndex; import org.elassandra.index.mapper.internal.NodeFieldMapper; import org.elassandra.index.mapper.internal.TokenFieldMapper; -import org.elassandra.shard.CassandraShardStartedBarrier; import org.elasticsearch.ElasticsearchTimeoutException; import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; @@ -152,6 +133,7 @@ import org.elasticsearch.cluster.routing.OperationRouting; import org.elasticsearch.cluster.routing.ShardRoutingState; import org.elasticsearch.cluster.service.InternalClusterService; +import org.elasticsearch.common.Base64; import org.elasticsearch.common.Booleans; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesReference; @@ -211,7 +193,6 @@ import org.elasticsearch.index.mapper.internal.TimestampFieldMapper; import org.elasticsearch.index.mapper.ip.IpFieldMapper; import org.elasticsearch.index.mapper.object.ObjectMapper; -import org.elasticsearch.index.percolator.PercolatorQueriesRegistry; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.translog.Translog; import org.elasticsearch.indices.IndicesLifecycle; @@ -225,12 +206,9 @@ import com.fasterxml.jackson.core.JsonFactory; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; +import com.google.common.net.InetAddresses; - -/** - * - */ public class InternalCassandraClusterService extends InternalClusterService { public static final String ELASTIC_ID_COLUMN_NAME = "_id"; @@ -262,7 +240,6 @@ public class InternalCassandraClusterService extends InternalClusterService { .put("uuid", "string" ) .put("timeuuid", "string" ) .build(); - public static Map mapperToCql = new java.util.HashMap() { { @@ -305,7 +282,7 @@ public class InternalCassandraClusterService extends InternalClusterService { private final DiscoveryService discoveryService; protected final MappingUpdatedAction mappingUpdatedAction; - public final static Class defaultSecondaryIndexClass = ExtendedElasticSecondaryIndex.class; + public final static Class defaultSecondaryIndexClass = ExtendedElasticSecondaryIndex.class; protected final PrimaryFirstSearchStrategy primaryFirstSearchStrategy = new PrimaryFirstSearchStrategy(); protected final Map strategies = new ConcurrentHashMap(); @@ -319,9 +296,7 @@ public class InternalCassandraClusterService extends InternalClusterService { private final String selectMetadataQuery; private final String insertMetadataQuery; private final String updateMetaDataQuery; - - private CassandraShardStartedBarrier shardStateObserver = null; - + @Inject public InternalCassandraClusterService(Settings settings, DiscoveryService discoveryService, OperationRouting operationRouting, TransportService transportService, NodeSettingsService nodeSettingsService, @@ -463,7 +438,6 @@ public boolean processConditional(final ConsistencyLevel cl, final ConsistencyLe /** * Don't use QueryProcessor.executeInternal, we need to propagate this on all nodes. - * * @see org.elasticsearch.cassandra.ElasticSchemaService#createIndexKeyspace(java.lang.String, int) **/ @Override @@ -523,7 +497,6 @@ public static MapType getMapType(final String ksName, final String cfName, } } } catch (Exception e) { - } return null; } @@ -535,40 +508,54 @@ public static boolean isReservedKeyword(String identifier) { return keywordsPattern.matcher(identifier.toUpperCase(Locale.ROOT)).matches(); } - public static String toJsonValue(Object o) { + private static Object toJsonValue(Object o) { + if (o instanceof UUID) + return o.toString(); if (o instanceof Date) - return Long.toString( ((Date)o).getTime() ); - if (o instanceof Integer) - return Integer.toString((Integer)o); - if (o instanceof Long) - return Long.toString((Long)o); - if (o instanceof Double) - return Double.toString((Double)o); - if (o instanceof Float) - return Float.toString((Float)o); - return o.toString(); + return ((Date)o).getTime(); + if (o instanceof ByteBuffer) { + // encode byte[] as Base64 encoded string + ByteBuffer bb = ByteBufferUtil.clone((ByteBuffer)o); + CharBuffer encoded = CharBuffer.allocate(4 * (bb.capacity()+2) / 3); + Base64.encode(bb, encoded); + encoded.position(0); + return encoded.toString(); + } + if (o instanceof InetAddress) + return InetAddresses.toAddrString((InetAddress)o); + return o; } + // wrap string values with quotes private static String stringify(Object o) { - if (o instanceof String || o instanceof UUID) - return "\""+o+"\""; - return toJsonValue(o); + Object v = toJsonValue(o); + return v instanceof String ? "\""+v+"\"" : v.toString(); } public static String stringify(Object[] cols, int length) { if (cols.length == 1) - return toJsonValue(cols[0]); + return toJsonValue(cols[0]).toString(); StringBuilder sb = new StringBuilder(); sb.append("["); for(int i = 0; i < length; i++) { if (i > 0) sb.append(","); - sb.append(stringify(cols[i])); + Object val = toJsonValue(cols[i]); + if (val instanceof String) { + sb.append('"').append(val).append('"'); + } else { + sb.append(val); + } } return sb.append("]").toString(); } + public static ByteBuffer fromString(AbstractType atype, String v) throws IOException { + if (atype instanceof BytesType) + return ByteBuffer.wrap(Base64.decode(v)); + return atype.fromString(v); + } public static void toXContent(XContentBuilder builder, Mapper mapper, String field, Object value) throws IOException { if (value instanceof Collection) { @@ -1693,8 +1680,6 @@ public Object[] rowAsArray(final String index, final String type, UntypedResultS return values; } - - public static class BlockingActionListener implements ActionListener { private final CountDownLatch latch = new CountDownLatch(1); private volatile Throwable error = null; @@ -1723,7 +1708,6 @@ public void onFailure(Throwable e) { } } - @Override public void blockingMappingUpdate(IndexService indexService, String type, String source) throws Exception { BlockingActionListener mappingUpdateListener = new BlockingActionListener(); @@ -1743,7 +1727,7 @@ public void insertDocument(final IndicesService indicesService, final IndexReque upsertDocument(indicesService, request, indexMetaData, false); } - public void upsertDocument(final IndicesService indicesService, final IndexRequest request, final IndexMetaData indexMetaData, boolean updateOperation) throws Exception { + private void upsertDocument(final IndicesService indicesService, final IndexRequest request, final IndexMetaData indexMetaData, boolean updateOperation) throws Exception { final IndexService indexService = indicesService.indexServiceSafe(request.index()); final IndexShard indexShard = indexService.shardSafe(0); @@ -2002,9 +1986,9 @@ public DocPrimaryKey parseElasticId(final String index, final String type, final AbstractType atype = cd.type; if (map == null) { names[i] = cd.name.toString(); - values[i] = atype.compose( atype.fromString(elements[i].toString()) ); + values[i] = atype.compose( fromString(atype, elements[i].toString()) ); } else { - map.put(cd.name.toString(), atype.compose( atype.fromString(elements[i].toString()) ) ); + map.put(cd.name.toString(), atype.compose( fromString(atype, elements[i].toString()) ) ); } } return (map != null) ? null : new DocPrimaryKey(names, values, (clusteringColumns.size() > 0 && elements.length == partitionColumns.size()) ) ; @@ -2012,14 +1996,14 @@ public DocPrimaryKey parseElasticId(final String index, final String type, final // _id is a single columns, parse its value. AbstractType atype = partitionColumns.get(0).type; if (map == null) { - return new DocPrimaryKey( new String[] { partitionColumns.get(0).name.toString() } , new Object[] { atype.compose( atype.fromString(id) ) }, clusteringColumns.size() != 0); + return new DocPrimaryKey( new String[] { partitionColumns.get(0).name.toString() } , new Object[] { atype.compose(fromString(atype, id)) }, clusteringColumns.size() != 0); } else { - map.put(partitionColumns.get(0).name.toString(), atype.compose( atype.fromString(id) ) ); + map.put(partitionColumns.get(0).name.toString(), atype.compose( fromString(atype, id) ) ); return null; } } } - + public DocPrimaryKey parseElasticRouting(final String index, final String type, final String routing) throws JsonParseException, JsonMappingException, IOException { IndexService indexService = indexServiceSafe(index); String ksName = indexService.settingsService().getSettings().get(IndexMetaData.SETTING_KEYSPACE,index); @@ -2040,14 +2024,14 @@ public DocPrimaryKey parseElasticRouting(final String index, final String type, ColumnDefinition cd = partitionColumns.get(i); AbstractType atype = cd.type; names[i] = cd.name.toString(); - values[i] = atype.compose( atype.fromString(elements[i].toString()) ); + values[i] = atype.compose( fromString(atype, elements[i].toString()) ); i++; } return new DocPrimaryKey(names, values) ; } else { // _id is a single columns, parse its value. AbstractType atype = partitionColumns.get(0).type; - return new DocPrimaryKey( new String[] { partitionColumns.get(0).name.toString() } , new Object[] { atype.compose( atype.fromString(routing) ) }); + return new DocPrimaryKey( new String[] { partitionColumns.get(0).name.toString() } , new Object[] { atype.compose( fromString(atype, routing) ) }); } } @@ -2269,20 +2253,6 @@ public void createOrUpdateElasticAdminKeyspace() { } } - /* - private boolean checkConsistency(String ksName, ConsistencyLevel cl) { - Keyspace adminKeypsace = Schema.instance.getKeyspaceInstance(ksName); - DatacenterReplicationStrategy replicationStrategy = (DatacenterReplicationStrategy)adminKeypsace.getReplicationStrategy(); - Collection aliveNodes = replicationStrategy.getAliveEndpoints(); - if (!cl.isSufficientLiveNodes(adminKeypsace, aliveNodes)) { - logger.warn("Only {}/{} live nodes on keyspace [{}], cannot succeed transaction with CL={}", - aliveNodes.size(), replicationStrategy.getEndpoints().size(), ksName, metadataWriteCL); - return false; - } - return true; - } - */ - @Override public ShardInfo shardInfo(String index, ConsistencyLevel cl) { Keyspace keyspace = Schema.instance.getKeyspaceInstance(state().metaData().index(index).keyspace()); diff --git a/core/src/main/java/org/elassandra/index/ElasticSecondaryIndex.java b/core/src/main/java/org/elassandra/index/ElasticSecondaryIndex.java index e2e46c6feb1..b8eb5841a92 100644 --- a/core/src/main/java/org/elassandra/index/ElasticSecondaryIndex.java +++ b/core/src/main/java/org/elassandra/index/ElasticSecondaryIndex.java @@ -25,7 +25,9 @@ import java.nio.file.attribute.FileAttribute; import java.nio.file.attribute.PosixFilePermission; import java.nio.file.attribute.PosixFilePermissions; +import java.text.MessageFormat; import java.util.ArrayList; +import java.util.Arrays; import java.util.BitSet; import java.util.Collection; import java.util.Collections; @@ -48,7 +50,6 @@ import java.util.function.BiFunction; import java.util.regex.Matcher; import java.util.regex.Pattern; -import java.util.stream.Collectors; import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.config.ColumnDefinition; @@ -286,7 +287,7 @@ protected Context initialValue() { return new Context(); } }; - + abstract class FilterableDocument extends ParseContext.Document implements Predicate { boolean applyFilter = false; @@ -852,50 +853,165 @@ public IndexShard shard() { return indexShard; } + public void refresh() { + if (this.refresh) { + IndexShard shard = shard(); + if (shard != null) { + try { + shard.refresh("synchronous_refresh"); + } catch (Throwable e) { + logger.error("error", e); + } + } + } + } + + public void deleteByQuery(RangeTombstone tombstone) { + IndexShard shard = shard(); + if (shard != null) { + Slice slice = tombstone.deletedSlice(); + Bound start = slice.start(); + Bound end = slice.end(); + + DocumentMapper docMapper = indexService.mapperService().documentMapper(typeName); + BooleanQuery.Builder builder = new BooleanQuery.Builder(); + builder.add( new TermQuery(new Term(TypeFieldMapper.NAME, docMapper.typeMapper().fieldType().indexedValueForSearch(typeName))), Occur.FILTER); + + // build the primary key part of the delete by query + int i = 0; + for(ColumnDefinition cd : baseCfs.metadata.primaryKeyColumns()) { + if (i >= start.size()) + break; + if (indexedPkColumns[i]) { + FieldMapper mapper = docMapper.mappers().smartNameFieldMapper(cd.name.toString()); + builder.add( buildQuery( cd, mapper, start.get(i), end.get(i), start.isInclusive(), end.isInclusive()), Occur.FILTER); + } + i++; + } + + Query query = builder.build(); + if (logger.isTraceEnabled()) { + logger.trace("delete rangeTombstone from ks.cf={}.{} query={} in elasticsearch index=[{}]", baseCfs.metadata.ksName, baseCfs.name, query, name); + } + if (!updated) + updated = true; + DeleteByQuery deleteByQuery = new DeleteByQuery(query, null, null, null, null, Operation.Origin.PRIMARY, System.currentTimeMillis(), typeName); + shard.engine().delete(deleteByQuery); + } + } + + /** + * Build range query to remove a row slice. + * @param cd + * @param mapper + * @param lower + * @param upper + * @param includeLower + * @param includeUpper + * @return + */ + @SuppressForbidden(reason="unchecked") + private Query buildQuery(ColumnDefinition cd, FieldMapper mapper, ByteBuffer lower, ByteBuffer upper, boolean includeLower, boolean includeUpper) { + Object start = cd.type.compose(lower); + Object end = cd.type.compose(upper); + Query query = null; + if (mapper != null) { + CQL3Type cql3Type = cd.type.asCQL3Type(); + if (cql3Type instanceof CQL3Type.Native) { + switch ((CQL3Type.Native) cql3Type) { + case ASCII: + case TEXT: + case VARCHAR: + if (start.equals(end)) { + query = new TermQuery(new Term(cd.name.toString(), mapper.fieldType().indexedValueForSearch(start))); + } else { + query = new TermRangeQuery(cd.name.toString(), mapper.fieldType().indexedValueForSearch(start), mapper.fieldType().indexedValueForSearch(end),includeLower, includeUpper); + } + break; + case INT: + case SMALLINT: + case TINYINT: + query = NumericRangeQuery.newIntRange(cd.name.toString(), (Integer) start, (Integer) end, includeLower, includeUpper); + break; + case INET: + case TIMESTAMP: + case BIGINT: + query = NumericRangeQuery.newLongRange(cd.name.toString(), (Long) start, (Long) start, includeLower, includeUpper); + break; + case DOUBLE: + query = NumericRangeQuery.newDoubleRange(cd.name.toString(), (Double) start, (Double) start, includeLower, includeUpper); + break; + case FLOAT: + query = NumericRangeQuery.newFloatRange(cd.name.toString(), (Float) start, (Float) start, includeLower, includeUpper); + break; + + case DECIMAL: + case TIMEUUID: + case UUID: + case BLOB: + case BOOLEAN: + throw new UnsupportedOperationException("Unsupported data type in primary key"); + } + } + } else { + throw new UnsupportedOperationException("Object type in primary key not supported"); + } + return query; + } + public boolean indexStaticOnly() { return this.index_static_only; } + + public String toString() { + return this.name; + } + + } class ImmutablePartitionFunction { final String name; final String pattern; final String[] fields; // indexed fields used in the partition function - final int[] fieldsIndices; // column position in Rowcument.values - final Set indices; + final int[] fieldsIdx; // column position in Rowcument.values + final Set indices; // associated indices + final PartitionFunction partitionFunction; - // args = field names used in the partition function ImmutablePartitionFunction(String[] args) { + this(args, new MessageFormatPartitionFunction()); + } + + ImmutablePartitionFunction(String[] args, PartitionFunction partitionFunc) { this.name = args[0]; this.pattern = args[1]; this.fields = new String[args.length-2]; - this.fieldsIndices = new int[args.length-2]; + this.fieldsIdx = new int[args.length-2]; System.arraycopy(args, 2, this.fields, 0, args.length-2); this.indices = new HashSet(); + this.partitionFunction = partitionFunc; } // values = indexed values in the same order as MappingInfo.fields - @SuppressForbidden(reason="unchecked") String indexName(Object[] values) { Object[] args = new Object[fields.length]; - for(int i=0; i < fieldsIndices.length; i++) { - args[i] = (fieldsIndices[i] < values.length) ? values[fieldsIndices[i]] : null; - } - return String.format(pattern, args); - //return MessageFormat.format(pattern, args); + for(int i=0; i < fieldsIdx.length; i++) + args[i] = (fieldsIdx[i] < values.length) ? values[fieldsIdx[i]] : null; + return partitionFunction.format(pattern, args); } public String toString() { return this.name; } + + } final Map partitionFunctions; - final Map indices = new HashMap(); - //final String[] fields; - final ObjectIntHashMap fieldsToIdx; - final Map columnsDefs = new HashMap(); + final ImmutableIndexInfo[] indices; + final ObjectIntHashMap indexToIdx; + final ObjectIntHashMap fieldsToIdx; final BitSet fieldsToRead; final BitSet staticColumns; final boolean indexSomeStaticColumns; @@ -910,17 +1026,20 @@ public String toString() { if (state.blocks().hasGlobalBlock(ClusterBlockLevel.WRITE)) { logger.debug("global write blocked"); + this.indices = null; + this.indexToIdx = null; this.fieldsToIdx = null; this.fieldsToRead = null; this.staticColumns = null; this.indexSomeStaticColumns = false; this.indexedPkColumns = null; this.partitionFunctions = null; - return; + return; } Map fieldsMap = new HashMap(); Map partFuncs = null; + List indexList = new ArrayList(); for(Iterator indexMetaDataIterator = state.metaData().iterator(); indexMetaDataIterator.hasNext(); ) { IndexMetaData indexMetaData = indexMetaDataIterator.next(); @@ -937,7 +1056,7 @@ public String toString() { continue; } - if ( ElasticSecondaryIndex.this.baseCfs.metadata.ksName.equals(indexMetaData.keyspace()) && (mappingMetaData = indexMetaData.mapping(typeName)) != null) { + if (ElasticSecondaryIndex.this.baseCfs.metadata.ksName.equals(indexMetaData.keyspace()) && (mappingMetaData = indexMetaData.mapping(typeName)) != null) { try { Map mappingMap = (Map)mappingMetaData.getSourceAsMap(); if (mappingMap.get("properties") != null) { @@ -948,7 +1067,7 @@ public String toString() { continue; } ImmutableIndexInfo indexInfo = new ImmutableIndexInfo(index, indexService, mappingMetaData, state.metaData(), indexMetaData.isIndexUsingVersionLessEngine(indexMetaData.getSettings())); - this.indices.put(index, indexInfo); + indexList.add(indexInfo); Map props = (Map)mappingMap.get("properties"); for(String fieldName : props.keySet() ) { @@ -976,10 +1095,9 @@ public String toString() { if (pf != null) { if (partFuncs == null) partFuncs = new HashMap(); - ImmutablePartitionFunction func = partFuncs.get(pf[0]); if (func == null) { - func = new ImmutablePartitionFunction(pf); + func = new ImmutablePartitionFunction(pf, indexMetaData.partitionFunctionClass()); partFuncs.put(func.name, func); } if (!func.pattern.equals(pf[1])) { @@ -994,9 +1112,11 @@ public String toString() { } } - if (indices.size() == 0) { + if (indexList.size() == 0) { if (logger.isTraceEnabled()) logger.warn("No active elasticsearch index for keyspace.table=[{}.{}] state={}",baseCfs.metadata.ksName, baseCfs.name, state); + this.indices = null; + this.indexToIdx = null; this.fieldsToIdx = null; this.fieldsToRead = null; this.staticColumns = null; @@ -1006,6 +1126,13 @@ public String toString() { return; } + // build indices array and indexToIdx map + this.indices = new ImmutableIndexInfo[indexList.size()]; + this.indexToIdx = new ObjectIntHashMap(indexList.size()); + for(int i = 0; i < indexList.size(); i++) { + indices[i] = indexList.get(i); + indexToIdx.put(indexList.get(i).name, i); + } // order fields with pk columns first final String[] fields = new String[fieldsMap.size()]; @@ -1031,7 +1158,7 @@ public String toString() { } } // build a map for fields, as it is O(1) rather than O(n) for an array. - this.fieldsToIdx = new ObjectIntHashMap(fields.length); + this.fieldsToIdx = new ObjectIntHashMap(fields.length); for(int i=0; i < fields.length; i++) this.fieldsToIdx.put(fields[i], i); @@ -1040,7 +1167,6 @@ public String toString() { for(int i=0; i < fields.length; i++) { ColumnIdentifier colId = new ColumnIdentifier(fields[i],true); ColumnDefinition colDef = baseCfs.metadata.getColumnDefinition(colId); - columnsDefs.put(fields[i], colDef); this.fieldsToRead.set(i, fieldsMap.get(fields[i]) && !colDef.isPrimaryKeyColumn()); if (staticColumns != null) this.staticColumns.set(i,colDef.isStatic()); @@ -1050,7 +1176,7 @@ public String toString() { for(ImmutablePartitionFunction func : partFuncs.values()) { int i = 0; for(String field : func.fields) - func.fieldsIndices[i++] = this.fieldsToIdx.getOrDefault(field, -1); + func.fieldsIdx[i++] = this.fieldsToIdx.getOrDefault(field, -1); } this.partitionFunctions = partFuncs; } else { @@ -1058,7 +1184,7 @@ public String toString() { } // build InderInfo.mappers arrays. - for(ImmutableIndexInfo indexInfo : indices.values()) { + for(ImmutableIndexInfo indexInfo : this.indices) { indexInfo.mappers = new Mapper[fields.length]; for(int i=0; i < fields.length; i++) { DocumentMapper docMapper = indexInfo.indexService.mapperService().documentMapper(typeName); @@ -1068,7 +1194,7 @@ public String toString() { } boolean _indexSomeStaticColumns = false; - for(ImmutableIndexInfo indexInfo : this.indices.values()) { + for(ImmutableIndexInfo indexInfo : this.indices) { if (indexInfo.index_static_columns) { _indexSomeStaticColumns = true; break; @@ -1077,44 +1203,50 @@ public String toString() { this.indexSomeStaticColumns = _indexSomeStaticColumns; } - public Collection targetIndices(final Object[] values) { + public BitSet targetIndices(final Object[] values) { if (this.partitionFunctions == null) - return this.indices.values(); + return null; - Set targetIndices = new HashSet(this.partitionFunctions.size()); + BitSet targets = new BitSet(this.indices.length); for(ImmutablePartitionFunction func : this.partitionFunctions.values()) { String indexName = func.indexName(values); - ImmutableIndexInfo targetIndexInfo = this.indices.get(indexName); - if (targetIndexInfo != null) { - targetIndices.add( targetIndexInfo ); + int indexIdx = this.indexToIdx.getOrDefault(indexName, -1); + if (indexIdx >= 0) { + targets.set(indexIdx); } else { if (logger.isDebugEnabled()) - logger.debug("No target index=[{}] found for partition function name=[{}] pattern=[{}] indices={}", indexName, func.name, func.pattern, this.indices.keySet()); + logger.debug("No target index=[{}] found for partition function name=[{}] pattern=[{}] indices={}", + indexName, func.name, func.pattern, Arrays.stream(mappingInfo.indices).map(i -> i.name)); } } if (logger.isTraceEnabled()) - logger.trace("Partition target indices={}", targetIndices.stream().map( e -> e.name ).collect( Collectors.toList() )); - return targetIndices; + logger.trace("Partition index bitset={} indices={}", targets, this.indices); + return targets; } - public Collection targetIndicesForDelete(final Object[] values) { + public BitSet targetIndicesForDelete(final Object[] values) { if (this.partitionFunctions == null) - return this.indices.values(); + return null; - Set targetIndices = new HashSet(this.partitionFunctions.size()); + BitSet targets = new BitSet(this.indices.length); for(ImmutablePartitionFunction func : this.partitionFunctions.values()) { String indexName = func.indexName(values); - ImmutableIndexInfo targetIndexInfo = this.indices.get(indexName); - if (targetIndexInfo != null) { - targetIndices.add( targetIndexInfo ); + int indexIdx = this.indexToIdx.getOrDefault(indexName, -1); + if (indexIdx >= 0) { + targets.set(indexIdx); } else { if (logger.isWarnEnabled()) - logger.warn("No target index=[{}] found, function name=[{}] pattern=[{}], return all indices={}", indexName, func.name, func.pattern, this.indices); - for(String index : func.indices) - targetIndices.add( this.indices.get(index) ); + logger.warn("No target index=[{}] found, function name=[{}] pattern=[{}], return all indices={}", + indexName, func.name, func.pattern, Arrays.stream(mappingInfo.indices).map(i -> i.name)); + for(String index : func.indices) { + int i = this.indexToIdx.getOrDefault(index, -1); + if (i >= 0) + targets.set(i); + } + } } - return targetIndices; + return targets; } class WideRowcumentIndexer extends RowcumentIndexer { @@ -1191,103 +1323,19 @@ public void flush() { */ @Override public void rangeTombstone(RangeTombstone tombstone) { - Slice slice = tombstone.deletedSlice(); - Bound start = slice.start(); - Bound end = slice.end(); - try { - for(ImmutableMappingInfo.ImmutableIndexInfo indexInfo : targetIndices(pkCols)) { - IndexShard indexShard = indexInfo.indexService.shard(0); - if (indexShard != null) { - DocumentMapper docMapper = indexInfo.indexService.mapperService().documentMapper(typeName); - BooleanQuery.Builder builder = new BooleanQuery.Builder(); - builder.add( new TermQuery(new Term(TypeFieldMapper.NAME, docMapper.typeMapper().fieldType().indexedValueForSearch(typeName))), Occur.FILTER); - - // build the primary key part of the delete by query - int i = 0; - for(ColumnDefinition cd : baseCfs.metadata.primaryKeyColumns()) { - if (i >= start.size()) - break; - if (indexedPkColumns[i]) { - FieldMapper mapper = docMapper.mappers().smartNameFieldMapper(cd.name.toString()); - builder.add( buildQuery( cd, mapper, start.get(i), end.get(i), start.isInclusive(), end.isInclusive()), Occur.FILTER); - } - i++; - } - - Query query = builder.build(); - if (logger.isTraceEnabled()) { - logger.trace("delete rangeTombstone from ks.cf={}.{} query={} in elasticsearch index=[{}]", baseCfs.metadata.ksName, baseCfs.name, query, indexInfo.name); - } - if (!indexInfo.updated) - indexInfo.updated = true; - DeleteByQuery deleteByQuery = new DeleteByQuery(query, null, null, null, null, Operation.Origin.PRIMARY, System.currentTimeMillis(), typeName); - indexShard.engine().delete(deleteByQuery); - } + BitSet targets = targetIndices(pkCols); + if (targets == null) { + for(ImmutableMappingInfo.ImmutableIndexInfo indexInfo : indices) + indexInfo.deleteByQuery(tombstone); + } else { + for(int i = targets.nextSetBit(0); i >= 0 && i < indices.length; i = targets.nextSetBit(i+1)) + indices[i].deleteByQuery(tombstone); } } catch(Throwable t) { logger.error("Unexpected error", t); } - } - - /** - * Build range query to remove a row slice. - * @param cd - * @param mapper - * @param lower - * @param upper - * @param includeLower - * @param includeUpper - * @return - */ - @SuppressForbidden(reason="unchecked") - private Query buildQuery(ColumnDefinition cd, FieldMapper mapper, ByteBuffer lower, ByteBuffer upper, boolean includeLower, boolean includeUpper) { - Object start = cd.type.compose(lower); - Object end = cd.type.compose(upper); - Query query = null; - if (mapper != null) { - CQL3Type cql3Type = cd.type.asCQL3Type(); - if (cql3Type instanceof CQL3Type.Native) { - switch ((CQL3Type.Native) cql3Type) { - case ASCII: - case TEXT: - case VARCHAR: - if (start.equals(end)) { - query = new TermQuery(new Term(cd.name.toString(), mapper.fieldType().indexedValueForSearch(start))); - } else { - query = new TermRangeQuery(cd.name.toString(), mapper.fieldType().indexedValueForSearch(start), mapper.fieldType().indexedValueForSearch(end),includeLower, includeUpper); - } - break; - case INT: - case SMALLINT: - case TINYINT: - query = NumericRangeQuery.newIntRange(cd.name.toString(), (Integer) start, (Integer) end, includeLower, includeUpper); - break; - case INET: - case TIMESTAMP: - case BIGINT: - query = NumericRangeQuery.newLongRange(cd.name.toString(), (Long) start, (Long) start, includeLower, includeUpper); - break; - case DOUBLE: - query = NumericRangeQuery.newDoubleRange(cd.name.toString(), (Double) start, (Double) start, includeLower, includeUpper); - break; - case FLOAT: - query = NumericRangeQuery.newFloatRange(cd.name.toString(), (Float) start, (Float) start, includeLower, includeUpper); - break; - - case DECIMAL: - case TIMEUUID: - case UUID: - case BLOB: - case BOOLEAN: - throw new UnsupportedOperationException("Unsupported data type in primary key"); - } - } - } else { - throw new UnsupportedOperationException("Object type in primary key not supported"); - } - return query; - } + } } class SkinnyRowcumentIndexer extends RowcumentIndexer { @@ -1346,6 +1394,7 @@ abstract class RowcumentIndexer implements Index.Indexer { final OpOrder.Group opGroup; final Object[] pkCols = new Object[baseCfs.metadata.partitionKeyColumns().size()+baseCfs.metadata.clusteringColumns().size()]; final String partitionKey; + BitSet targets = null; public RowcumentIndexer(final DecoratedKey key, final PartitionColumns columns, @@ -1445,17 +1494,14 @@ public void removeRow(Row row) { @Override public void finish() { flush(); - for (ImmutableMappingInfo.ImmutableIndexInfo indexInfo : targetIndices(pkCols)) { - IndexShard indexShard = indexInfo.indexService.shard(0); - if (indexShard != null) { - if (indexInfo.refresh) { - try { - indexShard.refresh("synchronous_refresh"); - } catch (Throwable e) { - logger.error("error", e); - } - } - } + if (this.targets == null) { + // refresh all associated indices. + for(ImmutableMappingInfo.ImmutableIndexInfo indexInfo : indices) + indexInfo.refresh(); + } else { + // refresh matching partition indices. + for(int i = targets.nextSetBit(0); i >= 0 && i < indices.length; i = targets.nextSetBit(i+1)) + indices[i].refresh(); } } @@ -1469,7 +1515,6 @@ public RowIterator read(SinglePartitionReadCommand command) { } class Rowcument { - //final Row row; final String id; final Object[] values = new Object[fieldsToIdx.size()]; final BitSet fieldsNotNull = new BitSet(fieldsToIdx.size()); // regular or static columns only @@ -1518,18 +1563,16 @@ public boolean isStatic() { } public void readCellValues(Row row, boolean indexOp) throws IOException { - for(Cell cell : row.cells()) { + for(Cell cell : row.cells()) readCellValue(cell, indexOp); - } } public void readCellValue(Cell cell, boolean indexOp) throws IOException { final String cellNameString = cell.column().name.toString(); int idx = fieldsToIdx.getOrDefault(cellNameString, -1); - if (idx == - 1) { - //ignore cell, not indexed. - return; - } + if (idx == - 1) + return; //ignore cell, not indexed. + if (cell.isLive(nowInSec) && indexOp) { docTtl = Math.min(cell.localDeletionTime(), docTtl); @@ -1714,81 +1757,98 @@ public void index() { long startTime = System.nanoTime(); long ttl = (long)((this.docTtl < Integer.MAX_VALUE) ? this.docTtl : 0); - for(ImmutableIndexInfo indexInfo : ImmutableMappingInfo.this.targetIndices(values)) { - try { - Context context = buildContext(indexInfo, isStatic()); - - Field uid = context.uid(); - if (isStatic()) { - uid = new Field(UidFieldMapper.NAME, Uid.createUid(typeName, partitionKey), Defaults.FIELD_TYPE); - for(Document doc : context.docs()) { - if (doc instanceof Context.StaticDocument) - ((Context.StaticDocument)doc).applyFilter(isStatic()); - } - - } - context.finalize(); - final ParsedDocument parsedDoc = new ParsedDocument( - uid, - context.version(), - (isStatic()) ? partitionKey : context.id(), - context.type(), - InternalCassandraClusterService.stringify(pkCols, baseCfs.metadata.partitionKeyColumns().size()), // routing - System.currentTimeMillis(), // timstamp - ttl, - ((Long)key.getToken().getTokenValue()).longValue(), - context.docs(), - context.source(), // source - (Mapping)null); // mappingUpdate - - parsedDoc.parent(context.parent()); - - if (logger.isTraceEnabled()) - logger.trace("index={} id={} type={} uid={} routing={} docs={}", context.indexInfo.name, parsedDoc.id(), parsedDoc.type(), parsedDoc.uid(), parsedDoc.routing(), parsedDoc.docs()); - - final IndexShard indexShard = context.indexInfo.shard(); - if (indexShard != null) { - if (!indexInfo.updated) - indexInfo.updated = true; - final Engine.Index operation = new Engine.Index(context.docMapper.uidMapper().term(uid.stringValue()), - parsedDoc, - indexInfo.versionLessEngine ? 1 : Versions.MATCH_ANY, - indexInfo.versionLessEngine ? VersionType.EXTERNAL : VersionType.INTERNAL, - Engine.Operation.Origin.PRIMARY, - startTime, - false); - - final boolean created = operation.execute(indexShard); - - if (logger.isDebugEnabled()) { - logger.debug("document CF={}.{} index={} type={} id={} version={} created={} ttl={} refresh={} ", - baseCfs.metadata.ksName, baseCfs.metadata.cfName, - context.indexInfo.name, typeName, - parsedDoc.id(), operation.version(), created, ttl, context.indexInfo.refresh); - } - } - } catch (IOException e) { - logger.error("error", e); - } + targets = ImmutableMappingInfo.this.targetIndices(values); + if (targets == null) { + // index for associated indices + for(ImmutableIndexInfo indexInfo : indices) + index(indexInfo, startTime, ttl); + } else { + // delete for matching target indices. + for(int i = targets.nextSetBit(0); i >= 0 && i < indices.length; i = targets.nextSetBit(i+1)) + index(indices[i], startTime, ttl); } } - public void delete() { - for (ImmutableMappingInfo.ImmutableIndexInfo indexInfo : targetIndices(values)) { - final IndexShard indexShard = indexInfo.shard(); + private void index(ImmutableIndexInfo indexInfo, long startTime, long ttl) { + try { + Context context = buildContext(indexInfo, isStatic()); + Field uid = context.uid(); + if (isStatic()) { + uid = new Field(UidFieldMapper.NAME, Uid.createUid(typeName, partitionKey), Defaults.FIELD_TYPE); + for(Document doc : context.docs()) { + if (doc instanceof Context.StaticDocument) + ((Context.StaticDocument)doc).applyFilter(isStatic()); + } + } + context.finalize(); + final ParsedDocument parsedDoc = new ParsedDocument( + uid, + context.version(), + (isStatic()) ? partitionKey : context.id(), + context.type(), + InternalCassandraClusterService.stringify(pkCols, baseCfs.metadata.partitionKeyColumns().size()), // routing + System.currentTimeMillis(), // timstamp + ttl, + ((Long)key.getToken().getTokenValue()).longValue(), + context.docs(), + context.source(), // source + (Mapping)null); // mappingUpdate + + parsedDoc.parent(context.parent()); + + if (logger.isTraceEnabled()) + logger.trace("index={} id={} type={} uid={} routing={} docs={}", context.indexInfo.name, parsedDoc.id(), parsedDoc.type(), parsedDoc.uid(), parsedDoc.routing(), parsedDoc.docs()); + + final IndexShard indexShard = context.indexInfo.shard(); if (indexShard != null) { - if (logger.isDebugEnabled()) - logger.debug("deleting document from index.type={}.{} id={}", indexInfo.name, typeName, id); if (!indexInfo.updated) indexInfo.updated = true; - Engine.Delete delete = indexShard.prepareDeleteOnPrimary(typeName, id, + final Engine.Index operation = new Engine.Index(context.docMapper.uidMapper().term(uid.stringValue()), + parsedDoc, indexInfo.versionLessEngine ? 1 : Versions.MATCH_ANY, - indexInfo.versionLessEngine ? VersionType.EXTERNAL : VersionType.INTERNAL); - indexShard.delete(delete); - } + indexInfo.versionLessEngine ? VersionType.EXTERNAL : VersionType.INTERNAL, + Engine.Operation.Origin.PRIMARY, + startTime, + false); + + if (logger.isDebugEnabled()) { + logger.debug("document CF={}.{} index={} type={} id={} version={} created={} ttl={} refresh={} ", + baseCfs.metadata.ksName, baseCfs.metadata.cfName, + context.indexInfo.name, typeName, + parsedDoc.id(), operation.version(), operation.execute(indexShard), ttl, context.indexInfo.refresh); + } + } + } catch (IOException e) { + logger.error("error", e); + } + } + + public void delete() { + targets = ImmutableMappingInfo.this.targetIndices(values); + if (targets == null) { + // delete for associated indices + for(ImmutableMappingInfo.ImmutableIndexInfo indexInfo : indices) + delete(indexInfo); + } else { + // delete for matching target indices. + for(int i = targets.nextSetBit(0); i >= 0 && i < indices.length; i = targets.nextSetBit(i+1)) + delete(indices[i]); } } + private void delete(ImmutableIndexInfo indexInfo) { + final IndexShard indexShard = indexInfo.shard(); + if (indexShard != null) { + if (logger.isDebugEnabled()) + logger.debug("deleting document from index.type={}.{} id={}", indexInfo.name, typeName, id); + if (!indexInfo.updated) + indexInfo.updated = true; + Engine.Delete delete = indexShard.prepareDeleteOnPrimary(typeName, id, + indexInfo.versionLessEngine ? 1 : Versions.MATCH_ANY, + indexInfo.versionLessEngine ? VersionType.EXTERNAL : VersionType.INTERNAL); + indexShard.delete(delete); + } + } } @@ -1805,7 +1865,7 @@ public void partitionDelete(DeletionTime deletionTime) { mappingInfoLock.readLock().lock(); try { // Delete documents where _token = token_long + _type = typeName - for (ImmutableMappingInfo.ImmutableIndexInfo indexInfo : mappingInfo.indices.values()) { + for (ImmutableMappingInfo.ImmutableIndexInfo indexInfo : indices) { if (logger.isTraceEnabled()) logger.trace("deleting documents where _token={} from index.type={}.{} id={}", token_long, indexInfo.name, typeName); IndexShard indexShard = indexInfo.indexService.shard(0); @@ -1836,9 +1896,6 @@ public void partitionDelete(DeletionTime deletionTime) { public void rangeTombstone(RangeTombstone tombstone) { logger.warn("Ignoring range tombstone {}", tombstone); } - - - } } @@ -1851,7 +1908,7 @@ public boolean isIndexing() { logger.warn("No Elasticsearch index ready"); return false; } - if (mappingInfo.indices.size() == 0) { + if (mappingInfo.indices == null || mappingInfo.indices.length == 0) { if (logger.isWarnEnabled()) logger.warn("No Elasticsearch index configured for {}.{}",this.baseCfs.metadata.ksName, this.baseCfs.metadata.cfName); return false; @@ -1864,7 +1921,7 @@ public void initMapping() { try { mappingInfo = new ImmutableMappingInfo(this.clusterService.state()); logger.debug("Secondary index=[{}] initialized, metadata.version={} mappingInfo.indices={}", - index_name, mappingInfo.metadataVersion, mappingInfo.indices.keySet()); + index_name, mappingInfo.metadataVersion, Arrays.stream(mappingInfo.indices).map(i -> i.name)); } catch(Exception e) { logger.error("Failed to update mapping index=[{}]",e ,index_name); } finally { @@ -1894,7 +1951,7 @@ public void clusterChanged(ClusterChangedEvent event) { try { mappingInfo = new ImmutableMappingInfo(event.state()); logger.debug("secondary index=[{}] metadata.version={} mappingInfo.indices={}", - this.index_name, event.state().metaData().version(), mappingInfo.indices.keySet() ); + this.index_name, event.state().metaData().version(), Arrays.stream(mappingInfo.indices).map(i -> i.name)); } catch(Exception e) { logger.error("Failed to update mapping index=[{}]", e, index_name); } finally { @@ -1931,7 +1988,7 @@ public Callable getBlockingFlushTask() { return () -> { if (isIndexing()) { - for(ImmutableMappingInfo.ImmutableIndexInfo indexInfo : mappingInfo.indices.values()) { + for(ImmutableMappingInfo.ImmutableIndexInfo indexInfo : mappingInfo.indices) { try { IndexShard indexShard = indexInfo.indexService.shard(0); if (indexShard != null && indexInfo.updated) { @@ -1972,7 +2029,7 @@ public Callable getSnapshotWithoutFlushTask(String snapshotName) { return () -> { if (isIndexing()) { - for(ImmutableMappingInfo.ImmutableIndexInfo indexInfo : mappingInfo.indices.values()) { + for(ImmutableMappingInfo.ImmutableIndexInfo indexInfo : mappingInfo.indices) { IndexShard indexShard = indexInfo.indexService.shard(0); if (indexShard != null && indexInfo.snapshot) { if (indexShard.state() == IndexShardState.STARTED) { @@ -2022,7 +2079,7 @@ public Callable getInvalidateTask() { public Callable getTruncateTask(long truncatedAt) { return () -> { if (isIndexing()) { - for(ImmutableMappingInfo.ImmutableIndexInfo indexInfo : mappingInfo.indices.values()) { + for(ImmutableMappingInfo.ImmutableIndexInfo indexInfo : mappingInfo.indices) { try { IndexShard indexShard = indexInfo.indexService.shard(0); if (indexShard != null) { diff --git a/core/src/main/java/org/elassandra/index/MessageFormatPartitionFunction.java b/core/src/main/java/org/elassandra/index/MessageFormatPartitionFunction.java new file mode 100644 index 00000000000..cafb22d601a --- /dev/null +++ b/core/src/main/java/org/elassandra/index/MessageFormatPartitionFunction.java @@ -0,0 +1,33 @@ +/* + * Copyright (c) 2017 Strapdata (http://www.strapdata.com) + * Contains some code from Elasticsearch (http://www.elastic.co) + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.elassandra.index; + +import java.text.MessageFormat; + +import org.elasticsearch.common.SuppressForbidden; + +/** + * Default PartitionFunction implementation. + * @author vroyer + * + */ +@SuppressForbidden(reason="unchecked MessageFormat.format()") +public class MessageFormatPartitionFunction implements PartitionFunction { + @Override + public String format(String pattern, Object... args) { + return MessageFormat.format(pattern, args); + } +} diff --git a/core/src/main/java/org/elassandra/index/PartitionFunction.java b/core/src/main/java/org/elassandra/index/PartitionFunction.java new file mode 100644 index 00000000000..841a1aab279 --- /dev/null +++ b/core/src/main/java/org/elassandra/index/PartitionFunction.java @@ -0,0 +1,25 @@ +/* + * Copyright (c) 2017 Strapdata (http://www.strapdata.com) + * Contains some code from Elasticsearch (http://www.elastic.co) + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.elassandra.index; + +/** + * Allows to override partition function with various implementation. + * @author vroyer + * + */ +public interface PartitionFunction { + public String format(String pattern, Object...args); +} diff --git a/core/src/main/java/org/elassandra/index/StringPartitionFunction.java b/core/src/main/java/org/elassandra/index/StringPartitionFunction.java new file mode 100644 index 00000000000..31b3429fabe --- /dev/null +++ b/core/src/main/java/org/elassandra/index/StringPartitionFunction.java @@ -0,0 +1,25 @@ +/* + * Copyright (c) 2017 Strapdata (http://www.strapdata.com) + * Contains some code from Elasticsearch (http://www.elastic.co) + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.elassandra.index; + +import java.util.Locale; + +public class StringPartitionFunction implements PartitionFunction { + @Override + public String format(String pattern, Object... args) { + return String.format(Locale.ROOT, pattern, args); + } +} diff --git a/core/src/main/java/org/elasticsearch/cluster/metadata/IndexMetaData.java b/core/src/main/java/org/elasticsearch/cluster/metadata/IndexMetaData.java index 492faef5eab..e72fc9b7ea3 100644 --- a/core/src/main/java/org/elasticsearch/cluster/metadata/IndexMetaData.java +++ b/core/src/main/java/org/elasticsearch/cluster/metadata/IndexMetaData.java @@ -32,9 +32,10 @@ import java.util.Locale; import java.util.Map; +import org.apache.cassandra.utils.FBUtilities; import org.elassandra.cluster.InternalCassandraClusterService; -import org.elassandra.cluster.routing.AbstractSearchStrategy; -import org.elassandra.cluster.routing.PrimaryFirstSearchStrategy; +import org.elassandra.index.MessageFormatPartitionFunction; +import org.elassandra.index.PartitionFunction; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.Version; import org.elasticsearch.cluster.Diff; @@ -187,6 +188,7 @@ public static State fromString(String state) { public static final String SETTING_SECONDARY_INDEX_CLASS = "index.secondary_index_class"; public static final String SETTING_SEARCH_STRATEGY_CLASS = "index.search_strategy_class"; public static final String SETTING_PARTITION_FUNCTION = "index.partition_function"; + public static final String SETTING_PARTITION_FUNCTION_CLASS = "index.partition_function_class"; public static final String SETTING_INCLUDE_NODE_ID = "index.include_node_id"; public static final String SETTING_SYNCHRONOUS_REFRESH = "index.synchronous_refresh"; public static final String SETTING_DROP_ON_DELETE_INDEX = "index.drop_on_delete_index"; @@ -418,6 +420,11 @@ public String[] partitionFunction() { return null; } + public PartitionFunction partitionFunctionClass() { + String partFuncClass = getSettings().get(IndexMetaData.SETTING_PARTITION_FUNCTION_CLASS, MessageFormatPartitionFunction.class.getName()); + return FBUtilities.instanceOrConstruct(partFuncClass, "PartitionFunction class used to generate index name from document fields"); + } + public ImmutableOpenMap getAliases() { return this.aliases; } diff --git a/core/src/main/java/org/elasticsearch/index/mapper/core/BinaryFieldMapper.java b/core/src/main/java/org/elasticsearch/index/mapper/core/BinaryFieldMapper.java index e4d83bea3a7..1fe3d0fa6f2 100644 --- a/core/src/main/java/org/elasticsearch/index/mapper/core/BinaryFieldMapper.java +++ b/core/src/main/java/org/elasticsearch/index/mapper/core/BinaryFieldMapper.java @@ -29,7 +29,6 @@ import java.util.Map; import java.util.Objects; -import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.lucene.document.Field; import org.apache.lucene.index.IndexOptions; import org.apache.lucene.store.ByteArrayDataOutput; diff --git a/core/src/test/java/org/elassandra/CqlTypesTests.java b/core/src/test/java/org/elassandra/CqlTypesTests.java index fa11ca8321c..cbe5f93e03d 100644 --- a/core/src/test/java/org/elassandra/CqlTypesTests.java +++ b/core/src/test/java/org/elassandra/CqlTypesTests.java @@ -18,8 +18,10 @@ import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.hamcrest.Matchers.equalTo; +import java.net.InetAddress; import java.nio.ByteBuffer; import java.text.SimpleDateFormat; +import java.util.Date; import java.util.Locale; import java.util.Map; import java.util.TimeZone; @@ -28,6 +30,7 @@ import org.apache.cassandra.db.ConsistencyLevel; import org.apache.cassandra.db.marshal.DoubleType; import org.apache.cassandra.db.marshal.TupleType; +import org.apache.cassandra.service.StorageService; import org.elassandra.cluster.InternalCassandraClusterService; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.common.geo.GeoPoint; @@ -36,6 +39,8 @@ import org.elasticsearch.test.ESSingleNodeTestCase; import org.junit.Test; +import com.google.common.net.InetAddresses; + /** * Elassandra CQL types mapping tests. * @author vroyer @@ -112,6 +117,89 @@ public void testAllTypesTest() throws Exception { assertThat(fields.get("c8"),equalTo(false)); } + // mvn test -Pdev -pl com.strapdata:elassandra -Dtests.seed=622A2B0618CE4676 -Dtests.class=org.elassandra.CqlTypesTests -Dtests.method="testSinglePkTypesTest" -Des.logger.level=ERROR -Dtests.assertion.disabled=false -Dtests.security.manager=false -Dtests.heap.size=1024m -Dtests.locale=ro-RO -Dtests.timezone=America/Toronto + @Test + public void testSinglePkTypesTest() throws Exception { + createIndex("ks1"); + ensureGreen("ks1"); + + String[] types = new String[] { "text","int","bigint","double","float","boolean","blob","timestamp","inet","uuid" }; + Object[] values = new Object[] { "foo",1,2L, new Double(3.14), new Float(3.14), true, ByteBuffer.wrap("toto".getBytes("UTF-8")), new Date(), InetAddresses.forString("127.0.0.1"), UUID.randomUUID() }; + for(int i=0; i < types.length; i++) { + String type = types[i]; + Object value = values[i]; + System.out.println("insert pk type="+type); + process(ConsistencyLevel.ONE,String.format(Locale.ROOT,"CREATE TABLE ks1.t%s (pk%s %s PRIMARY KEY, v text)", type, type, type)); + process(ConsistencyLevel.ONE,String.format(Locale.ROOT,"INSERT INTO ks1.t%s (pk%s, v) VALUES (?, 'foobar')", type, type), value); + } + + // flush for rebuild_index + StorageService.instance.forceKeyspaceFlush("ks1"); + for(int i=0; i < types.length; i++) { + String type = types[i]; + System.out.println("discover pk type="+type); + assertAcked(client().admin().indices() + .preparePutMapping("ks1") + .setType(String.format(Locale.ROOT,"t%s",type)) + .setSource(String.format(Locale.ROOT,"{ \"t%s\" : { \"discover\" : \".*\" }}",type)).get()); + } + + // search + Thread.sleep(2000); + for(int i=0; i < types.length; i++) { + String type = types[i]; + System.out.println("search pk type="+type); + + assertThat(client().prepareSearch() + .setIndices("ks1") + .setTypes(String.format(Locale.ROOT,"t%s",type)) + .setQuery(QueryBuilders.queryStringQuery("*:*")) + .get().getHits().getTotalHits(), equalTo(1L)); + } + } + + // mvn test -Pdev -pl com.strapdata:elassandra -Dtests.seed=622A2B0618CE4676 -Dtests.class=org.elassandra.CqlTypesTests -Dtests.method="testCompoundPkTypesTest" -Des.logger.level=ERROR -Dtests.assertion.disabled=false -Dtests.security.manager=false -Dtests.heap.size=1024m -Dtests.locale=ro-RO -Dtests.timezone=America/Toronto + @Test + public void testCompoundPkTypesTest() throws Exception { + createIndex("ks2"); + ensureGreen("ks2"); + + String[] types = new String[] { "text", "int","bigint","double","float","boolean","blob","timestamp","inet","uuid" }; + Object[] values = new Object[] { "foo", 1, 2L, new Double(3.14), new Float(3.14), true, ByteBuffer.wrap("toto".getBytes("UTF-8")), new Date(), InetAddresses.forString("127.0.0.1"), UUID.randomUUID() }; + int randomCk = this.randomInt(types.length); + int randomVal= this.randomInt(types.length); + for(int i=0; i < types.length; i++) { + String type = types[i]; + System.out.println("insert pk type="+type); + process(ConsistencyLevel.ONE,String.format(Locale.ROOT,"CREATE TABLE ks2.t%s (pk%s %s, ck %s, v %s, PRIMARY KEY (pk%s,ck))", type, type, type, types[randomCk], types[randomVal], type)); + process(ConsistencyLevel.ONE,String.format(Locale.ROOT,"INSERT INTO ks2.t%s (pk%s, ck, v) VALUES (?, ?, ?)", type, type), values[i], values[randomCk], values[randomVal]); + } + + // flush for rebuild_index + StorageService.instance.forceKeyspaceFlush("ks2"); + for(int i=0; i < types.length; i++) { + String type = types[i]; + System.out.println("discover pk type="+type); + assertAcked(client().admin().indices() + .preparePutMapping("ks2") + .setType(String.format(Locale.ROOT,"t%s",type)) + .setSource(String.format(Locale.ROOT,"{ \"t%s\" : { \"discover\" : \".*\" }}",type)).get()); + } + + // search + Thread.sleep(2000); + for(int i=0; i < types.length; i++) { + String type = types[i]; + System.out.println("search pk type="+type); + + assertThat(client().prepareSearch() + .setIndices("ks2") + .setTypes(String.format(Locale.ROOT,"t%s",type)) + .setQuery(QueryBuilders.queryStringQuery("*:*")) + .get().getHits().getTotalHits(), equalTo(1L)); + } + } + @Test public void testTextGeohashMapping() throws Exception { createIndex("test"); diff --git a/core/src/test/java/org/elassandra/PartitionedIndexTests.java b/core/src/test/java/org/elassandra/PartitionedIndexTests.java new file mode 100644 index 00000000000..241f1d8d200 --- /dev/null +++ b/core/src/test/java/org/elassandra/PartitionedIndexTests.java @@ -0,0 +1,93 @@ +/* + * Copyright (c) 2017 Strapdata (http://www.strapdata.com) + * Contains some code from Elasticsearch (http://www.elastic.co) + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.elassandra; + +import static org.hamcrest.Matchers.equalTo; + +import java.util.Locale; + +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.db.ConsistencyLevel; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentFactory; +import org.elasticsearch.index.query.QueryBuilders; +import org.elasticsearch.test.ESSingleNodeTestCase; +import org.junit.Test; + +/** + * Elassandra partitioned index tests. + * @author vroyer + * + */ +//mvn test -Pdev -pl com.strapdata:elassandra -Dtests.seed=622A2B0618CE4676 -Dtests.class=org.elassandra.PartitionedIndexTests -Des.logger.level=ERROR -Dtests.assertion.disabled=false -Dtests.security.manager=false -Dtests.heap.size=1024m -Dtests.locale=ro-RO -Dtests.timezone=America/Toronto +public class PartitionedIndexTests extends ESSingleNodeTestCase { + + @Test + public void basicPartitionFunctionTest() throws Exception { + process(ConsistencyLevel.ONE,String.format(Locale.ROOT, "CREATE KEYSPACE ks WITH replication = {'class': 'NetworkTopologyStrategy', '%s': '1'}",DatabaseDescriptor.getLocalDataCenter())); + process(ConsistencyLevel.ONE,"CREATE TABLE ks.t1 ( name text, age int, primary key (name))"); + + XContentBuilder mapping = XContentFactory.jsonBuilder() + .startObject() + .startObject("t1") + .field("discover", ".*") + .endObject() + .endObject(); + + for(long i=20; i < 30; i++) { + createIndex("ks_"+i, Settings.builder().put("index.keyspace","ks") + .put("index.partition_function", "byage ks_{0,number,##} age") + .build(),"t1", mapping); + ensureGreen("ks_"+i); + } + for(long i=20; i < 30; i++) { + for(int j=0; j < i; j++) + process(ConsistencyLevel.ONE,String.format(Locale.ROOT, "INSERT INTO ks.t1 (name, age) VALUES ('name%d-%d', %d)",i,j,i)); + } + + for(long i=20; i < 30; i++) + assertThat(client().prepareSearch().setIndices("ks_"+i).setTypes("t1").setQuery(QueryBuilders.queryStringQuery("*:*")).get().getHits().getTotalHits(), equalTo(i)); + } + + @Test + public void basicStringPartitionFunctionTest() throws Exception { + process(ConsistencyLevel.ONE,String.format(Locale.ROOT, "CREATE KEYSPACE ks WITH replication = {'class': 'NetworkTopologyStrategy', '%s': '1'}",DatabaseDescriptor.getLocalDataCenter())); + process(ConsistencyLevel.ONE,"CREATE TABLE ks.t1 ( name text, age int, primary key (name))"); + + XContentBuilder mapping = XContentFactory.jsonBuilder() + .startObject() + .startObject("t1") + .field("discover", ".*") + .endObject() + .endObject(); + + for(long i=20; i < 30; i++) { + createIndex("ks_"+i, Settings.builder().put("index.keyspace","ks") + .put("index.partition_function", "byage ks_%d age") + .put("index.partition_function_class", "org.elassandra.index.StringPartitionFunction") + .build(),"t1", mapping); + ensureGreen("ks_"+i); + } + for(long i=20; i < 30; i++) { + for(int j=0; j < i; j++) + process(ConsistencyLevel.ONE,String.format(Locale.ROOT, "INSERT INTO ks.t1 (name, age) VALUES ('name%d-%d', %d)",i,j,i)); + } + + for(long i=20; i < 30; i++) + assertThat(client().prepareSearch().setIndices("ks_"+i).setTypes("t1").setQuery(QueryBuilders.queryStringQuery("*:*")).get().getHits().getTotalHits(), equalTo(i)); + } +} diff --git a/docs/elassandra/source/limitations.rst b/docs/elassandra/source/limitations.rst index 18ece9f6a97..273a23636a9 100644 --- a/docs/elassandra/source/limitations.rst +++ b/docs/elassandra/source/limitations.rst @@ -111,6 +111,11 @@ When such a modification occurs, Elassandra keeps this change in memory to corre Morever, Cassandra table names are limited to 48 caraters, so Elasticsearch type names are also limted to 48 caraters. +Column names +------------ + +For Elasticsearch, field mapping is unique in an index. So, two columns having the same name, indexed in an index, should have the same CQL type and share the same Elasticsearch mapping. + Elasticsearch unsupported feature ---------------------------------