Skip to content

Commit

Permalink
degenerate-dataflow-edges (#431)
Browse files Browse the repository at this point in the history
Summary:

- Degenerate dataflow edges accomodated up to a point.
- Added robot test `Degenerate List And Details Dataflow View Works As Exemplified By AWS KMS Key Cloud Control`.
- Added robot test `Union of Degenerate List And Details Dataflow View Works As Exemplified By AWS KMS Key Cloud Control`.
- Added robot test `Materialized View of Union of Degenerate List And Details Dataflow View Works As Exemplified By AWS KMS Key Cloud Control`.
  • Loading branch information
general-kroll-4-life committed Jun 20, 2024
1 parent 97601a8 commit 78f0bb0
Show file tree
Hide file tree
Showing 10 changed files with 362 additions and 17 deletions.
29 changes: 29 additions & 0 deletions .vscode/launch.json
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@
"show methods in aws.pseudo_s3.s3_bucket_detail;",
"select * from aws_cc_bucket_unfiltered where data__Identifier = 'stackql-trial-bucket-01' ;",
"select * from google.cloudkms.key_rings where projectsId = 'testing-project' and locationsId = 'australia-southeast1';",
"select json_extract(detail.Properties, '$.KeyPolicy.Id') as key_policy_id, json_extract(detail.Properties, '$.Tags') as key_tags, json_extract(detail.Properties, '$.KeyUsage') as key_usage, json_extract(detail.Properties, '$.Origin') as key_origin, json_extract(detail.Properties, '$.MultiRegion') as key_is_multi_region, detail.region from aws.cloud_control.resources listing inner join aws.cloud_control.resource detail on detail.data__Identifier = listing.Identifier and detail.region = listing.region where listing.data__TypeName = 'AWS::KMS::Key' and listing.region = 'us-east-1' and detail.data__TypeName = 'AWS::KMS::Key' ;",
],
"default": "show providers;"
},
Expand Down Expand Up @@ -201,6 +202,32 @@
"50"
]
},
{
"type": "pickString",
"id": "dataflowDependencyMax",
"description": "Data Flow Dependency Limit",
"default": "1",
"options": [
"1",
"3",
"5",
"10",
"50"
]
},
{
"type": "pickString",
"id": "dataflowComponentsMax",
"description": "Data Flow Weakly Connected Components Limit",
"default": "1",
"options": [
"1",
"3",
"5",
"10",
"50"
]
},

