Skip to content

Commit

Permalink
concise-analytics (#432)
Browse files Browse the repository at this point in the history
Summary:

- Reorder and uplift dataflow analysis.
- Shims to support broad query pattern.
- Added robot test `In Clause Split of List And Details Dataflow View Works As Exemplified By AWS KMS Key Cloud Control`.
- Added robot test `Materialized View of In Clause Split of List And Details Dataflow View Works As Exemplified By AWS KMS Key Cloud Control`.
  • Loading branch information
general-kroll-4-life committed Jun 24, 2024
1 parent 78f0bb0 commit a901fde
Show file tree
Hide file tree
Showing 26 changed files with 512 additions and 130 deletions.
3 changes: 3 additions & 0 deletions .vscode/launch.json
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,9 @@
"select * from aws_cc_bucket_unfiltered where data__Identifier = 'stackql-trial-bucket-01' ;",
"select * from google.cloudkms.key_rings where projectsId = 'testing-project' and locationsId = 'australia-southeast1';",
"select json_extract(detail.Properties, '$.KeyPolicy.Id') as key_policy_id, json_extract(detail.Properties, '$.Tags') as key_tags, json_extract(detail.Properties, '$.KeyUsage') as key_usage, json_extract(detail.Properties, '$.Origin') as key_origin, json_extract(detail.Properties, '$.MultiRegion') as key_is_multi_region, detail.region from aws.cloud_control.resources listing inner join aws.cloud_control.resource detail on detail.data__Identifier = listing.Identifier and detail.region = listing.region where listing.data__TypeName = 'AWS::KMS::Key' and listing.region = 'us-east-1' and detail.data__TypeName = 'AWS::KMS::Key' ;",
"select json_extract(detail.Properties, '$.KeyPolicy.Id') as key_policy_id, json_extract(detail.Properties, '$.Tags') as key_tags, json_extract(detail.Properties, '$.KeyUsage') as key_usage, json_extract(detail.Properties, '$.Origin') as key_origin, json_extract(detail.Properties, '$.MultiRegion') as key_is_multi_region, detail.region from aws.cloud_control.resources listing inner join aws.cloud_control.resource detail on detail.data__Identifier = listing.Identifier and detail.region = listing.region where listing.data__TypeName = 'AWS::KMS::Key' and listing.region in ('us-east-1', 'ap-southeast-1') and detail.data__TypeName = 'AWS::KMS::Key' order by key_policy_id;",
"select listing.Identifier as key_policy_id, listing.region from aws.cloud_control.resources listing where listing.data__TypeName = 'AWS::KMS::Key' and listing.region in ('us-east-1', 'ap-southeast-1') order by key_policy_id;",
"create or replace materialized view de_gen_01 as select json_extract_path_text(detail.Properties, 'KeyPolicy', 'Id') as key_policy_id, json_extract_path_text(detail.Properties, 'Tags') as key_tags, json_extract_path_text(detail.Properties, 'KeyUsage') as key_usage, json_extract_path_text(detail.Properties, 'Origin') as key_origin, case when json_extract_path_text(detail.Properties, 'MultiRegion') = 'true' then 1 else 0 end as key_is_multi_region, detail.region from aws.cloud_control.resources listing inner join aws.cloud_control.resource detail on detail.data__Identifier = listing.Identifier and detail.region = listing.region where listing.data__TypeName = 'AWS::KMS::Key' and listing.region IN ('us-east-1', 'ap-southeast-1') and detail.data__TypeName = 'AWS::KMS::Key' order by key_policy_id ASC ; select key_policy_id, key_tags, key_usage, key_origin, key_is_multi_region, region from de_gen_01 order by key_policy_id ASC ; drop materialized view if exists de_gen_01 ;",
],
"default": "show providers;"
},
Expand Down
4 changes: 4 additions & 0 deletions internal/stackql/asyncmonitor/asyncmonitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,10 @@ func (pr *AsyncHTTPMonitorPrimitive) GetUndoLog() (binlog.LogEntry, bool) {
return nil, false
}

func (pr *AsyncHTTPMonitorPrimitive) WithDebugName(_ string) primitive.IPrimitive {
return pr
}

func (pr *AsyncHTTPMonitorPrimitive) SetUndoLog(_ binlog.LogEntry) {
}

