From 183f8d3a7462712bea6e5413708e7b062774c35c Mon Sep 17 00:00:00 2001 From: simcap Date: Mon, 7 Aug 2017 11:02:03 +0200 Subject: [PATCH] Sync performance: better CPU / mem. See below: Avoid snapshotting datastore for querying when not necessary when building relations (issue #116) * Go profiling: - making Snapshotting not on top 5 CPU anymore (now below top25) - making Snaphotting not on top 5 Mem anymore (now below top30) --- aws/services/gen_services.go | 289 ++++++++++++++++++--------------- aws/services/relations.go | 27 ++- gen/aws/generators/services.go | 9 +- 3 files changed, 173 insertions(+), 152 deletions(-) diff --git a/aws/services/gen_services.go b/aws/services/gen_services.go index 264a24544..15afa234d 100644 --- a/aws/services/gen_services.go +++ b/aws/services/gen_services.go @@ -69,6 +69,7 @@ import ( "github.com/wallix/awless/graph" "github.com/wallix/awless/logger" "github.com/wallix/awless/template/driver" + tstore "github.com/wallix/triplestore" ) const accessDenied = "Access Denied" @@ -389,6 +390,8 @@ func (s *Infra) FetchResources() (*graph.Graph, error) { return gph, err } + snap := gph.AsRDFGraphSnaphot() + errc := make(chan error) var wg sync.WaitGroup if s.config.getBool("aws.infra.instance.sync", true) { @@ -402,14 +405,14 @@ func (s *Infra) FetchResources() (*graph.Graph, error) { for _, r := range list.([]*ec2.Instance) { for _, fn := range addParentsFns["instance"] { wg.Add(1) - go func(f addParentFn, region string, res *ec2.Instance) { + go func(f addParentFn, snap tstore.RDFGraph, region string, res *ec2.Instance) { defer wg.Done() - err := f(gph, region, res) + err := f(gph, snap, region, res) if err != nil { errc <- err return } - }(fn, s.region, r) + }(fn, snap, s.region, r) } } } @@ -424,14 +427,14 @@ func (s *Infra) FetchResources() (*graph.Graph, error) { for _, r := range list.([]*ec2.Subnet) { for _, fn := range addParentsFns["subnet"] { wg.Add(1) - go func(f addParentFn, region string, res *ec2.Subnet) { + go func(f addParentFn, snap tstore.RDFGraph, region string, res *ec2.Subnet) { defer wg.Done() - err := f(gph, region, res) + err := f(gph, snap, region, res) if err != nil { errc <- err return } - }(fn, s.region, r) + }(fn, snap, s.region, r) } } } @@ -446,14 +449,14 @@ func (s *Infra) FetchResources() (*graph.Graph, error) { for _, r := range list.([]*ec2.Vpc) { for _, fn := range addParentsFns["vpc"] { wg.Add(1) - go func(f addParentFn, region string, res *ec2.Vpc) { + go func(f addParentFn, snap tstore.RDFGraph, region string, res *ec2.Vpc) { defer wg.Done() - err := f(gph, region, res) + err := f(gph, snap, region, res) if err != nil { errc <- err return } - }(fn, s.region, r) + }(fn, snap, s.region, r) } } } @@ -468,14 +471,14 @@ func (s *Infra) FetchResources() (*graph.Graph, error) { for _, r := range list.([]*ec2.KeyPairInfo) { for _, fn := range addParentsFns["keypair"] { wg.Add(1) - go func(f addParentFn, region string, res *ec2.KeyPairInfo) { + go func(f addParentFn, snap tstore.RDFGraph, region string, res *ec2.KeyPairInfo) { defer wg.Done() - err := f(gph, region, res) + err := f(gph, snap, region, res) if err != nil { errc <- err return } - }(fn, s.region, r) + }(fn, snap, s.region, r) } } } @@ -490,14 +493,14 @@ func (s *Infra) FetchResources() (*graph.Graph, error) { for _, r := range list.([]*ec2.SecurityGroup) { for _, fn := range addParentsFns["securitygroup"] { wg.Add(1) - go func(f addParentFn, region string, res *ec2.SecurityGroup) { + go func(f addParentFn, snap tstore.RDFGraph, region string, res *ec2.SecurityGroup) { defer wg.Done() - err := f(gph, region, res) + err := f(gph, snap, region, res) if err != nil { errc <- err return } - }(fn, s.region, r) + }(fn, snap, s.region, r) } } } @@ -512,14 +515,14 @@ func (s *Infra) FetchResources() (*graph.Graph, error) { for _, r := range list.([]*ec2.Volume) { for _, fn := range addParentsFns["volume"] { wg.Add(1) - go func(f addParentFn, region string, res *ec2.Volume) { + go func(f addParentFn, snap tstore.RDFGraph, region string, res *ec2.Volume) { defer wg.Done() - err := f(gph, region, res) + err := f(gph, snap, region, res) if err != nil { errc <- err return } - }(fn, s.region, r) + }(fn, snap, s.region, r) } } } @@ -534,14 +537,14 @@ func (s *Infra) FetchResources() (*graph.Graph, error) { for _, r := range list.([]*ec2.InternetGateway) { for _, fn := range addParentsFns["internetgateway"] { wg.Add(1) - go func(f addParentFn, region string, res *ec2.InternetGateway) { + go func(f addParentFn, snap tstore.RDFGraph, region string, res *ec2.InternetGateway) { defer wg.Done() - err := f(gph, region, res) + err := f(gph, snap, region, res) if err != nil { errc <- err return } - }(fn, s.region, r) + }(fn, snap, s.region, r) } } } @@ -556,14 +559,14 @@ func (s *Infra) FetchResources() (*graph.Graph, error) { for _, r := range list.([]*ec2.NatGateway) { for _, fn := range addParentsFns["natgateway"] { wg.Add(1) - go func(f addParentFn, region string, res *ec2.NatGateway) { + go func(f addParentFn, snap tstore.RDFGraph, region string, res *ec2.NatGateway) { defer wg.Done() - err := f(gph, region, res) + err := f(gph, snap, region, res) if err != nil { errc <- err return } - }(fn, s.region, r) + }(fn, snap, s.region, r) } } } @@ -578,14 +581,14 @@ func (s *Infra) FetchResources() (*graph.Graph, error) { for _, r := range list.([]*ec2.RouteTable) { for _, fn := range addParentsFns["routetable"] { wg.Add(1) - go func(f addParentFn, region string, res *ec2.RouteTable) { + go func(f addParentFn, snap tstore.RDFGraph, region string, res *ec2.RouteTable) { defer wg.Done() - err := f(gph, region, res) + err := f(gph, snap, region, res) if err != nil { errc <- err return } - }(fn, s.region, r) + }(fn, snap, s.region, r) } } } @@ -600,14 +603,14 @@ func (s *Infra) FetchResources() (*graph.Graph, error) { for _, r := range list.([]*ec2.AvailabilityZone) { for _, fn := range addParentsFns["availabilityzone"] { wg.Add(1) - go func(f addParentFn, region string, res *ec2.AvailabilityZone) { + go func(f addParentFn, snap tstore.RDFGraph, region string, res *ec2.AvailabilityZone) { defer wg.Done() - err := f(gph, region, res) + err := f(gph, snap, region, res) if err != nil { errc <- err return } - }(fn, s.region, r) + }(fn, snap, s.region, r) } } } @@ -622,14 +625,14 @@ func (s *Infra) FetchResources() (*graph.Graph, error) { for _, r := range list.([]*ec2.Image) { for _, fn := range addParentsFns["image"] { wg.Add(1) - go func(f addParentFn, region string, res *ec2.Image) { + go func(f addParentFn, snap tstore.RDFGraph, region string, res *ec2.Image) { defer wg.Done() - err := f(gph, region, res) + err := f(gph, snap, region, res) if err != nil { errc <- err return } - }(fn, s.region, r) + }(fn, snap, s.region, r) } } } @@ -644,14 +647,14 @@ func (s *Infra) FetchResources() (*graph.Graph, error) { for _, r := range list.([]*ec2.ImportImageTask) { for _, fn := range addParentsFns["importimagetask"] { wg.Add(1) - go func(f addParentFn, region string, res *ec2.ImportImageTask) { + go func(f addParentFn, snap tstore.RDFGraph, region string, res *ec2.ImportImageTask) { defer wg.Done() - err := f(gph, region, res) + err := f(gph, snap, region, res) if err != nil { errc <- err return } - }(fn, s.region, r) + }(fn, snap, s.region, r) } } } @@ -666,14 +669,14 @@ func (s *Infra) FetchResources() (*graph.Graph, error) { for _, r := range list.([]*ec2.Address) { for _, fn := range addParentsFns["elasticip"] { wg.Add(1) - go func(f addParentFn, region string, res *ec2.Address) { + go func(f addParentFn, snap tstore.RDFGraph, region string, res *ec2.Address) { defer wg.Done() - err := f(gph, region, res) + err := f(gph, snap, region, res) if err != nil { errc <- err return } - }(fn, s.region, r) + }(fn, snap, s.region, r) } } } @@ -688,14 +691,14 @@ func (s *Infra) FetchResources() (*graph.Graph, error) { for _, r := range list.([]*ec2.Snapshot) { for _, fn := range addParentsFns["snapshot"] { wg.Add(1) - go func(f addParentFn, region string, res *ec2.Snapshot) { + go func(f addParentFn, snap tstore.RDFGraph, region string, res *ec2.Snapshot) { defer wg.Done() - err := f(gph, region, res) + err := f(gph, snap, region, res) if err != nil { errc <- err return } - }(fn, s.region, r) + }(fn, snap, s.region, r) } } } @@ -710,14 +713,14 @@ func (s *Infra) FetchResources() (*graph.Graph, error) { for _, r := range list.([]*elbv2.LoadBalancer) { for _, fn := range addParentsFns["loadbalancer"] { wg.Add(1) - go func(f addParentFn, region string, res *elbv2.LoadBalancer) { + go func(f addParentFn, snap tstore.RDFGraph, region string, res *elbv2.LoadBalancer) { defer wg.Done() - err := f(gph, region, res) + err := f(gph, snap, region, res) if err != nil { errc <- err return } - }(fn, s.region, r) + }(fn, snap, s.region, r) } } } @@ -732,14 +735,14 @@ func (s *Infra) FetchResources() (*graph.Graph, error) { for _, r := range list.([]*elbv2.TargetGroup) { for _, fn := range addParentsFns["targetgroup"] { wg.Add(1) - go func(f addParentFn, region string, res *elbv2.TargetGroup) { + go func(f addParentFn, snap tstore.RDFGraph, region string, res *elbv2.TargetGroup) { defer wg.Done() - err := f(gph, region, res) + err := f(gph, snap, region, res) if err != nil { errc <- err return } - }(fn, s.region, r) + }(fn, snap, s.region, r) } } } @@ -754,14 +757,14 @@ func (s *Infra) FetchResources() (*graph.Graph, error) { for _, r := range list.([]*elbv2.Listener) { for _, fn := range addParentsFns["listener"] { wg.Add(1) - go func(f addParentFn, region string, res *elbv2.Listener) { + go func(f addParentFn, snap tstore.RDFGraph, region string, res *elbv2.Listener) { defer wg.Done() - err := f(gph, region, res) + err := f(gph, snap, region, res) if err != nil { errc <- err return } - }(fn, s.region, r) + }(fn, snap, s.region, r) } } } @@ -776,14 +779,14 @@ func (s *Infra) FetchResources() (*graph.Graph, error) { for _, r := range list.([]*rds.DBInstance) { for _, fn := range addParentsFns["database"] { wg.Add(1) - go func(f addParentFn, region string, res *rds.DBInstance) { + go func(f addParentFn, snap tstore.RDFGraph, region string, res *rds.DBInstance) { defer wg.Done() - err := f(gph, region, res) + err := f(gph, snap, region, res) if err != nil { errc <- err return } - }(fn, s.region, r) + }(fn, snap, s.region, r) } } } @@ -798,14 +801,14 @@ func (s *Infra) FetchResources() (*graph.Graph, error) { for _, r := range list.([]*rds.DBSubnetGroup) { for _, fn := range addParentsFns["dbsubnetgroup"] { wg.Add(1) - go func(f addParentFn, region string, res *rds.DBSubnetGroup) { + go func(f addParentFn, snap tstore.RDFGraph, region string, res *rds.DBSubnetGroup) { defer wg.Done() - err := f(gph, region, res) + err := f(gph, snap, region, res) if err != nil { errc <- err return } - }(fn, s.region, r) + }(fn, snap, s.region, r) } } } @@ -820,14 +823,14 @@ func (s *Infra) FetchResources() (*graph.Graph, error) { for _, r := range list.([]*autoscaling.LaunchConfiguration) { for _, fn := range addParentsFns["launchconfiguration"] { wg.Add(1) - go func(f addParentFn, region string, res *autoscaling.LaunchConfiguration) { + go func(f addParentFn, snap tstore.RDFGraph, region string, res *autoscaling.LaunchConfiguration) { defer wg.Done() - err := f(gph, region, res) + err := f(gph, snap, region, res) if err != nil { errc <- err return } - }(fn, s.region, r) + }(fn, snap, s.region, r) } } } @@ -842,14 +845,14 @@ func (s *Infra) FetchResources() (*graph.Graph, error) { for _, r := range list.([]*autoscaling.Group) { for _, fn := range addParentsFns["scalinggroup"] { wg.Add(1) - go func(f addParentFn, region string, res *autoscaling.Group) { + go func(f addParentFn, snap tstore.RDFGraph, region string, res *autoscaling.Group) { defer wg.Done() - err := f(gph, region, res) + err := f(gph, snap, region, res) if err != nil { errc <- err return } - }(fn, s.region, r) + }(fn, snap, s.region, r) } } } @@ -864,14 +867,14 @@ func (s *Infra) FetchResources() (*graph.Graph, error) { for _, r := range list.([]*autoscaling.ScalingPolicy) { for _, fn := range addParentsFns["scalingpolicy"] { wg.Add(1) - go func(f addParentFn, region string, res *autoscaling.ScalingPolicy) { + go func(f addParentFn, snap tstore.RDFGraph, region string, res *autoscaling.ScalingPolicy) { defer wg.Done() - err := f(gph, region, res) + err := f(gph, snap, region, res) if err != nil { errc <- err return } - }(fn, s.region, r) + }(fn, snap, s.region, r) } } } @@ -886,14 +889,14 @@ func (s *Infra) FetchResources() (*graph.Graph, error) { for _, r := range list.([]*ecr.Repository) { for _, fn := range addParentsFns["repository"] { wg.Add(1) - go func(f addParentFn, region string, res *ecr.Repository) { + go func(f addParentFn, snap tstore.RDFGraph, region string, res *ecr.Repository) { defer wg.Done() - err := f(gph, region, res) + err := f(gph, snap, region, res) if err != nil { errc <- err return } - }(fn, s.region, r) + }(fn, snap, s.region, r) } } } @@ -908,14 +911,14 @@ func (s *Infra) FetchResources() (*graph.Graph, error) { for _, r := range list.([]*ecs.Cluster) { for _, fn := range addParentsFns["containercluster"] { wg.Add(1) - go func(f addParentFn, region string, res *ecs.Cluster) { + go func(f addParentFn, snap tstore.RDFGraph, region string, res *ecs.Cluster) { defer wg.Done() - err := f(gph, region, res) + err := f(gph, snap, region, res) if err != nil { errc <- err return } - }(fn, s.region, r) + }(fn, snap, s.region, r) } } } @@ -930,14 +933,14 @@ func (s *Infra) FetchResources() (*graph.Graph, error) { for _, r := range list.([]*ecs.TaskDefinition) { for _, fn := range addParentsFns["containertask"] { wg.Add(1) - go func(f addParentFn, region string, res *ecs.TaskDefinition) { + go func(f addParentFn, snap tstore.RDFGraph, region string, res *ecs.TaskDefinition) { defer wg.Done() - err := f(gph, region, res) + err := f(gph, snap, region, res) if err != nil { errc <- err return } - }(fn, s.region, r) + }(fn, snap, s.region, r) } } } @@ -952,14 +955,14 @@ func (s *Infra) FetchResources() (*graph.Graph, error) { for _, r := range list.([]*ecs.Container) { for _, fn := range addParentsFns["container"] { wg.Add(1) - go func(f addParentFn, region string, res *ecs.Container) { + go func(f addParentFn, snap tstore.RDFGraph, region string, res *ecs.Container) { defer wg.Done() - err := f(gph, region, res) + err := f(gph, snap, region, res) if err != nil { errc <- err return } - }(fn, s.region, r) + }(fn, snap, s.region, r) } } } @@ -974,14 +977,14 @@ func (s *Infra) FetchResources() (*graph.Graph, error) { for _, r := range list.([]*ecs.ContainerInstance) { for _, fn := range addParentsFns["containerinstance"] { wg.Add(1) - go func(f addParentFn, region string, res *ecs.ContainerInstance) { + go func(f addParentFn, snap tstore.RDFGraph, region string, res *ecs.ContainerInstance) { defer wg.Done() - err := f(gph, region, res) + err := f(gph, snap, region, res) if err != nil { errc <- err return } - }(fn, s.region, r) + }(fn, snap, s.region, r) } } } @@ -1102,6 +1105,8 @@ func (s *Access) FetchResources() (*graph.Graph, error) { return gph, err } + snap := gph.AsRDFGraphSnaphot() + errc := make(chan error) var wg sync.WaitGroup if s.config.getBool("aws.access.user.sync", true) { @@ -1115,14 +1120,14 @@ func (s *Access) FetchResources() (*graph.Graph, error) { for _, r := range list.([]*iam.UserDetail) { for _, fn := range addParentsFns["user"] { wg.Add(1) - go func(f addParentFn, region string, res *iam.UserDetail) { + go func(f addParentFn, snap tstore.RDFGraph, region string, res *iam.UserDetail) { defer wg.Done() - err := f(gph, region, res) + err := f(gph, snap, region, res) if err != nil { errc <- err return } - }(fn, s.region, r) + }(fn, snap, s.region, r) } } } @@ -1137,14 +1142,14 @@ func (s *Access) FetchResources() (*graph.Graph, error) { for _, r := range list.([]*iam.GroupDetail) { for _, fn := range addParentsFns["group"] { wg.Add(1) - go func(f addParentFn, region string, res *iam.GroupDetail) { + go func(f addParentFn, snap tstore.RDFGraph, region string, res *iam.GroupDetail) { defer wg.Done() - err := f(gph, region, res) + err := f(gph, snap, region, res) if err != nil { errc <- err return } - }(fn, s.region, r) + }(fn, snap, s.region, r) } } } @@ -1159,14 +1164,14 @@ func (s *Access) FetchResources() (*graph.Graph, error) { for _, r := range list.([]*iam.RoleDetail) { for _, fn := range addParentsFns["role"] { wg.Add(1) - go func(f addParentFn, region string, res *iam.RoleDetail) { + go func(f addParentFn, snap tstore.RDFGraph, region string, res *iam.RoleDetail) { defer wg.Done() - err := f(gph, region, res) + err := f(gph, snap, region, res) if err != nil { errc <- err return } - }(fn, s.region, r) + }(fn, snap, s.region, r) } } } @@ -1181,14 +1186,14 @@ func (s *Access) FetchResources() (*graph.Graph, error) { for _, r := range list.([]*iam.Policy) { for _, fn := range addParentsFns["policy"] { wg.Add(1) - go func(f addParentFn, region string, res *iam.Policy) { + go func(f addParentFn, snap tstore.RDFGraph, region string, res *iam.Policy) { defer wg.Done() - err := f(gph, region, res) + err := f(gph, snap, region, res) if err != nil { errc <- err return } - }(fn, s.region, r) + }(fn, snap, s.region, r) } } } @@ -1203,14 +1208,14 @@ func (s *Access) FetchResources() (*graph.Graph, error) { for _, r := range list.([]*iam.AccessKeyMetadata) { for _, fn := range addParentsFns["accesskey"] { wg.Add(1) - go func(f addParentFn, region string, res *iam.AccessKeyMetadata) { + go func(f addParentFn, snap tstore.RDFGraph, region string, res *iam.AccessKeyMetadata) { defer wg.Done() - err := f(gph, region, res) + err := f(gph, snap, region, res) if err != nil { errc <- err return } - }(fn, s.region, r) + }(fn, snap, s.region, r) } } } @@ -1225,14 +1230,14 @@ func (s *Access) FetchResources() (*graph.Graph, error) { for _, r := range list.([]*iam.InstanceProfile) { for _, fn := range addParentsFns["instanceprofile"] { wg.Add(1) - go func(f addParentFn, region string, res *iam.InstanceProfile) { + go func(f addParentFn, snap tstore.RDFGraph, region string, res *iam.InstanceProfile) { defer wg.Done() - err := f(gph, region, res) + err := f(gph, snap, region, res) if err != nil { errc <- err return } - }(fn, s.region, r) + }(fn, snap, s.region, r) } } } @@ -1344,6 +1349,8 @@ func (s *Storage) FetchResources() (*graph.Graph, error) { return gph, err } + snap := gph.AsRDFGraphSnaphot() + errc := make(chan error) var wg sync.WaitGroup if s.config.getBool("aws.storage.bucket.sync", true) { @@ -1357,14 +1364,14 @@ func (s *Storage) FetchResources() (*graph.Graph, error) { for _, r := range list.([]*s3.Bucket) { for _, fn := range addParentsFns["bucket"] { wg.Add(1) - go func(f addParentFn, region string, res *s3.Bucket) { + go func(f addParentFn, snap tstore.RDFGraph, region string, res *s3.Bucket) { defer wg.Done() - err := f(gph, region, res) + err := f(gph, snap, region, res) if err != nil { errc <- err return } - }(fn, s.region, r) + }(fn, snap, s.region, r) } } } @@ -1379,14 +1386,14 @@ func (s *Storage) FetchResources() (*graph.Graph, error) { for _, r := range list.([]*s3.Object) { for _, fn := range addParentsFns["s3object"] { wg.Add(1) - go func(f addParentFn, region string, res *s3.Object) { + go func(f addParentFn, snap tstore.RDFGraph, region string, res *s3.Object) { defer wg.Done() - err := f(gph, region, res) + err := f(gph, snap, region, res) if err != nil { errc <- err return } - }(fn, s.region, r) + }(fn, snap, s.region, r) } } } @@ -1504,6 +1511,8 @@ func (s *Messaging) FetchResources() (*graph.Graph, error) { return gph, err } + snap := gph.AsRDFGraphSnaphot() + errc := make(chan error) var wg sync.WaitGroup if s.config.getBool("aws.messaging.subscription.sync", true) { @@ -1517,14 +1526,14 @@ func (s *Messaging) FetchResources() (*graph.Graph, error) { for _, r := range list.([]*sns.Subscription) { for _, fn := range addParentsFns["subscription"] { wg.Add(1) - go func(f addParentFn, region string, res *sns.Subscription) { + go func(f addParentFn, snap tstore.RDFGraph, region string, res *sns.Subscription) { defer wg.Done() - err := f(gph, region, res) + err := f(gph, snap, region, res) if err != nil { errc <- err return } - }(fn, s.region, r) + }(fn, snap, s.region, r) } } } @@ -1539,14 +1548,14 @@ func (s *Messaging) FetchResources() (*graph.Graph, error) { for _, r := range list.([]*sns.Topic) { for _, fn := range addParentsFns["topic"] { wg.Add(1) - go func(f addParentFn, region string, res *sns.Topic) { + go func(f addParentFn, snap tstore.RDFGraph, region string, res *sns.Topic) { defer wg.Done() - err := f(gph, region, res) + err := f(gph, snap, region, res) if err != nil { errc <- err return } - }(fn, s.region, r) + }(fn, snap, s.region, r) } } } @@ -1561,14 +1570,14 @@ func (s *Messaging) FetchResources() (*graph.Graph, error) { for _, r := range list.([]*string) { for _, fn := range addParentsFns["queue"] { wg.Add(1) - go func(f addParentFn, region string, res *string) { + go func(f addParentFn, snap tstore.RDFGraph, region string, res *string) { defer wg.Done() - err := f(gph, region, res) + err := f(gph, snap, region, res) if err != nil { errc <- err return } - }(fn, s.region, r) + }(fn, snap, s.region, r) } } } @@ -1680,6 +1689,8 @@ func (s *Dns) FetchResources() (*graph.Graph, error) { return gph, err } + snap := gph.AsRDFGraphSnaphot() + errc := make(chan error) var wg sync.WaitGroup if s.config.getBool("aws.dns.zone.sync", true) { @@ -1693,14 +1704,14 @@ func (s *Dns) FetchResources() (*graph.Graph, error) { for _, r := range list.([]*route53.HostedZone) { for _, fn := range addParentsFns["zone"] { wg.Add(1) - go func(f addParentFn, region string, res *route53.HostedZone) { + go func(f addParentFn, snap tstore.RDFGraph, region string, res *route53.HostedZone) { defer wg.Done() - err := f(gph, region, res) + err := f(gph, snap, region, res) if err != nil { errc <- err return } - }(fn, s.region, r) + }(fn, snap, s.region, r) } } } @@ -1715,14 +1726,14 @@ func (s *Dns) FetchResources() (*graph.Graph, error) { for _, r := range list.([]*route53.ResourceRecordSet) { for _, fn := range addParentsFns["record"] { wg.Add(1) - go func(f addParentFn, region string, res *route53.ResourceRecordSet) { + go func(f addParentFn, snap tstore.RDFGraph, region string, res *route53.ResourceRecordSet) { defer wg.Done() - err := f(gph, region, res) + err := f(gph, snap, region, res) if err != nil { errc <- err return } - }(fn, s.region, r) + }(fn, snap, s.region, r) } } } @@ -1833,6 +1844,8 @@ func (s *Lambda) FetchResources() (*graph.Graph, error) { return gph, err } + snap := gph.AsRDFGraphSnaphot() + errc := make(chan error) var wg sync.WaitGroup if s.config.getBool("aws.lambda.function.sync", true) { @@ -1846,14 +1859,14 @@ func (s *Lambda) FetchResources() (*graph.Graph, error) { for _, r := range list.([]*lambda.FunctionConfiguration) { for _, fn := range addParentsFns["function"] { wg.Add(1) - go func(f addParentFn, region string, res *lambda.FunctionConfiguration) { + go func(f addParentFn, snap tstore.RDFGraph, region string, res *lambda.FunctionConfiguration) { defer wg.Done() - err := f(gph, region, res) + err := f(gph, snap, region, res) if err != nil { errc <- err return } - }(fn, s.region, r) + }(fn, snap, s.region, r) } } } @@ -1965,6 +1978,8 @@ func (s *Monitoring) FetchResources() (*graph.Graph, error) { return gph, err } + snap := gph.AsRDFGraphSnaphot() + errc := make(chan error) var wg sync.WaitGroup if s.config.getBool("aws.monitoring.metric.sync", true) { @@ -1978,14 +1993,14 @@ func (s *Monitoring) FetchResources() (*graph.Graph, error) { for _, r := range list.([]*cloudwatch.Metric) { for _, fn := range addParentsFns["metric"] { wg.Add(1) - go func(f addParentFn, region string, res *cloudwatch.Metric) { + go func(f addParentFn, snap tstore.RDFGraph, region string, res *cloudwatch.Metric) { defer wg.Done() - err := f(gph, region, res) + err := f(gph, snap, region, res) if err != nil { errc <- err return } - }(fn, s.region, r) + }(fn, snap, s.region, r) } } } @@ -2000,14 +2015,14 @@ func (s *Monitoring) FetchResources() (*graph.Graph, error) { for _, r := range list.([]*cloudwatch.MetricAlarm) { for _, fn := range addParentsFns["alarm"] { wg.Add(1) - go func(f addParentFn, region string, res *cloudwatch.MetricAlarm) { + go func(f addParentFn, snap tstore.RDFGraph, region string, res *cloudwatch.MetricAlarm) { defer wg.Done() - err := f(gph, region, res) + err := f(gph, snap, region, res) if err != nil { errc <- err return } - }(fn, s.region, r) + }(fn, snap, s.region, r) } } } @@ -2118,6 +2133,8 @@ func (s *Cdn) FetchResources() (*graph.Graph, error) { return gph, err } + snap := gph.AsRDFGraphSnaphot() + errc := make(chan error) var wg sync.WaitGroup if s.config.getBool("aws.cdn.distribution.sync", true) { @@ -2131,14 +2148,14 @@ func (s *Cdn) FetchResources() (*graph.Graph, error) { for _, r := range list.([]*cloudfront.DistributionSummary) { for _, fn := range addParentsFns["distribution"] { wg.Add(1) - go func(f addParentFn, region string, res *cloudfront.DistributionSummary) { + go func(f addParentFn, snap tstore.RDFGraph, region string, res *cloudfront.DistributionSummary) { defer wg.Done() - err := f(gph, region, res) + err := f(gph, snap, region, res) if err != nil { errc <- err return } - }(fn, s.region, r) + }(fn, snap, s.region, r) } } } @@ -2249,6 +2266,8 @@ func (s *Cloudformation) FetchResources() (*graph.Graph, error) { return gph, err } + snap := gph.AsRDFGraphSnaphot() + errc := make(chan error) var wg sync.WaitGroup if s.config.getBool("aws.cloudformation.stack.sync", true) { @@ -2262,14 +2281,14 @@ func (s *Cloudformation) FetchResources() (*graph.Graph, error) { for _, r := range list.([]*cloudformation.Stack) { for _, fn := range addParentsFns["stack"] { wg.Add(1) - go func(f addParentFn, region string, res *cloudformation.Stack) { + go func(f addParentFn, snap tstore.RDFGraph, region string, res *cloudformation.Stack) { defer wg.Done() - err := f(gph, region, res) + err := f(gph, snap, region, res) if err != nil { errc <- err return } - }(fn, s.region, r) + }(fn, snap, s.region, r) } } } diff --git a/aws/services/relations.go b/aws/services/relations.go index f7bab4eb6..881754b8c 100644 --- a/aws/services/relations.go +++ b/aws/services/relations.go @@ -31,6 +31,7 @@ import ( "github.com/wallix/awless/aws/conv" "github.com/wallix/awless/cloud" "github.com/wallix/awless/graph" + tstore "github.com/wallix/triplestore" ) const ( @@ -45,7 +46,7 @@ type funcBuilder struct { relation int } -type addParentFn func(*graph.Graph, string, interface{}) error +type addParentFn func(*graph.Graph, tstore.RDFGraph, string, interface{}) error var addParentsFns = map[string][]addParentFn{ // Infra @@ -154,7 +155,7 @@ func (fb funcBuilder) build() addParentFn { } func (fb funcBuilder) addRelationWithField() addParentFn { - return func(g *graph.Graph, region string, i interface{}) error { + return func(g *graph.Graph, snap tstore.RDFGraph, region string, i interface{}) error { structField, err := verifyValidStructField(i, fb.fieldName) if err != nil { return err @@ -180,7 +181,7 @@ func (fb funcBuilder) addRelationWithField() addParentFn { } func (fb funcBuilder) addRelationListWithStringField() addParentFn { - return func(g *graph.Graph, region string, i interface{}) error { + return func(g *graph.Graph, snap tstore.RDFGraph, region string, i interface{}) error { structField, err := verifyValidStructField(i, fb.stringListName) if err != nil { return err @@ -215,7 +216,7 @@ func (fb funcBuilder) addRelationListWithStringField() addParentFn { } func (fb funcBuilder) addRelationListWithField() addParentFn { - return func(g *graph.Graph, region string, i interface{}) error { + return func(g *graph.Graph, snap tstore.RDFGraph, region string, i interface{}) error { structField, err := verifyValidStructField(i, fb.listName) if err != nil { return err @@ -293,7 +294,7 @@ func addRelation(g *graph.Graph, first, other *graph.Resource, relation int) err return nil } -func addRegionParent(g *graph.Graph, region string, i interface{}) error { +func addRegionParent(g *graph.Graph, snap tstore.RDFGraph, region string, i interface{}) error { res, err := awsconv.InitResource(i) if err != nil { return err @@ -302,7 +303,7 @@ func addRegionParent(g *graph.Graph, region string, i interface{}) error { return nil } -func addManagedPoliciesRelations(g *graph.Graph, region string, i interface{}) error { +func addManagedPoliciesRelations(g *graph.Graph, snap tstore.RDFGraph, region string, i interface{}) error { res, err := awsconv.InitResource(i) if err != nil { return err @@ -325,9 +326,8 @@ func addManagedPoliciesRelations(g *graph.Graph, region string, i interface{}) e return fmt.Errorf("add parent to %s: not a valid attached policy list: %T", res.Id(), structField.Interface()) } - immutableRDFGraph := g.AsRDFGraphSnaphot() for _, policy := range policies { - policies, err := graph.ResolveResourcesOnSnapShot(immutableRDFGraph, &graph.And{Resolvers: []graph.Resolver{&graph.ByProperty{Key: "Name", Value: awssdk.StringValue(policy.PolicyName)}, &graph.ByType{Typ: cloud.Policy}}}) + policies, err := graph.ResolveResourcesOnSnapShot(snap, &graph.And{Resolvers: []graph.Resolver{&graph.ByProperty{Key: "Name", Value: awssdk.StringValue(policy.PolicyName)}, &graph.ByType{Typ: cloud.Policy}}}) if err != nil { return err } @@ -340,7 +340,7 @@ func addManagedPoliciesRelations(g *graph.Graph, region string, i interface{}) e return nil } -func userAddGroupsRelations(g *graph.Graph, region string, i interface{}) error { +func userAddGroupsRelations(g *graph.Graph, snap tstore.RDFGraph, region string, i interface{}) error { user, ok := i.(*iam.UserDetail) if !ok { return fmt.Errorf("aws fetch: not a user, but a %T", i) @@ -350,10 +350,9 @@ func userAddGroupsRelations(g *graph.Graph, region string, i interface{}) error return err } - immutableRDFGraph := g.AsRDFGraphSnaphot() for _, group := range user.GroupList { groupName := awssdk.StringValue(group) - resources, err := graph.ResolveResourcesOnSnapShot(immutableRDFGraph, &graph.And{Resolvers: []graph.Resolver{ + resources, err := graph.ResolveResourcesOnSnapShot(snap, &graph.And{Resolvers: []graph.Resolver{ &graph.ByProperty{Key: "Name", Value: groupName}, &graph.ByType{Typ: cloud.Group}, }}) @@ -372,7 +371,7 @@ func userAddGroupsRelations(g *graph.Graph, region string, i interface{}) error return nil } -func fetchTargetsAndAddRelations(g *graph.Graph, region string, i interface{}) error { +func fetchTargetsAndAddRelations(g *graph.Graph, snap tstore.RDFGraph, region string, i interface{}) error { group, ok := i.(*elbv2.TargetGroup) if !ok { return fmt.Errorf("add targets relation: not a target group, but a %T", i) @@ -397,7 +396,7 @@ func fetchTargetsAndAddRelations(g *graph.Graph, region string, i interface{}) e return nil } -func addScalingGroupSubnets(g *graph.Graph, region string, i interface{}) error { +func addScalingGroupSubnets(g *graph.Graph, snap tstore.RDFGraph, region string, i interface{}) error { group, ok := i.(*autoscaling.Group) if !ok { return fmt.Errorf("add autoscaling group relation: not a autoscaling group, but a %T", i) @@ -419,7 +418,7 @@ func addScalingGroupSubnets(g *graph.Graph, region string, i interface{}) error return nil } -func addAlarmMetric(g *graph.Graph, region string, i interface{}) error { +func addAlarmMetric(g *graph.Graph, snap tstore.RDFGraph, region string, i interface{}) error { alarm, ok := i.(*cloudwatch.MetricAlarm) if !ok { return fmt.Errorf("add alarm metric relation: not a alarm, but a %T", i) diff --git a/gen/aws/generators/services.go b/gen/aws/generators/services.go index 8df1a77bb..df8d1cea2 100644 --- a/gen/aws/generators/services.go +++ b/gen/aws/generators/services.go @@ -92,6 +92,7 @@ import ( "github.com/wallix/awless/aws/driver" "github.com/wallix/awless/fetch" "github.com/wallix/awless/aws/fetch" + tstore "github.com/wallix/triplestore" ) const accessDenied = "Access Denied" @@ -239,6 +240,8 @@ func (s *{{ Title $service.Name }}) FetchResources() (*graph.Graph, error) { return gph, err } + snap := gph.AsRDFGraphSnaphot() + errc := make(chan error) var wg sync.WaitGroup @@ -254,14 +257,14 @@ func (s *{{ Title $service.Name }}) FetchResources() (*graph.Graph, error) { for _, r := range list.([]*{{ $fetcher.AWSType }}) { for _, fn := range addParentsFns["{{ $fetcher.ResourceType }}"] { wg.Add(1) - go func(f addParentFn, region string, res *{{ $fetcher.AWSType }}) { + go func(f addParentFn, snap tstore.RDFGraph, region string, res *{{ $fetcher.AWSType }}) { defer wg.Done() - err := f(gph, region, res) + err := f(gph, snap, region, res) if err != nil { errc <- err return } - }(fn, s.region, r) + }(fn, snap, s.region, r) } } }