Skip to content

Commit

Permalink
[mixer:stackdriver] Initial changes to support dst svc edges in graph (
Browse files Browse the repository at this point in the history
…#11426)

* Initial changes to support dst svc edges

* Add istio service to k8s service member relation

* Refactor of edge logic and add test

* Add <workload, service> relations
  • Loading branch information
douglas-reid authored and wenchenglu committed Feb 8, 2019
1 parent 2810bae commit db48306
Show file tree
Hide file tree
Showing 13 changed files with 650 additions and 89 deletions.
17 changes: 12 additions & 5 deletions mixer/adapter/stackdriver/contextgraph/contextgraph.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,12 +155,19 @@ func (h *handler) HandleEdge(ctx context.Context, insts []*edgepb.Instance) erro
i.DestinationWorkloadName,
i.DestinationWorkloadNamespace,
}
destinationService := service{
meshUID: h.meshUID,
namespace: i.DestinationServiceNamespace,
name: i.DestinationServiceName,
istioProject: h.projectID,
}
h.traffics <- trafficAssertion{
source,
destination,
i.ContextProtocol,
i.ApiProtocol,
i.Timestamp,
source: source,
destination: destination,
contextProtocol: i.ContextProtocol,
apiProtocol: i.ApiProtocol,
destinationService: destinationService,
timestamp: i.Timestamp,
}
}
return nil
Expand Down
2 changes: 1 addition & 1 deletion mixer/adapter/stackdriver/contextgraph/send.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ func (h *handler) send(ctx context.Context, t time.Time, entitiesToSend []entity
}