Expand Down
46 changes: 31 additions & 15 deletions internal/stackql/dataflow/collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,17 @@ type Collection interface {
Vertices() []Vertex
UpsertStandardDataFlowVertex(taxonomy.AnnotationCtx, sqlparser.TableExpr) Vertex
getVertex(annotation taxonomy.AnnotationCtx, tableExpr sqlparser.TableExpr) (Vertex, bool)
GetSplitAnnotationContextMap() taxonomy.AnnotationCtxSplitMap
WithSplitAnnotationContextMap(taxonomy.AnnotationCtxSplitMap) Collection
}

func NewStandardDataFlowCollection() Collection {
return &standardDataFlowCollection{
idMutex: &sync.Mutex{},
g: simple.NewWeightedDirectedGraph(0.0, 0.0),
vertices: make(map[Vertex]struct{}),
verticesForTableExprs: make(map[sqlparser.TableExpr]struct{}),
idMutex: &sync.Mutex{},
g: simple.NewWeightedDirectedGraph(0.0, 0.0),
vertices: make(map[Vertex]struct{}),
verticesForTableExprs: make(map[sqlparser.TableExpr]struct{}),
splitAnnotationContextMap: taxonomy.NewAnnotationCtxSplitMap(),
}
}

Expand Down Expand Up @@ -75,15 +78,16 @@ func (dc *standardDataFlowCollection) getVertex(
}

type standardDataFlowCollection struct {
idMutex *sync.Mutex
maxID int64
g *simple.WeightedDirectedGraph
sorted []graph.Node
orphans []Vertex
weaklyConnnectedGraphs []WeaklyConnectedComponent
vertices map[Vertex]struct{}
verticesForTableExprs map[sqlparser.TableExpr]struct{}
edges []Edge
idMutex *sync.Mutex
maxID int64
g *simple.WeightedDirectedGraph
sorted []graph.Node
orphans []Vertex
weaklyConnnectedGraphs []WeaklyConnectedComponent
vertices map[Vertex]struct{}
verticesForTableExprs map[sqlparser.TableExpr]struct{}
edges []Edge
splitAnnotationContextMap taxonomy.AnnotationCtxSplitMap
}

