Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
Refactor meta data refresher with spi for DDL (apache#13947)
Co-authored-by: shardingsphere <dev@shardingsphere.apache.org>
  • Loading branch information
menghaoranss and shardingsphere committed Dec 6, 2021
1 parent e784e0e commit 00e067d
Show file tree
Hide file tree
Showing 11 changed files with 588 additions and 19 deletions.
Expand Up @@ -21,17 +21,16 @@
import org.apache.shardingsphere.infra.config.properties.ConfigurationProperties;
import org.apache.shardingsphere.infra.eventbus.ShardingSphereEventBus;
import org.apache.shardingsphere.infra.federation.optimizer.metadata.FederationSchemaMetaData;
import org.apache.shardingsphere.infra.federation.optimizer.metadata.refresher.FederationMetaDataRefresher;
import org.apache.shardingsphere.infra.metadata.MetaDataRefresher;
import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
import org.apache.shardingsphere.infra.metadata.mapper.SQLStatementEventMapper;
import org.apache.shardingsphere.infra.metadata.mapper.SQLStatementEventMapperFactory;
import org.apache.shardingsphere.infra.metadata.schema.ShardingSphereSchema;
import org.apache.shardingsphere.infra.metadata.schema.loader.SchemaLoader;
import org.apache.shardingsphere.infra.metadata.schema.refresher.SchemaRefresher;
import org.apache.shardingsphere.infra.metadata.schema.refresher.event.SchemaAlteredEvent;
import org.apache.shardingsphere.infra.rule.ShardingSphereRule;
import org.apache.shardingsphere.infra.rule.builder.schema.SchemaRulesBuilder;
import org.apache.shardingsphere.spi.ShardingSphereServiceLoader;
import org.apache.shardingsphere.spi.typed.TypedSPIRegistry;
import org.apache.shardingsphere.sql.parser.sql.common.statement.SQLStatement;

import javax.sql.DataSource;
Expand All @@ -46,6 +45,10 @@
*/
public final class MetaDataRefreshEngine {

static {
ShardingSphereServiceLoader.register(MetaDataRefresher.class);
}

private final ShardingSphereMetaData schemaMetaData;

private final FederationSchemaMetaData federationMetaData;
Expand All @@ -66,9 +69,10 @@ public MetaDataRefreshEngine(final ShardingSphereMetaData schemaMetaData, final
* @throws SQLException SQL exception
*/
public void refresh(final SQLStatement sqlStatement, final Collection<String> logicDataSourceNames) throws SQLException {
Collection<MetaDataRefresher> metaDataRefreshers = MetaDataRefresherFactory.newInstance(sqlStatement);
if (!metaDataRefreshers.isEmpty()) {
refresh(sqlStatement, logicDataSourceNames, metaDataRefreshers);
Optional<MetaDataRefresher> schemaRefresher = TypedSPIRegistry.findRegisteredService(MetaDataRefresher.class, sqlStatement.getClass().getSuperclass().getCanonicalName(), null);
if (schemaRefresher.isPresent()) {
schemaRefresher.get().refresh(schemaMetaData, federationMetaData, logicDataSourceNames, sqlStatement, props);
ShardingSphereEventBus.getInstance().post(new SchemaAlteredEvent(schemaMetaData.getName(), loadActualSchema(schemaMetaData)));
}
Optional<SQLStatementEventMapper> sqlStatementEventMapper = SQLStatementEventMapperFactory.newInstance(sqlStatement);
if (sqlStatementEventMapper.isPresent()) {
Expand All @@ -77,19 +81,6 @@ public void refresh(final SQLStatement sqlStatement, final Collection<String> lo
}
}

@SuppressWarnings({"unchecked", "rawtypes"})
private void refresh(final SQLStatement sqlStatement, final Collection<String> logicDataSourceNames, final Collection<MetaDataRefresher> refreshers) throws SQLException {
for (MetaDataRefresher each : refreshers) {
if (each instanceof SchemaRefresher) {
((SchemaRefresher) each).refresh(schemaMetaData, logicDataSourceNames, sqlStatement, props);
}
if (each instanceof FederationMetaDataRefresher) {
((FederationMetaDataRefresher) each).refresh(federationMetaData, logicDataSourceNames, sqlStatement, schemaMetaData, props);
}
}
ShardingSphereEventBus.getInstance().post(new SchemaAlteredEvent(schemaMetaData.getName(), loadActualSchema(schemaMetaData)));
}

private ShardingSphereSchema loadActualSchema(final ShardingSphereMetaData schemaMetaData) throws SQLException {
Map<String, Map<String, DataSource>> dataSourcesMap = Collections.singletonMap(schemaMetaData.getName(), schemaMetaData.getResource().getDataSources());
Map<String, Collection<RuleConfiguration>> schemaRuleConfigs = Collections.singletonMap(schemaMetaData.getName(), schemaMetaData.getRuleMetaData().getConfigurations());
Expand Down
@@ -0,0 +1,47 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.shardingsphere.infra.context.refresher;

import org.apache.shardingsphere.infra.config.properties.ConfigurationProperties;
import org.apache.shardingsphere.infra.federation.optimizer.metadata.FederationSchemaMetaData;
import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
import org.apache.shardingsphere.spi.typed.TypedSPI;
import org.apache.shardingsphere.sql.parser.sql.common.statement.SQLStatement;

import java.sql.SQLException;
import java.util.Collection;

/**
* ShardingSphere schema refresher.
*
* @param <T> type of SQL statement
*/
public interface MetaDataRefresher<T extends SQLStatement> extends TypedSPI {

/**
* Refresh ShardingSphere schema.
*
* @param schemaMetaData schema meta data
* @param schema federation schema meta data
* @param logicDataSourceNames route data source names
* @param sqlStatement SQL statement
* @param props configuration properties
* @throws SQLException SQL exception
*/
void refresh(ShardingSphereMetaData schemaMetaData, FederationSchemaMetaData schema, Collection<String> logicDataSourceNames, T sqlStatement, ConfigurationProperties props) throws SQLException;
}
@@ -0,0 +1,67 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.shardingsphere.infra.context.refresher.type;

import com.google.common.base.Preconditions;
import org.apache.shardingsphere.infra.config.properties.ConfigurationProperties;
import org.apache.shardingsphere.infra.context.refresher.MetaDataRefresher;
import org.apache.shardingsphere.infra.federation.optimizer.metadata.FederationSchemaMetaData;
import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
import org.apache.shardingsphere.infra.metadata.schema.ShardingSphereSchema;
import org.apache.shardingsphere.infra.metadata.schema.model.IndexMetaData;
import org.apache.shardingsphere.infra.metadata.schema.model.TableMetaData;
import org.apache.shardingsphere.sql.parser.sql.common.segment.ddl.index.IndexSegment;
import org.apache.shardingsphere.sql.parser.sql.common.statement.ddl.AlterIndexStatement;
import org.apache.shardingsphere.sql.parser.sql.dialect.handler.ddl.AlterIndexStatementHandler;

import java.sql.SQLException;
import java.util.Collection;
import java.util.Optional;

/**
* Schema refresher for alter index statement.
*/
public final class AlterIndexStatementSchemaRefresher implements MetaDataRefresher<AlterIndexStatement> {

@Override
public void refresh(final ShardingSphereMetaData schemaMetaData, final FederationSchemaMetaData schema, final Collection<String> logicDataSourceNames, final AlterIndexStatement sqlStatement,
final ConfigurationProperties props) throws SQLException {
Optional<IndexSegment> renameIndex = AlterIndexStatementHandler.getRenameIndexSegment(sqlStatement);
if (!sqlStatement.getIndex().isPresent() || !renameIndex.isPresent()) {
return;
}
String indexName = sqlStatement.getIndex().get().getIdentifier().getValue();
Optional<String> logicTableName = findLogicTableName(schemaMetaData.getSchema(), indexName);
if (logicTableName.isPresent()) {
TableMetaData tableMetaData = schemaMetaData.getSchema().get(logicTableName.get());
Preconditions.checkNotNull(tableMetaData, "Can not get the table '%s' metadata!", logicTableName.get());
tableMetaData.getIndexes().remove(indexName);
String renameIndexName = renameIndex.get().getIdentifier().getValue();
tableMetaData.getIndexes().put(renameIndexName, new IndexMetaData(renameIndexName));
}
}

private Optional<String> findLogicTableName(final ShardingSphereSchema schema, final String indexName) {
return schema.getAllTableNames().stream().filter(each -> schema.get(each).getIndexes().containsKey(indexName)).findFirst();
}

@Override
public String getType() {
return AlterIndexStatement.class.getCanonicalName();
}
}
@@ -0,0 +1,81 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.shardingsphere.infra.context.refresher.type;

import org.apache.shardingsphere.infra.config.properties.ConfigurationProperties;
import org.apache.shardingsphere.infra.context.refresher.MetaDataRefresher;
import org.apache.shardingsphere.infra.federation.optimizer.metadata.FederationSchemaMetaData;
import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
import org.apache.shardingsphere.infra.metadata.schema.builder.SchemaBuilderMaterials;
import org.apache.shardingsphere.infra.metadata.schema.builder.TableMetaDataBuilder;
import org.apache.shardingsphere.infra.metadata.schema.model.TableMetaData;
import org.apache.shardingsphere.infra.rule.identifier.type.DataNodeContainedRule;
import org.apache.shardingsphere.infra.rule.identifier.type.MutableDataNodeRule;
import org.apache.shardingsphere.sql.parser.sql.common.statement.ddl.AlterTableStatement;

import java.sql.SQLException;
import java.util.Collection;
import java.util.Collections;
import java.util.Optional;

/**
* Schema refresher for alter table statement.
*/
public final class AlterTableStatementSchemaRefresher implements MetaDataRefresher<AlterTableStatement> {

@Override
public void refresh(final ShardingSphereMetaData schemaMetaData, final FederationSchemaMetaData schema, final Collection<String> logicDataSourceNames, final AlterTableStatement sqlStatement,
final ConfigurationProperties props) throws SQLException {
String tableName = sqlStatement.getTable().getTableName().getIdentifier().getValue();
if (sqlStatement.getRenameTable().isPresent()) {
putTableMetaData(schemaMetaData, schema, logicDataSourceNames, sqlStatement.getRenameTable().get().getTableName().getIdentifier().getValue(), props);
removeTableMetaData(schemaMetaData, schema, tableName);
} else {
putTableMetaData(schemaMetaData, schema, logicDataSourceNames, tableName, props);
}
}

private void removeTableMetaData(final ShardingSphereMetaData schemaMetaData, final FederationSchemaMetaData schema, final String tableName) {
schemaMetaData.getSchema().remove(tableName);
schemaMetaData.getRuleMetaData().findRules(MutableDataNodeRule.class).forEach(each -> each.remove(tableName));
schema.remove(tableName);
}

private void putTableMetaData(final ShardingSphereMetaData schemaMetaData, final FederationSchemaMetaData schema, final Collection<String> logicDataSourceNames, final String tableName,
final ConfigurationProperties props) throws SQLException {
if (!containsInDataNodeContainedRule(tableName, schemaMetaData)) {
schemaMetaData.getRuleMetaData().findRules(MutableDataNodeRule.class).forEach(each -> each.put(tableName, logicDataSourceNames.iterator().next()));
}
SchemaBuilderMaterials materials = new SchemaBuilderMaterials(
schemaMetaData.getResource().getDatabaseType(), schemaMetaData.getResource().getDataSources(), schemaMetaData.getRuleMetaData().getRules(), props);
Optional<TableMetaData> actualTableMetaData = Optional.ofNullable(TableMetaDataBuilder.load(Collections.singletonList(tableName), materials).get(tableName));
actualTableMetaData.ifPresent(tableMetaData -> {
schemaMetaData.getSchema().put(tableName, TableMetaDataBuilder.decorateKernelTableMetaData(tableMetaData, materials.getRules()));
schema.put(TableMetaDataBuilder.decorateFederationTableMetaData(tableMetaData, materials.getRules()));
});
}

private boolean containsInDataNodeContainedRule(final String tableName, final ShardingSphereMetaData schemaMetaData) {
return schemaMetaData.getRuleMetaData().findRules(DataNodeContainedRule.class).stream().anyMatch(each -> each.getAllTables().contains(tableName));
}

@Override
public String getType() {
return AlterTableStatement.class.getCanonicalName();
}
}
@@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.shardingsphere.infra.context.refresher.type;

import com.google.common.base.Strings;
import org.apache.shardingsphere.infra.config.properties.ConfigurationProperties;
import org.apache.shardingsphere.infra.context.refresher.MetaDataRefresher;
import org.apache.shardingsphere.infra.federation.optimizer.metadata.FederationSchemaMetaData;
import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
import org.apache.shardingsphere.infra.metadata.schema.builder.util.IndexMetaDataUtil;
import org.apache.shardingsphere.infra.metadata.schema.model.IndexMetaData;
import org.apache.shardingsphere.sql.parser.sql.common.statement.ddl.CreateIndexStatement;

import java.sql.SQLException;
import java.util.Collection;

/**
* Schema refresher for create index statement.
*/
public final class CreateIndexStatementSchemaRefresher implements MetaDataRefresher<CreateIndexStatement> {

@Override
public void refresh(final ShardingSphereMetaData schemaMetaData, final FederationSchemaMetaData schema, final Collection<String> logicDataSourceNames, final CreateIndexStatement sqlStatement,
final ConfigurationProperties props) throws SQLException {
String indexName = null != sqlStatement.getIndex() ? sqlStatement.getIndex().getIdentifier().getValue() : IndexMetaDataUtil.getGeneratedLogicIndexName(sqlStatement.getColumns());
if (Strings.isNullOrEmpty(indexName)) {
return;
}
String tableName = sqlStatement.getTable().getTableName().getIdentifier().getValue();
schemaMetaData.getSchema().get(tableName).getIndexes().put(indexName, new IndexMetaData(indexName));
}

@Override
public String getType() {
return CreateIndexStatement.class.getCanonicalName();
}
}

0 comments on commit 00e067d

Please sign in to comment.