{
"type": "pickString",
Expand Down Expand Up @@ -346,6 +373,8 @@
"--dbInternal=${input:dbInternalString}",
"--execution.concurrency.limit=${input:concurrencyLimit}",
"--export.alias=${input:exportAliasString}",
"--dataflow.dependency.max=${input:dataflowDependencyMax}",
"--dataflow.components.max=${input:dataflowComponentsMax}",
"${input:queryString}"
],
},
Expand Down
1 change: 1 addition & 0 deletions internal/stackql/cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ func init() {
rootCmd.PersistentFlags().IntVar(&runtimeCtx.ExecutionConcurrencyLimit, dto.ExecutionConcurrencyLimitKey, 1, "concurrency limit for query execution")
rootCmd.PersistentFlags().IntVar(&runtimeCtx.IndirectDepthMax, dto.IndirectDepthMaxKey, 5, "max depth for indirect queries: views and subqueries") //nolint:gomnd // TODO: investigate
rootCmd.PersistentFlags().IntVar(&runtimeCtx.DataflowDependencyMax, dto.DataflowDependencyMaxKey, 1, "max dataflow dependency depth for a given query")
rootCmd.PersistentFlags().IntVar(&runtimeCtx.DataflowComponentsMax, dto.DataflowComponentsMaxKey, 1, "max dataflow weakly connected components for a given query")
rootCmd.PersistentFlags().StringVar(&runtimeCtx.HTTPProxyHost, dto.HTTPProxyHostKey, "", "http proxy host, empty means no proxy")
rootCmd.PersistentFlags().StringVar(&runtimeCtx.HTTPProxyScheme, dto.HTTPProxySchemeKey, "http", "http proxy scheme, eg 'http'")
rootCmd.PersistentFlags().StringVar(&runtimeCtx.HTTPProxyPassword, dto.HTTPProxyPasswordKey, "", "http proxy password")
Expand Down
40 changes: 39 additions & 1 deletion internal/stackql/dataflow/collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (

"github.com/stackql/stackql-parser/go/vt/sqlparser"
"github.com/stackql/stackql/internal/stackql/logging"
"github.com/stackql/stackql/internal/stackql/taxonomy"
"gonum.org/v1/gonum/graph"
"gonum.org/v1/gonum/graph/simple"
"gonum.org/v1/gonum/graph/topo"
Expand All @@ -30,6 +31,8 @@ type Collection interface {
OutDegree(v Vertex) int
Sort() error
Vertices() []Vertex
UpsertStandardDataFlowVertex(taxonomy.AnnotationCtx, sqlparser.TableExpr) Vertex
getVertex(annotation taxonomy.AnnotationCtx, tableExpr sqlparser.TableExpr) (Vertex, bool)
}

func NewStandardDataFlowCollection() Collection {
Expand All @@ -41,6 +44,36 @@ func NewStandardDataFlowCollection() Collection {
}
}

// Upsert a vertex into the collection.
// If the vertex already exists, it is returned.
// Otherwise, a new vertex is created and returned.
// This makes subsequent handling of degenerate edges easier.
func (dc *standardDataFlowCollection) UpsertStandardDataFlowVertex(
annotation taxonomy.AnnotationCtx,
tableExpr sqlparser.TableExpr,
) Vertex {
if v, ok := dc.getVertex(annotation, tableExpr); ok {
return v
}
return &standardDataFlowVertex{
annotation: annotation,
tableExpr: tableExpr,
id: dc.GetNextID(),
}
}

func (dc *standardDataFlowCollection) getVertex(
annotation taxonomy.AnnotationCtx,
tableExpr sqlparser.TableExpr,
) (Vertex, bool) {
for v := range dc.vertices {
if v.GetAnnotation() == annotation && v.GetTableExpr() == tableExpr {
return v, true
}
}
return nil, false
}

type standardDataFlowCollection struct {
idMutex *sync.Mutex
maxID int64
Expand Down Expand Up @@ -69,7 +102,9 @@ func (dc *standardDataFlowCollection) AddOrUpdateEdge(
) error {
dc.AddVertex(source)
dc.AddVertex(dest)
existingEdge := dc.g.WeightedEdge(source.ID(), dest.ID())
sourceID := source.ID()
destID := dest.ID()
existingEdge := dc.g.WeightedEdge(sourceID, destID)
if existingEdge == nil {
edge := newStandardDataFlowEdge(source, dest, comparisonExpr, sourceExpr, destColumn)
dc.edges = append(dc.edges, edge)
Expand Down Expand Up @@ -98,6 +133,9 @@ func (dc *standardDataFlowCollection) AddVertex(v Vertex) {
dc.g.AddNode(v)
}

// Sort sorts the underlying graph.
// Cyclic dependencies return an error.
// This is where the DAG invariant is enforced.
func (dc *standardDataFlowCollection) Sort() error {
var err error
dc.sorted, err = topo.Sort(dc.g)
Expand Down
11 changes: 0 additions & 11 deletions internal/stackql/dataflow/vertex.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,6 @@ type standardDataFlowVertex struct {
tableExpr sqlparser.TableExpr
}

func NewStandardDataFlowVertex(
annotation taxonomy.AnnotationCtx,
tableExpr sqlparser.TableExpr,
id int64) Vertex {
return &standardDataFlowVertex{
annotation: annotation,
tableExpr: tableExpr,
id: id,
}
}

func (dv *standardDataFlowVertex) iDataFlowUnit() {}

func (dv *standardDataFlowVertex) ID() int64 {
Expand Down
8 changes: 7 additions & 1 deletion internal/stackql/dependencyplanner/dependencyplanner.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,12 @@ func (dp *standardDependencyPlanner) Plan() error {
dp.dataflowToEdges[toIdx] = append(dp.dataflowToEdges[toIdx], fromIdx)
}
}
}
for _, n := range orderedNodes {
// another pass for AOT dataflows; to wit, on clauses
if _, ok := idsVisited[n.ID()]; ok {
continue
}
for _, e := range edges {
toIdx, toFound := dp.nodeIDIdxMap[e.To().ID()]
if !toFound {
Expand All @@ -249,7 +254,8 @@ func (dp *standardDependencyPlanner) Plan() error {
return fmt.Errorf("cannot support dependency unit of type = '%T'", unit)
}
}
if weaklyConnectedComponentCount > 1 {
maxWeaklyConnectedComponents := dp.handlerCtx.GetRuntimeContext().DataflowComponentsMax
if weaklyConnectedComponentCount > maxWeaklyConnectedComponents {
return fmt.Errorf(
"data flow: there are too many weakly connected components; found = %d, max = 1",
weaklyConnectedComponentCount)
Expand Down
1 change: 1 addition & 0 deletions internal/stackql/dto/dto.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ const (
ErrorPresentationKey string = "errorpresentation"
IndirectDepthMaxKey string = "indirect.depth.max"
DataflowDependencyMaxKey string = "dataflow.dependency.max"
DataflowComponentsMaxKey string = "dataflow.components.max"
HTTPLogEnabledKey string = "http.log.enabled"
HTTPMaxResultsKey string = "http.response.maxResults"
HTTPPAgeLimitKey string = "http.response.pageLimit"
Expand Down
3 changes: 3 additions & 0 deletions internal/stackql/dto/runtime_ctx.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ type RuntimeCtx struct {
HTTPProxyScheme string
HTTPProxyUser string
IndirectDepthMax int
DataflowComponentsMax int
DataflowDependencyMax int
InfilePath string
LogLevelStr string
Expand Down Expand Up @@ -129,6 +130,8 @@ func (rc *RuntimeCtx) Set(key string, val string) error {
retVal = setInt(&rc.HTTPProxyPort, val)
case IndirectDepthMaxKey:
retVal = setInt(&rc.IndirectDepthMax, val)
case DataflowComponentsMaxKey:
retVal = setInt(&rc.DataflowComponentsMax, val)
case DataflowDependencyMaxKey:
retVal = setInt(&rc.DataflowDependencyMax, val)
case HTTPProxySchemeKey:
Expand Down
8 changes: 4 additions & 4 deletions internal/stackql/router/parameter_router.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,8 +231,8 @@ func (pr *standardParameterRouter) GetOnConditionDataFlows() (dataflow.Collectio
}
// rv[dependency] = destHierarchy

srcVertex := dataflow.NewStandardDataFlowVertex(dependency, dependencyTable, rv.GetNextID())
destVertex := dataflow.NewStandardDataFlowVertex(destHierarchy, destinationTable, rv.GetNextID())
srcVertex := rv.UpsertStandardDataFlowVertex(dependency, dependencyTable)
destVertex := rv.UpsertStandardDataFlowVertex(destHierarchy, destinationTable)

err := rv.AddOrUpdateEdge(
srcVertex,
Expand Down Expand Up @@ -271,15 +271,15 @@ func (pr *standardParameterRouter) GetOnConditionDataFlows() (dataflow.Collectio
v.GetTableMeta().Clone(),
clonedParams,
)
sourceVertexIteration := dataflow.NewStandardDataFlowVertex(clonedAnnotationCtx, k, rv.GetNextID())
sourceVertexIteration := rv.UpsertStandardDataFlowVertex(clonedAnnotationCtx, k)
sourceVertexIteration.SetEquivalencyGroup(tableEquivalencyID)
rv.AddVertex(sourceVertexIteration)
}
return rv, nil
}
}
}
rv.AddVertex(dataflow.NewStandardDataFlowVertex(v, k, rv.GetNextID()))
rv.AddVertex(rv.UpsertStandardDataFlowVertex(v, k))
}
return rv, nil
}
Expand Down
Loading

0 comments on commit 78f0bb0

Please sign in to comment.