diff --git a/CHANGES.txt b/CHANGES.txt index 1ca26432a7d..0d2049605bf 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,3 +1,6 @@ +2.4.2-8 - 2017-02-15 + * Fix support for uuid and timeuuid Cassandra Types #74 + 2.4.2-7 - 2017-02-09 * Enable support for multi-threading rebuild_index on Cassandra 3. * Add new _meta fields in order to index static columns. diff --git a/core/src/main/java/org/apache/cassandra/security/SSLFactory.java b/core/cassandra/src/java/org/apache/cassandra/security/SSLFactory.java similarity index 51% rename from core/src/main/java/org/apache/cassandra/security/SSLFactory.java rename to core/cassandra/src/java/org/apache/cassandra/security/SSLFactory.java index 8de5910d96f..a327de9f1a4 100644 --- a/core/src/main/java/org/apache/cassandra/security/SSLFactory.java +++ b/core/cassandra/src/java/org/apache/cassandra/security/SSLFactory.java @@ -29,7 +29,6 @@ import java.util.Enumeration; import java.util.List; -import javax.net.ssl.KeyManager; import javax.net.ssl.KeyManagerFactory; import javax.net.ssl.SSLContext; import javax.net.ssl.SSLServerSocket; @@ -57,41 +56,17 @@ public final class SSLFactory public static final String[] ACCEPTED_PROTOCOLS = new String[] {"SSLv2Hello", "TLSv1", "TLSv1.1", "TLSv1.2"}; private static boolean checkedExpiry = false; - private static SSLContext ctx; - private static TrustManager trustManagers[] = null; - private static KeyManager keyManagers[] = null; - - public static SSLContext getSSLContext() { - return ctx; - } - - public static TrustManager[] getTrustManagers() { - return trustManagers; - } - - public static KeyManager[] getKetManagers() { - return keyManagers; - } - public static SSLServerSocket getServerSocket(EncryptionOptions options, InetAddress address, int port) throws IOException { SSLContext ctx = createSSLContext(options, true); - SSLServerSocket serverSocket = (SSLServerSocket) ctx.getServerSocketFactory().createServerSocket(); - try - { - serverSocket.setReuseAddress(true); - String[] suites = filterCipherSuites(serverSocket.getSupportedCipherSuites(), options.cipher_suites); - serverSocket.setEnabledCipherSuites(suites); - serverSocket.setNeedClientAuth(options.require_client_auth); - serverSocket.setEnabledProtocols(ACCEPTED_PROTOCOLS); - serverSocket.bind(new InetSocketAddress(address, port), 500); - return serverSocket; - } - catch (IllegalArgumentException | SecurityException | IOException e) - { - serverSocket.close(); - throw e; - } + SSLServerSocket serverSocket = (SSLServerSocket)ctx.getServerSocketFactory().createServerSocket(); + serverSocket.setReuseAddress(true); + String[] suites = filterCipherSuites(serverSocket.getSupportedCipherSuites(), options.cipher_suites); + serverSocket.setEnabledCipherSuites(suites); + serverSocket.setNeedClientAuth(options.require_client_auth); + serverSocket.setEnabledProtocols(ACCEPTED_PROTOCOLS); + serverSocket.bind(new InetSocketAddress(address, port), 500); + return serverSocket; } /** Create a socket and connect */ @@ -99,18 +74,10 @@ public static SSLSocket getSocket(EncryptionOptions options, InetAddress address { SSLContext ctx = createSSLContext(options, true); SSLSocket socket = (SSLSocket) ctx.getSocketFactory().createSocket(address, port, localAddress, localPort); - try - { - String[] suites = filterCipherSuites(socket.getSupportedCipherSuites(), options.cipher_suites); - socket.setEnabledCipherSuites(suites); - socket.setEnabledProtocols(ACCEPTED_PROTOCOLS); - return socket; - } - catch (IllegalArgumentException e) - { - socket.close(); - throw e; - } + String[] suites = filterCipherSuites(socket.getSupportedCipherSuites(), options.cipher_suites); + socket.setEnabledCipherSuites(suites); + socket.setEnabledProtocols(ACCEPTED_PROTOCOLS); + return socket; } /** Create a socket and connect, using any local address */ @@ -118,18 +85,10 @@ public static SSLSocket getSocket(EncryptionOptions options, InetAddress address { SSLContext ctx = createSSLContext(options, true); SSLSocket socket = (SSLSocket) ctx.getSocketFactory().createSocket(address, port); - try - { - String[] suites = filterCipherSuites(socket.getSupportedCipherSuites(), options.cipher_suites); - socket.setEnabledCipherSuites(suites); - socket.setEnabledProtocols(ACCEPTED_PROTOCOLS); - return socket; - } - catch (IllegalArgumentException e) - { - socket.close(); - throw e; - } + String[] suites = filterCipherSuites(socket.getSupportedCipherSuites(), options.cipher_suites); + socket.setEnabledCipherSuites(suites); + socket.setEnabledProtocols(ACCEPTED_PROTOCOLS); + return socket; } /** Just create a socket */ @@ -137,18 +96,10 @@ public static SSLSocket getSocket(EncryptionOptions options) throws IOException { SSLContext ctx = createSSLContext(options, true); SSLSocket socket = (SSLSocket) ctx.getSocketFactory().createSocket(); - try - { - String[] suites = filterCipherSuites(socket.getSupportedCipherSuites(), options.cipher_suites); - socket.setEnabledCipherSuites(suites); - socket.setEnabledProtocols(ACCEPTED_PROTOCOLS); - return socket; - } - catch (IllegalArgumentException e) - { - socket.close(); - throw e; - } + String[] suites = filterCipherSuites(socket.getSupportedCipherSuites(), options.cipher_suites); + socket.setEnabledCipherSuites(suites); + socket.setEnabledProtocols(ACCEPTED_PROTOCOLS); + return socket; } @SuppressWarnings("resource") @@ -156,53 +107,53 @@ public static SSLContext createSSLContext(EncryptionOptions options, boolean bui { FileInputStream tsf = null; FileInputStream ksf = null; - - if (ctx == null) { - try + SSLContext ctx; + try + { + ctx = SSLContext.getInstance(options.protocol); + TrustManager[] trustManagers = null; + + if(buildTruststore) { - ctx = SSLContext.getInstance(options.protocol); - - if(buildTruststore) - { - tsf = new FileInputStream(options.truststore); - TrustManagerFactory tmf = TrustManagerFactory.getInstance(options.algorithm); - KeyStore ts = KeyStore.getInstance(options.store_type); - ts.load(tsf, options.truststore_password.toCharArray()); - tmf.init(ts); - trustManagers = tmf.getTrustManagers(); - } - - ksf = new FileInputStream(options.keystore); - KeyManagerFactory kmf = KeyManagerFactory.getInstance(options.algorithm); - KeyStore ks = KeyStore.getInstance(options.store_type); - ks.load(ksf, options.keystore_password.toCharArray()); - if (!checkedExpiry) + tsf = new FileInputStream(options.truststore); + TrustManagerFactory tmf = TrustManagerFactory.getInstance(options.algorithm); + KeyStore ts = KeyStore.getInstance(options.store_type); + ts.load(tsf, options.truststore_password.toCharArray()); + tmf.init(ts); + trustManagers = tmf.getTrustManagers(); + } + + ksf = new FileInputStream(options.keystore); + KeyManagerFactory kmf = KeyManagerFactory.getInstance(options.algorithm); + KeyStore ks = KeyStore.getInstance(options.store_type); + ks.load(ksf, options.keystore_password.toCharArray()); + if (!checkedExpiry) + { + for (Enumeration aliases = ks.aliases(); aliases.hasMoreElements(); ) { - for (Enumeration aliases = ks.aliases(); aliases.hasMoreElements(); ) + String alias = aliases.nextElement(); + if (ks.getCertificate(alias).getType().equals("X.509")) { - String alias = aliases.nextElement(); - if (ks.getCertificate(alias).getType().equals("X.509")) - { - Date expires = ((X509Certificate) ks.getCertificate(alias)).getNotAfter(); - if (expires.before(new Date())) - logger.warn("Certificate for {} expired on {}", alias, expires); - } + Date expires = ((X509Certificate) ks.getCertificate(alias)).getNotAfter(); + if (expires.before(new Date())) + logger.warn("Certificate for {} expired on {}", alias, expires); } - checkedExpiry = true; } - kmf.init(ks, options.keystore_password.toCharArray()); - keyManagers = kmf.getKeyManagers(); - ctx.init(kmf.getKeyManagers(), trustManagers, null); - } - catch (Exception e) - { - throw new IOException("Error creating the initializing the SSL Context", e); - } - finally - { - FileUtils.closeQuietly(tsf); - FileUtils.closeQuietly(ksf); + checkedExpiry = true; } + kmf.init(ks, options.keystore_password.toCharArray()); + + ctx.init(kmf.getKeyManagers(), trustManagers, null); + + } + catch (Exception e) + { + throw new IOException("Error creating the initializing the SSL Context", e); + } + finally + { + FileUtils.closeQuietly(tsf); + FileUtils.closeQuietly(ksf); } return ctx; } diff --git a/core/src/main/java/org/elassandra/cluster/InternalCassandraClusterService.java b/core/src/main/java/org/elassandra/cluster/InternalCassandraClusterService.java index 76112cdf469..de8ad3acd1f 100644 --- a/core/src/main/java/org/elassandra/cluster/InternalCassandraClusterService.java +++ b/core/src/main/java/org/elassandra/cluster/InternalCassandraClusterService.java @@ -66,11 +66,9 @@ import org.apache.cassandra.db.marshal.ListType; import org.apache.cassandra.db.marshal.MapType; import org.apache.cassandra.db.marshal.SetType; -import org.apache.cassandra.db.marshal.TimeUUIDType; import org.apache.cassandra.db.marshal.TupleType; import org.apache.cassandra.db.marshal.TypeParser; import org.apache.cassandra.db.marshal.UTF8Type; -import org.apache.cassandra.db.marshal.UUIDType; import org.apache.cassandra.db.marshal.UserType; import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; @@ -348,6 +346,9 @@ public InternalCassandraClusterService(Settings settings, DiscoveryService disco updateMetaDataQuery = String.format(Locale.ROOT, "UPDATE \"%s\".\"%s\" SET owner = ?, version = ?, metadata = ? WHERE cluster_name = ? IF version < ?", elasticAdminKeyspaceName, ELASTIC_ADMIN_METADATA_TABLE); } + public String getElasticAdminKeyspaceName() { + return this.elasticAdminKeyspaceName; + } public boolean isNativeCql3Type(String cqlType) { return cqlMapping.keySet().contains(cqlType) && !cqlType.startsWith("geo_"); @@ -396,35 +397,40 @@ public AbstractSearchStrategy.Router getRouter(IndexMetaData indexMetaData, Clus return router; } - - /** - * Binds values with query and executes with consistency level. - * - * @param cl - * @param query - * @param values - * @return - * @throws RequestExecutionException - * @throws RequestValidationException - */ - public UntypedResultSet process(final ConsistencyLevel cl, final ConsistencyLevel serialConsistencyLevel, final String query, final Object... values) throws RequestExecutionException, - RequestValidationException, InvalidRequestException { - return process(cl, serialConsistencyLevel, query, new Long(0), values); + @Override + public UntypedResultSet process(final ConsistencyLevel cl, final ClientState clientState, final String query) + throws RequestExecutionException, RequestValidationException, InvalidRequestException { + return process(cl, null, clientState, query, new Long(0), new Object[] {}); } - public UntypedResultSet process(final ConsistencyLevel cl, final ConsistencyLevel serialConsistencyLevel, final String query, Long writetime, final Object... values) + @Override + public UntypedResultSet process(final ConsistencyLevel cl, final ClientState clientState, final String query, Object... values) + throws RequestExecutionException, RequestValidationException, InvalidRequestException { + return process(cl, null, clientState, query, new Long(0), values); + } + + public UntypedResultSet process(final ConsistencyLevel cl, final ConsistencyLevel serialConsistencyLevel, final String query, final Object... values) throws RequestExecutionException, RequestValidationException, InvalidRequestException { - if (logger.isDebugEnabled()) { + return process(cl, serialConsistencyLevel, ClientState.forInternalCalls(), query, new Long(0), values); + } + + public UntypedResultSet process(final ConsistencyLevel cl, final ConsistencyLevel serialConsistencyLevel, final String query, Long writetime, final Object... values) { + return process(cl, serialConsistencyLevel, ClientState.forInternalCalls(), query, writetime, values); + } + + public UntypedResultSet process(final ConsistencyLevel cl, final ConsistencyLevel serialConsistencyLevel, final ClientState clientState, final String query, Long writetime, final Object... values) + throws RequestExecutionException, RequestValidationException, InvalidRequestException { + if (logger.isDebugEnabled()) logger.debug("processing CL={} SERIAL_CL={} query={}", cl, serialConsistencyLevel, query); - } - ParsedStatement.Prepared prepared = QueryProcessor.getStatement(query, ClientState.forInternalCalls()); + + ParsedStatement.Prepared prepared = QueryProcessor.getStatement(query, clientState); List boundValues = new ArrayList(values.length); for (int i = 0; i < values.length; i++) { Object v = values[i]; AbstractType type = prepared.boundNames.get(i).type; boundValues.add(v instanceof ByteBuffer || v == null ? (ByteBuffer) v : type.decompose(v)); } - QueryState queryState = QueryState.forInternalCalls(); + QueryState queryState = new QueryState(clientState); QueryOptions queryOptions = QueryOptions.forInternalCalls(cl, serialConsistencyLevel, boundValues); ResultMessage result = QueryProcessor.instance.process(query, queryState, queryOptions); writetime = queryState.getTimestamp(); @@ -434,15 +440,9 @@ public UntypedResultSet process(final ConsistencyLevel cl, final ConsistencyLeve return null; } - public UntypedResultSet process(final ConsistencyLevel cl, final String query) throws RequestExecutionException, RequestValidationException, InvalidRequestException { - return process(cl, null, query, new Object[] {}); - } - - public UntypedResultSet process(final ConsistencyLevel cl, final String query, Object... values) throws RequestExecutionException, RequestValidationException, InvalidRequestException { - return process(cl, null, query, values); - } - - public boolean processConditional(final ConsistencyLevel cl, final ConsistencyLevel serialCl, final String query, Object... values) throws RequestExecutionException, RequestValidationException, InvalidRequestException { + + public boolean processConditional(final ConsistencyLevel cl, final ConsistencyLevel serialCl, final String query, Object... values) + throws RequestExecutionException, RequestValidationException, InvalidRequestException { try { UntypedResultSet result = process(cl, serialCl, query, values); if (serialCl != null) { @@ -478,7 +478,8 @@ public void createIndexKeyspace(final String ksname, final int replicationFactor } try { - process(ConsistencyLevel.LOCAL_ONE, String.format(Locale.ROOT, "CREATE KEYSPACE IF NOT EXISTS \"%s\" WITH replication = {'class':'NetworkTopologyStrategy', '%s':'%d' };", + process(ConsistencyLevel.LOCAL_ONE, ClientState.forInternalCalls(), + String.format(Locale.ROOT, "CREATE KEYSPACE IF NOT EXISTS \"%s\" WITH replication = {'class':'NetworkTopologyStrategy', '%s':'%d' };", ksname, DatabaseDescriptor.getLocalDataCenter(), replicationFactor)); } catch (Throwable e) { throw new IOException(e.getMessage(), e); @@ -1365,7 +1366,7 @@ public Map> flattenTree(final Set neededFiedls, fin public boolean rowExists(final MapperService mapperService, final String type, final String id) throws InvalidRequestException, RequestExecutionException, RequestValidationException, IOException { DocPrimaryKey docPk = parseElasticId(mapperService.index().name(), type, id); - return process(ConsistencyLevel.LOCAL_ONE, buildExistsQuery(mapperService.documentMapper(type), mapperService.keyspace(), typeToCfName(type), id), docPk.values).size() > 0; + return process(ConsistencyLevel.LOCAL_ONE, ClientState.forInternalCalls(), buildExistsQuery(mapperService.documentMapper(type), mapperService.keyspace(), typeToCfName(type), id), docPk.values).size() > 0; } /* @@ -1397,7 +1398,7 @@ public UntypedResultSet fetchRow(final String ksName,final String index, final S @Override public UntypedResultSet fetchRow(final String ksName, final String index, final String type, final DocPrimaryKey docPk, final String[] columns, final ConsistencyLevel cl, Map columnDefs) throws InvalidRequestException, RequestExecutionException, RequestValidationException, IOException { - return process(cl, buildFetchQuery(ksName, index, type, columns, docPk.isStaticDocument, columnDefs), docPk. values); + return process(cl, ClientState.forInternalCalls(), buildFetchQuery(ksName, index, type, columns, docPk.isStaticDocument, columnDefs), docPk. values); } public Engine.GetResult fetchSourceInternal(final String ksName, String index, String type, String id, Map columnDefs) throws IOException { @@ -1575,7 +1576,7 @@ public void deleteRow(final String index, final String type, final String id, fi IndexService indexService = this.indexServiceSafe(index); String cfName = typeToCfName(type); DocumentMapper docMapper = indexService.mapperService().documentMapper(type); - process(cl, buildDeleteQuery(docMapper, indexService.keyspace(), cfName, id), parseElasticId(index, type, id).values); + process(cl, ClientState.forInternalCalls(), buildDeleteQuery(docMapper, indexService.keyspace(), cfName, id), parseElasticId(index, type, id).values); } @Override @@ -1962,7 +1963,7 @@ public void upsertDocument(final IndicesService indicesService, final IndexReque } - map.put(field, serializeType(request.index(), cfName, cd.type, field, fieldValue, mapper)); + map.put(field, serialize(request.index(), cfName, cd.type, field, fieldValue, mapper)); } catch (Exception e) { logger.error("[{}].[{}] failed to parse field {}={}", e, request.index(), cfName, field, fieldValue ); throw e; @@ -2000,7 +2001,7 @@ public void upsertDocument(final IndicesService indicesService, final IndexReque (request.ttl() != null) ? request.ttl().getSeconds() : null, timestamp, values, 0); - process(request.consistencyLevel().toCassandraConsistencyLevel(), null, query, values); + process(request.consistencyLevel().toCassandraConsistencyLevel(), ClientState.forInternalCalls(), query, values); } } @@ -2283,7 +2284,7 @@ public MetaData checkForNewMetaData(Long expectedVersion) throws NoPersistedMeta public MetaData readMetaDataAsRow(ConsistencyLevel cl) throws NoPersistedMetaDataException { UntypedResultSet result; try { - result = process(cl, selectMetadataQuery, DatabaseDescriptor.getClusterName()); + result = process(cl, ClientState.forInternalCalls(), selectMetadataQuery, DatabaseDescriptor.getClusterName()); Row row = result.one(); if (row != null && row.has("metadata")) { return parseMetaDataString(row.getString("metadata")); @@ -2327,15 +2328,15 @@ public void createOrUpdateElasticAdminKeyspace() { String createKeyspace = String.format(Locale.ROOT, "CREATE KEYSPACE IF NOT EXISTS \"%s\" WITH replication = %s;", elasticAdminKeyspaceName, FBUtilities.json(replication).replaceAll("\"", "'")); logger.info(createKeyspace); - process(ConsistencyLevel.LOCAL_ONE, createKeyspace); + process(ConsistencyLevel.LOCAL_ONE, ClientState.forInternalCalls(), createKeyspace); String createTable = String.format(Locale.ROOT, "CREATE TABLE IF NOT EXISTS \"%s\".%s ( cluster_name text PRIMARY KEY, owner uuid, version bigint, metadata text) WITH comment='%s';", elasticAdminKeyspaceName, ELASTIC_ADMIN_METADATA_TABLE, MetaData.Builder.toXContent(metadata)); logger.info(createTable); - process(ConsistencyLevel.LOCAL_ONE, createTable); + process(ConsistencyLevel.LOCAL_ONE, ClientState.forInternalCalls(), createTable); // initialize a first row if needed - process(ConsistencyLevel.LOCAL_ONE, insertMetadataQuery, + process(ConsistencyLevel.LOCAL_ONE, ClientState.forInternalCalls(), insertMetadataQuery, DatabaseDescriptor.getClusterName(), UUID.fromString(StorageService.instance.getLocalHostId()), metadata.version(), metaDataString); logger.info("Succefully initialize {}.{} = {}", elasticAdminKeyspaceName, ELASTIC_ADMIN_METADATA_TABLE,metaDataString); writeMetaDataAsComment(metaDataString); @@ -2360,7 +2361,7 @@ public void createOrUpdateElasticAdminKeyspace() { String query = String.format(Locale.ROOT, "ALTER KEYSPACE \"%s\" WITH replication = %s", elasticAdminKeyspaceName, FBUtilities.json(replication).replaceAll("\"", "'")); logger.info(query); - process(ConsistencyLevel.LOCAL_ONE, query); + process(ConsistencyLevel.LOCAL_ONE, ClientState.forInternalCalls(), query); } catch (Throwable e) { logger.error("Failed to alter keyspace [{}]", e, this.elasticAdminKeyspaceName); throw e; @@ -2477,6 +2478,9 @@ private static Object value(FieldMapper mapper, Object value) throws IOException } else if (mapper instanceof SourceFieldMapper) { byte[] bytes = (byte[]) ((SourceFieldMapper)mapper).fieldType().value(value); return ByteBuffer.wrap(bytes, 0, bytes.length); + } else if (value instanceof UUID) { + // workaround for #74 + return value; } else { Object v = mapper.fieldType().value(value); if (v instanceof Uid) { @@ -2546,7 +2550,7 @@ public static Collection flattenCollection(Collection c) { * @throws JsonMappingException * @throws JsonGenerationException */ - public static ByteBuffer serializeType(final String ksName, final String cfName, final AbstractType type, final String name, final Object value, final Mapper mapper) + public static ByteBuffer serialize(final String ksName, final String cfName, final AbstractType type, final String name, final Object value, final Mapper mapper) throws SyntaxException, ConfigurationException, JsonGenerationException, JsonMappingException, IOException { if (value == null) { return null; @@ -2566,15 +2570,15 @@ public static ByteBuffer serializeType(final String ksName, final String cfName, Map mapValue = (Map) value; geoPoint.reset((Double)mapValue.get(GeoPointFieldMapper.Names.LAT), (Double)mapValue.get(GeoPointFieldMapper.Names.LON)); } - components[i++]=serializeType(ksName, cfName, udt.fieldType(0), GeoPointFieldMapper.Names.LAT, geoPoint.lat(), null); - components[i++]=serializeType(ksName, cfName, udt.fieldType(1), GeoPointFieldMapper.Names.LON, geoPoint.lon(), null); + components[i++]=serialize(ksName, cfName, udt.fieldType(0), GeoPointFieldMapper.Names.LAT, geoPoint.lat(), null); + components[i++]=serialize(ksName, cfName, udt.fieldType(1), GeoPointFieldMapper.Names.LON, geoPoint.lon(), null); } else if (COMPLETION_TYPE.equals(ByteBufferUtil.string(udt.name))) { // input list, output text, weight int, payload text Map mapValue = (Map) value; - components[i++]=(mapValue.get(Fields.CONTENT_FIELD_NAME_INPUT) == null) ? null : serializeType(ksName, cfName, udt.fieldType(0), Fields.CONTENT_FIELD_NAME_INPUT, mapValue.get(Fields.CONTENT_FIELD_NAME_INPUT), null); - components[i++]=(mapValue.get(Fields.CONTENT_FIELD_NAME_OUTPUT) == null) ? null : serializeType(ksName, cfName, udt.fieldType(1), Fields.CONTENT_FIELD_NAME_OUTPUT, mapValue.get(Fields.CONTENT_FIELD_NAME_OUTPUT), null); - components[i++]=(mapValue.get(Fields.CONTENT_FIELD_NAME_WEIGHT) == null) ? null : serializeType(ksName, cfName, udt.fieldType(2), Fields.CONTENT_FIELD_NAME_WEIGHT, new Long((Integer) mapValue.get(Fields.CONTENT_FIELD_NAME_WEIGHT)), null); - components[i++]=(mapValue.get(Fields.CONTENT_FIELD_NAME_PAYLOAD) == null) ? null : serializeType(ksName, cfName, udt.fieldType(3), Fields.CONTENT_FIELD_NAME_PAYLOAD, stringify(mapValue.get(Fields.CONTENT_FIELD_NAME_PAYLOAD)), null); + components[i++]=(mapValue.get(Fields.CONTENT_FIELD_NAME_INPUT) == null) ? null : serialize(ksName, cfName, udt.fieldType(0), Fields.CONTENT_FIELD_NAME_INPUT, mapValue.get(Fields.CONTENT_FIELD_NAME_INPUT), null); + components[i++]=(mapValue.get(Fields.CONTENT_FIELD_NAME_OUTPUT) == null) ? null : serialize(ksName, cfName, udt.fieldType(1), Fields.CONTENT_FIELD_NAME_OUTPUT, mapValue.get(Fields.CONTENT_FIELD_NAME_OUTPUT), null); + components[i++]=(mapValue.get(Fields.CONTENT_FIELD_NAME_WEIGHT) == null) ? null : serialize(ksName, cfName, udt.fieldType(2), Fields.CONTENT_FIELD_NAME_WEIGHT, new Long((Integer) mapValue.get(Fields.CONTENT_FIELD_NAME_WEIGHT)), null); + components[i++]=(mapValue.get(Fields.CONTENT_FIELD_NAME_PAYLOAD) == null) ? null : serialize(ksName, cfName, udt.fieldType(3), Fields.CONTENT_FIELD_NAME_PAYLOAD, stringify(mapValue.get(Fields.CONTENT_FIELD_NAME_PAYLOAD)), null); } else { Map mapValue = (Map) value; for (int j = 0; j < udt.size(); j++) { @@ -2582,7 +2586,7 @@ public static ByteBuffer serializeType(final String ksName, final String cfName, AbstractType subType = udt.fieldType(j); Object subValue = mapValue.get(subName); Mapper subMapper = (mapper instanceof ObjectMapper) ? ((ObjectMapper) mapper).getMapper(subName) : null; - components[i++]=serializeType(ksName, cfName, subType, subName, subValue, subMapper); + components[i++]=serialize(ksName, cfName, subType, subName, subValue, subMapper); } } return TupleType.buildValue(components); @@ -2600,8 +2604,8 @@ public static ByteBuffer serializeType(final String ksName, final String cfName, UserType udt = (UserType)elementType; List values = (List)value; ByteBuffer[] elements = new ByteBuffer[] { - serializeType(ksName, cfName, udt.fieldType(0), GeoPointFieldMapper.Names.LAT, values.get(1), null), - serializeType(ksName, cfName, udt.fieldType(1), GeoPointFieldMapper.Names.LON, values.get(0), null) + serialize(ksName, cfName, udt.fieldType(0), GeoPointFieldMapper.Names.LAT, values.get(1), null), + serialize(ksName, cfName, udt.fieldType(1), GeoPointFieldMapper.Names.LON, values.get(0), null) }; ByteBuffer geo_point = TupleType.buildValue(elements); return CollectionSerializer.pack(ImmutableList.of(geo_point), 1, Server.VERSION_3); @@ -2611,21 +2615,22 @@ public static ByteBuffer serializeType(final String ksName, final String cfName, // list of elementType List elements = new ArrayList(); for(Object v : flattenCollection((Collection) value)) { - ByteBuffer bb = serializeType(ksName, cfName, elementType, name, v, mapper); + ByteBuffer bb = serialize(ksName, cfName, elementType, name, v, mapper); elements.add(bb); } return CollectionSerializer.pack(elements, elements.size(), Server.VERSION_3); } else { // singleton list - ByteBuffer bb = serializeType(ksName, cfName, elementType, name, value, mapper); + ByteBuffer bb = serialize(ksName, cfName, elementType, name, value, mapper); return CollectionSerializer.pack(ImmutableList.of(bb), 1, Server.VERSION_3); } + } else if (type.getSerializer() instanceof UUIDSerializer) { + // #74 workaround + return type.decompose( (value instanceof UUID) ? value : UUID.fromString((String)value)); } else { // Native cassandra type, encoded with mapper if available. if (mapper != null) { if (mapper instanceof FieldMapper) { - if (mapper instanceof StringFieldMapper && (type instanceof TimeUUIDType || type instanceof UUIDType)) - return type.decompose( UUID.fromString((String)value) ) ; // #74 uuid is mapped as text return type.decompose( value((FieldMapper) mapper, value) ); } else if (mapper instanceof ObjectMapper && !((ObjectMapper)mapper).isEnabled()) { // enabled=false => store field as json text diff --git a/core/src/main/java/org/elasticsearch/cluster/ClusterService.java b/core/src/main/java/org/elasticsearch/cluster/ClusterService.java index d67c48b638c..3d93c96d2d9 100644 --- a/core/src/main/java/org/elasticsearch/cluster/ClusterService.java +++ b/core/src/main/java/org/elasticsearch/cluster/ClusterService.java @@ -21,7 +21,6 @@ import java.io.IOException; import java.net.InetAddress; -import java.nio.ByteBuffer; import java.util.Collection; import java.util.List; import java.util.Map; @@ -38,6 +37,7 @@ import org.apache.cassandra.exceptions.InvalidRequestException; import org.apache.cassandra.exceptions.RequestExecutionException; import org.apache.cassandra.exceptions.RequestValidationException; +import org.apache.cassandra.service.ClientState; import org.apache.lucene.search.Query; import org.apache.lucene.util.BytesRef; import org.codehaus.jackson.JsonGenerationException; @@ -300,6 +300,7 @@ public DocPrimaryKey(String[] names, Object[] values) { public Map> flattenTree(final Set neededFiedls, final String path, final Object node, Map> fields); public void createOrUpdateElasticAdminKeyspace(); + public String getElasticAdminKeyspaceName(); public void createIndexKeyspace(String index, int replicationFactor) throws IOException; public void dropIndexKeyspace(String ksName) throws IOException; @@ -403,6 +404,6 @@ public void persistMetaData(MetaData currentMetadData, MetaData newMetaData, Str public ShardInfo shardInfo(String index, ConsistencyLevel cl); - public UntypedResultSet process(ConsistencyLevel cl, String query) throws RequestExecutionException, RequestValidationException, InvalidRequestException; - public UntypedResultSet process(ConsistencyLevel cl, String query, Object... values) throws RequestExecutionException, RequestValidationException, InvalidRequestException; + public UntypedResultSet process(ConsistencyLevel cl, ClientState clientState, String query) throws RequestExecutionException, RequestValidationException, InvalidRequestException; + public UntypedResultSet process(ConsistencyLevel cl, ClientState clientState, String query, Object... values) throws RequestExecutionException, RequestValidationException, InvalidRequestException; } diff --git a/core/src/main/java/org/elasticsearch/cluster/service/InternalClusterService.java b/core/src/main/java/org/elasticsearch/cluster/service/InternalClusterService.java index 6f0f06eafa3..9a7efcb4eb9 100644 --- a/core/src/main/java/org/elasticsearch/cluster/service/InternalClusterService.java +++ b/core/src/main/java/org/elasticsearch/cluster/service/InternalClusterService.java @@ -51,6 +51,8 @@ import org.apache.cassandra.exceptions.InvalidRequestException; import org.apache.cassandra.exceptions.RequestExecutionException; import org.apache.cassandra.exceptions.RequestValidationException; +import org.apache.cassandra.service.ClientState; +import org.apache.cassandra.service.QueryState; import org.apache.lucene.search.Query; import org.apache.lucene.util.BytesRef; import org.codehaus.jackson.JsonGenerationException; @@ -1518,19 +1520,19 @@ public ShardInfo shardInfo(String index, ConsistencyLevel cl) { } @Override - public UntypedResultSet process(ConsistencyLevel cl, String query) + public UntypedResultSet process(ConsistencyLevel cl, ClientState clientState, String query) throws RequestExecutionException, RequestValidationException, InvalidRequestException { // TODO Auto-generated method stub return null; } @Override - public UntypedResultSet process(ConsistencyLevel cl, String query, Object... values) + public UntypedResultSet process(ConsistencyLevel cl, ClientState clientState, String query, Object... values) throws RequestExecutionException, RequestValidationException, InvalidRequestException { // TODO Auto-generated method stub return null; } - + @Override public BytesReference source(DocumentMapper docMapper, Map sourceAsMap, String index, String type, String id) throws JsonParseException, JsonMappingException, IOException { @@ -1544,4 +1546,9 @@ public BytesReference source(DocumentMapper docMapper, Map sourceAsMap, String i // TODO Auto-generated method stub return null; } + + @Override + public String getElasticAdminKeyspaceName() { + return null; + } } diff --git a/core/src/test/java/org/elassandra/CompositeTests.java b/core/src/test/java/org/elassandra/CompositeTests.java index 27c856ac86c..c513721dbd1 100644 --- a/core/src/test/java/org/elassandra/CompositeTests.java +++ b/core/src/test/java/org/elassandra/CompositeTests.java @@ -34,7 +34,7 @@ public void testCompositeWithStaticColumnTest() throws Exception { ensureGreen("test"); // INSERT INTO test.t2 (id, surname, name, phonetic_name, nicks) VALUES (22, 'Genesis', 'Abraham', 'ai-b-ram', ['the-A', 'ab']) - clusterService().process(ConsistencyLevel.ONE,"INSERT INTO test.t2 (id, surname, name, phonetic_name, nicks) VALUES (22, 'Genesis', 'Abraham', 'ai-b-ram', ['the-A', 'ab'])"); + process(ConsistencyLevel.ONE,"INSERT INTO test.t2 (id, surname, name, phonetic_name, nicks) VALUES (22, 'Genesis', 'Abraham', 'ai-b-ram', ['the-A', 'ab'])"); SearchResponse rsp = client().prepareSearch().setQuery("{ \"match_all\" : {} }").get(); assertThat(rsp.getHits().getTotalHits(), equalTo(2L)); diff --git a/core/src/test/java/org/elassandra/CqlTypesTests.java b/core/src/test/java/org/elassandra/CqlTypesTests.java index 2b30b6c58ed..b37f115afab 100644 --- a/core/src/test/java/org/elassandra/CqlTypesTests.java +++ b/core/src/test/java/org/elassandra/CqlTypesTests.java @@ -103,8 +103,8 @@ public void testTextGeohashMapping() throws Exception { .setSource("{ \"geoloc\" : { \"discover\":\"^((?!geohash).*)\", \"properties\": { \"geohash\": { \"type\": \"geo_point\", \"cql_collection\":\"singleton\",\"cql_partition_key\" : true,\"cql_primary_key_order\" : 0, \"index\" : \"not_analyzed\" } }}}").get()); GeoPoint geo_point = new GeoPoint(-25.068403, 29.411767); ByteBuffer[] elements = new ByteBuffer[] { - InternalCassandraClusterService.serializeType("test", "geoloc", DoubleType.instance, GeoPointFieldMapper.Names.LAT, -25.068403, null), - InternalCassandraClusterService.serializeType("test", "geoloc", DoubleType.instance, GeoPointFieldMapper.Names.LON, 29.411767, null) + InternalCassandraClusterService.serialize("test", "geoloc", DoubleType.instance, GeoPointFieldMapper.Names.LAT, -25.068403, null), + InternalCassandraClusterService.serialize("test", "geoloc", DoubleType.instance, GeoPointFieldMapper.Names.LON, 29.411767, null) }; process(ConsistencyLevel.ONE,"INSERT INTO test.geoloc (geohash, id, coord, comment) VALUES (?,?,?,?)", geo_point.geohash(), UUID.randomUUID(), TupleType.buildValue(elements), "blabla"); @@ -115,4 +115,20 @@ public void testTextGeohashMapping() throws Exception { .get(); assertThat(rsp.getHits().getTotalHits(),equalTo(1L)); } + + // #74 test + @Test + public void testUUID() throws Exception { + createIndex("test"); + ensureGreen("test"); + + process(ConsistencyLevel.ONE,"create table test.pk_uuid (pk_uuid uuid, column_not_uuid text, primary key(pk_uuid));"); + process(ConsistencyLevel.ONE,"create table test.pk_not_uuid (pk_not_uuid text, column_uuid uuid, primary key(pk_not_uuid));"); + + assertAcked(client().admin().indices().preparePutMapping("test").setType("pk_uuid").setSource("{ \"pk_uuid\" : { \"discover\" : \".*\"}}").get()); + assertAcked(client().admin().indices().preparePutMapping("test").setType("pk_not_uuid").setSource("{ \"pk_not_uuid\" : { \"discover\" : \".*\"}}").get()); + + assertThat(client().prepareIndex("test", "pk_uuid", "bacc6c75-91b8-4a86-a408-ff7bafac535d").setSource("{ \"column_not_uuid\": \"a value\" }").get().isCreated(), equalTo(true)); + assertThat(client().prepareIndex("test", "pk_not_uuid", "pk2").setSource("{ \"column_uuid\": \"bacc6c75-91b8-4a86-a408-ff7bafac535d\" }").get().isCreated(), equalTo(true)); + } } diff --git a/core/src/test/java/org/elasticsearch/test/ESSingleNodeTestCase.java b/core/src/test/java/org/elasticsearch/test/ESSingleNodeTestCase.java index cbbfab2d13d..128b72febce 100644 --- a/core/src/test/java/org/elasticsearch/test/ESSingleNodeTestCase.java +++ b/core/src/test/java/org/elasticsearch/test/ESSingleNodeTestCase.java @@ -34,7 +34,9 @@ import org.apache.cassandra.exceptions.InvalidRequestException; import org.apache.cassandra.exceptions.RequestExecutionException; import org.apache.cassandra.exceptions.RequestValidationException; +import org.apache.cassandra.service.ClientState; import org.apache.cassandra.service.ElassandraDaemon; +import org.apache.commons.compress.utils.Charsets; import org.elassandra.cluster.InternalCassandraClusterService; import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsResponse; @@ -128,10 +130,15 @@ static void initNode(Settings testSettings, Collection> ClusterHealthResponse clusterHealthResponse = clusterAdminClient.prepareHealth().setWaitForGreenStatus().get(); assertFalse(clusterHealthResponse.isTimedOut()); + String usernameAndPassword = "cassandra" + ":" + "cassandra"; + String token = "Basic " + java.util.Base64.getEncoder().encodeToString( usernameAndPassword.getBytes(Charsets.UTF_8) ); + Settings.Builder settingsBuilder = Settings.builder() .put(InternalCassandraClusterService.SETTING_CLUSTER_DEFAULT_SYNCHRONOUS_REFRESH, true) .put(InternalCassandraClusterService.SETTING_CLUSTER_DEFAULT_DROP_ON_DELETE_INDEX, true); - ClusterUpdateSettingsResponse clusterUpdateSettingsResponse = clusterAdminClient.prepareUpdateSettings().setTransientSettings(settingsBuilder).get(); + ClusterUpdateSettingsResponse clusterUpdateSettingsResponse = clusterAdminClient.prepareUpdateSettings() + .putHeader("Authorization", token) + .setTransientSettings(settingsBuilder).get(); assertTrue(clusterUpdateSettingsResponse.isAcknowledged()); } @@ -238,11 +245,19 @@ public ClusterService clusterService() { } public UntypedResultSet process(ConsistencyLevel cl, String query) throws RequestExecutionException, RequestValidationException, InvalidRequestException { - return clusterService().process(cl, query); + return clusterService().process(cl, ClientState.forInternalCalls(), query); + } + + public UntypedResultSet process(ConsistencyLevel cl, ClientState clientState, String query) throws RequestExecutionException, RequestValidationException, InvalidRequestException { + return clusterService().process(cl, clientState, query); } public UntypedResultSet process(ConsistencyLevel cl, String query, Object... values) throws RequestExecutionException, RequestValidationException, InvalidRequestException { - return clusterService().process(cl, query, values); + return clusterService().process(cl, ClientState.forInternalCalls(), query, values); + } + + public UntypedResultSet process(ConsistencyLevel cl, ClientState clientState, String query, Object... values) throws RequestExecutionException, RequestValidationException, InvalidRequestException { + return clusterService().process(cl, clientState, query, values); } /** diff --git a/core/src/test/java/org/elasticsearch/test/cluster/NoopClusterService.java b/core/src/test/java/org/elasticsearch/test/cluster/NoopClusterService.java index fccab6c7f63..da2b061d54c 100644 --- a/core/src/test/java/org/elasticsearch/test/cluster/NoopClusterService.java +++ b/core/src/test/java/org/elasticsearch/test/cluster/NoopClusterService.java @@ -37,6 +37,7 @@ import org.apache.cassandra.exceptions.InvalidRequestException; import org.apache.cassandra.exceptions.RequestExecutionException; import org.apache.cassandra.exceptions.RequestValidationException; +import org.apache.cassandra.service.ClientState; import org.apache.lucene.search.Query; import org.apache.lucene.util.BytesRef; import org.codehaus.jackson.JsonGenerationException; @@ -495,19 +496,19 @@ public void publishGossipStates() { } @Override - public UntypedResultSet process(ConsistencyLevel cl, String query) + public UntypedResultSet process(ConsistencyLevel cl, ClientState clientState, String query) throws RequestExecutionException, RequestValidationException, InvalidRequestException { // TODO Auto-generated method stub return null; } @Override - public UntypedResultSet process(ConsistencyLevel cl, String query, Object... values) + public UntypedResultSet process(ConsistencyLevel cl, ClientState clientState, String query, Object... values) throws RequestExecutionException, RequestValidationException, InvalidRequestException { // TODO Auto-generated method stub return null; } - + @Override public void updateDocument(IndicesService indicesService, IndexRequest request, IndexMetaData indexMetaData) throws Exception { @@ -626,4 +627,10 @@ public UntypedResultSet fetchRowInternal(String ksName, String index, String cfN // TODO Auto-generated method stub return null; } + + @Override + public String getElasticAdminKeyspaceName() { + // TODO Auto-generated method stub + return null; + } } diff --git a/core/src/test/java/org/elasticsearch/test/cluster/TestClusterService.java b/core/src/test/java/org/elasticsearch/test/cluster/TestClusterService.java index d4637d96066..a4ff9d5ca25 100644 --- a/core/src/test/java/org/elasticsearch/test/cluster/TestClusterService.java +++ b/core/src/test/java/org/elasticsearch/test/cluster/TestClusterService.java @@ -42,6 +42,7 @@ import org.apache.cassandra.exceptions.InvalidRequestException; import org.apache.cassandra.exceptions.RequestExecutionException; import org.apache.cassandra.exceptions.RequestValidationException; +import org.apache.cassandra.service.ClientState; import org.apache.lucene.search.Query; import org.apache.lucene.util.BytesRef; import org.codehaus.jackson.JsonGenerationException; @@ -626,20 +627,6 @@ public void publishGossipStates() { } - @Override - public UntypedResultSet process(ConsistencyLevel cl, String query) - throws RequestExecutionException, RequestValidationException, InvalidRequestException { - // TODO Auto-generated method stub - return null; - } - - @Override - public UntypedResultSet process(ConsistencyLevel cl, String query, Object... values) - throws RequestExecutionException, RequestValidationException, InvalidRequestException { - // TODO Auto-generated method stub - return null; - } - @Override public void updateDocument(IndicesService indicesService, IndexRequest request, IndexMetaData indexMetaData) throws Exception { @@ -758,4 +745,24 @@ public UntypedResultSet fetchRowInternal(String ksName, String index, String cfN // TODO Auto-generated method stub return null; } + + @Override + public UntypedResultSet process(ConsistencyLevel cl, ClientState clientState, String query) + throws RequestExecutionException, RequestValidationException, InvalidRequestException { + // TODO Auto-generated method stub + return null; + } + + @Override + public UntypedResultSet process(ConsistencyLevel cl, ClientState clientState, String query, Object... values) + throws RequestExecutionException, RequestValidationException, InvalidRequestException { + // TODO Auto-generated method stub + return null; + } + + @Override + public String getElasticAdminKeyspaceName() { + // TODO Auto-generated method stub + return null; + } }