func (h *handler) call(ctx context.Context, req *contextgraphpb.AssertBatchRequest) error {
h.env.Logger().Debugf("Sending %v entities and %v relationships",
h.env.Logger().Debugf("Sending Context Graph AssertBatch with %d entities, and %d relationships",
len(req.EntityPresentAssertions), len(req.RelationshipPresentAssertions))
if _, err := h.assertBatch(ctx, req); err != nil {
s, _ := status.FromError(err)
Expand Down
127 changes: 88 additions & 39 deletions mixer/adapter/stackdriver/contextgraph/workload.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,23 @@ import (
"istio.io/istio/mixer/pkg/adapter"
)

const membershipTypeName = "google.cloud.contextgraph.Membership"
const (
membershipTypeName = "google.cloud.contextgraph.Membership"

grpcComm = "google.cloud.contextgraph.Communication.Grpc"
httpComm = "google.cloud.contextgraph.Communication.Http"
httpsComm = "google.cloud.contextgraph.Communication.Https"
tcpComm = "google.cloud.contextgraph.Communication.Tcp"
)

var (
protocolMap = map[string]string{
"http": httpComm,
"https": httpsComm,
"tcp": tcpComm,
"grpc": grpcComm,
}
)

type workloadInstance struct {
// N.B. The projects can potentially be different for each workload.
Expand All @@ -35,6 +51,13 @@ type workloadInstance struct {
workloadName, workloadNamespace string
}

type service struct {
meshUID string
namespace string
name string
istioProject string
}

// Reify turns wi into a set of Context API entities and edges.
func (wi workloadInstance) Reify(logger adapter.Logger) ([]entity, []edge) {
gcpContainer := fmt.Sprintf("//cloudresourcemanager.googleapis.com/projects/%s", wi.istioProject)
Expand Down Expand Up @@ -82,13 +105,7 @@ func (wi workloadInstance) Reify(logger adapter.Logger) ([]entity, []edge) {
"global",
[4]string{meshUID, workloadNamespace, workloadName, ""},
}
// TODO: Figure out what the container is for non-GCE clusters.
clusterLocationType := "locations"
if strings.Count(clusterLocation, "-") == 2 {
clusterLocationType = "zones"
}
clusterContainer := fmt.Sprintf("//container.googleapis.com/projects/%s/%s/%s/clusters/%s",
wi.clusterProject, clusterLocationType, wi.clusterLocation, wi.clusterName)
clusterContainer := clusterContainer(wi.clusterProject, wi.clusterLocation, wi.clusterName)

var ownerK8sFullName string
t := strings.Split(wi.owner, "/")
Expand Down Expand Up @@ -151,56 +168,88 @@ func (wi workloadInstance) Reify(logger adapter.Logger) ([]entity, []edge) {
return []entity{wiEntity, owner, workload}, edges
}

func (s service) Reify() entity {
return entity{
containerFullName: fmt.Sprintf("//cloudresourcemanager.googleapis.com/projects/%s",
s.istioProject),
typeName: "io.istio.Service",
fullName: fmt.Sprintf("//istio.io/projects/%s/meshes/%s/services/%s/%s",
s.istioProject,
url.QueryEscape(s.meshUID),
url.QueryEscape(s.namespace),
url.QueryEscape(s.name)),
location: "global",
shortNames: [4]string{
url.QueryEscape(s.meshUID),
url.QueryEscape(s.namespace),
url.QueryEscape(s.name),
"",
},
}
}

type trafficAssertion struct {
source, destination workloadInstance
contextProtocol, apiProtocol string
destinationService service
timestamp time.Time
}

func (t trafficAssertion) Reify(logger adapter.Logger) ([]entity, []edge) {
var sourceFullNames, destinationFullNames []string

commType, ok := protocolMap[t.contextProtocol]
if !ok {
if commType, ok = protocolMap[t.apiProtocol]; !ok {
logger.Warningf("Unknown type of protocol: %s", t.apiProtocol)
}
}

serviceEntity := t.destinationService.Reify()
entities, edges := t.source.Reify(logger)
var sourceFullNames []string
for _, entity := range entities {
sourceFullNames = append(sourceFullNames, entity.fullName)
if len(commType) > 0 {
edges = append(edges, edge{entity.fullName, serviceEntity.fullName, commType})
}
}
destEntities, destEdges := t.destination.Reify(logger)
for _, entity := range destEntities {
entities = append(entities, entity)
destinationFullNames = append(destinationFullNames, entity.fullName)
}
entities = append(entities, destEntities...)
edges = append(edges, destEdges...)

var typeName string
var protocol string
switch t.contextProtocol {
case "tcp", "http", "grpc":
protocol = t.contextProtocol
default:
protocol = t.apiProtocol
}
switch protocol {
case "http":
typeName = "google.cloud.contextgraph.Communication.Http"
case "tcp":
typeName = "google.cloud.contextgraph.Communication.Tcp"
case "https":
typeName = "google.cloud.contextgraph.Communication.Https"
case "grpc":
typeName = "google.cloud.contextgraph.Communication.Grpc"
default:
logger.Warningf("Unknown type of protocol: %s", protocol)
}
if typeName != "" {
// Publish the full N-way relationships.
for _, s := range sourceFullNames {
for _, d := range destinationFullNames {
edges = append(edges, edge{s, d, typeName})
var k8sSvc string
for _, entity := range destEntities {
if len(commType) > 0 {
for _, s := range sourceFullNames {
edges = append(edges, edge{s, entity.fullName, commType})
}
edges = append(edges, edge{serviceEntity.fullName, entity.fullName, commType})
if k8sSvc == "" {
k8sSvc = k8sSvcFullname(entity.shortNames[1], entity.location, entity.shortNames[2], t.destinationService.namespace, t.destinationService.name)
}
}
}
entities = append(entities, serviceEntity)
if len(k8sSvc) > 0 {
edges = append(edges, edge{serviceEntity.fullName, k8sSvc, membershipTypeName})
}

return entities, edges
}

// example: //container.googleapis.com/projects/<project>/locations/us-central1-a/clusters/<cluster>/k8s/namespaces/default/services/<service>
func k8sSvcFullname(project, location, cluster, namespace, name string) string {
return fmt.Sprintf("%s/k8s/namespaces/%s/services/%s", clusterContainer(project, location, cluster), namespace, name)
}

func clusterContainer(project, location, cluster string) string {
// TODO: Figure out what the container is for non-GCE clusters.
locType := "locations"
if strings.Count(location, "-") == 2 {
locType = "zones"
}
return fmt.Sprintf("//container.googleapis.com/projects/%s/%s/%s/clusters/%s", project, locType, location, cluster)
}

type entity struct {
containerFullName string
typeName string
Expand Down

0 comments on commit db48306

Please sign in to comment.