Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion extensions/stackdriver/metric/record.cc
Original file line number Diff line number Diff line change
Expand Up @@ -458,6 +458,13 @@ void recordTCP(bool is_outbound,
opencensus::stats::Record({{clientReceivedBytesCountMeasure(),
request_info.tcp_received_bytes}},
tagMap);
std::cout << absl::StrCat("tcp_received_bytes: ",
request_info.tcp_received_bytes)
<< std::endl;
for (const auto& [tag, value] : tagMap) {
std::cout << absl::StrCat("tag: ", tag.name(), " value: ", value)
<< std::endl;
}
} else {
opencensus::stats::Record({{clientReceivedBytesCountMeasure(),
request_info.tcp_received_bytes}},
Expand Down Expand Up @@ -548,4 +555,4 @@ void recordTCP(bool is_outbound,

} // namespace Metric
} // namespace Stackdriver
} // namespace Extensions
} // namespace Extensions
10 changes: 8 additions & 2 deletions extensions/stackdriver/stackdriver.cc
Original file line number Diff line number Diff line change
Expand Up @@ -654,9 +654,15 @@ bool StackdriverRootContext::recordTCP(uint32_t id) {
bool log_open_on_timeout =
!record_info.tcp_open_entry_logged &&
(cur - request_info.start_time) > tcp_log_entry_timeout_;
if (waiting_for_metadata && no_error && !log_open_on_timeout) {
return false;
if (waiting_for_metadata && no_error) {
LOG_TRACE(absl::StrCat("Waiting for peer metadata, timeout: ",
log_open_on_timeout));
if (!log_open_on_timeout) {
return false;
}
}
LOG_TRACE(absl::StrCat("Reporting sent: ", request_info.tcp_sent_bytes,
" received: ", request_info.tcp_received_bytes));
if (!request_info.is_populated) {
::Wasm::Common::populateTCPRequestInfo(outbound, &request_info);
}
Expand Down
2 changes: 2 additions & 0 deletions extensions/stackdriver/stackdriver.h
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,7 @@ class StackdriverContext : public Context {
if (!is_initialized_) {
return FilterStatus::Continue;
}
LOG_TRACE(absl::StrCat("SD incrementReceivedBytes: ", size));
getRootContext()->incrementReceivedBytes(context_id_, size);
getRootContext()->setConnectionState(
context_id_, ::Wasm::Common::TCPConnectionState::Connected);
Expand All @@ -235,6 +236,7 @@ class StackdriverContext : public Context {
if (!is_initialized_) {
return FilterStatus::Continue;
}
LOG_TRACE(absl::StrCat("SD incrementSentBytes: ", size));
getRootContext()->incrementSentBytes(context_id_, size);
getRootContext()->setConnectionState(
context_id_, ::Wasm::Common::TCPConnectionState::Connected);
Expand Down
17 changes: 10 additions & 7 deletions test/envoye2e/driver/envoy.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,8 @@ func (e *Envoy) Run(p *Params) error {
}
log.Printf("envoy bootstrap:\n%s\n", bootstrap)

e.adminPort, err = getAdminPort(bootstrap)
var node string
e.adminPort, node, err = getAdminPortAndNode(bootstrap)
if err != nil {
return err
}
Expand All @@ -106,6 +107,7 @@ func (e *Envoy) Run(p *Params) error {
concurrency = fmt.Sprint(e.Concurrency)
}
args := []string{
"--log-format", "[" + node + ` %T.%e][%t][%l][%n] [%g:%#] %v`,
"-c", e.tmpFile,
"-l", debugLevel,
"--concurrency", concurrency,
Expand Down Expand Up @@ -172,23 +174,24 @@ func (e *Envoy) Cleanup() {
}
}

func getAdminPort(bootstrap string) (uint32, error) {
func getAdminPortAndNode(bootstrap string) (uint32, string, error) {
pb := &bootstrap_v3.Bootstrap{}
if err := ReadYAML(bootstrap, pb); err != nil {
return 0, err
return 0, "", err
}
if pb.Admin == nil || pb.Admin.Address == nil {
return 0, fmt.Errorf("missing admin section in bootstrap: %v", bootstrap)
return 0, "", fmt.Errorf("missing admin section in bootstrap: %v", bootstrap)
}
socket, ok := pb.Admin.Address.Address.(*core.Address_SocketAddress)
if !ok {
return 0, fmt.Errorf("missing socket in bootstrap: %v", bootstrap)
return 0, "", fmt.Errorf("missing socket in bootstrap: %v", bootstrap)
}
port, ok := socket.SocketAddress.PortSpecifier.(*core.SocketAddress_PortValue)
if !ok {
return 0, fmt.Errorf("missing port in bootstrap: %v", bootstrap)
return 0, "", fmt.Errorf("missing port in bootstrap: %v", bootstrap)
}
return port.PortValue, nil
node := pb.Node.Id
return port.PortValue, node, nil
}

// downloads env based on the given branch name. Return location of downloaded envoy.
Expand Down
7 changes: 5 additions & 2 deletions test/envoye2e/driver/xds.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (

// XDS creates an xDS server
type XDS struct {
lis net.Listener
grpc *grpc.Server
}

Expand All @@ -56,13 +57,14 @@ func (x *XDS) Run(p *Params) error {
xdsServer := server.NewServer(context.Background(), p.Config.Cache, nil)
discovery.RegisterAggregatedDiscoveryServiceServer(x.grpc, xdsServer)
secret.RegisterSecretDiscoveryServiceServer(x.grpc, xdsServer)
lis, err := net.Listen("tcp", fmt.Sprintf(":%d", p.Ports.XDSPort))
var err error
x.lis, err = net.Listen("tcp", fmt.Sprintf(":%d", p.Ports.XDSPort))
if err != nil {
return err
}

go func() {
_ = x.grpc.Serve(lis)
_ = x.grpc.Serve(x.lis)
}()
return nil
}
Expand All @@ -71,6 +73,7 @@ func (x *XDS) Run(p *Params) error {
func (x *XDS) Cleanup() {
log.Println("stopping XDS server")
x.grpc.GracefulStop()
x.lis.Close()
}

func (x *XDS) Debugf(format string, args ...interface{}) {
Expand Down
1 change: 0 additions & 1 deletion test/envoye2e/inventory.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ func init() {
"TestStackdriverRbacAccessDenied/ActionAllow",
"TestStackdriverRbacAccessDenied/ActionBoth",
"TestStackdriverRbacAccessDenied/ActionDeny",
"TestStackdriverRbacTCPDryRun",
"TestStackdriverRbacTCPDryRun/BaseCase",
"TestStackdriverRbacTCPDryRun/NoAlpn",
"TestStackdriverReload",
Expand Down
12 changes: 6 additions & 6 deletions test/envoye2e/stackdriver_plugin/fake_stackdriver.go
Original file line number Diff line number Diff line change
Expand Up @@ -352,7 +352,7 @@ func (s *TracesServer) CreateSpan(ctx context.Context, req *cloudtracev2.Span) (
// NewFakeStackdriver creates a new fake Stackdriver server.
func NewFakeStackdriver(port uint16, delay time.Duration,
enableTLS bool, bearer string,
) (*MetricServer, *LoggingServer, *TracesServer, *grpc.Server) {
) (*MetricServer, *LoggingServer, *TracesServer, *grpc.Server, net.Listener) {
log.Printf("Stackdriver server listening on port %v\n", port)

var options []grpc.ServerOption
Expand Down Expand Up @@ -403,17 +403,17 @@ func NewFakeStackdriver(port uint16, delay time.Duration,
cloudtracev1.RegisterTraceServiceServer(grpcServer, traceSvc)
cloudtracev2.RegisterTraceServiceServer(grpcServer, traceSvc)

lis, err := net.Listen("tcp", fmt.Sprintf(":%d", port))
if err != nil {
log.Fatalf("failed to listen: %v", err)
}
go func() {
lis, err := net.Listen("tcp", fmt.Sprintf(":%d", port))
if err != nil {
log.Fatalf("failed to listen: %v", err)
}
err = grpcServer.Serve(lis)
if err != nil {
log.Fatalf("fake stackdriver server terminated abnormally: %v", err)
}
}()
return fsdms, fsdls, traceSvc, grpcServer
return fsdms, fsdls, traceSvc, grpcServer, lis
}

func RunFakeStackdriver(port uint16) error {
Expand Down
11 changes: 10 additions & 1 deletion test/envoye2e/stackdriver_plugin/stackdriver.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package stackdriverplugin
import (
"fmt"
"log"
"net"
"reflect"
"strings"
"sync"
Expand All @@ -25,6 +26,7 @@ import (
"github.com/google/go-cmp/cmp"
logging "google.golang.org/genproto/googleapis/logging/v2"
monitoring "google.golang.org/genproto/googleapis/monitoring/v3"
"google.golang.org/grpc"
"google.golang.org/protobuf/encoding/prototext"
"google.golang.org/protobuf/testing/protocmp"

Expand All @@ -44,6 +46,9 @@ type Stackdriver struct {
tsReq []*monitoring.CreateTimeSeriesRequest
ts map[string]int64
ls map[string]struct{}

grpcServer *grpc.Server
lis net.Listener
}

type SDLogEntry struct {
Expand All @@ -59,7 +64,9 @@ func (sd *Stackdriver) Run(p *driver.Params) error {
sd.ls = make(map[string]struct{})
sd.ts = make(map[string]int64)
sd.tsReq = make([]*monitoring.CreateTimeSeriesRequest, 0, 20)
metrics, logging, _, _ := NewFakeStackdriver(sd.Port, sd.Delay, true, ExpectedBearer)
var metrics *MetricServer
var logging *LoggingServer
metrics, logging, _, sd.grpcServer, sd.lis = NewFakeStackdriver(sd.Port, sd.Delay, true, ExpectedBearer)

go func() {
for {
Expand Down Expand Up @@ -124,6 +131,8 @@ func (sd *Stackdriver) Run(p *driver.Params) error {

func (sd *Stackdriver) Cleanup() {
close(sd.done)
sd.grpcServer.GracefulStop()
sd.lis.Close()
}

func (sd *Stackdriver) Check(p *driver.Params, tsFiles []string, lsFiles []SDLogEntry, verifyLatency bool) driver.Step {
Expand Down