func (dc *standardDataFlowCollection) GetNextID() int64 {
Expand All @@ -93,6 +97,15 @@ func (dc *standardDataFlowCollection) GetNextID() int64 {
return dc.maxID
}

func (dc *standardDataFlowCollection) GetSplitAnnotationContextMap() taxonomy.AnnotationCtxSplitMap {
return dc.splitAnnotationContextMap
}

func (dc *standardDataFlowCollection) WithSplitAnnotationContextMap(m taxonomy.AnnotationCtxSplitMap) Collection {
dc.splitAnnotationContextMap = m
return dc
}

func (dc *standardDataFlowCollection) AddOrUpdateEdge(
source Vertex,
dest Vertex,
Expand Down Expand Up @@ -123,10 +136,13 @@ func (dc *standardDataFlowCollection) AddOrUpdateEdge(
func (dc *standardDataFlowCollection) AddVertex(v Vertex) {
_, ok := dc.verticesForTableExprs[v.GetTableExpr()]
if ok {
var isNew bool
if v.GetEquivalencyGroup() > 0 {
dc.g.AddNode(v) // TODO: change to acceptable idion
_, isNew = dc.g.NodeWithID(v.ID()) // TODO: change to acceptable idion
}
if !isNew {
return
}
return
}
dc.vertices[v] = struct{}{}
dc.verticesForTableExprs[v.GetTableExpr()] = struct{}{}
Expand Down
3 changes: 2 additions & 1 deletion internal/stackql/dataflow/component.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,8 @@ func (wc *standardDataFlowWeaklyConnectedComponent) Analyze() error {
for _, node := range wc.collection.sorted {
incidentNodes := wc.collection.g.To(node.ID())
wc.idsVisited[node.ID()] = struct{}{}
if incidentNodes.Len() > 1 {
//nolint:gomnd // TODO: pass through configurable limit
if incidentNodes.Len() > 5 {
return fmt.Errorf(
"data flow: too complex for now; %d dependencies detected when max allowed = 1",
incidentNodes.Len())
Expand Down
7 changes: 4 additions & 3 deletions internal/stackql/dependencyplanner/dependencyplanner.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ func (dp *standardDependencyPlanner) Plan() error {
}
toNode := e.GetDest()
toTableExpr := toNode.GetTableExpr()
toAnnotation := toNode.GetAnnotation()
toAnnotation := toNode.GetAnnotation().Clone() // this bodge protects split source vertices
stream, streamErr := dp.getStreamFromEdge(e, toAnnotation, tcc)
if streamErr != nil {
return streamErr
Expand Down Expand Up @@ -329,6 +329,7 @@ func (dp *standardDependencyPlanner) Plan() error {
dp.execSlice,
selBld,
dp.dataflowToEdges,
dp.handlerCtx.GetSQLSystem(),
)
dp.selCtx = selCtx
return nil
Expand Down Expand Up @@ -552,7 +553,7 @@ func (dp *standardDependencyPlanner) getStreamFromEdge(
if err != nil {
return nil, err
}
transformedStaticParams, paramTRansformErr := util.TransformSQLRawParameters(toAc.GetParameters())
transformedStaticParams, paramTRansformErr := util.TransformSQLRawParameters(toAc.GetParameters(), false)
if paramTRansformErr != nil {
return nil, paramTRansformErr
}
Expand Down Expand Up @@ -581,7 +582,7 @@ func (dp *standardDependencyPlanner) getStreamFromEdge(
}
}
if len(staticParams) > 0 {
staticParams, err = util.TransformSQLRawParameters(staticParams)
staticParams, err = util.TransformSQLRawParameters(staticParams, false)
if err != nil {
return nil, err
}
Expand Down
10 changes: 8 additions & 2 deletions internal/stackql/drm/drm_cfg.go
Original file line number Diff line number Diff line change
Expand Up @@ -556,7 +556,9 @@ func (dc *staticDRMConfig) GenerateInsertDML(
if err != nil {
return nil, err
}
transformedParams, paramErr := util.TransformSQLRawParameters(tabAnnotated.GetParameters())
priorParameters := tabAnnotated.GetParameters()
// TODO: fix this for dependent table where dependency has `IN` clause!!!
transformedParams, paramErr := util.TransformSQLRawParameters(priorParameters, false)
if paramErr != nil {
return nil, paramErr
}
Expand Down Expand Up @@ -670,7 +672,11 @@ func (dc *staticDRMConfig) generateControlVarArgs(
) ([]interface{}, error) {
var varArgs []interface{}
if cp.IsControlArgsRequired() {
ctrSlice := cp.GetCtx().GetAllCtrlCtrs()
ctrSlice := cp.GetCtx().GetOrderedTccs()
legacyCtrSlice := cp.GetCtx().GetAllCtrlCtrs()
if len(ctrSlice) < len(legacyCtrSlice) {
ctrSlice = legacyCtrSlice
}
for _, ctrs := range ctrSlice {
if ctrs == nil {
continue
Expand Down
24 changes: 24 additions & 0 deletions internal/stackql/drm/prepared_statement_ctx.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ type PreparedStatementCtx interface {
SetIndirectContexts(indirectContexts []PreparedStatementCtx)
SetKind(kind string)
UpdateQuery(q string)
WithAliasToTccMap(map[string][]internaldto.TxnControlCounters) PreparedStatementCtx
WithAliasOrdering([]string) PreparedStatementCtx
GetOrderedTccs() []internaldto.TxnControlCounters
}

type standardPreparedStatementCtx struct {
Expand All @@ -44,6 +47,27 @@ type standardPreparedStatementCtx struct {
sqlSystem sql_system.SQLSystem
indirectContexts []PreparedStatementCtx
parameters map[string]interface{}
aliasToTccMap map[string][]internaldto.TxnControlCounters
aliasOrdering []string
}

func (ps *standardPreparedStatementCtx) GetOrderedTccs() []internaldto.TxnControlCounters {
var rv []internaldto.TxnControlCounters
for _, alias := range ps.aliasOrdering {
rv = append(rv, ps.aliasToTccMap[alias]...)
}
return rv
}

func (ps *standardPreparedStatementCtx) WithAliasToTccMap(
aliasToTccMap map[string][]internaldto.TxnControlCounters) PreparedStatementCtx {
ps.aliasToTccMap = aliasToTccMap
return ps
}

func (ps *standardPreparedStatementCtx) WithAliasOrdering(aliasOrdering []string) PreparedStatementCtx {
ps.aliasOrdering = aliasOrdering
return ps
}

func (ps *standardPreparedStatementCtx) SetIndirectContexts(indirectContexts []PreparedStatementCtx) {
Expand Down
8 changes: 8 additions & 0 deletions internal/stackql/primitive/http_rest_primitive.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"github.com/stackql/stackql/internal/stackql/drm"
"github.com/stackql/stackql/internal/stackql/internal_data_transfer/internaldto"
"github.com/stackql/stackql/internal/stackql/internal_data_transfer/primitive_context"
"github.com/stackql/stackql/internal/stackql/logging"
"github.com/stackql/stackql/internal/stackql/provider"
)

Expand All @@ -19,6 +20,7 @@ type HTTPRestPrimitive struct {
isReadOnly bool
undoLog binlog.LogEntry
redoLog binlog.LogEntry
debugName string
}

func NewHTTPRestPrimitive(
Expand All @@ -39,6 +41,11 @@ func NewHTTPRestPrimitive(
}
}

func (pr *HTTPRestPrimitive) WithDebugName(name string) IPrimitive {
pr.debugName = name
return pr
}

func (pr *HTTPRestPrimitive) SetUndoLog(log binlog.LogEntry) {
pr.undoLog = log
}
Expand Down Expand Up @@ -94,6 +101,7 @@ func (pr *HTTPRestPrimitive) GetInputFromAlias(alias string) (internaldto.Execut

func (pr *HTTPRestPrimitive) Execute(pc IPrimitiveCtx) internaldto.ExecutorOutput {
if pr.Executor != nil {
logging.GetLogger().Debugf("running HTTP rest primitive %s", pr.debugName)
op := pr.Executor(pc)
return op
}
Expand Down
8 changes: 8 additions & 0 deletions internal/stackql/primitive/local_primitive.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"github.com/stackql/stackql/internal/stackql/acid/binlog"
"github.com/stackql/stackql/internal/stackql/drm"
"github.com/stackql/stackql/internal/stackql/internal_data_transfer/internaldto"
"github.com/stackql/stackql/internal/stackql/logging"
)

type LocalPrimitive struct {
Expand All @@ -13,6 +14,7 @@ type LocalPrimitive struct {
id int64
undoLog binlog.LogEntry
redoLog binlog.LogEntry
debugName string
}

func NewLocalPrimitive(executor func(pc IPrimitiveCtx) internaldto.ExecutorOutput) IPrimitive {
Expand Down Expand Up @@ -72,8 +74,14 @@ func (pr *LocalPrimitive) ID() int64 {
return pr.id
}

func (pr *LocalPrimitive) WithDebugName(name string) IPrimitive {
pr.debugName = name
return pr
}

func (pr *LocalPrimitive) Execute(pc IPrimitiveCtx) internaldto.ExecutorOutput {
if pr.Executor != nil {
logging.GetLogger().Infof("running local primitive")
return pr.Executor(pc)
}
return internaldto.NewExecutorOutput(nil, nil, nil, nil, nil)
Expand Down
6 changes: 6 additions & 0 deletions internal/stackql/primitive/metadata_primitive.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ type MetaDataPrimitive struct {
id int64
undoLog binlog.LogEntry
redoLog binlog.LogEntry
debugName string
}

func (pr *MetaDataPrimitive) SetTxnID(_ int) {
Expand Down Expand Up @@ -74,6 +75,11 @@ func (pr *MetaDataPrimitive) Execute(pc IPrimitiveCtx) internaldto.ExecutorOutpu
return internaldto.NewExecutorOutput(nil, nil, nil, nil, nil)
}

func (pr *MetaDataPrimitive) WithDebugName(name string) IPrimitive {
pr.debugName = name
return pr
}

func NewMetaDataPrimitive(
provider provider.IProvider,
executor func(pc IPrimitiveCtx) internaldto.ExecutorOutput,
Expand Down
6 changes: 6 additions & 0 deletions internal/stackql/primitive/pass_through_primitive.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ type PassThroughPrimitive struct {
txnControlCounterSlice []internaldto.TxnControlCounters
undoLog binlog.LogEntry
redoLog binlog.LogEntry
debugName string
}

func NewPassThroughPrimitive(
Expand Down Expand Up @@ -74,6 +75,11 @@ func (pr *PassThroughPrimitive) IncidentData(fromID int64, input internaldto.Exe
return nil
}

func (pr *PassThroughPrimitive) WithDebugName(name string) IPrimitive {
pr.debugName = name
return pr
}

func (pr *PassThroughPrimitive) collectGarbage() {
// placeholder
}
Expand Down
2 changes: 2 additions & 0 deletions internal/stackql/primitive/primitive.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,4 +40,6 @@ type IPrimitive interface {
SetInputAlias(string, int64) error

GetInputFromAlias(string) (internaldto.ExecutorOutput, bool)

WithDebugName(string) IPrimitive
}
Loading

0 comments on commit a901fde

Please sign in to comment.