diff --git a/cloudata-structured/pom.xml b/cloudata-structured/pom.xml index fa40bfa..0529654 100644 --- a/cloudata-structured/pom.xml +++ b/cloudata-structured/pom.xml @@ -13,6 +13,18 @@ + + com.facebook.presto + presto-parser + 0.55-SNAPSHOT + + + + com.facebook.presto + presto-main + 0.55-SNAPSHOT + + com.cloudata cloudata-server-shared diff --git a/cloudata-structured/src/main/java/com/cloudata/structured/sql/SqlEngine.java b/cloudata-structured/src/main/java/com/cloudata/structured/sql/SqlEngine.java new file mode 100644 index 0000000..d8b2934 --- /dev/null +++ b/cloudata-structured/src/main/java/com/cloudata/structured/sql/SqlEngine.java @@ -0,0 +1,70 @@ +package com.cloudata.structured.sql; + +import java.util.List; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.facebook.presto.importer.PeriodicImportManager; +import com.facebook.presto.metadata.MetadataManager; +import com.facebook.presto.sql.analyzer.Analysis; +import com.facebook.presto.sql.analyzer.Analyzer; +import com.facebook.presto.sql.analyzer.QueryExplainer; +import com.facebook.presto.sql.parser.SqlParser; +import com.facebook.presto.sql.planner.LogicalPlanner; +import com.facebook.presto.sql.planner.Plan; +import com.facebook.presto.sql.planner.PlanNodeIdAllocator; +import com.facebook.presto.sql.planner.optimizations.PlanOptimizer; +import com.facebook.presto.sql.tree.Statement; +import com.facebook.presto.storage.StorageManager; +import com.google.common.base.Optional; + +public class SqlEngine { + private static final Logger log = LoggerFactory.getLogger(SqlEngine.class); + + final MetadataManager metadata; + final List planOptimizers; + final PeriodicImportManager periodicImportManager; + final StorageManager storageManager; + + // SplitManager splitManager = buildSplitManager(); + + public SqlEngine(MetadataManager metadata, List planOptimizers, + PeriodicImportManager periodicImportManager, StorageManager storageManager) { + this.metadata = metadata; + this.planOptimizers = planOptimizers; + this.periodicImportManager = periodicImportManager; + this.storageManager = storageManager; + } + + public SqlStatement parse(SqlSession session, String sql) { + log.debug("Parsing sql: {}", sql); + + Statement statement = SqlParser.createStatement(sql); + + QueryExplainer queryExplainer = new QueryExplainer(session.prestoSession, planOptimizers, metadata, + periodicImportManager, storageManager); + // analyze query + Analyzer analyzer = new Analyzer(session.prestoSession, metadata, Optional.of(queryExplainer)); + + Analysis analysis = analyzer.analyze(statement); + + // System.out.println("analysis: " + analysis); + + PlanNodeIdAllocator idAllocator = new PlanNodeIdAllocator(); + // plan query + LogicalPlanner logicalPlanner = new LogicalPlanner(session.prestoSession, planOptimizers, idAllocator, + metadata, periodicImportManager, storageManager); + Plan plan = logicalPlanner.plan(analysis); + + return new SqlStatement(metadata, sql, plan); + // + // TableScanCountVisitor visitor = new TableScanCountVisitor(); + // plan.getRoot().accept(visitor, 0); + // Assert.assertEquals(1, visitor.count); + // String p = PlanPrinter.textLogicalPlan(plan.getRoot(), plan.getTypes()); + // + // System.out.println("plan: " + p); + } + +} diff --git a/cloudata-structured/src/main/java/com/cloudata/structured/sql/SqlSession.java b/cloudata-structured/src/main/java/com/cloudata/structured/sql/SqlSession.java new file mode 100644 index 0000000..fedcf3d --- /dev/null +++ b/cloudata-structured/src/main/java/com/cloudata/structured/sql/SqlSession.java @@ -0,0 +1,18 @@ +package com.cloudata.structured.sql; + +import com.facebook.presto.sql.analyzer.Session; + +public class SqlSession { + + final Session prestoSession; + + public SqlSession() { + String user = "user"; + String source = "source"; + String catalog = "default"; + String schema = "default"; + String remoteUserAddress = "remoteUserAddress"; + String userAgent = "userAgent"; + this.prestoSession = new Session(user, source, catalog, schema, remoteUserAddress, userAgent); + } +} diff --git a/cloudata-structured/src/main/java/com/cloudata/structured/sql/SqlStatement.java b/cloudata-structured/src/main/java/com/cloudata/structured/sql/SqlStatement.java new file mode 100644 index 0000000..78affba --- /dev/null +++ b/cloudata-structured/src/main/java/com/cloudata/structured/sql/SqlStatement.java @@ -0,0 +1,116 @@ +package com.cloudata.structured.sql; + +import com.cloudata.structured.sql.simple.ConvertToSimplePlanVisitor; +import com.cloudata.structured.sql.simple.SimpleNode; +import com.facebook.presto.metadata.Metadata; +import com.facebook.presto.sql.planner.Plan; +import com.google.common.base.Optional; + +public class SqlStatement { + + private final String sql; + private final Plan plan; + + Optional simple; + private final Metadata metadata; + + public SqlStatement(Metadata metadata, String sql, Plan plan) { + this.metadata = metadata; + this.sql = sql; + this.plan = plan; + } + + public boolean isSimple() { + return getSimple() != null; + } + + public SimpleNode getSimple() { + if (simple == null) { + ConvertToSimplePlanVisitor visitor = new ConvertToSimplePlanVisitor(metadata); + SimpleNode accept = plan.getRoot().accept(visitor, null); + // plan.getRoot().accept(visitor, 0); + simple = Optional.fromNullable(accept); + } + return simple.orNull(); + } + + // class SimplePlan { + // final Table table; + // + // final Expression[] expressions; + // + // public void evaluate() { + // // Statement: Query{queryBody=QuerySpecification{select=Select{distinct=false, selectItems=["key1" k1, + // // concat("key2", 'hello') k2]}, from=[Table{table1}], where=null, groupBy=[], having=null, orderBy=[], + // // limit=null}, orderBy=[]} + // // analysis: com.facebook.presto.sql.analyzer.Analysis@39f46204 + // // plan: - Output[k1, k2] + // // k1 := key1 + // // k2 := concat + // // - Project => [key1:varchar, concat:varchar] + // // concat := concat("key2", 'hello') + // // - TableScan[com.cloudata.structured.sql.MockTableHandle@737c45ee, domain={}] => [key1:varchar, + // // key2:varchar] + // // key1 := com.cloudata.structured.sql.MockColumnHandle@549448df + // // key2 := com.cloudata.structured.sql.MockColumnHandle@533c53da + // + // // ExpressionInterpreter.expressionInterpreter(expression, metadata, session) + // + // } + // } + // + // private boolean isSimple(Plan plan) { + // + // // TODO: Make this better, once we have a better grip on the logic + // PlanNode root = plan.getRoot(); + // if (root instanceof OutputNode) { + // OutputNode outputNode = (OutputNode) root; + // PlanNode source = outputNode.getSource(); + // if (source instanceof TableScanNode) { + // TableScanNode tableScanNode = (TableScanNode) source; + // + // List columns = Lists.newArrayList(); + // // List columns = Lists.newArrayList(); + // + // for (int i = 0; i < outputNode.getColumnNames().size(); i++) { + // String name = outputNode.getColumnNames().get(i); + // Symbol symbol = outputNode.getOutputSymbols().get(i); + // + // } + // + // // Statement: Query{queryBody=QuerySpecification{select=Select{distinct=false, selectItems=["key1" k1, + // // "key2" k2]}, from=[Table{table1}], where=null, groupBy=[], having=null, orderBy=[], limit=null}, + // // orderBy=[]} + // // analysis: com.facebook.presto.sql.analyzer.Analysis@6fcc5b5d + // // plan: - Output[k1, k2] + // // k1 := key1 + // // k2 := key2 + // // - TableScan[com.cloudata.structured.sql.MockTableHandle@3aa92b03, domain={}] => [key1:varchar, + // // key2:varchar] + // // key1 := com.cloudata.structured.sql.MockColumnHandle@20bb82ca + // // key2 := com.cloudata.structured.sql.MockColumnHandle@7687ac8f + // + // SimpleQuery query = new SimpleQuery(); + // return true; + // } + // } + // return false; + // } + + // class CheckSimpleVisitor extends RecursivePlanVisitor { + // boolean simple = true; + // + // @Override + // public Boolean visitOutput(OutputNode node, Integer context) { + // simple = false; + // return false; + // } + // + // @Override + // protected Boolean visitPlan(PlanNode node, Integer context) { + // if (vi) + // } + // + // } + +} diff --git a/cloudata-structured/src/main/java/com/cloudata/structured/sql/provider/CloudataColumnHandle.java b/cloudata-structured/src/main/java/com/cloudata/structured/sql/provider/CloudataColumnHandle.java new file mode 100644 index 0000000..3431e3a --- /dev/null +++ b/cloudata-structured/src/main/java/com/cloudata/structured/sql/provider/CloudataColumnHandle.java @@ -0,0 +1,18 @@ +package com.cloudata.structured.sql.provider; + +import com.facebook.presto.spi.ColumnHandle; +import com.facebook.presto.spi.ColumnMetadata; + +public class CloudataColumnHandle implements ColumnHandle { + + private final ColumnMetadata columnMetadata; + + public CloudataColumnHandle(CloudataTableHandle tableHandle, ColumnMetadata columnMetadata) { + this.columnMetadata = columnMetadata; + } + + public ColumnMetadata getColumnMetadata() { + return columnMetadata; + } + +} diff --git a/cloudata-structured/src/main/java/com/cloudata/structured/sql/provider/CloudataConnectorMetadata.java b/cloudata-structured/src/main/java/com/cloudata/structured/sql/provider/CloudataConnectorMetadata.java new file mode 100644 index 0000000..ed0f497 --- /dev/null +++ b/cloudata-structured/src/main/java/com/cloudata/structured/sql/provider/CloudataConnectorMetadata.java @@ -0,0 +1,145 @@ +package com.cloudata.structured.sql.provider; + +import java.util.List; +import java.util.Map; + +import com.facebook.presto.spi.ColumnHandle; +import com.facebook.presto.spi.ColumnMetadata; +import com.facebook.presto.spi.ConnectorMetadata; +import com.facebook.presto.spi.ConnectorTableMetadata; +import com.facebook.presto.spi.SchemaTableName; +import com.facebook.presto.spi.SchemaTablePrefix; +import com.facebook.presto.spi.TableHandle; +import com.google.common.collect.Lists; + +public class CloudataConnectorMetadata implements ConnectorMetadata { + + final String connectorId; + + public CloudataConnectorMetadata(String connectorId) { + this.connectorId = connectorId; + } + + @Override + public boolean canHandle(TableHandle tableHandle) { + return tableHandle instanceof CloudataTableHandle + && ((CloudataTableHandle) tableHandle).getConnectorId().equals(connectorId); + } + + @Override + public List listSchemaNames() { + List schemas = Lists.newArrayList(); + schemas.add("default"); + return schemas; + } + + @Override + public CloudataTableHandle getTableHandle(SchemaTableName tableName) { + if (!listSchemaNames().contains(tableName.getSchemaName())) { + return null; + } + + // ExampleTable table = exampleClient.getTable(tableName.getSchemaName(), tableName.getTableName()); + // if (table == null) { + // return null; + // } + + return new CloudataTableHandle(connectorId, tableName.getSchemaName(), tableName.getTableName()); + } + + @Override + public ConnectorTableMetadata getTableMetadata(TableHandle table) { + CloudataTableHandle tableHandle = promote(table); + return tableHandle.getTableMetadata(); + } + + private CloudataTableHandle promote(TableHandle table) { + assert (table != null); + // checkArgument(table instanceof CloudataTableHandle, "tableHandle is not an instance of CloudataTableHandle"); + assert (table instanceof CloudataTableHandle); + CloudataTableHandle tableHandle = (CloudataTableHandle) table; + assert tableHandle.getConnectorId().equals(connectorId); + // checkArgument(tableHandle.getConnectorId().equals(connectorId), "tableHandle is not for this connector"); + return tableHandle; + } + + @Override + public List listTables(String schemaNameOrNull) { + throw new UnsupportedOperationException(); + } + + @Override + public ColumnHandle getColumnHandle(TableHandle tableHandle, String columnName) { + CloudataTableHandle exampleTableHandle = promote(tableHandle); + + // ExampleTable table = exampleClient.getTable(exampleTableHandle.getSchemaName(), + // exampleTableHandle.getTableName()); + // if (table == null) { + // throw new TableNotFoundException(exampleTableHandle.toSchemaTableName()); + // } + + // ImmutableMap.Builder columnHandles = ImmutableMap.builder(); + // for (ColumnMetadata columnMetadata : table.getColumnsMetadata()) { + // columnHandles.put(columnMetadata.getName(), new ExampleColumnHandle(connectorId, columnMetadata)); + // } + // return columnHandles.build(); + + return exampleTableHandle.getColumnHandle(columnName); + } + + @Override + public Map getColumnHandles(TableHandle tableHandle) { + CloudataTableHandle exampleTableHandle = promote(tableHandle); + + // ExampleTable table = exampleClient.getTable(exampleTableHandle.getSchemaName(), + // exampleTableHandle.getTableName()); + // if (table == null) { + // throw new TableNotFoundException(exampleTableHandle.toSchemaTableName()); + // } + + // ImmutableMap.Builder columnHandles = ImmutableMap.builder(); + // for (ColumnMetadata columnMetadata : table.getColumnsMetadata()) { + // columnHandles.put(columnMetadata.getName(), new ExampleColumnHandle(connectorId, columnMetadata)); + // } + // return columnHandles.build(); + + return exampleTableHandle.getColumnHandles(); + } + + @Override + public ColumnMetadata getColumnMetadata(TableHandle tableHandle, ColumnHandle column) { + // checkNotNull(tableHandle, "tableHandle is null"); + // checkArgument(tableHandle instanceof MockTableHandle, "tableHandle is not an instance of MockTableHandle"); + // checkArgument(((MockTableHandle) tableHandle).getConnectorId().equals(connectorId), + // "tableHandle is not for this connector"); + + CloudataColumnHandle columnHandle = promote(column); + + return columnHandle.getColumnMetadata(); + } + + private CloudataColumnHandle promote(ColumnHandle column) { + // checkNotNull(columnHandle, "columnHandle is null"); + assert column != null; + // checkArgument(columnHandle instanceof CloudataColumnHandle, + // "columnHandle is not an instance of CloudataColumnHandle"); + assert column instanceof CloudataColumnHandle; + return (CloudataColumnHandle) column; + } + + @Override + public Map> listTableColumns(SchemaTablePrefix prefix) { + throw new UnsupportedOperationException(); + } + + @Override + public TableHandle createTable(ConnectorTableMetadata tableMetadata) { + throw new UnsupportedOperationException(); + } + + @Override + public void dropTable(TableHandle tableHandle) { + throw new UnsupportedOperationException(); + } + +} diff --git a/cloudata-structured/src/main/java/com/cloudata/structured/sql/provider/CloudataPartition.java b/cloudata-structured/src/main/java/com/cloudata/structured/sql/provider/CloudataPartition.java new file mode 100644 index 0000000..a299c40 --- /dev/null +++ b/cloudata-structured/src/main/java/com/cloudata/structured/sql/provider/CloudataPartition.java @@ -0,0 +1,40 @@ +package com.cloudata.structured.sql.provider; + +import com.facebook.presto.spi.Partition; +import com.facebook.presto.spi.TupleDomain; +import com.google.common.base.Objects; + +public class CloudataPartition implements Partition { + + private final String schemaName; + private final String tableName; + + public CloudataPartition(String schemaName, String tableName) { + this.schemaName = schemaName; + this.tableName = tableName; + } + + @Override + public String getPartitionId() { + return schemaName + ":" + tableName; + } + + public String getSchemaName() { + return schemaName; + } + + public String getTableName() { + return tableName; + } + + @Override + public TupleDomain getTupleDomain() { + return TupleDomain.all(); + } + + @Override + public String toString() { + return Objects.toStringHelper(this).add("schemaName", schemaName).add("tableName", tableName).toString(); + } + +} diff --git a/cloudata-structured/src/main/java/com/cloudata/structured/sql/provider/CloudataSplit.java b/cloudata-structured/src/main/java/com/cloudata/structured/sql/provider/CloudataSplit.java new file mode 100644 index 0000000..0ca56fd --- /dev/null +++ b/cloudata-structured/src/main/java/com/cloudata/structured/sql/provider/CloudataSplit.java @@ -0,0 +1,71 @@ +package com.cloudata.structured.sql.provider; + +import static com.google.common.base.Preconditions.checkNotNull; + +import java.util.List; + +import com.facebook.presto.spi.HostAddress; +import com.facebook.presto.spi.Split; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.collect.ImmutableList; + +public class CloudataSplit implements Split { + private final String connectorId; + private final String schemaName; + private final String tableName; + private final boolean remotelyAccessible; + private final ImmutableList addresses; + private final HostAddress hostAddress; + + @JsonCreator + public CloudataSplit(@JsonProperty("connectorId") String connectorId, + @JsonProperty("schemaName") String schemaName, @JsonProperty("tableName") String tableName, + @JsonProperty("hostAddress") HostAddress hostAddress) { + this.schemaName = checkNotNull(schemaName, "schema name is null"); + this.connectorId = checkNotNull(connectorId, "connector id is null"); + this.tableName = checkNotNull(tableName, "table name is null"); + this.hostAddress = checkNotNull(hostAddress, "hostAddress is null"); + + // if ("http".equalsIgnoreCase(uri.getScheme()) || "https".equalsIgnoreCase(uri.getScheme())) { + remotelyAccessible = false; + addresses = ImmutableList.of(hostAddress); + } + + @JsonProperty + public String getConnectorId() { + return connectorId; + } + + @JsonProperty + public String getSchemaName() { + return schemaName; + } + + @JsonProperty + public String getTableName() { + return tableName; + } + + // @JsonProperty + // public URI getUri() + // { + // return uri; + // } + + @Override + public boolean isRemotelyAccessible() { + // only http or https is remotely accessible + return remotelyAccessible; + } + + @Override + public List getAddresses() { + return addresses; + } + + @Override + public Object getInfo() { + return this; + } +} diff --git a/cloudata-structured/src/main/java/com/cloudata/structured/sql/provider/CloudataSplitManager.java b/cloudata-structured/src/main/java/com/cloudata/structured/sql/provider/CloudataSplitManager.java new file mode 100644 index 0000000..405c2be --- /dev/null +++ b/cloudata-structured/src/main/java/com/cloudata/structured/sql/provider/CloudataSplitManager.java @@ -0,0 +1,84 @@ +package com.cloudata.structured.sql.provider; + +import static com.google.common.base.Preconditions.checkArgument; + +import java.util.Collections; +import java.util.List; + +import com.facebook.presto.metadata.NodeManager; +import com.facebook.presto.spi.ConnectorSplitManager; +import com.facebook.presto.spi.Partition; +import com.facebook.presto.spi.PartitionResult; +import com.facebook.presto.spi.Split; +import com.facebook.presto.spi.TableHandle; +import com.facebook.presto.spi.TupleDomain; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; + +public class CloudataSplitManager implements ConnectorSplitManager { + final NodeManager nodeManager; + final String connectorId; + + public CloudataSplitManager(NodeManager nodeManager, String connectorId) { + this.nodeManager = nodeManager; + this.connectorId = connectorId; + } + + @Override + public String getConnectorId() { + return connectorId; + } + + @Override + public boolean canHandle(TableHandle tableHandle) { + return tableHandle instanceof CloudataTableHandle + && ((CloudataTableHandle) tableHandle).getConnectorId().equals(connectorId); + } + + @Override + public PartitionResult getPartitions(TableHandle table, TupleDomain tupleDomain) { + // checkArgument(table instanceof CloudataTableHandle, "tableHandle is not an instance of CloudataTableHandle"); + assert table instanceof CloudataTableHandle; + CloudataTableHandle tableHandle = (CloudataTableHandle) table; + + // example connector has only one partition + List partitions = ImmutableList. of(new CloudataPartition(tableHandle.getSchemaName(), + tableHandle.getTableName())); + // example connector does not do any additional processing/filtering with the TupleDomain, so just return the + // whole TupleDomain + + return new PartitionResult(partitions, tupleDomain); + } + + @Override + public Iterable getPartitionSplits(TableHandle table, List partitions) { + // checkNotNull(partitions, "partitions is null"); + assert partitions != null; + checkArgument(partitions.size() == 1, "Expected one partition but got %s", partitions.size()); + + // checkArgument(partition instanceof CloudataPartition, "partition is not an instance of CloudataPartition"); + assert partitions.get(0) instanceof CloudataPartition; + CloudataPartition partition = (CloudataPartition) partitions.get(0); + + CloudataTableHandle tableHandle = (CloudataTableHandle) table; + // ExampleTable table = exampleClient.getTable(exampleTableHandle.getSchemaName(), + // exampleTableHandle.getTableName()); + // // this can happen if table is removed during a query + // checkState(table != null, "Table %s.%s no longer exists", exampleTableHandle.getSchemaName(), + // exampleTableHandle.getTableName()); + + List splits = Lists.newArrayList(); + // for (URI uri : table.getSources()) { + // splits.add(new ExampleSplit(connectorId, examplePartition.getSchemaName(), examplePartition.getTableName(), + // uri)); + // } + // URI.create("http://localhost:1234"); + + splits.add(new CloudataSplit(connectorId, partition.getSchemaName(), partition.getTableName(), nodeManager + .getCurrentNode().getHostAndPort())); + Collections.shuffle(splits); + + return splits; + } + +} diff --git a/cloudata-structured/src/main/java/com/cloudata/structured/sql/provider/CloudataTableHandle.java b/cloudata-structured/src/main/java/com/cloudata/structured/sql/provider/CloudataTableHandle.java new file mode 100644 index 0000000..7c2e506 --- /dev/null +++ b/cloudata-structured/src/main/java/com/cloudata/structured/sql/provider/CloudataTableHandle.java @@ -0,0 +1,74 @@ +package com.cloudata.structured.sql.provider; + +import java.util.List; +import java.util.Map; + +import org.weakref.jmx.com.google.common.collect.Lists; +import org.weakref.jmx.com.google.common.collect.Maps; + +import com.facebook.presto.spi.ColumnHandle; +import com.facebook.presto.spi.ColumnMetadata; +import com.facebook.presto.spi.ColumnType; +import com.facebook.presto.spi.ConnectorTableMetadata; +import com.facebook.presto.spi.SchemaTableName; +import com.facebook.presto.spi.TableHandle; + +public class CloudataTableHandle implements TableHandle { + + private final String tableName; + private final ConnectorTableMetadata tableMetadata; + private final List columns; + private final String connectorId; + private final String schemaName; + + public CloudataTableHandle(String connectorId, String schemaName, String tableName) { + this.connectorId = connectorId; + this.schemaName = schemaName; + this.tableName = tableName; + SchemaTableName schemaTableName = new SchemaTableName(schemaName, tableName); + + List keys = Lists.newArrayList(); + keys.add("key1"); + keys.add("key2"); + + List columns = Lists.newArrayList(); + for (int i = 0; i < keys.size(); i++) { + ColumnType type = ColumnType.STRING; + boolean paritionKey = false; + columns.add(new ColumnMetadata(keys.get(i), type, i, paritionKey)); + } + this.columns = columns; + ConnectorTableMetadata metadata = new ConnectorTableMetadata(schemaTableName, columns); + this.tableMetadata = metadata; + // new TableMetadata(connectorId, metadata); + } + + public ConnectorTableMetadata getTableMetadata() { + return tableMetadata; + } + + public Map getColumnHandles() { + Map handles = Maps.newHashMap(); + for (ColumnMetadata column : columns) { + handles.put(column.getName(), new CloudataColumnHandle(this, column)); + } + return handles; + } + + public String getConnectorId() { + return connectorId; + } + + public String getTableName() { + return tableName; + } + + public String getSchemaName() { + return schemaName; + } + + public ColumnHandle getColumnHandle(String columnName) { + return getColumnHandles().get(columnName); + } + +} diff --git a/cloudata-structured/src/main/java/com/cloudata/structured/sql/simple/ConvertToSimplePlanVisitor.java b/cloudata-structured/src/main/java/com/cloudata/structured/sql/simple/ConvertToSimplePlanVisitor.java new file mode 100644 index 0000000..c14a821 --- /dev/null +++ b/cloudata-structured/src/main/java/com/cloudata/structured/sql/simple/ConvertToSimplePlanVisitor.java @@ -0,0 +1,190 @@ +package com.cloudata.structured.sql.simple; + +import java.util.List; +import java.util.Map; + +import com.facebook.presto.metadata.Metadata; +import com.facebook.presto.metadata.TableMetadata; +import com.facebook.presto.spi.ColumnHandle; +import com.facebook.presto.spi.ColumnMetadata; +import com.facebook.presto.spi.TableHandle; +import com.facebook.presto.spi.TupleDomain; +import com.facebook.presto.sql.planner.Symbol; +import com.facebook.presto.sql.planner.plan.OutputNode; +import com.facebook.presto.sql.planner.plan.PlanNode; +import com.facebook.presto.sql.planner.plan.PlanVisitor; +import com.facebook.presto.sql.planner.plan.TableScanNode; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; + +public class ConvertToSimplePlanVisitor extends PlanVisitor { + + final Metadata metadata; + + public ConvertToSimplePlanVisitor(Metadata metadata) { + this.metadata = metadata; + } + + @Override + protected SimpleNode visitPlan(PlanNode node, Object context) { + // Not simple! + return null; + } + + // @Override + // public SimpleNode visitProject(ProjectNode node, Integer context) { + // SimpleNode source = node.getSource().accept(this, context); + // if (source == null) { + // return null; + // } + // + // if (source instanceof SimpleTableScan) { + // SimpleTableScan simpleTableScan = (SimpleTableScan) source; + // + // for (Map.Entry entry : node.getOutputMap().entrySet()) { + // entry.getValue().accept(visitor, context); + // if (entry.getValue() instanceof QualifiedNameReference && ((QualifiedNameReference) + // entry.getValue()).getName().equals(entry.getKey().toQualifiedName())) { + // // skip identity assignments + // continue; + // } + // print(indent + 2, "%s := %s", entry.getKey(), entry.getValue()); + // } + // + // return processChildren(node, indent + 1); + // + // + // // print(indent, "- Project => [%s]", formatOutputs(node.getOutputSymbols())); + // // for (Map.Entry entry : node.getOutputMap().entrySet()) { + // // if (entry.getValue() instanceof QualifiedNameReference && ((QualifiedNameReference) + // entry.getValue()).getName().equals(entry.getKey().toQualifiedName())) { + // // // skip identity assignments + // // continue; + // // } + // // print(indent + 2, "%s := %s", entry.getKey(), entry.getValue()); + // // } + // // + // // return processChildren(node, indent + 1); + // / - Project => [key1:varchar, concat:varchar, expr:varchar] + // // concat := concat("key2", 'hello') + // // expr := 'world' + // } else { + // throw new IllegalStateException(); + // } + // } + + @Override + public SimpleNode visitOutput(OutputNode node, Object context) { + assert node.getSources().size() == 1; + + SimpleNode source = node.getSource().accept(this, context); + if (source == null) { + return null; + } + + if (source instanceof SimpleTableScan) { + SimpleTableScan tableScan = (SimpleTableScan) source; + List columnNames = node.getColumnNames(); + List outputSymbols = node.getOutputSymbols(); + + List expressions = Lists.newArrayList(); + for (int i = 0; i < columnNames.size(); i++) { + // String name = columnNames.get(i); + Symbol symbol = outputSymbols.get(i); + + SimpleExpression expression = tableScan.getExpression(symbol); + if (expression == null) { + assert false; + return null; + } + expressions.add(expression); + } + + tableScan.expressions = expressions; + tableScan.columnNames = columnNames; + return tableScan; + } else { + throw new IllegalStateException(); + } + + // print(indent, "- Output[%s]", Joiner.on(", ").join(node.getColumnNames())); + // for (int i = 0; i < node.getColumnNames().size(); i++) { + // String name = node.getColumnNames().get(i); + // Symbol symbol = node.getOutputSymbols().get(i); + // if (!name.equals(symbol.toString())) { + // print(indent + 2, "%s := %s", name, symbol); + // } + // } + // + // return processChildren(node, indent + 1); + } + + @Override + public SimpleNode visitTableScan(TableScanNode node, Object context) { + assert node.getSources().size() == 0; + + TupleDomain partitionsDomainSummary = node.getPartitionsDomainSummary(); + if (!partitionsDomainSummary.isAll()) { + return null; + } + + // plan: - Output[k1, k2, k3] + // k1 := key1 + // k2 := concat + // k3 := expr + // - Project => [key1:varchar, concat:varchar, expr:varchar] + // concat := concat("key2", 'hello') + // expr := 'world' + // - TableScan[com.cloudata.structured.sql.MockTableHandle@5850abcc, domain={}] => [key1:varchar, + // key2:varchar] + // key1 := com.cloudata.structured.sql.MockColumnHandle@56300388 + // key2 := com.cloudata.structured.sql.MockColumnHandle@6a3801ec + + TableHandle tableHandle = node.getTable(); + TableMetadata table = metadata.getTableMetadata(tableHandle); + // SchemaTableName schemaTable = table.getTable(); + // + // TableEntry entry = new TableEntry(table.getConnectorId(), schemaTable.getSchemaName(), + // schemaTable.getTableName()); + String tableName = table.getTable().getTableName(); + + Map symbolToExpression = Maps.newHashMap(); + List columnNames = Lists.newArrayList(); + List expressions = Lists.newArrayList(); + + for (Map.Entry entry : node.getAssignments().entrySet()) { + Symbol symbol = entry.getKey(); + String name = symbol.getName(); + columnNames.add(name); + + ColumnHandle columnHandle = entry.getValue(); + + ColumnMetadata columnMetadata = metadata.getColumnMetadata(tableHandle, columnHandle); + if (columnMetadata == null) { + assert false; + return null; + } + + SimpleColumnExpression expression = new SimpleColumnExpression(tableName, columnMetadata); + expressions.add(expression); + symbolToExpression.put(symbol, expression); + + // QualifiedNameReference expression = new QualifiedNameReference(entry.getKey().toQualifiedName()); + // + // if (node.getOutputSymbols().contains(entry.getKey()) + // || (!partitionsDomainSummary.isNone() && partitionsDomainSummary.getDomains().keySet() + // .contains(entry.getValue()))) { + // print(indent + 2, "%s := %s", entry.getKey(), entry.getValue()); + // } + } + + SimpleTableScan tableScan = new SimpleTableScan(table); + tableScan.columnNames = columnNames; + tableScan.expressions = Lists. newArrayList(expressions); + tableScan.symbolToExpression = symbolToExpression; + tableScan.columns = expressions.toArray(new SimpleColumnExpression[expressions.size()]); + + return tableScan; + } + +} \ No newline at end of file diff --git a/cloudata-structured/src/main/java/com/cloudata/structured/sql/simple/SimpleColumnExpression.java b/cloudata-structured/src/main/java/com/cloudata/structured/sql/simple/SimpleColumnExpression.java new file mode 100644 index 0000000..f34fcb0 --- /dev/null +++ b/cloudata-structured/src/main/java/com/cloudata/structured/sql/simple/SimpleColumnExpression.java @@ -0,0 +1,27 @@ +package com.cloudata.structured.sql.simple; + +import com.facebook.presto.spi.ColumnMetadata; + +public class SimpleColumnExpression extends SimpleExpression { + private final String tableName; + private final ColumnMetadata columnMetadata; + + public SimpleColumnExpression(String tableName, ColumnMetadata columnMetadata) { + this.tableName = tableName; + this.columnMetadata = columnMetadata; + } + + public String getColumnName() { + return columnMetadata.getName(); + } + + @Override + public R accept(SimpleExpressionVisitor visitor, C context) { + return visitor.visitColumnExpression(this, context); + } + + public String getTableName() { + return tableName; + } + +} diff --git a/cloudata-structured/src/main/java/com/cloudata/structured/sql/simple/SimpleExpression.java b/cloudata-structured/src/main/java/com/cloudata/structured/sql/simple/SimpleExpression.java new file mode 100644 index 0000000..752d7e9 --- /dev/null +++ b/cloudata-structured/src/main/java/com/cloudata/structured/sql/simple/SimpleExpression.java @@ -0,0 +1,7 @@ +package com.cloudata.structured.sql.simple; + +public abstract class SimpleExpression { + + public abstract R accept(SimpleExpressionVisitor visitor, C context); + +} diff --git a/cloudata-structured/src/main/java/com/cloudata/structured/sql/simple/SimpleExpressionPrinter.java b/cloudata-structured/src/main/java/com/cloudata/structured/sql/simple/SimpleExpressionPrinter.java new file mode 100644 index 0000000..005924d --- /dev/null +++ b/cloudata-structured/src/main/java/com/cloudata/structured/sql/simple/SimpleExpressionPrinter.java @@ -0,0 +1,19 @@ +package com.cloudata.structured.sql.simple; + +public class SimpleExpressionPrinter extends SimpleExpressionVisitor { + + @Override + public String visitGeneric(SimpleExpression node, Object context) { + return "(unknown:" + node.toString() + ")"; + } + + @Override + public String visitColumnExpression(SimpleColumnExpression node, Object context) { + return node.getTableName() + "." + node.getColumnName(); + } + + public static String toString(SimpleExpression node) { + SimpleExpressionPrinter visitor = new SimpleExpressionPrinter(); + return node.accept(visitor, null); + } +} diff --git a/cloudata-structured/src/main/java/com/cloudata/structured/sql/simple/SimpleExpressionVisitor.java b/cloudata-structured/src/main/java/com/cloudata/structured/sql/simple/SimpleExpressionVisitor.java new file mode 100644 index 0000000..f88849d --- /dev/null +++ b/cloudata-structured/src/main/java/com/cloudata/structured/sql/simple/SimpleExpressionVisitor.java @@ -0,0 +1,12 @@ +package com.cloudata.structured.sql.simple; + +public class SimpleExpressionVisitor { + + public R visitGeneric(SimpleExpression node, C context) { + return null; + } + + public R visitColumnExpression(SimpleColumnExpression node, C context) { + return visitGeneric(node, context); + } +} diff --git a/cloudata-structured/src/main/java/com/cloudata/structured/sql/simple/SimpleNode.java b/cloudata-structured/src/main/java/com/cloudata/structured/sql/simple/SimpleNode.java new file mode 100644 index 0000000..7f1a644 --- /dev/null +++ b/cloudata-structured/src/main/java/com/cloudata/structured/sql/simple/SimpleNode.java @@ -0,0 +1,6 @@ +package com.cloudata.structured.sql.simple; + +public abstract class SimpleNode { + + public abstract R accept(SimpleNodeVisitor visitor, C context); +} diff --git a/cloudata-structured/src/main/java/com/cloudata/structured/sql/simple/SimpleNodeVisitor.java b/cloudata-structured/src/main/java/com/cloudata/structured/sql/simple/SimpleNodeVisitor.java new file mode 100644 index 0000000..32abab8 --- /dev/null +++ b/cloudata-structured/src/main/java/com/cloudata/structured/sql/simple/SimpleNodeVisitor.java @@ -0,0 +1,11 @@ +package com.cloudata.structured.sql.simple; + +public class SimpleNodeVisitor { + public R visitGeneric(SimpleNode node, C context) { + return null; + } + + public R visitTableScan(SimpleTableScan node, C context) { + return visitGeneric(node, context); + } +} diff --git a/cloudata-structured/src/main/java/com/cloudata/structured/sql/simple/SimpleTableScan.java b/cloudata-structured/src/main/java/com/cloudata/structured/sql/simple/SimpleTableScan.java new file mode 100644 index 0000000..a606ee9 --- /dev/null +++ b/cloudata-structured/src/main/java/com/cloudata/structured/sql/simple/SimpleTableScan.java @@ -0,0 +1,41 @@ +package com.cloudata.structured.sql.simple; + +import java.util.List; +import java.util.Map; + +import com.facebook.presto.metadata.TableMetadata; +import com.facebook.presto.spi.RecordCursor; +import com.facebook.presto.spi.RecordSet; +import com.facebook.presto.sql.planner.Symbol; + +public class SimpleTableScan extends SimpleNode { + RecordSet recordset; + + RecordCursor cursor; + + public List columnNames; + public List expressions; + + SimpleColumnExpression[] columns; + + Map symbolToExpression; + + final TableMetadata tableMetadata; + + public SimpleTableScan(TableMetadata tableMetadata) { + this.tableMetadata = tableMetadata; + } + + public SimpleExpression getExpression(Symbol symbol) { + return symbolToExpression.get(symbol); + } + + public String getTableName() { + return tableMetadata.getTable().getTableName(); + } + + @Override + public R accept(SimpleNodeVisitor visitor, C context) { + return visitor.visitTableScan(this, context); + } +} \ No newline at end of file diff --git a/cloudata-structured/src/main/java/com/cloudata/structured/sql/simple/SimpleTreePrinter.java b/cloudata-structured/src/main/java/com/cloudata/structured/sql/simple/SimpleTreePrinter.java new file mode 100644 index 0000000..03e1096 --- /dev/null +++ b/cloudata-structured/src/main/java/com/cloudata/structured/sql/simple/SimpleTreePrinter.java @@ -0,0 +1,45 @@ +package com.cloudata.structured.sql.simple; + +import static java.lang.String.format; + +import com.google.common.base.Strings; + +public class SimpleTreePrinter extends SimpleNodeVisitor { + private final StringBuilder output = new StringBuilder(); + + @Override + public Void visitGeneric(SimpleNode node, Integer indent) { + print(indent, "[Generic]: %s", node); + return null; + } + + @Override + public Void visitTableScan(SimpleTableScan node, Integer indent) { + print(indent, "[TableScan]: %s", node.getTableName()); + for (int i = 0; i < node.expressions.size(); i++) { + SimpleExpression expression = node.expressions.get(i); + String columnName = node.columnNames.get(i); + + print(indent + 2, "%s := %s", columnName, SimpleExpressionPrinter.toString(expression)); + } + return null; + } + + private void print(int indent, String format, Object... args) { + String value; + + if (args.length == 0) { + value = format; + } else { + value = format(format, args); + } + output.append(Strings.repeat(" ", indent)).append(value).append('\n'); + } + + public static String toString(SimpleNode simple) { + SimpleTreePrinter printer = new SimpleTreePrinter(); + simple.accept(printer, 0); + return printer.output.toString(); + } + +} diff --git a/cloudata-structured/src/test/java/com/cloudata/structured/sql/FakeRemoteTaskFactory.java b/cloudata-structured/src/test/java/com/cloudata/structured/sql/FakeRemoteTaskFactory.java new file mode 100644 index 0000000..bb8f615 --- /dev/null +++ b/cloudata-structured/src/test/java/com/cloudata/structured/sql/FakeRemoteTaskFactory.java @@ -0,0 +1,165 @@ +package com.cloudata.structured.sql; + +import static com.facebook.presto.OutputBuffers.INITIAL_EMPTY_OUTPUT_BUFFERS; +import static com.facebook.presto.util.Failures.toFailures; +import static com.google.common.base.Preconditions.checkNotNull; +import static io.airlift.units.DataSize.Unit.MEGABYTE; +import io.airlift.units.DataSize; +import io.airlift.units.DataSize.Unit; +import io.airlift.units.Duration; + +import java.net.URI; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.Executor; +import java.util.concurrent.atomic.AtomicLong; + +import javax.annotation.concurrent.GuardedBy; + +import org.joda.time.DateTime; + +import com.facebook.presto.OutputBuffers; +import com.facebook.presto.client.FailureInfo; +import com.facebook.presto.execution.RemoteTask; +import com.facebook.presto.execution.RemoteTaskFactory; +import com.facebook.presto.execution.SharedBuffer; +import com.facebook.presto.execution.StateMachine.StateChangeListener; +import com.facebook.presto.execution.TaskId; +import com.facebook.presto.execution.TaskInfo; +import com.facebook.presto.execution.TaskState; +import com.facebook.presto.execution.TaskStateMachine; +import com.facebook.presto.metadata.Node; +import com.facebook.presto.operator.TaskContext; +import com.facebook.presto.spi.Split; +import com.facebook.presto.sql.analyzer.Session; +import com.facebook.presto.sql.planner.OutputReceiver; +import com.facebook.presto.sql.planner.PlanFragment; +import com.facebook.presto.sql.planner.plan.PlanNodeId; +import com.google.common.collect.HashMultimap; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Multimap; + +public class FakeRemoteTaskFactory implements RemoteTaskFactory { + private final Executor executor; + + public FakeRemoteTaskFactory(Executor executor) { + this.executor = executor; + } + + @Override + public RemoteTask createRemoteTask(Session session, TaskId taskId, Node node, PlanFragment fragment, + Multimap initialSplits, Map outputReceivers, + OutputBuffers outputBuffers) { + return new MockRemoteTask(taskId, fragment, executor); + } + + private static class MockRemoteTask implements RemoteTask { + private final AtomicLong nextTaskInfoVersion = new AtomicLong(TaskInfo.STARTING_VERSION); + + private final URI location; + private final TaskStateMachine taskStateMachine; + private final TaskContext taskContext; + private final SharedBuffer sharedBuffer; + + private final PlanFragment fragment; + + @GuardedBy("this") + private final Set noMoreSplits = new HashSet<>(); + + @GuardedBy("this") + private final Multimap splits = HashMultimap.create(); + + public MockRemoteTask(TaskId taskId, PlanFragment fragment, Executor executor) { + this.taskStateMachine = new TaskStateMachine(checkNotNull(taskId, "taskId is null"), checkNotNull(executor, + "executor is null")); + + Session session = new Session("user", "source", "catalog", "schema", "address", "agent"); + this.taskContext = new TaskContext(taskStateMachine, executor, session, new DataSize(256, MEGABYTE), + new DataSize(1, MEGABYTE), true); + + this.location = URI.create("fake://task/" + taskId); + + this.sharedBuffer = new SharedBuffer(checkNotNull(new DataSize(1, Unit.BYTE), "maxBufferSize is null"), + INITIAL_EMPTY_OUTPUT_BUFFERS); + this.fragment = checkNotNull(fragment, "fragment is null"); + } + + @Override + public String getNodeId() { + return "node"; + } + + @Override + public TaskInfo getTaskInfo() { + TaskState state = taskStateMachine.getState(); + List failures = ImmutableList.of(); + if (state == TaskState.FAILED) { + failures = toFailures(taskStateMachine.getFailureCauses()); + } + + return new TaskInfo(taskStateMachine.getTaskId(), nextTaskInfoVersion.getAndIncrement(), state, location, + DateTime.now(), sharedBuffer.getInfo(), ImmutableSet. of(), taskContext.getTaskStats(), + failures, taskContext.getOutputItems()); + } + + @Override + public void start() { + } + + @Override + public void addSplit(PlanNodeId sourceId, Split split) { + checkNotNull(split, "split is null"); + splits.put(sourceId, split); + } + + @Override + public void noMoreSplits(PlanNodeId sourceId) { + noMoreSplits.add(sourceId); + if (noMoreSplits.containsAll(fragment.getSources())) { + taskStateMachine.finished(); + } + } + + @Override + public void setOutputBuffers(OutputBuffers outputBuffers) { + sharedBuffer.setOutputBuffers(outputBuffers); + } + + @Override + public void addStateChangeListener(final StateChangeListener stateChangeListener) { + taskStateMachine.addStateChangeListener(new StateChangeListener() { + @Override + public void stateChanged(TaskState newValue) { + stateChangeListener.stateChanged(getTaskInfo()); + } + }); + } + + @Override + public void cancel() { + taskStateMachine.cancel(); + } + + @Override + public Duration waitForTaskToFinish(Duration maxWait) throws InterruptedException { + while (true) { + TaskState currentState = taskStateMachine.getState(); + if (maxWait.toMillis() <= 1 || currentState.isDone()) { + return maxWait; + } + maxWait = taskStateMachine.waitForStateChange(currentState, maxWait); + } + } + + @Override + public int getQueuedSplits() { + if (taskStateMachine.getState().isDone()) { + return 0; + } + return splits.size(); + } + } +} \ No newline at end of file diff --git a/cloudata-structured/src/test/java/com/cloudata/structured/sql/MockColumnHandle.java b/cloudata-structured/src/test/java/com/cloudata/structured/sql/MockColumnHandle.java new file mode 100644 index 0000000..a15a7a4 --- /dev/null +++ b/cloudata-structured/src/test/java/com/cloudata/structured/sql/MockColumnHandle.java @@ -0,0 +1,18 @@ +package com.cloudata.structured.sql; + +import com.facebook.presto.spi.ColumnHandle; +import com.facebook.presto.spi.ColumnMetadata; + +public class MockColumnHandle implements ColumnHandle { + + private final ColumnMetadata columnMetadata; + + public MockColumnHandle(MockTableHandle mockTableHandle, ColumnMetadata columnMetadata) { + this.columnMetadata = columnMetadata; + } + + public ColumnMetadata getColumnMetadata() { + return columnMetadata; + } + +} diff --git a/cloudata-structured/src/test/java/com/cloudata/structured/sql/MockConnectorMetadata.java b/cloudata-structured/src/test/java/com/cloudata/structured/sql/MockConnectorMetadata.java new file mode 100644 index 0000000..95d5df8 --- /dev/null +++ b/cloudata-structured/src/test/java/com/cloudata/structured/sql/MockConnectorMetadata.java @@ -0,0 +1,141 @@ +package com.cloudata.structured.sql; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; + +import java.util.List; +import java.util.Map; + +import org.weakref.jmx.com.google.common.collect.Lists; + +import com.facebook.presto.spi.ColumnHandle; +import com.facebook.presto.spi.ColumnMetadata; +import com.facebook.presto.spi.ConnectorMetadata; +import com.facebook.presto.spi.ConnectorTableMetadata; +import com.facebook.presto.spi.SchemaTableName; +import com.facebook.presto.spi.SchemaTablePrefix; +import com.facebook.presto.spi.TableHandle; + +public class MockConnectorMetadata implements ConnectorMetadata { + + final String connectorId; + + public MockConnectorMetadata(String connectorId) { + this.connectorId = connectorId; + } + + @Override + public boolean canHandle(TableHandle tableHandle) { + return tableHandle instanceof MockTableHandle + && ((MockTableHandle) tableHandle).getConnectorId().equals(connectorId); + } + + @Override + public List listSchemaNames() { + List schemas = Lists.newArrayList(); + schemas.add("default"); + return schemas; + } + + @Override + public MockTableHandle getTableHandle(SchemaTableName tableName) { + if (!listSchemaNames().contains(tableName.getSchemaName())) { + return null; + } + + // ExampleTable table = exampleClient.getTable(tableName.getSchemaName(), tableName.getTableName()); + // if (table == null) { + // return null; + // } + + return new MockTableHandle(connectorId, tableName.getSchemaName(), tableName.getTableName()); + } + + @Override + public ConnectorTableMetadata getTableMetadata(TableHandle table) { + checkArgument(table instanceof MockTableHandle, "tableHandle is not an instance of MockTableHandle"); + MockTableHandle exampleTableHandle = (MockTableHandle) table; + checkArgument(exampleTableHandle.getConnectorId().equals(connectorId), "tableHandle is not for this connector"); + return exampleTableHandle.getTableMetadata(); + } + + @Override + public List listTables(String schemaNameOrNull) { + throw new UnsupportedOperationException(); + } + + @Override + public ColumnHandle getColumnHandle(TableHandle tableHandle, String columnName) { + MockTableHandle exampleTableHandle = promote(tableHandle); + + // ExampleTable table = exampleClient.getTable(exampleTableHandle.getSchemaName(), + // exampleTableHandle.getTableName()); + // if (table == null) { + // throw new TableNotFoundException(exampleTableHandle.toSchemaTableName()); + // } + + // ImmutableMap.Builder columnHandles = ImmutableMap.builder(); + // for (ColumnMetadata columnMetadata : table.getColumnsMetadata()) { + // columnHandles.put(columnMetadata.getName(), new ExampleColumnHandle(connectorId, columnMetadata)); + // } + // return columnHandles.build(); + + return exampleTableHandle.getColumnHandle(columnName); + } + + @Override + public Map getColumnHandles(TableHandle tableHandle) { + MockTableHandle exampleTableHandle = promote(tableHandle); + + // ExampleTable table = exampleClient.getTable(exampleTableHandle.getSchemaName(), + // exampleTableHandle.getTableName()); + // if (table == null) { + // throw new TableNotFoundException(exampleTableHandle.toSchemaTableName()); + // } + + // ImmutableMap.Builder columnHandles = ImmutableMap.builder(); + // for (ColumnMetadata columnMetadata : table.getColumnsMetadata()) { + // columnHandles.put(columnMetadata.getName(), new ExampleColumnHandle(connectorId, columnMetadata)); + // } + // return columnHandles.build(); + + return exampleTableHandle.getColumnHandles(); + } + + private MockTableHandle promote(TableHandle tableHandle) { + checkNotNull(tableHandle, "tableHandle is null"); + checkArgument(tableHandle instanceof MockTableHandle, "tableHandle is not an instance of MockTableHandle"); + MockTableHandle exampleTableHandle = (MockTableHandle) tableHandle; + checkArgument(exampleTableHandle.getConnectorId().equals(connectorId), "tableHandle is not for this connector"); + return exampleTableHandle; + } + + @Override + public ColumnMetadata getColumnMetadata(TableHandle tableHandle, ColumnHandle columnHandle) { + checkNotNull(tableHandle, "tableHandle is null"); + checkNotNull(columnHandle, "columnHandle is null"); + checkArgument(tableHandle instanceof MockTableHandle, "tableHandle is not an instance of MockTableHandle"); + checkArgument(((MockTableHandle) tableHandle).getConnectorId().equals(connectorId), + "tableHandle is not for this connector"); + checkArgument(columnHandle instanceof MockColumnHandle, + "columnHandle is not an instance of ExampleColumnHandle"); + + return ((MockColumnHandle) columnHandle).getColumnMetadata(); + } + + @Override + public Map> listTableColumns(SchemaTablePrefix prefix) { + throw new UnsupportedOperationException(); + } + + @Override + public TableHandle createTable(ConnectorTableMetadata tableMetadata) { + throw new UnsupportedOperationException(); + } + + @Override + public void dropTable(TableHandle tableHandle) { + throw new UnsupportedOperationException(); + } + +} diff --git a/cloudata-structured/src/test/java/com/cloudata/structured/sql/MockExchangeClientSupplier.java b/cloudata-structured/src/test/java/com/cloudata/structured/sql/MockExchangeClientSupplier.java new file mode 100644 index 0000000..26aff2f --- /dev/null +++ b/cloudata-structured/src/test/java/com/cloudata/structured/sql/MockExchangeClientSupplier.java @@ -0,0 +1,11 @@ +package com.cloudata.structured.sql; + +import com.facebook.presto.operator.ExchangeClient; +import com.google.common.base.Supplier; + +public class MockExchangeClientSupplier implements Supplier { + @Override + public ExchangeClient get() { + throw new UnsupportedOperationException(); + } +} diff --git a/cloudata-structured/src/test/java/com/cloudata/structured/sql/MockLocalStorageManager.java b/cloudata-structured/src/test/java/com/cloudata/structured/sql/MockLocalStorageManager.java new file mode 100644 index 0000000..0d7be03 --- /dev/null +++ b/cloudata-structured/src/test/java/com/cloudata/structured/sql/MockLocalStorageManager.java @@ -0,0 +1,45 @@ +package com.cloudata.structured.sql; + +import java.io.IOException; +import java.util.List; +import java.util.UUID; + +import com.facebook.presto.block.BlockIterable; +import com.facebook.presto.metadata.ColumnFileHandle; +import com.facebook.presto.metadata.LocalStorageManager; +import com.facebook.presto.spi.ColumnHandle; + +public class MockLocalStorageManager implements LocalStorageManager { + + @Override + public BlockIterable getBlocks(UUID shardUuid, ColumnHandle columnHandle) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean shardExists(UUID shardUuid) { + throw new UnsupportedOperationException(); + } + + @Override + public void dropShard(UUID shardUuid) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean isShardActive(UUID shardUuid) { + throw new UnsupportedOperationException(); + } + + @Override + public ColumnFileHandle createStagingFileHandles(UUID shardUuid, List columnHandles) + throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public void commit(ColumnFileHandle columnFileHandle) throws IOException { + throw new UnsupportedOperationException(); + } + +} diff --git a/cloudata-structured/src/test/java/com/cloudata/structured/sql/MockLocationFactory.java b/cloudata-structured/src/test/java/com/cloudata/structured/sql/MockLocationFactory.java new file mode 100644 index 0000000..7499f78 --- /dev/null +++ b/cloudata-structured/src/test/java/com/cloudata/structured/sql/MockLocationFactory.java @@ -0,0 +1,65 @@ +package com.cloudata.structured.sql; + +import java.net.URI; + +import com.facebook.presto.execution.LocationFactory; +import com.facebook.presto.execution.QueryId; +import com.facebook.presto.execution.StageId; +import com.facebook.presto.execution.TaskId; +import com.facebook.presto.metadata.Node; + +public class MockLocationFactory implements LocationFactory { + + // private final URI baseUri; + // private final NodeManager nodeManager; + // + // public MockLocationFactory(NodeManager nodeManager, URI baseUri) { + // this.nodeManager = nodeManager; + // this.baseUri = baseUri; + // } + + // @Override + // public URI createQueryLocation(QueryId queryId) { + // Preconditions.checkNotNull(queryId, "queryId is null"); + // return uriBuilderFrom(baseUri).appendPath("/v1/query").appendPath(queryId.toString()).build(); + // } + // + // @Override + // public URI createStageLocation(StageId stageId) { + // Preconditions.checkNotNull(stageId, "stageId is null"); + // return uriBuilderFrom(baseUri).appendPath("v1/stage").appendPath(stageId.toString()).build(); + // } + // + // @Override + // public URI createLocalTaskLocation(TaskId taskId) { + // return createTaskLocation(nodeManager.getCurrentNode(), taskId); + // } + // + // @Override + // public URI createTaskLocation(Node node, TaskId taskId) { + // Preconditions.checkNotNull(node, "node is null"); + // Preconditions.checkNotNull(taskId, "taskId is null"); + // return uriBuilderFrom(node.getHttpUri()).appendPath("/v1/task").appendPath(taskId.toString()).build(); + // } + + @Override + public URI createQueryLocation(QueryId queryId) { + return URI.create("fake://query/" + queryId); + } + + @Override + public URI createStageLocation(StageId stageId) { + return URI.create("fake://stage/" + stageId); + } + + @Override + public URI createLocalTaskLocation(TaskId taskId) { + return URI.create("fake://task/" + taskId); + } + + @Override + public URI createTaskLocation(Node node, TaskId taskId) { + return URI.create("fake://task/" + node.getNodeIdentifier() + "/" + taskId); + } + +} diff --git a/cloudata-structured/src/test/java/com/cloudata/structured/sql/MockNodeManager.java b/cloudata-structured/src/test/java/com/cloudata/structured/sql/MockNodeManager.java new file mode 100644 index 0000000..824b5ee --- /dev/null +++ b/cloudata-structured/src/test/java/com/cloudata/structured/sql/MockNodeManager.java @@ -0,0 +1,41 @@ +package com.cloudata.structured.sql; + +import java.util.Collections; +import java.util.Set; + +import com.facebook.presto.metadata.AllNodes; +import com.facebook.presto.metadata.Node; +import com.facebook.presto.metadata.NodeManager; +import com.google.common.collect.Sets; + +public class MockNodeManager implements NodeManager { + + final Node me; + final AllNodes allNodes; + + public MockNodeManager(Node me) { + this.me = me; + this.allNodes = new AllNodes(Sets.newHashSet(me), Collections. emptySet()); + } + + @Override + public Set getActiveDatasourceNodes(String datasourceName) { + return Sets.newHashSet(me); + } + + @Override + public AllNodes getAllNodes() { + return allNodes; + } + + @Override + public Node getCurrentNode() { + return me; + } + + @Override + public void refreshNodes() { + throw new UnsupportedOperationException(); + } + +} diff --git a/cloudata-structured/src/test/java/com/cloudata/structured/sql/MockPartition.java b/cloudata-structured/src/test/java/com/cloudata/structured/sql/MockPartition.java new file mode 100644 index 0000000..a8f6a7e --- /dev/null +++ b/cloudata-structured/src/test/java/com/cloudata/structured/sql/MockPartition.java @@ -0,0 +1,40 @@ +package com.cloudata.structured.sql; + +import com.facebook.presto.spi.Partition; +import com.facebook.presto.spi.TupleDomain; +import com.google.common.base.Objects; + +public class MockPartition implements Partition { + + private final String schemaName; + private final String tableName; + + public MockPartition(String schemaName, String tableName) { + this.schemaName = schemaName; + this.tableName = tableName; + } + + @Override + public String getPartitionId() { + return schemaName + ":" + tableName; + } + + public String getSchemaName() { + return schemaName; + } + + public String getTableName() { + return tableName; + } + + @Override + public TupleDomain getTupleDomain() { + return TupleDomain.all(); + } + + @Override + public String toString() { + return Objects.toStringHelper(this).add("schemaName", schemaName).add("tableName", tableName).toString(); + } + +} diff --git a/cloudata-structured/src/test/java/com/cloudata/structured/sql/MockPeriodicImportManager.java b/cloudata-structured/src/test/java/com/cloudata/structured/sql/MockPeriodicImportManager.java new file mode 100644 index 0000000..57572f5 --- /dev/null +++ b/cloudata-structured/src/test/java/com/cloudata/structured/sql/MockPeriodicImportManager.java @@ -0,0 +1,54 @@ +package com.cloudata.structured.sql; + +import java.util.List; + +import com.facebook.presto.importer.PeriodicImportJob; +import com.facebook.presto.importer.PeriodicImportManager; +import com.facebook.presto.importer.PersistentPeriodicImportJob; +import com.google.common.base.Predicate; + +public class MockPeriodicImportManager implements PeriodicImportManager { + + @Override + public long insertJob(PeriodicImportJob job) { + throw new UnsupportedOperationException(); + } + + @Override + public void dropJob(long jobId) { + throw new UnsupportedOperationException(); + + } + + @Override + public void dropJobs(Predicate jobPredicate) { + throw new UnsupportedOperationException(); + + } + + @Override + public long getJobCount() { + throw new UnsupportedOperationException(); + } + + @Override + public PersistentPeriodicImportJob getJob(long jobId) { + throw new UnsupportedOperationException(); + } + + @Override + public List getJobs() { + throw new UnsupportedOperationException(); + } + + @Override + public long beginRun(long jobId) { + throw new UnsupportedOperationException(); + } + + @Override + public void endRun(long runId, boolean result) { + throw new UnsupportedOperationException(); + } + +} diff --git a/cloudata-structured/src/test/java/com/cloudata/structured/sql/MockRemoteTaskFactory.java b/cloudata-structured/src/test/java/com/cloudata/structured/sql/MockRemoteTaskFactory.java new file mode 100644 index 0000000..5f40f45 --- /dev/null +++ b/cloudata-structured/src/test/java/com/cloudata/structured/sql/MockRemoteTaskFactory.java @@ -0,0 +1,26 @@ +package com.cloudata.structured.sql; + +import java.util.Map; + +import com.facebook.presto.OutputBuffers; +import com.facebook.presto.execution.RemoteTask; +import com.facebook.presto.execution.RemoteTaskFactory; +import com.facebook.presto.execution.TaskId; +import com.facebook.presto.metadata.Node; +import com.facebook.presto.spi.Split; +import com.facebook.presto.sql.analyzer.Session; +import com.facebook.presto.sql.planner.OutputReceiver; +import com.facebook.presto.sql.planner.PlanFragment; +import com.facebook.presto.sql.planner.plan.PlanNodeId; +import com.google.common.collect.Multimap; + +public class MockRemoteTaskFactory implements RemoteTaskFactory { + + @Override + public RemoteTask createRemoteTask(Session session, TaskId taskId, Node node, PlanFragment fragment, + Multimap initialSplits, Map outputReceivers, + OutputBuffers outputBuffers) { + throw new UnsupportedOperationException(); + } + +} diff --git a/cloudata-structured/src/test/java/com/cloudata/structured/sql/MockShardManager.java b/cloudata-structured/src/test/java/com/cloudata/structured/sql/MockShardManager.java new file mode 100644 index 0000000..ad5e27a --- /dev/null +++ b/cloudata-structured/src/test/java/com/cloudata/structured/sql/MockShardManager.java @@ -0,0 +1,75 @@ +package com.cloudata.structured.sql; + +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.UUID; + +import com.facebook.presto.metadata.ShardManager; +import com.facebook.presto.metadata.TablePartition; +import com.facebook.presto.spi.PartitionKey; +import com.facebook.presto.spi.TableHandle; +import com.google.common.base.Optional; +import com.google.common.collect.Multimap; + +public class MockShardManager implements ShardManager { + + @Override + public void disassociateShard(long shardId, String nodeIdentifier) { + throw new UnsupportedOperationException(); + } + + @Override + public void dropShard(long shardId) { + throw new UnsupportedOperationException(); + } + + @Override + public void commitPartition(TableHandle tableHandle, String partition, List partitionKeys, + Map shards) { + throw new UnsupportedOperationException(); + } + + @Override + public Set getPartitions(TableHandle tableHandle) { + throw new UnsupportedOperationException(); + } + + @Override + public Multimap getAllPartitionKeys(TableHandle tableHandle) { + throw new UnsupportedOperationException(); + } + + @Override + public Multimap> getShardNodesByPartition(TableHandle tableHandle) { + throw new UnsupportedOperationException(); + } + + @Override + public Set getTableNodes(TableHandle tableHandle) { + throw new UnsupportedOperationException(); + } + + @Override + public Iterable getAllNodesInUse() { + throw new UnsupportedOperationException(); + } + + @Override + public void dropPartition(TableHandle tableHandle, String partitionName) { + throw new UnsupportedOperationException(); + } + + @Override + public void dropOrphanedPartitions() { + throw new UnsupportedOperationException(); + + } + + @Override + public Iterable getOrphanedShardIds(Optional nodeIdentifier) { + throw new UnsupportedOperationException(); + } + +} diff --git a/cloudata-structured/src/test/java/com/cloudata/structured/sql/MockSplit.java b/cloudata-structured/src/test/java/com/cloudata/structured/sql/MockSplit.java new file mode 100644 index 0000000..9b5d56d --- /dev/null +++ b/cloudata-structured/src/test/java/com/cloudata/structured/sql/MockSplit.java @@ -0,0 +1,71 @@ +package com.cloudata.structured.sql; + +import static com.google.common.base.Preconditions.checkNotNull; + +import java.net.URI; +import java.util.List; + +import com.facebook.presto.spi.HostAddress; +import com.facebook.presto.spi.Split; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.collect.ImmutableList; + +public class MockSplit implements Split { + private final String connectorId; + private final String schemaName; + private final String tableName; + private final URI uri; + private final boolean remotelyAccessible; + private final ImmutableList addresses; + + @JsonCreator + public MockSplit(@JsonProperty("connectorId") String connectorId, @JsonProperty("schemaName") String schemaName, + @JsonProperty("tableName") String tableName, @JsonProperty("uri") URI uri) { + this.schemaName = checkNotNull(schemaName, "schema name is null"); + this.connectorId = checkNotNull(connectorId, "connector id is null"); + this.tableName = checkNotNull(tableName, "table name is null"); + this.uri = checkNotNull(uri, "uri is null"); + + // if ("http".equalsIgnoreCase(uri.getScheme()) || "https".equalsIgnoreCase(uri.getScheme())) { + remotelyAccessible = true; + addresses = ImmutableList.of(HostAddress.fromUri(uri)); + } + + @JsonProperty + public String getConnectorId() { + return connectorId; + } + + @JsonProperty + public String getSchemaName() { + return schemaName; + } + + @JsonProperty + public String getTableName() { + return tableName; + } + + // @JsonProperty + // public URI getUri() + // { + // return uri; + // } + + @Override + public boolean isRemotelyAccessible() { + // only http or https is remotely accessible + return remotelyAccessible; + } + + @Override + public List getAddresses() { + return addresses; + } + + @Override + public Object getInfo() { + return this; + } +} diff --git a/cloudata-structured/src/test/java/com/cloudata/structured/sql/MockSplitManager.java b/cloudata-structured/src/test/java/com/cloudata/structured/sql/MockSplitManager.java new file mode 100644 index 0000000..05bd297 --- /dev/null +++ b/cloudata-structured/src/test/java/com/cloudata/structured/sql/MockSplitManager.java @@ -0,0 +1,80 @@ +package com.cloudata.structured.sql; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; + +import java.net.URI; +import java.util.Collections; +import java.util.List; + +import com.facebook.presto.spi.ConnectorSplitManager; +import com.facebook.presto.spi.Partition; +import com.facebook.presto.spi.PartitionResult; +import com.facebook.presto.spi.Split; +import com.facebook.presto.spi.TableHandle; +import com.facebook.presto.spi.TupleDomain; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; + +public class MockSplitManager implements ConnectorSplitManager { + + final String connectorId; + + public MockSplitManager(String connectorId) { + this.connectorId = connectorId; + } + + @Override + public String getConnectorId() { + return connectorId; + } + + @Override + public boolean canHandle(TableHandle tableHandle) { + return tableHandle instanceof MockTableHandle + && ((MockTableHandle) tableHandle).getConnectorId().equals(connectorId); + } + + @Override + public PartitionResult getPartitions(TableHandle tableHandle, TupleDomain tupleDomain) { + checkArgument(tableHandle instanceof MockTableHandle, "tableHandle is not an instance of MockTableHandle"); + MockTableHandle exampleTableHandle = (MockTableHandle) tableHandle; + + // example connector has only one partition + List partitions = ImmutableList. of(new MockPartition(exampleTableHandle.getSchemaName(), + exampleTableHandle.getTableName())); + // example connector does not do any additional processing/filtering with the TupleDomain, so just return the + // whole TupleDomain + ; + return new PartitionResult(partitions, tupleDomain); + } + + @Override + public Iterable getPartitionSplits(TableHandle tableHandle, List partitions) { + checkNotNull(partitions, "partitions is null"); + checkArgument(partitions.size() == 1, "Expected one partition but got %s", partitions.size()); + Partition partition = partitions.get(0); + + checkArgument(partition instanceof MockPartition, "partition is not an instance of MockPartition"); + MockPartition examplePartition = (MockPartition) partition; + + MockTableHandle exampleTableHandle = (MockTableHandle) tableHandle; + // ExampleTable table = exampleClient.getTable(exampleTableHandle.getSchemaName(), + // exampleTableHandle.getTableName()); + // // this can happen if table is removed during a query + // checkState(table != null, "Table %s.%s no longer exists", exampleTableHandle.getSchemaName(), + // exampleTableHandle.getTableName()); + + List splits = Lists.newArrayList(); + // for (URI uri : table.getSources()) { + // splits.add(new ExampleSplit(connectorId, examplePartition.getSchemaName(), examplePartition.getTableName(), + // uri)); + // } + splits.add(new MockSplit(connectorId, examplePartition.getSchemaName(), examplePartition.getTableName(), URI + .create("http://localhost:1234"))); + Collections.shuffle(splits); + + return splits; + } + +} diff --git a/cloudata-structured/src/test/java/com/cloudata/structured/sql/MockStorageManager.java b/cloudata-structured/src/test/java/com/cloudata/structured/sql/MockStorageManager.java new file mode 100644 index 0000000..d098d41 --- /dev/null +++ b/cloudata-structured/src/test/java/com/cloudata/structured/sql/MockStorageManager.java @@ -0,0 +1,25 @@ +package com.cloudata.structured.sql; + +import com.facebook.presto.metadata.NativeTableHandle; +import com.facebook.presto.metadata.QualifiedTableName; +import com.facebook.presto.storage.StorageManager; + +public class MockStorageManager implements StorageManager { + + @Override + public void insertTableSource(NativeTableHandle tableHandle, QualifiedTableName sourceTableName) { + throw new UnsupportedOperationException(); + } + + @Override + public QualifiedTableName getTableSource(NativeTableHandle tableHandle) { + throw new UnsupportedOperationException(); + } + + @Override + public void dropTableSource(NativeTableHandle tableHandle) { + throw new UnsupportedOperationException(); + + } + +} diff --git a/cloudata-structured/src/test/java/com/cloudata/structured/sql/MockTableHandle.java b/cloudata-structured/src/test/java/com/cloudata/structured/sql/MockTableHandle.java new file mode 100644 index 0000000..8e31276 --- /dev/null +++ b/cloudata-structured/src/test/java/com/cloudata/structured/sql/MockTableHandle.java @@ -0,0 +1,74 @@ +package com.cloudata.structured.sql; + +import java.util.List; +import java.util.Map; + +import org.weakref.jmx.com.google.common.collect.Lists; +import org.weakref.jmx.com.google.common.collect.Maps; + +import com.facebook.presto.spi.ColumnHandle; +import com.facebook.presto.spi.ColumnMetadata; +import com.facebook.presto.spi.ColumnType; +import com.facebook.presto.spi.ConnectorTableMetadata; +import com.facebook.presto.spi.SchemaTableName; +import com.facebook.presto.spi.TableHandle; + +public class MockTableHandle implements TableHandle { + + private final String tableName; + private final ConnectorTableMetadata tableMetadata; + private final List columns; + private final String connectorId; + private final String schemaName; + + public MockTableHandle(String connectorId, String schemaName, String tableName) { + this.connectorId = connectorId; + this.schemaName = schemaName; + this.tableName = tableName; + SchemaTableName schemaTableName = new SchemaTableName(schemaName, tableName); + + List keys = Lists.newArrayList(); + keys.add("key1"); + keys.add("key2"); + + List columns = Lists.newArrayList(); + for (int i = 0; i < keys.size(); i++) { + ColumnType type = ColumnType.STRING; + boolean paritionKey = false; + columns.add(new ColumnMetadata(keys.get(i), type, i, paritionKey)); + } + this.columns = columns; + ConnectorTableMetadata metadata = new ConnectorTableMetadata(schemaTableName, columns); + this.tableMetadata = metadata; + // new TableMetadata(connectorId, metadata); + } + + public ConnectorTableMetadata getTableMetadata() { + return tableMetadata; + } + + public Map getColumnHandles() { + Map handles = Maps.newHashMap(); + for (ColumnMetadata column : columns) { + handles.put(column.getName(), new MockColumnHandle(this, column)); + } + return handles; + } + + public String getConnectorId() { + return connectorId; + } + + public String getTableName() { + return tableName; + } + + public String getSchemaName() { + return schemaName; + } + + public ColumnHandle getColumnHandle(String columnName) { + return getColumnHandles().get(columnName); + } + +} diff --git a/cloudata-structured/src/test/java/com/cloudata/structured/sql/Query.java b/cloudata-structured/src/test/java/com/cloudata/structured/sql/Query.java new file mode 100644 index 0000000..92504a3 --- /dev/null +++ b/cloudata-structured/src/test/java/com/cloudata/structured/sql/Query.java @@ -0,0 +1,442 @@ +//package com.cloudata.structured.sql; +// +//import static com.google.common.base.Preconditions.checkArgument; +//import static com.google.common.base.Preconditions.checkNotNull; +//import io.airlift.units.Duration; +// +//import java.io.Closeable; +//import java.net.URI; +//import java.util.ArrayList; +//import java.util.HashSet; +//import java.util.Iterator; +//import java.util.List; +//import java.util.Set; +//import java.util.concurrent.TimeUnit; +//import java.util.concurrent.atomic.AtomicLong; +// +//import javax.annotation.concurrent.GuardedBy; +//import javax.ws.rs.WebApplicationException; +//import javax.ws.rs.core.Response.Status; +//import javax.ws.rs.core.UriInfo; +// +//import org.slf4j.Logger; +//import org.slf4j.LoggerFactory; +// +//import com.facebook.presto.block.BlockCursor; +//import com.facebook.presto.client.Column; +//import com.facebook.presto.client.FailureInfo; +//import com.facebook.presto.client.QueryError; +//import com.facebook.presto.client.QueryResults; +//import com.facebook.presto.client.StageStats; +//import com.facebook.presto.client.StatementStats; +//import com.facebook.presto.execution.BufferInfo; +//import com.facebook.presto.execution.QueryId; +//import com.facebook.presto.execution.QueryInfo; +//import com.facebook.presto.execution.QueryManager; +//import com.facebook.presto.execution.QueryState; +//import com.facebook.presto.execution.QueryStats; +//import com.facebook.presto.execution.StageInfo; +//import com.facebook.presto.execution.StageState; +//import com.facebook.presto.execution.TaskInfo; +//import com.facebook.presto.operator.ExchangeClient; +//import com.facebook.presto.operator.Page; +//import com.facebook.presto.sql.analyzer.Session; +//import com.facebook.presto.tuple.TupleInfo; +//import com.facebook.presto.util.IterableTransformer; +//import com.google.common.base.Preconditions; +//import com.google.common.base.Predicate; +//import com.google.common.collect.AbstractIterator; +//import com.google.common.collect.ImmutableList; +//import com.google.common.collect.ImmutableSet; +//import com.google.common.collect.Iterables; +//import com.google.common.collect.Lists; +// +//public class Query implements Closeable { +// +// private static final Logger log = LoggerFactory.getLogger(Query.class); +// +// private final QueryManager queryManager; +// private final QueryId queryId; +// private final ExchangeClient exchangeClient; +// +// private final AtomicLong resultId = new AtomicLong(); +// +// @GuardedBy("this") +// private QueryResults lastResult; +// +// @GuardedBy("this") +// private String lastResultPath; +// +// @GuardedBy("this") +// private List columns; +// +// public Query(Session session, String query, QueryManager queryManager, ExchangeClient exchangeClient) { +// checkNotNull(session, "session is null"); +// checkNotNull(query, "query is null"); +// checkNotNull(queryManager, "queryManager is null"); +// checkNotNull(exchangeClient, "exchangeClient is null"); +// +// this.queryManager = queryManager; +// +// QueryInfo queryInfo = queryManager.createQuery(session, query); +// queryId = queryInfo.getQueryId(); +// this.exchangeClient = exchangeClient; +// } +// +// @Override +// public void close() { +// queryManager.cancelQuery(queryId); +// } +// +// public QueryId getQueryId() { +// return queryId; +// } +// +// public synchronized QueryResults getResults(long token, UriInfo uriInfo, Duration maxWaitTime) +// throws InterruptedException { +// // is the a repeated request for the last results? +// String requestedPath = uriInfo.getAbsolutePath().getPath(); +// if (lastResultPath != null && requestedPath.equals(lastResultPath)) { +// // tell query manager we are still interested in the query +// queryManager.getQueryInfo(queryId); +// return lastResult; +// } +// +// if (token < resultId.get()) { +// throw new WebApplicationException(Status.GONE); +// } +// +// // if this is not a request for the next results, return not found +// if (lastResult.getNextUri() == null || !requestedPath.equals(lastResult.getNextUri().getPath())) { +// // unknown token +// throw new WebApplicationException(Status.NOT_FOUND); +// } +// +// return getNextResults(uriInfo, maxWaitTime); +// } +// +// public synchronized QueryResults getNextResults(UriInfo uriInfo, Duration maxWaitTime) throws InterruptedException { +// Iterable> data = getData(maxWaitTime); +// +// // get the query info before returning +// // force update if query manager is closed +// QueryInfo queryInfo = queryManager.getQueryInfo(queryId); +// +// // if we have received all of the output data and the query is not marked as done, wait for the query to finish +// if (exchangeClient.isClosed() && !queryInfo.getState().isDone()) { +// queryManager.waitForStateChange(queryId, queryInfo.getState(), maxWaitTime); +// queryInfo = queryManager.getQueryInfo(queryId); +// } +// +// // close exchange client if the query has failed +// if (queryInfo.getState().isDone()) { +// if (queryInfo.getState() != QueryState.FINISHED) { +// exchangeClient.close(); +// } else if (queryInfo.getOutputStage() == null) { +// // For simple executions (e.g. drop table), there will never be an output stage, +// // so close the exchange as soon as the query is done. +// exchangeClient.close(); +// +// // this is a hack to suppress the warn message in the client saying that there are no columns. +// // The reason for this is that the current API definition assumes that everything is a query, +// // so statements without results produce an error in the client otherwise. +// // +// // TODO: add support to the API for non-query statements. +// columns = ImmutableList.of(new Column("result", "varchar")); +// data = ImmutableSet.> of(ImmutableList. of("true")); +// } +// } +// +// // only return a next if the query is not done or there is more data to send (due to buffering) +// URI nextResultsUri = null; +// if ((!queryInfo.getState().isDone()) || (!exchangeClient.isClosed())) { +// nextResultsUri = createNextResultsUri(uriInfo); +// } +// +// // first time through, self is null +// QueryResults queryResults = new QueryResults(queryId.toString(), uriInfo.getRequestUriBuilder() +// .replaceQuery("").replacePath(queryInfo.getSelf().getPath()).build(), +// findCancelableLeafStage(queryInfo), nextResultsUri, columns, data, toStatementStats(queryInfo), +// toQueryError(queryInfo)); +// +// // cache the last results +// if (lastResult != null) { +// lastResultPath = lastResult.getNextUri().getPath(); +// } else { +// lastResultPath = null; +// } +// lastResult = queryResults; +// return queryResults; +// } +// +// private synchronized Iterable> getData(Duration maxWait) throws InterruptedException { +// // wait for query to start +// QueryInfo queryInfo = queryManager.getQueryInfo(queryId); +// while (maxWait.toMillis() > 1 && !isQueryStarted(queryInfo)) { +// maxWait = queryManager.waitForStateChange(queryId, queryInfo.getState(), maxWait); +// queryInfo = queryManager.getQueryInfo(queryId); +// } +// +// // if query did not finish starting or does not have output, just return +// if (!isQueryStarted(queryInfo) || queryInfo.getOutputStage() == null) { +// return null; +// } +// +// if (columns == null) { +// columns = createColumnsList(queryInfo); +// } +// +// updateExchangeClient(queryInfo.getOutputStage()); +// +// ImmutableList.Builder pages = ImmutableList.builder(); +// // wait up to max wait for data to arrive; then try to return at least DESIRED_RESULT_BYTES +// int bytes = 0; +// while (bytes < DESIRED_RESULT_BYTES) { +// Page page = exchangeClient.getNextPage(maxWait); +// if (page == null) { +// break; +// } +// bytes += page.getDataSize().toBytes(); +// pages.add(new RowIterable(page)); +// +// // only wait on first call +// maxWait = new Duration(0, TimeUnit.MILLISECONDS); +// } +// +// if (bytes == 0) { +// return null; +// } +// +// return Iterables.concat(pages.build()); +// } +// +// private static boolean isQueryStarted(QueryInfo queryInfo) { +// QueryState state = queryInfo.getState(); +// return state != QueryState.QUEUED && queryInfo.getState() != QueryState.PLANNING +// && queryInfo.getState() != QueryState.STARTING; +// } +// +// private synchronized void updateExchangeClient(StageInfo outputStage) { +// // update the exchange client with any additional locations +// for (TaskInfo taskInfo : outputStage.getTasks()) { +// List buffers = taskInfo.getOutputBuffers().getBuffers(); +// Preconditions.checkState(buffers.size() == 1, "Expected a single output buffer for task %s, but found %s", +// taskInfo.getTaskId(), buffers); +// +// String bufferId = Iterables.getOnlyElement(buffers).getBufferId(); +// URI uri = uriBuilderFrom(taskInfo.getSelf()).appendPath("results").appendPath(bufferId).build(); +// exchangeClient.addLocation(uri); +// } +// if ((outputStage.getState() != StageState.PLANNED) && (outputStage.getState() != StageState.SCHEDULING)) { +// exchangeClient.noMoreLocations(); +// } +// } +// +// private synchronized URI createNextResultsUri(UriInfo uriInfo) { +// return uriInfo.getBaseUriBuilder().replacePath("/v1/statement").path(queryId.toString()) +// .path(String.valueOf(resultId.incrementAndGet())).replaceQuery("").build(); +// } +// +// private static List createColumnsList(QueryInfo queryInfo) { +// checkNotNull(queryInfo, "queryInfo is null"); +// StageInfo outputStage = queryInfo.getOutputStage(); +// if (outputStage == null) { +// checkNotNull(outputStage, "outputStage is null"); +// } +// +// List names = queryInfo.getFieldNames(); +// ArrayList types = new ArrayList<>(); +// for (TupleInfo tupleInfo : outputStage.getTupleInfos()) { +// types.addAll(tupleInfo.getTypes()); +// } +// +// checkArgument(names.size() == types.size(), "names and types size mismatch"); +// +// ImmutableList.Builder list = ImmutableList.builder(); +// for (int i = 0; i < names.size(); i++) { +// String name = names.get(i); +// Type type = types.get(i); +// switch (type) { +// case BOOLEAN: +// list.add(new Column(name, "boolean")); +// break; +// case FIXED_INT_64: +// list.add(new Column(name, "bigint")); +// break; +// case DOUBLE: +// list.add(new Column(name, "double")); +// break; +// case VARIABLE_BINARY: +// list.add(new Column(name, "varchar")); +// break; +// default: +// throw new IllegalArgumentException("unhandled type: " + type); +// } +// } +// return list.build(); +// } +// +// private static StatementStats toStatementStats(QueryInfo queryInfo) { +// QueryStats queryStats = queryInfo.getQueryStats(); +// +// return StatementStats.builder().setState(queryInfo.getState().toString()).setScheduled(isScheduled(queryInfo)) +// .setNodes(globalUniqueNodes(queryInfo.getOutputStage()).size()) +// .setTotalSplits(queryStats.getTotalDrivers()).setQueuedSplits(queryStats.getQueuedDrivers()) +// .setRunningSplits(queryStats.getRunningDrivers()).setCompletedSplits(queryStats.getCompletedDrivers()) +// .setUserTimeMillis(queryStats.getTotalUserTime().toMillis()) +// .setCpuTimeMillis(queryStats.getTotalCpuTime().toMillis()) +// .setWallTimeMillis(queryStats.getTotalScheduledTime().toMillis()) +// .setProcessedRows(queryStats.getRawInputPositions()) +// .setProcessedBytes(queryStats.getRawInputDataSize().toBytes()) +// .setRootStage(toStageStats(queryInfo.getOutputStage())).build(); +// } +// +// private static StageStats toStageStats(StageInfo stageInfo) { +// if (stageInfo == null) { +// return null; +// } +// +// com.facebook.presto.execution.StageStats stageStats = stageInfo.getStageStats(); +// +// ImmutableList.Builder subStages = ImmutableList.builder(); +// for (StageInfo subStage : stageInfo.getSubStages()) { +// subStages.add(toStageStats(subStage)); +// } +// +// Set uniqueNodes = new HashSet<>(); +// for (TaskInfo task : stageInfo.getTasks()) { +// // todo add nodeId to TaskInfo +// URI uri = task.getSelf(); +// uniqueNodes.add(uri.getHost() + ":" + uri.getPort()); +// } +// +// return StageStats.builder().setStageId(String.valueOf(stageInfo.getStageId().getId())) +// .setState(stageInfo.getState().toString()).setDone(stageInfo.getState().isDone()) +// .setNodes(uniqueNodes.size()).setTotalSplits(stageStats.getTotalDrivers()) +// .setQueuedSplits(stageStats.getQueuedDrivers()).setRunningSplits(stageStats.getRunningDrivers()) +// .setCompletedSplits(stageStats.getCompletedDrivers()) +// .setUserTimeMillis(stageStats.getTotalUserTime().toMillis()) +// .setCpuTimeMillis(stageStats.getTotalCpuTime().toMillis()) +// .setWallTimeMillis(stageStats.getTotalScheduledTime().toMillis()) +// .setProcessedRows(stageStats.getRawInputPositions()) +// .setProcessedBytes(stageStats.getRawInputDataSize().toBytes()).setSubStages(subStages.build()).build(); +// } +// +// private static Set globalUniqueNodes(StageInfo stageInfo) { +// if (stageInfo == null) { +// return ImmutableSet.of(); +// } +// ImmutableSet.Builder nodes = ImmutableSet.builder(); +// for (TaskInfo task : stageInfo.getTasks()) { +// // todo add nodeId to TaskInfo +// URI uri = task.getSelf(); +// nodes.add(uri.getHost() + ":" + uri.getPort()); +// } +// +// for (StageInfo subStage : stageInfo.getSubStages()) { +// nodes.addAll(globalUniqueNodes(subStage)); +// } +// return nodes.build(); +// } +// +// private static boolean isScheduled(QueryInfo queryInfo) { +// StageInfo stage = queryInfo.getOutputStage(); +// if (stage == null) { +// return false; +// } +// return IterableTransformer.on(getAllStages(stage)).transform(stageStateGetter()).all(isStageRunningOrDone()); +// } +// +// private static Predicate isStageRunningOrDone() { +// return new Predicate() { +// @Override +// public boolean apply(StageState state) { +// return (state == StageState.RUNNING) || state.isDone(); +// } +// }; +// } +// +// private static URI findCancelableLeafStage(QueryInfo queryInfo) { +// if (queryInfo.getOutputStage() == null) { +// // query is not running yet, cannot cancel leaf stage +// return null; +// } +// +// // query is running, find the leaf-most running stage +// return findCancelableLeafStage(queryInfo.getOutputStage()); +// } +// +// private static URI findCancelableLeafStage(StageInfo stage) { +// // if this stage is already done, we can't cancel it +// if (stage.getState().isDone()) { +// return null; +// } +// +// // attempt to find a cancelable sub stage +// // check in reverse order since build side of a join will be later in the list +// for (StageInfo subStage : Lists.reverse(stage.getSubStages())) { +// URI leafStage = findCancelableLeafStage(subStage); +// if (leafStage != null) { +// return leafStage; +// } +// } +// +// // no matching sub stage, so return this stage +// return stage.getSelf(); +// } +// +// private static QueryError toQueryError(QueryInfo queryInfo) { +// FailureInfo failure = queryInfo.getFailureInfo(); +// if (failure == null) { +// QueryState state = queryInfo.getState(); +// if ((!state.isDone()) || (state == QueryState.FINISHED)) { +// return null; +// } +// log.warn("Query %s in state %s has no failure info", queryInfo.getQueryId(), state); +// failure = toFailure(new RuntimeException(format("Query is %s (reason unknown)", state))); +// } +// return new QueryError(failure.getMessage(), null, 0, failure.getErrorLocation(), failure); +// } +// +// private static class RowIterable implements Iterable> { +// private final Page page; +// +// private RowIterable(Page page) { +// this.page = checkNotNull(page, "page is null"); +// } +// +// @Override +// public Iterator> iterator() { +// return new RowIterator(page); +// } +// } +// +// private static class RowIterator extends AbstractIterator> { +// private final BlockCursor[] cursors; +// private final int columnCount; +// +// private RowIterator(Page page) { +// int columnCount = 0; +// cursors = new BlockCursor[page.getChannelCount()]; +// for (int channel = 0; channel < cursors.length; channel++) { +// cursors[channel] = page.getBlock(channel).cursor(); +// columnCount = cursors[channel].getTupleInfo().getFieldCount(); +// } +// this.columnCount = columnCount; +// } +// +// @Override +// protected List computeNext() { +// List row = new ArrayList<>(columnCount); +// for (BlockCursor cursor : cursors) { +// if (!cursor.advanceNextPosition()) { +// Preconditions.checkState(row.isEmpty(), "Page is unaligned"); +// return endOfData(); +// } +// +// row.addAll(cursor.getTuple().toValues()); +// } +// return row; +// } +// } +// } diff --git a/cloudata-structured/src/test/java/com/cloudata/structured/sql/RecursivePlanVisitor.java b/cloudata-structured/src/test/java/com/cloudata/structured/sql/RecursivePlanVisitor.java new file mode 100644 index 0000000..409a4b6 --- /dev/null +++ b/cloudata-structured/src/test/java/com/cloudata/structured/sql/RecursivePlanVisitor.java @@ -0,0 +1,140 @@ +package com.cloudata.structured.sql; + +import com.facebook.presto.sql.planner.plan.AggregationNode; +import com.facebook.presto.sql.planner.plan.ExchangeNode; +import com.facebook.presto.sql.planner.plan.FilterNode; +import com.facebook.presto.sql.planner.plan.JoinNode; +import com.facebook.presto.sql.planner.plan.LimitNode; +import com.facebook.presto.sql.planner.plan.MaterializedViewWriterNode; +import com.facebook.presto.sql.planner.plan.OutputNode; +import com.facebook.presto.sql.planner.plan.PlanNode; +import com.facebook.presto.sql.planner.plan.PlanVisitor; +import com.facebook.presto.sql.planner.plan.ProjectNode; +import com.facebook.presto.sql.planner.plan.SampleNode; +import com.facebook.presto.sql.planner.plan.SemiJoinNode; +import com.facebook.presto.sql.planner.plan.SinkNode; +import com.facebook.presto.sql.planner.plan.SortNode; +import com.facebook.presto.sql.planner.plan.TableScanNode; +import com.facebook.presto.sql.planner.plan.TopNNode; +import com.facebook.presto.sql.planner.plan.UnionNode; +import com.facebook.presto.sql.planner.plan.WindowNode; + +public class RecursivePlanVisitor extends PlanVisitor { + + // @Override + // protected R visitPlan(PlanNode node, C context) { + // return super.visitPlan(node, context); + // } + + @Override + public R visitExchange(ExchangeNode node, C context) { + return super.visitExchange(node, context); + } + + @Override + public R visitAggregation(AggregationNode node, C context) { + node.getSource().accept(this, context); + + return super.visitAggregation(node, context); + } + + @Override + public R visitFilter(FilterNode node, C context) { + node.getSource().accept(this, context); + + return super.visitFilter(node, context); + } + + @Override + public R visitProject(ProjectNode node, C context) { + node.getSource().accept(this, context); + + return super.visitProject(node, context); + } + + @Override + public R visitTopN(TopNNode node, C context) { + node.getSource().accept(this, context); + + return super.visitTopN(node, context); + } + + @Override + public R visitOutput(OutputNode node, C context) { + node.getSource().accept(this, context); + + return super.visitOutput(node, context); + } + + @Override + public R visitLimit(LimitNode node, C context) { + node.getSource().accept(this, context); + + return super.visitLimit(node, context); + } + + @Override + public R visitSample(SampleNode node, C context) { + node.getSource().accept(this, context); + + return super.visitSample(node, context); + } + + @Override + public R visitTableScan(TableScanNode node, C context) { + return super.visitTableScan(node, context); + } + + @Override + public R visitJoin(JoinNode node, C context) { + node.getLeft().accept(this, context); + node.getRight().accept(this, context); + + return super.visitJoin(node, context); + } + + @Override + public R visitSemiJoin(SemiJoinNode node, C context) { + node.getSource().accept(this, context); + node.getFilteringSource().accept(this, context); + + return super.visitSemiJoin(node, context); + } + + @Override + public R visitSort(SortNode node, C context) { + node.getSource().accept(this, context); + + return super.visitSort(node, context); + } + + @Override + public R visitSink(SinkNode node, C context) { + node.getSource().accept(this, context); + + return super.visitSink(node, context); + } + + @Override + public R visitWindow(WindowNode node, C context) { + node.getSource().accept(this, context); + + return super.visitWindow(node, context); + } + + @Override + public R visitMaterializedViewWriter(MaterializedViewWriterNode node, C context) { + node.getSource().accept(this, context); + + return super.visitMaterializedViewWriter(node, context); + } + + @Override + public R visitUnion(UnionNode node, C context) { + for (PlanNode subPlanNode : node.getSources()) { + subPlanNode.accept(this, context); + } + return super.visitUnion(node, context); + } + +} diff --git a/cloudata-structured/src/test/java/com/cloudata/structured/sql/SqlTest.java b/cloudata-structured/src/test/java/com/cloudata/structured/sql/SqlTest.java new file mode 100644 index 0000000..e595d8e --- /dev/null +++ b/cloudata-structured/src/test/java/com/cloudata/structured/sql/SqlTest.java @@ -0,0 +1,458 @@ +package com.cloudata.structured.sql; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; + +import java.net.URI; +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +import org.junit.Assert; +import org.junit.Test; + +import com.cloudata.structured.sql.provider.CloudataConnectorMetadata; +import com.cloudata.structured.sql.provider.CloudataSplitManager; +import com.cloudata.structured.sql.simple.SimpleNode; +import com.cloudata.structured.sql.simple.SimpleTreePrinter; +import com.facebook.presto.client.Column; +import com.facebook.presto.connector.dual.DualMetadata; +import com.facebook.presto.connector.dual.DualSplitManager; +import com.facebook.presto.execution.LocationFactory; +import com.facebook.presto.execution.NodeScheduler; +import com.facebook.presto.execution.NodeSchedulerConfig; +import com.facebook.presto.execution.QueryId; +import com.facebook.presto.execution.QueryInfo; +import com.facebook.presto.execution.QueryState; +import com.facebook.presto.execution.RemoteTaskFactory; +import com.facebook.presto.execution.SqlQueryExecution; +import com.facebook.presto.execution.StageInfo; +import com.facebook.presto.execution.StateMachine.StateChangeListener; +import com.facebook.presto.importer.PeriodicImportManager; +import com.facebook.presto.metadata.InMemoryNodeManager; +import com.facebook.presto.metadata.MetadataManager; +import com.facebook.presto.metadata.NodeManager; +import com.facebook.presto.metadata.ShardManager; +import com.facebook.presto.spi.ConnectorSplitManager; +import com.facebook.presto.split.SplitManager; +import com.facebook.presto.sql.analyzer.Analysis; +import com.facebook.presto.sql.analyzer.Analyzer; +import com.facebook.presto.sql.analyzer.QueryExplainer; +import com.facebook.presto.sql.analyzer.Session; +import com.facebook.presto.sql.parser.SqlParser; +import com.facebook.presto.sql.planner.LogicalPlanner; +import com.facebook.presto.sql.planner.Plan; +import com.facebook.presto.sql.planner.PlanNodeIdAllocator; +import com.facebook.presto.sql.planner.PlanOptimizersFactory; +import com.facebook.presto.sql.planner.PlanPrinter; +import com.facebook.presto.sql.planner.optimizations.PlanOptimizer; +import com.facebook.presto.sql.tree.Statement; +import com.facebook.presto.storage.StorageManager; +import com.facebook.presto.tuple.TupleInfo; +import com.facebook.presto.tuple.TupleInfo.Type; +import com.google.common.base.Optional; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Sets; + +public class SqlTest { + String connectorId = "connector0"; + String catalog = "default"; + + // @Test + public void execute() throws InterruptedException { + MetadataManager metadata = new MetadataManager(); + { + DualMetadata dualMetadata = new DualMetadata(); + metadata.addInternalSchemaMetadata(MetadataManager.INTERNAL_CONNECTOR_ID, dualMetadata); + } + + // TaskExecutor taskExecutor = new TaskExecutor(8); + // taskExecutor.start(); + + InMemoryNodeManager nodeManager = new InMemoryNodeManager(); + + SplitManager splitManager = new SplitManager(Sets. newHashSet()); + { + DualSplitManager dualSplitManager = new DualSplitManager(nodeManager); + splitManager.addConnectorSplitManager(dualSplitManager); + } + + QueryId queryId = new QueryId(UUID.randomUUID().toString().replace("-", "")); + // String query = "SELECT * FROM table1 JOIN table2 ON table1.key1=table2.key1 WHERE table1.key2='1'"; + String query = "SELECT " + DualMetadata.COLUMN_NAME + " FROM " + DualMetadata.NAME; + + // URI self = URI.create("localhost"); + + Statement statement = SqlParser.createStatement(query); + + // URI baseUri = URI.create("http://localhost:1234"); + // Node me = new Node("me", baseUri, NodeVersion.UNKNOWN); + + // NodeManager nodeManager = new InMemoryNodeManager(me); + NodeSchedulerConfig config = new NodeSchedulerConfig(); + NodeScheduler nodeScheduler = new NodeScheduler(nodeManager, config); + // + List planOptimizers = buildPlanOptimizers(metadata, splitManager); + // + + ExecutorService queryExecutor = Executors.newCachedThreadPool(); // threadsNamed("query-scheduler-%d")); + + RemoteTaskFactory remoteTaskFactory = new FakeRemoteTaskFactory(queryExecutor); + LocationFactory locationFactory = new MockLocationFactory(); // nodeManager, baseUri); + int maxPendingSplitsPerNode = 1; + int initialHashPartitions = 1; + ShardManager shardManager = new MockShardManager(); + StorageManager storageManager = new MockStorageManager(); + PeriodicImportManager periodicImportManager = new MockPeriodicImportManager(); + + URI selfUri = nodeManager.getCurrentNode().getHttpUri(); + SqlQueryExecution queryExecution = new SqlQueryExecution(queryId, query, buildSession(), selfUri, statement, + metadata, splitManager, nodeScheduler, planOptimizers, remoteTaskFactory, locationFactory, + maxPendingSplitsPerNode, initialHashPartitions, queryExecutor, shardManager, storageManager, + periodicImportManager); + + queryExecution.addStateChangeListener(new StateChangeListener() { + + @Override + public void stateChanged(QueryState newValue) { + System.out.println("New state: " + newValue); + + } + }); + + queryExecution.start(); + + StageInfo outputStage = queryExecution.getQueryInfo().getOutputStage(); + Thread.sleep(30000); + + System.out.println(queryExecution.getQueryInfo().getFailureInfo()); + + // List sources = ImmutableList. of(new TaskSource(tableScanNodeId, ImmutableSet + // .of(new ScheduledSplit(0, split)), true)); + + // taskInfo = sqlTaskManager.getTaskInfo(taskInfo.getTaskId(), false); + // assertEquals(taskInfo.getState(), TaskState.RUNNING); + // + // BufferResult results = sqlTaskManager.getTaskResults(taskId, "out", 0, new DataSize(1, Unit.MEGABYTE), + // new Duration(1, TimeUnit.SECONDS)); + // assertEquals(results.isBufferClosed(), false); + // assertEquals(results.getPages().size(), 1); + // assertEquals(results.getPages().get(0).getPositionCount(), 1); + // + // results = sqlTaskManager.getTaskResults(taskId, "out", results.getToken() + results.getPages().size(), + // new DataSize(1, Unit.MEGABYTE), new Duration(1, TimeUnit.SECONDS)); + // // todo this should be true + // assertEquals(results.isBufferClosed(), false); + // assertEquals(results.getPages().size(), 0); + // + // sqlTaskManager.waitForStateChange(taskInfo.getTaskId(), taskInfo.getState(), new Duration(1, + // TimeUnit.SECONDS)); + // taskInfo = sqlTaskManager.getTaskInfo(taskInfo.getTaskId(), false); + // assertEquals(taskInfo.getState(), TaskState.FINISHED); + // taskInfo = sqlTaskManager.getTaskInfo(taskInfo.getTaskId(), false); + // assertEquals(taskInfo.getState(), TaskState.FINISHED); + } + + private static List createColumnsList(QueryInfo queryInfo) { + checkNotNull(queryInfo, "queryInfo is null"); + StageInfo outputStage = queryInfo.getOutputStage(); + if (outputStage == null) { + checkNotNull(outputStage, "outputStage is null"); + } + + List names = queryInfo.getFieldNames(); + ArrayList types = new ArrayList<>(); + for (TupleInfo tupleInfo : outputStage.getTupleInfos()) { + types.addAll(tupleInfo.getTypes()); + } + + checkArgument(names.size() == types.size(), "names and types size mismatch"); + + ImmutableList.Builder list = ImmutableList.builder(); + for (int i = 0; i < names.size(); i++) { + String name = names.get(i); + Type type = types.get(i); + switch (type) { + case BOOLEAN: + list.add(new Column(name, "boolean")); + break; + case FIXED_INT_64: + list.add(new Column(name, "bigint")); + break; + case DOUBLE: + list.add(new Column(name, "double")); + break; + case VARIABLE_BINARY: + list.add(new Column(name, "varchar")); + break; + default: + throw new IllegalArgumentException("unhandled type: " + type); + } + } + return list.build(); + } + + // @Test + // public void runQuery() throws IOException, InterruptedException { + // DualMetadata dualMetadata = new DualMetadata(); + // // TableHandle tableHandle = dualMetadata.getTableHandle(new SchemaTableName("default", DualMetadata.NAME)); + // // assertNotNull(tableHandle); + // // + // // ColumnHandle columnHandle = dualMetadata.getColumnHandle(tableHandle, DualMetadata.COLUMN_NAME); + // // assertNotNull(columnHandle); + // // Symbol symbol = new Symbol(DualMetadata.COLUMN_NAME); + // + // MetadataManager metadata = new MetadataManager(); + // metadata.addInternalSchemaMetadata(MetadataManager.INTERNAL_CONNECTOR_ID, dualMetadata); + // + // TaskExecutor taskExecutor = new TaskExecutor(8); + // taskExecutor.start(); + // + // InMemoryNodeManager nodeManager = new InMemoryNodeManager(); + // + // SplitManager splitManager = new SplitManager(Sets. newHashSet()); + // { + // DualSplitManager dualSplitManager = new DualSplitManager(nodeManager); + // splitManager.addConnectorSplitManager(dualSplitManager); + // } + // + // StorageManager storageManager = new MockStorageManager(); + // PeriodicImportManager periodicImportManager = new MockPeriodicImportManager(); + // + // List planOptimizers = buildPlanOptimizers(metadata, splitManager); + // + // PlanNodeIdAllocator idAllocator = new PlanNodeIdAllocator(); + // + // for (int i = 0; i < 5; i++) { + // String sql = "SELECT " + DualMetadata.COLUMN_NAME + " FROM " + DualMetadata.NAME; + // + // Statement statement = SqlParser.createStatement(sql); + // + // System.out.println("Statement: " + statement); + // + // Session session = buildSession(); + // QueryExplainer queryExplainer = new QueryExplainer(session, planOptimizers, metadata, + // periodicImportManager, storageManager); + // // analyze query + // Analyzer analyzer = new Analyzer(session, metadata, Optional.of(queryExplainer)); + // + // Analysis analysis = analyzer.analyze(statement); + // + // System.out.println("analysis: " + analysis); + // + // // plan query + // LogicalPlanner logicalPlanner = new LogicalPlanner(session, planOptimizers, idAllocator, metadata, + // periodicImportManager, storageManager); + // Plan plan = logicalPlanner.plan(analysis); + // + // String p = PlanPrinter.textLogicalPlan(plan.getRoot(), plan.getTypes()); + // System.out.println("plan: " + p); + // + // List inputs = new InputExtractor(metadata).extract(plan.getRoot()); + // // stateMachine.setInputs(inputs); + // + // // fragment the plan + // SubPlan subplan = new DistributedLogicalPlanner(metadata, idAllocator).createSubPlans(plan, false); + // + // PlanFragment planFragment = subplan.getFragment(); + // // PartitionResult partitionResult = splitManager.getPartitions(tableHandle, + // // Optional.of(TupleDomain.all())); + // // Split split = Iterables.getOnlyElement(splitManager.getPartitionSplits(tableHandle, + // // partitionResult.getPartitions()).getSplits()); + // + // LocalExecutionPlanner planner = new LocalExecutionPlanner(new NodeInfo("test"), metadata, + // new DataStreamManager(new DualDataStreamProvider()), new MockLocalStorageManager(), + // new MockExchangeClientSupplier(), new ExpressionCompiler(metadata)); + // + // SqlTaskManager sqlTaskManager = new SqlTaskManager(planner, new MockLocationFactory(), taskExecutor, + // new QueryMonitor(new ObjectMapperProvider().get(), new NullEventClient(), new NodeInfo("test")), + // new TaskManagerConfig()); + // + // // PlanNodeId tableScanNodeId = new PlanNodeId("tableScan"); + // // PlanFragment testFragment = new PlanFragment(new PlanFragmentId("fragment"), new TableScanNode( + // // tableScanNodeId, tableHandle, ImmutableList.of(symbol), ImmutableMap.of(symbol, columnHandle), + // // Optional. absent()), ImmutableMap. of(symbol, Type.VARCHAR), + // // Partitioning.SOURCE, tableScanNodeId); + // + // TaskId taskId = new TaskId("query", "stage", "task"); + // + // List sources; + // { + // ImmutableList.Builder sb = ImmutableList.builder(); + // for (PlanNodeId planNodeId : planFragment.getSourceIds()) { + // ImmutableMultimap.Builder initialSplits = ImmutableMultimap.builder(); + // if (sourceId != null) { + // initialSplits.put(sourceId, sourceSplit); + // } + // for (Entry entry : exchangeLocations.get().entries()) { + // initialSplits.put(entry.getKey(), + // createRemoteSplitFor(node.getNodeIdentifier(), entry.getValue())); + // } + // + // Set splits = pendingSplits.get(planNodeId); + // boolean noMoreSplits = this.noMoreSplits.contains(planNodeId); + // if (!splits.isEmpty() || noMoreSplits) { + // sb.add(new TaskSource(planNodeId, splits, noMoreSplits)); + // } + // } + // sources = sb.build(); + // } + // // List sources = ImmutableList. of(new TaskSource(tableScanNodeId, ImmutableSet + // // .of(new ScheduledSplit(0, split)), true)); + // OutputBuffers outputBuffers = INITIAL_EMPTY_OUTPUT_BUFFERS.withBuffer("out", + // new UnpartitionedPagePartitionFunction()).withNoMoreBufferIds(); + // TaskInfo taskInfo = sqlTaskManager.updateTask(session, taskId, testFragment, sources, outputBuffers); + // assertEquals(taskInfo.getState(), TaskState.RUNNING); + // + // taskInfo = sqlTaskManager.getTaskInfo(taskInfo.getTaskId(), false); + // assertEquals(taskInfo.getState(), TaskState.RUNNING); + // + // BufferResult results = sqlTaskManager.getTaskResults(taskId, "out", 0, new DataSize(1, Unit.MEGABYTE), + // new Duration(1, TimeUnit.SECONDS)); + // assertEquals(results.isBufferClosed(), false); + // assertEquals(results.getPages().size(), 1); + // assertEquals(results.getPages().get(0).getPositionCount(), 1); + // + // results = sqlTaskManager.getTaskResults(taskId, "out", results.getToken() + results.getPages().size(), + // new DataSize(1, Unit.MEGABYTE), new Duration(1, TimeUnit.SECONDS)); + // // todo this should be true + // assertEquals(results.isBufferClosed(), false); + // assertEquals(results.getPages().size(), 0); + // + // sqlTaskManager.waitForStateChange(taskInfo.getTaskId(), taskInfo.getState(), new Duration(1, + // TimeUnit.SECONDS)); + // taskInfo = sqlTaskManager.getTaskInfo(taskInfo.getTaskId(), false); + // assertEquals(taskInfo.getState(), TaskState.FINISHED); + // taskInfo = sqlTaskManager.getTaskInfo(taskInfo.getTaskId(), false); + // assertEquals(taskInfo.getState(), TaskState.FINISHED); + // } + // } + + private Session buildSession() { + String user = "user"; + String source = "test"; + String schema = "default"; + String remoteUserAddress = "remoteUserAddress"; + String userAgent = "userAgent"; + Session session = new Session(user, source, catalog, schema, remoteUserAddress, userAgent); + return session; + } + + @Test + public void testParse() { + // String sql = "SELECT * FROM table1 JOIN table2 ON table1.key1=table2.key1 WHERE table1.key2='1'"; + // String sql = "SELECT * FROM table1 WHERE table1.key2>'1'"; + + InMemoryNodeManager nodeManager = new InMemoryNodeManager(); + + MetadataManager metadata = buildMetadata(); + StorageManager storageManager = new MockStorageManager(); + PeriodicImportManager periodicImportManager = new MockPeriodicImportManager(); + + SplitManager splitManager = buildSplitManager(nodeManager); + List planOptimizers = buildPlanOptimizers(metadata, splitManager); + + for (int i = 0; i < 1; i++) { + // String sql = "SELECT key1 as k1, key2 || 'hello' as k2, 'world' as k3 FROM table1"; + String sql = "SELECT * FROM table1 JOIN table1 t2 ON table1.key1=t2.key1"; + + Statement statement = SqlParser.createStatement(sql); + + // System.out.println("Statement: " + statement); + + Session session = buildSession(); + QueryExplainer queryExplainer = new QueryExplainer(session, planOptimizers, metadata, + periodicImportManager, storageManager); + // analyze query + Analyzer analyzer = new Analyzer(session, metadata, Optional.of(queryExplainer)); + + Analysis analysis = analyzer.analyze(statement); + + // System.out.println("analysis: " + analysis); + + PlanNodeIdAllocator idAllocator = new PlanNodeIdAllocator(); + // plan query + LogicalPlanner logicalPlanner = new LogicalPlanner(session, planOptimizers, idAllocator, metadata, + periodicImportManager, storageManager); + Plan plan = logicalPlanner.plan(analysis); + + TableScanCountVisitor visitor = new TableScanCountVisitor(); + plan.getRoot().accept(visitor, 0); + // Assert.assertEquals(1, visitor.count); + String p = PlanPrinter.textLogicalPlan(plan.getRoot(), plan.getTypes()); + + System.out.println("plan: " + p); + } + + // plan: - Output[key1, key2] + // - TableScan[com.cloudata.structured.sql.MockTableHandle@6c6dba0d, domain={}] => [key1:varchar, key2:varchar] + // key1 := com.cloudata.structured.sql.MockColumnHandle@319560e6 + // key2 := com.cloudata.structured.sql.MockColumnHandle@460cb578 + + // plan: - Output[key1, key2] + // - Filter[("key2" > '1')] => [key1:varchar, key2:varchar] + // - TableScan[com.cloudata.structured.sql.MockTableHandle@6d3a3c8e, domain={}] => [key1:varchar, key2:varchar] + // key1 := com.cloudata.structured.sql.MockColumnHandle@5bb10cf0 + // key2 := com.cloudata.structured.sql.MockColumnHandle@6311c509 + // + + // List inputs = new InputExtractor(metadata).extract(plan.getRoot()); + // stateMachine.setInputs(inputs); + + // // fragment the plan + // SubPlan subplan = new DistributedLogicalPlanner(metadata, idAllocator).createSubPlans(plan, false); + // + // stateMachine.recordAnalysisTime(analysisStart); + // return subplan; + // } + + } + + @Test + public void testEngine() { + MetadataManager metadata = buildMetadata(); + StorageManager storageManager = new MockStorageManager(); + PeriodicImportManager periodicImportManager = new MockPeriodicImportManager(); + + NodeManager nodeManager = new InMemoryNodeManager(); + + SplitManager splitManager = buildSplitManager(nodeManager); + List planOptimizers = buildPlanOptimizers(metadata, splitManager); + + SqlSession session = new SqlSession(); + SqlEngine engine = new SqlEngine(metadata, planOptimizers, periodicImportManager, storageManager); + + String sql = "SELECT key1 as k1 FROM table1"; + + SqlStatement statement = engine.parse(session, sql); + Assert.assertTrue(statement.isSimple()); + + SimpleNode simple = statement.getSimple(); + Assert.assertNotNull(simple); + + System.out.println(SimpleTreePrinter.toString(simple)); + } + + private List buildPlanOptimizers(MetadataManager metadata, SplitManager splitManager) { + PlanOptimizersFactory planOptimizersFactory = new PlanOptimizersFactory(metadata, splitManager); + List planOptimizers = planOptimizersFactory.get(); + return planOptimizers; + } + + private SplitManager buildSplitManager(NodeManager nodeManager) { + SplitManager splitManager = new SplitManager(Sets. newHashSet()); + splitManager.addConnectorSplitManager(new CloudataSplitManager(nodeManager, connectorId)); + return splitManager; + } + + private MetadataManager buildMetadata() { + // Metadata metadata = new TestMetadata(); + MetadataManager metadata = new MetadataManager(); + + metadata.addConnectorMetadata(connectorId, catalog, new CloudataConnectorMetadata(connectorId)); + + return metadata; + } +} diff --git a/cloudata-structured/src/test/java/com/cloudata/structured/sql/TableScanCountVisitor.java b/cloudata-structured/src/test/java/com/cloudata/structured/sql/TableScanCountVisitor.java new file mode 100644 index 0000000..eeed1f0 --- /dev/null +++ b/cloudata-structured/src/test/java/com/cloudata/structured/sql/TableScanCountVisitor.java @@ -0,0 +1,15 @@ +package com.cloudata.structured.sql; + +import com.facebook.presto.sql.planner.plan.TableScanNode; + +public class TableScanCountVisitor extends RecursivePlanVisitor { + + int count; + + @Override + public Integer visitTableScan(TableScanNode node, Integer context) { + count++; + return super.visitTableScan(node, context); + } + +} diff --git a/cloudata-structured/src/test/java/com/cloudata/structured/sql/TestMetadata.java b/cloudata-structured/src/test/java/com/cloudata/structured/sql/TestMetadata.java new file mode 100644 index 0000000..22893fa --- /dev/null +++ b/cloudata-structured/src/test/java/com/cloudata/structured/sql/TestMetadata.java @@ -0,0 +1,119 @@ +package com.cloudata.structured.sql; +//package com.cloudata.structured; +// +//import java.util.List; +//import java.util.Map; +// +//import com.facebook.presto.metadata.FunctionHandle; +//import com.facebook.presto.metadata.FunctionInfo; +//import com.facebook.presto.metadata.Metadata; +//import com.facebook.presto.metadata.QualifiedTableName; +//import com.facebook.presto.metadata.QualifiedTablePrefix; +//import com.facebook.presto.metadata.TableMetadata; +//import com.facebook.presto.spi.ColumnHandle; +//import com.facebook.presto.spi.ColumnMetadata; +//import com.facebook.presto.spi.SchemaTableName; +//import com.facebook.presto.spi.TableHandle; +//import com.facebook.presto.sql.analyzer.Type; +//import com.facebook.presto.sql.tree.QualifiedName; +//import com.google.common.base.Optional; +// +//public class TestMetadata implements Metadata { +// +// @Override +// public FunctionInfo getFunction(QualifiedName name, List parameterTypes) { +// throw new UnsupportedOperationException(); +// } +// +// @Override +// public FunctionInfo getFunction(FunctionHandle handle) { +// throw new UnsupportedOperationException(); +// } +// +// @Override +// public boolean isAggregationFunction(QualifiedName name) { +// throw new UnsupportedOperationException(); +// } +// +// @Override +// public List listFunctions() { +// throw new UnsupportedOperationException(); +// +// } +// +// @Override +// public List listSchemaNames(String catalogName) { +// throw new UnsupportedOperationException(); +// +// } +// +// @Override +// public Optional getTableHandle(QualifiedTableName tableName) { +// return Optional.of((TableHandle) new MockTableHandle(connectorId, tableName)); +// } +// +// @Override +// public TableMetadata getTableMetadata(TableHandle tableHandle) { +// return ((MockTableHandle) tableHandle).getTableMetadata(); +// +// } +// +// @Override +// public List listTables(QualifiedTablePrefix prefix) { +// throw new UnsupportedOperationException(); +// +// } +// +// @Override +// public Optional getColumnHandle(TableHandle tableHandle, String columnName) { +// throw new UnsupportedOperationException(); +// +// } +// +// @Override +// public Map getColumnHandles(TableHandle tableHandle) { +// return ((MockTableHandle) tableHandle).getColumnHandles(); +// } +// +// @Override +// public ColumnMetadata getColumnMetadata(TableHandle tableHandle, ColumnHandle columnHandle) { +// return ((MockColumnHandle) columnHandle).getColumnMetadata(); +// } +// +// @Override +// public Map> listTableColumns(QualifiedTablePrefix prefix) { +// throw new UnsupportedOperationException(); +// +// } +// +// @Override +// public TableHandle createTable(String catalogName, TableMetadata tableMetadata) { +// throw new UnsupportedOperationException(); +// +// } +// +// @Override +// public void dropTable(TableHandle tableHandle) { +// throw new UnsupportedOperationException(); +// +// } +// +// @Override +// public String getConnectorId(TableHandle tableHandle) { +// throw new UnsupportedOperationException(); +// +// } +// +// @Override +// public Optional getTableHandle(String connectorId, SchemaTableName tableName) { +// throw new UnsupportedOperationException(); +// +// } +// +// @Override +// public Map getCatalogNames() { +// throw new UnsupportedOperationException(); +// +// } +// +// }