Skip to content

Commit

Permalink
在建立和修改Schema时可以配置复制参数
Browse files Browse the repository at this point in the history
  • Loading branch information
codefollower committed May 17, 2015
1 parent 6200b3c commit cdcac4b
Show file tree
Hide file tree
Showing 13 changed files with 316 additions and 98 deletions.

This file was deleted.

This file was deleted.

74 changes: 53 additions & 21 deletions lealone-cluster/src/main/java/org/lealone/cluster/db/Keyspace.java
@@ -1,31 +1,63 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.lealone.cluster.db; package org.lealone.cluster.db;


import java.util.HashMap;

import org.lealone.cluster.config.DatabaseDescriptor; import org.lealone.cluster.config.DatabaseDescriptor;
import org.lealone.cluster.exceptions.ConfigurationException;
import org.lealone.cluster.locator.AbstractReplicationStrategy; import org.lealone.cluster.locator.AbstractReplicationStrategy;

import org.lealone.cluster.service.StorageService;
public class Keyspace { import org.lealone.dbobject.Schema;
private static Keyspace INSTANCE = new Keyspace(); import org.lealone.dbobject.Schema.ReplicationPropertiesChangeListener;


public static Keyspace open(String keyspaceName) { public class Keyspace implements ReplicationPropertiesChangeListener {
return INSTANCE; private static final Keyspace INSTANCE = new Keyspace();
} private static final HashMap<Schema, AbstractReplicationStrategy> replicationStrategys = new HashMap<>();

private static final AbstractReplicationStrategy defaultReplicationStrategy = DatabaseDescriptor
public static Iterable<Keyspace> all() { .getDefaultReplicationStrategy();
return null;
} public static AbstractReplicationStrategy getReplicationStrategy(Schema schema) {

if (schema.getReplicationProperties() == null)
private final AbstractReplicationStrategy replicationStrategy; return defaultReplicationStrategy;

AbstractReplicationStrategy replicationStrategy = replicationStrategys.get(schema);
public Keyspace() { if (replicationStrategy == null) {
//TODO 按表或按数据库配置 HashMap<String, String> map = new HashMap<>(schema.getReplicationProperties());
replicationStrategy = DatabaseDescriptor.getDefaultReplicationStrategy(); String className = map.remove("class");
if (className == null) {
throw new ConfigurationException("Missing replication strategy class");
}

replicationStrategy = AbstractReplicationStrategy.createReplicationStrategy(schema.getFullName(),
AbstractReplicationStrategy.getClass(className), StorageService.instance.getTokenMetaData(),
DatabaseDescriptor.getEndpointSnitch(), map);
schema.setReplicationPropertiesChangeListener(INSTANCE);
replicationStrategys.put(schema, replicationStrategy);
}
return replicationStrategy;
} }


public AbstractReplicationStrategy getReplicationStrategy() { private Keyspace() {
return replicationStrategy;
} }


public String getName() { @Override
return null; public void replicationPropertiesChanged(Schema schema) {
replicationStrategys.remove(schema);
getReplicationStrategy(schema);
} }
} }
Expand Up @@ -49,6 +49,7 @@
import org.lealone.command.router.Router; import org.lealone.command.router.Router;
import org.lealone.command.router.SerializedResult; import org.lealone.command.router.SerializedResult;
import org.lealone.command.router.SortedResult; import org.lealone.command.router.SortedResult;
import org.lealone.dbobject.Schema;
import org.lealone.dbobject.table.TableFilter; import org.lealone.dbobject.table.TableFilter;
import org.lealone.message.DbException; import org.lealone.message.DbException;
import org.lealone.result.ResultInterface; import org.lealone.result.ResultInterface;
Expand Down Expand Up @@ -110,9 +111,7 @@ public int executeMerge(Merge merge) {
private static int executeInsertOrMerge(InsertOrMerge iom) { private static int executeInsertOrMerge(InsertOrMerge iom) {
final String localDataCenter = DatabaseDescriptor.getEndpointSnitch() final String localDataCenter = DatabaseDescriptor.getEndpointSnitch()
.getDatacenter(Utils.getBroadcastAddress()); .getDatacenter(Utils.getBroadcastAddress());
String keyspaceName = iom.getTable().getSchema().getName(); Schema schema = iom.getTable().getSchema();
//AbstractReplicationStrategy rs = Keyspace.open(keyspaceName).getReplicationStrategy();

List<Row> localRows = null; List<Row> localRows = null;
Map<InetAddress, List<Row>> localDataCenterRows = null; Map<InetAddress, List<Row>> localDataCenterRows = null;
Map<InetAddress, List<Row>> remoteDataCenterRows = null; Map<InetAddress, List<Row>> remoteDataCenterRows = null;
Expand All @@ -124,9 +123,9 @@ private static int executeInsertOrMerge(InsertOrMerge iom) {
if (partitionKey == null) if (partitionKey == null)
partitionKey = ValueUuid.getNewRandom(); partitionKey = ValueUuid.getNewRandom();
Token tk = StorageService.getPartitioner().getToken(ByteBuffer.wrap(partitionKey.getBytesNoCopy())); Token tk = StorageService.getPartitioner().getToken(ByteBuffer.wrap(partitionKey.getBytesNoCopy()));
List<InetAddress> naturalEndpoints = StorageService.instance.getNaturalEndpoints(keyspaceName, tk); List<InetAddress> naturalEndpoints = StorageService.instance.getNaturalEndpoints(schema, tk);
Collection<InetAddress> pendingEndpoints = StorageService.instance.getTokenMetaData().pendingEndpointsFor( Collection<InetAddress> pendingEndpoints = StorageService.instance.getTokenMetaData().pendingEndpointsFor(
tk, keyspaceName); tk, schema.getFullName());


Iterable<InetAddress> targets = Iterables.concat(naturalEndpoints, pendingEndpoints); Iterable<InetAddress> targets = Iterables.concat(naturalEndpoints, pendingEndpoints);
for (InetAddress destination : targets) { for (InetAddress destination : targets) {
Expand Down Expand Up @@ -350,11 +349,11 @@ private static List<InetAddress> getTargetEndpointsIfEqual(TableFilter tableFilt
Value endPK = getPartitionKey(endRow); Value endPK = getPartitionKey(endRow);


if (startPK != null && endPK != null && startPK == endPK) { if (startPK != null && endPK != null && startPK == endPK) {
String keyspaceName = tableFilter.getTable().getSchema().getName(); Schema schema = tableFilter.getTable().getSchema();
Token tk = StorageService.getPartitioner().getToken(ByteBuffer.wrap(startPK.getBytesNoCopy())); Token tk = StorageService.getPartitioner().getToken(ByteBuffer.wrap(startPK.getBytesNoCopy()));
List<InetAddress> naturalEndpoints = StorageService.instance.getNaturalEndpoints(keyspaceName, tk); List<InetAddress> naturalEndpoints = StorageService.instance.getNaturalEndpoints(schema, tk);
Collection<InetAddress> pendingEndpoints = StorageService.instance.getTokenMetaData().pendingEndpointsFor( Collection<InetAddress> pendingEndpoints = StorageService.instance.getTokenMetaData().pendingEndpointsFor(
tk, keyspaceName); tk, schema.getFullName());


naturalEndpoints.addAll(pendingEndpoints); naturalEndpoints.addAll(pendingEndpoints);
return naturalEndpoints; return naturalEndpoints;
Expand Down
Expand Up @@ -69,6 +69,7 @@
import org.lealone.cluster.utils.Pair; import org.lealone.cluster.utils.Pair;
import org.lealone.cluster.utils.Utils; import org.lealone.cluster.utils.Utils;
import org.lealone.cluster.utils.WrappedRunnable; import org.lealone.cluster.utils.WrappedRunnable;
import org.lealone.dbobject.Schema;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;


Expand Down Expand Up @@ -484,8 +485,8 @@ private void setTokens(Collection<Token> tokens) {
setMode(Mode.NORMAL, false); setMode(Mode.NORMAL, false);
} }


public Collection<Range<Token>> getLocalRanges(String keyspaceName) { public Collection<Range<Token>> getLocalRanges(Schema schema) {
return getRangesForEndpoint(keyspaceName, Utils.getBroadcastAddress()); return getRangesForEndpoint(schema, Utils.getBroadcastAddress());
} }


public void register(IEndpointLifecycleSubscriber subscriber) { public void register(IEndpointLifecycleSubscriber subscriber) {
Expand Down Expand Up @@ -1135,8 +1136,8 @@ private List<String> stringify(Iterable<InetAddress> endpoints) {
* @param ep endpoint we are interested in. * @param ep endpoint we are interested in.
* @return ranges for the specified endpoint. * @return ranges for the specified endpoint.
*/ */
Collection<Range<Token>> getRangesForEndpoint(String keyspaceName, InetAddress ep) { Collection<Range<Token>> getRangesForEndpoint(Schema schema, InetAddress ep) {
return Keyspace.open(keyspaceName).getReplicationStrategy().getAddressRanges().get(ep); return Keyspace.getReplicationStrategy(schema).getAddressRanges().get(ep);
} }


/** /**
Expand All @@ -1163,36 +1164,36 @@ public List<Range<Token>> getAllRanges(List<Token> sortedTokens) {
return ranges; return ranges;
} }


public List<InetAddress> getNaturalEndpoints(String keyspaceName, ByteBuffer key) { public List<InetAddress> getNaturalEndpoints(Schema schema, ByteBuffer key) {
return getNaturalEndpoints(keyspaceName, getPartitioner().getToken(key)); return getNaturalEndpoints(schema, getPartitioner().getToken(key));
} }


/** /**
* This method returns the N endpoints that are responsible for storing the * This method returns the N endpoints that are responsible for storing the
* specified key i.e for replication. * specified key i.e for replication.
* *
* @param keyspaceName keyspace name also known as keyspace * @param schema the schema
* @param pos position for which we need to find the endpoint * @param pos position for which we need to find the endpoint
* @return the endpoint responsible for this token * @return the endpoint responsible for this token
*/ */
public List<InetAddress> getNaturalEndpoints(String keyspaceName, RingPosition<?> pos) { public List<InetAddress> getNaturalEndpoints(Schema schema, RingPosition<?> pos) {
return Keyspace.open(keyspaceName).getReplicationStrategy().getNaturalEndpoints(pos); return Keyspace.getReplicationStrategy(schema).getNaturalEndpoints(pos);
} }


/** /**
* This method attempts to return N endpoints that are responsible for storing the * This method attempts to return N endpoints that are responsible for storing the
* specified key i.e for replication. * specified key i.e for replication.
* *
* @param keyspace keyspace name also known as keyspace * @param schema the schema
* @param key key for which we need to find the endpoint * @param key key for which we need to find the endpoint
* @return the endpoint responsible for this key * @return the endpoint responsible for this key
*/ */
public List<InetAddress> getLiveNaturalEndpoints(Keyspace keyspace, ByteBuffer key) { public List<InetAddress> getLiveNaturalEndpoints(Schema schema, ByteBuffer key) {
return getLiveNaturalEndpoints(keyspace, getPartitioner().decorateKey(key)); return getLiveNaturalEndpoints(schema, getPartitioner().decorateKey(key));
} }


public List<InetAddress> getLiveNaturalEndpoints(Keyspace keyspace, RingPosition<?> pos) { public List<InetAddress> getLiveNaturalEndpoints(Schema schema, RingPosition<?> pos) {
List<InetAddress> endpoints = keyspace.getReplicationStrategy().getNaturalEndpoints(pos); List<InetAddress> endpoints = Keyspace.getReplicationStrategy(schema).getNaturalEndpoints(pos);
List<InetAddress> liveEps = new ArrayList<>(endpoints.size()); List<InetAddress> liveEps = new ArrayList<>(endpoints.size());


for (InetAddress endpoint : endpoints) { for (InetAddress endpoint : endpoints) {
Expand Down
Expand Up @@ -448,6 +448,11 @@ public interface CommandInterface {
*/ */
int SHUTDOWN_DEFRAG = 84; int SHUTDOWN_DEFRAG = 84;


/**
* The type of a ALTER SCHEMA WTIH REPLICATION statement.
*/
int ALTER_SCHEMA_WTIH_REPLICATION = 85;

/** /**
* Get command type. * Get command type.
* *
Expand Down
46 changes: 45 additions & 1 deletion lealone-sql/src/main/java/org/lealone/command/Parser.java
Expand Up @@ -11,12 +11,15 @@
import java.nio.charset.Charset; import java.nio.charset.Charset;
import java.text.Collator; import java.text.Collator;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
import java.util.Map;


import org.lealone.api.ErrorCode; import org.lealone.api.ErrorCode;
import org.lealone.api.Trigger; import org.lealone.api.Trigger;
import org.lealone.command.ddl.AlterIndexRename; import org.lealone.command.ddl.AlterIndexRename;
import org.lealone.command.ddl.AlterSchemaRename; import org.lealone.command.ddl.AlterSchemaRename;
import org.lealone.command.ddl.AlterSchemaWithReplication;
import org.lealone.command.ddl.AlterSequence; import org.lealone.command.ddl.AlterSequence;
import org.lealone.command.ddl.AlterTableAddConstraint; import org.lealone.command.ddl.AlterTableAddConstraint;
import org.lealone.command.ddl.AlterTableAlterColumn; import org.lealone.command.ddl.AlterTableAlterColumn;
Expand Down Expand Up @@ -4093,9 +4096,31 @@ private CreateSchema parseCreateSchema() {
} else { } else {
command.setAuthorization(session.getUser().getName()); command.setAuthorization(session.getUser().getName());
} }

if (readIf("WITH")) {
read("REPLICATION");
read("=");
Map<String, String> replicationProperties = parseMap();
command.setReplicationProperties(replicationProperties);
checkReplicationProperties(replicationProperties);
}

return command; return command;
} }


private Map<String, String> parseMap() {
Map<String, String> map = new HashMap<>();
read("(");
while (!readIf(")")) {
String k = readString();
read(":");
String v = readString();
readIf(",");
map.put(k, v);
}
return map;
}

private CreateSequence parseCreateSequence() { private CreateSequence parseCreateSequence() {
boolean ifNotExists = readIfNoExists(); boolean ifNotExists = readIfNoExists();
String sequenceName = readIdentifierWithSchema(); String sequenceName = readIdentifierWithSchema();
Expand Down Expand Up @@ -4431,8 +4456,27 @@ private AlterView parseAlterView() {
return command; return command;
} }


private AlterSchemaRename parseAlterSchema() { private DefineCommand parseAlterSchema() {
String schemaName = readIdentifierWithSchema(); String schemaName = readIdentifierWithSchema();
if (readIf("WITH")) {
read("REPLICATION");
read("=");
AlterSchemaWithReplication command = new AlterSchemaWithReplication(session);
command.setSchema(getSchema(schemaName));
Map<String, String> replicationProperties = parseMap();
command.setReplicationProperties(replicationProperties);
checkReplicationProperties(replicationProperties);
return command;
}
return parseAlterSchemaRename(schemaName);
}

private void checkReplicationProperties(Map<String, String> replicationProperties) {
if (replicationProperties != null && !replicationProperties.containsKey("class"))
throw DbException.get(ErrorCode.SYNTAX_ERROR_1, sqlCommand + ", missing replication strategy class");
}

private AlterSchemaRename parseAlterSchemaRename(String schemaName) {
Schema old = getSchema(); Schema old = getSchema();
AlterSchemaRename command = new AlterSchemaRename(session); AlterSchemaRename command = new AlterSchemaRename(session);
command.setOldSchema(getSchema(schemaName)); command.setOldSchema(getSchema(schemaName));
Expand Down
Expand Up @@ -37,6 +37,7 @@ public void setNewName(String name) {
newSchemaName = name; newSchemaName = name;
} }


@Override
public int update() { public int update() {
session.commit(true); session.commit(true);
Database db = session.getDatabase(); Database db = session.getDatabase();
Expand All @@ -55,6 +56,7 @@ public int update() {
return 0; return 0;
} }


@Override
public int getType() { public int getType() {
return CommandInterface.ALTER_SCHEMA_RENAME; return CommandInterface.ALTER_SCHEMA_RENAME;
} }
Expand Down

0 comments on commit cdcac4b

Please sign in to comment.