Skip to content
Permalink
Browse files
feat(telemetry): fix telemetry lags; telemetry logs; ignore commands …
…list

* Optionally write telemetry log to the provided WERF_TELEMETRY_LOG_FILE.
* Introduce ignore commands list, werf will not send telemetry for following commands:
    * werf version;
    * werf completion;
    * werf synchronization.
* Lower overall sending timeout to 1sec.

Signed-off-by: Timofey Kirillov <timofey.kirillov@flant.com>
  • Loading branch information
distorhead committed Jul 13, 2022
1 parent 9039b36 commit 9a2310f97941e651730fd5f72823580e3210d983
Show file tree
Hide file tree
Showing 4 changed files with 66 additions and 37 deletions.
@@ -3,7 +3,6 @@ package common
import (
"context"
"fmt"
"os"
"strings"

"github.com/go-git/go-git/v5/plumbing/transport"
@@ -14,41 +13,49 @@ import (
"github.com/werf/werf/pkg/util"
)

var telemetryIgnoreCommands = []string{
"werf version",
"werf synchronization",
"werf completion",
}

func InitTelemetry(ctx context.Context) {
if err := telemetry.Init(ctx, telemetry.TelemetryOptions{
ErrorHandlerFunc: func(err error) {
if err == nil {
return
}
logTelemetryError(err.Error())

telemetry.LogF("error: %s", err)
},
}); err != nil {
logTelemetryError(fmt.Sprintf("unable to init: %s", err))
telemetry.LogF("error: %s", err)
}
}

func ShutdownTelemetry(ctx context.Context, exitCode int) {
if err := telemetry.Shutdown(ctx); err != nil {
logTelemetryError(fmt.Sprintf("unable to shutdown: %s", err))
}
}

func logTelemetryError(msg string) {
if !telemetry.IsLogsEnabled() {
return
telemetry.LogF("unable to shutdown: %s", err)
}
fmt.Fprintf(os.Stderr, "Telemetry error: %s\n", msg)
}

func TelemetryPreRun(cmd *cobra.Command, args []string) error {
ctx := cmd.Context()

telemetry.GetTelemetryWerfIO().SetCommand(ctx, getTelemetryCommand(cmd))
command := getTelemetryCommand(cmd)

if projectID, err := getTelemetryProjectID(ctx); err != nil {
if telemetry.IsLogsEnabled() {
fmt.Fprintf(os.Stderr, "Telemetry error: %s\n", err)
for _, c := range telemetryIgnoreCommands {
if command == c {
return nil
}
}

InitTelemetry(ctx)

telemetry.GetTelemetryWerfIO().SetCommand(ctx, command)

if projectID, err := getTelemetryProjectID(ctx); err != nil {
telemetry.LogF("error: %s", err)
} else {
telemetry.GetTelemetryWerfIO().SetProjectID(ctx, projectID)
}
@@ -84,9 +91,7 @@ func getTelemetryProjectID(ctx context.Context) (string, error) {
}

if repo, err := getTelemetryLocalRepo(ctx, workingDir, gitWorkTree); err != nil {
if telemetry.IsLogsEnabled() {
fmt.Fprintf(os.Stderr, "Telemetry: unable to open local repo: %s\n", err)
}
telemetry.LogF("unable to detect projectID: unable to open local repo: %s", err)
} else {
url, err := repo.RemoteOriginUrl(ctx)
if err != nil {
@@ -100,9 +105,8 @@ func getTelemetryProjectID(ctx context.Context) (string, error) {

hashParts := []string{ep.Protocol, ep.Host, fmt.Sprintf("%d", ep.Port), ep.Path}

if telemetry.IsLogsEnabled() {
fmt.Fprintf(os.Stderr, "Telemetry: calculate projectID based on repo origin url\n")
}
telemetry.LogF("calculate projectID based on repo origin url")

projectID = util.Sha256Hash(hashParts...)
}

@@ -56,11 +56,8 @@ import (
func main() {
ctx := common.GetContextWithLogger()

common.InitTelemetry(ctx)

shouldTerminate, err := common.ContainerBackendProcessStartupHook()
if err != nil {
common.ShutdownTelemetry(ctx, 1)
common.TerminateWithError(err.Error(), 1)
}
if shouldTerminate {
@@ -72,7 +69,6 @@ func main() {
logrus.StandardLogger().SetOutput(logboek.OutStream())

if err := process_exterminator.Init(); err != nil {
common.ShutdownTelemetry(ctx, 1)
common.TerminateWithError(fmt.Sprintf("process exterminator initialization failed: %s", err), 1)
}

@@ -284,9 +280,7 @@ func setupTelemetryInit(rootCmd *cobra.Command) {

cmd.RunE = func(cmd *cobra.Command, args []string) error {
if err := common.TelemetryPreRun(cmd, args); err != nil {
if telemetry.IsLogsEnabled() {
fmt.Fprintf(os.Stderr, "Telemetry error: %s\n", err)
}
telemetry.LogF("error: %s\n", err)
}

if oldRunE != nil {
@@ -3,6 +3,8 @@ package telemetry
import (
"context"
"fmt"
"os"
"time"

"go.opentelemetry.io/otel"

@@ -13,7 +15,10 @@ const (
TracesURL = "https://telemetry.werf.io/v1/traces"
)

var telemetrywerfio *TelemetryWerfIO
var (
telemetrywerfio *TelemetryWerfIO
logFile *os.File
)

func GetTelemetryWerfIO() TelemetryWerfIOInterface {
if telemetrywerfio == nil {
@@ -31,6 +36,14 @@ func Init(ctx context.Context, opts TelemetryOptions) error {
return nil
}

if path := os.Getenv("WERF_TELEMETRY_LOG_FILE"); path != "" {
f, err := os.OpenFile(path, os.O_APPEND|os.O_WRONLY|os.O_CREATE, 0o644)
if err != nil {
return fmt.Errorf("unable to open log file %q: %w", path, err)
}
logFile = f
}

if t, err := NewTelemetryWerfIO(TracesURL, TelemetryWerfIOOptions{
HandleErrorFunc: opts.ErrorHandlerFunc,
}); err != nil {
@@ -60,13 +73,24 @@ func Shutdown(ctx context.Context) error {
if !IsEnabled() {
return nil
}
if telemetrywerfio == nil {
return nil
}

if logFile != nil {
defer logFile.Close()
}

return telemetrywerfio.Shutdown(ctx)
}

func IsEnabled() bool {
return util.GetBoolEnvironmentDefaultFalse("WERF_TELEMETRY")
}

func IsLogsEnabled() bool {
return util.GetBoolEnvironmentDefaultFalse("WERF_TELEMETRY_LOGS")
func LogF(f string, args ...interface{}) {
if logFile == nil {
return
}
fmt.Fprintf(logFile, "[%d][%s] Telemetry: %s\n", os.Getpid(), time.Now(), fmt.Sprintf(f, args...))
}
@@ -52,8 +52,8 @@ func NewTelemetryWerfIO(url string, opts TelemetryWerfIOOptions) (*TelemetryWerf
handleErrorFunc: opts.HandleErrorFunc,
tracerProvider: sdktrace.NewTracerProvider(
sdktrace.WithBatcher(e,
sdktrace.WithBatchTimeout(0),
sdktrace.WithExportTimeout(3*time.Second),
sdktrace.WithBatchTimeout(1*time.Millisecond), // send all available events immediately
sdktrace.WithExportTimeout(1000*time.Millisecond),
),
),
traceExporter: e,
@@ -62,25 +62,33 @@ func NewTelemetryWerfIO(url string, opts TelemetryWerfIOOptions) (*TelemetryWerf
}

func (t *TelemetryWerfIO) Start(ctx context.Context) error {
LogF("start trace exporter")
if err := t.traceExporter.Start(ctx); err != nil {
return fmt.Errorf("error starting telemetry trace exporter: %w", err)
}
return nil
}

func (t *TelemetryWerfIO) Shutdown(ctx context.Context) error {
LogF("start shutdown")

LogF("flush trace provider")
if err := t.tracerProvider.ForceFlush(ctx); err != nil {
return fmt.Errorf("unable to force flush tracer provider: %w", err)
}

LogF("shutdown trace exporter")
if err := t.traceExporter.Shutdown(ctx); err != nil {
return fmt.Errorf("unable to shutdown trace exporter: %w", err)
}

LogF("shutdown trace provider")
if err := t.tracerProvider.Shutdown(ctx); err != nil {
return fmt.Errorf("unable to shutdown trace provider: %w", err)
}

LogF("shutdown complete")

return nil
}

@@ -117,6 +125,8 @@ func (t *TelemetryWerfIO) sendEvent(ctx context.Context, eventType EventType, ev
trc := t.getTracer()
_, span := trc.Start(ctx, spanName)

LogF("start sending event")

ts := time.Now().UnixMilli()

span.SetAttributes(attribute.Key("ts").Int64(ts))
@@ -138,12 +148,9 @@ func (t *TelemetryWerfIO) sendEvent(ctx context.Context, eventType EventType, ev
}
span.SetAttributes(attribute.Key("eventData").String(string(rawEventData)))
span.SetAttributes(attribute.Key("schemaVersion").Int64(schemaVersion))

span.End()

if IsLogsEnabled() {
fmt.Printf("Telemetry: sent event: ts=%d executionID=%q projectID=%q command=%q attributes=%q eventType=%q eventData=%q schemaVersion=%d\n", ts, t.executionID, t.projectID, t.command, string(rawAttributes), string(eventType), string(rawEventData), schemaVersion)
}
LogF("sent event: ts=%d executionID=%q projectID=%q command=%q attributes=%q eventType=%q eventData=%q schemaVersion=%d", ts, t.executionID, t.projectID, t.command, string(rawAttributes), string(eventType), string(rawEventData), schemaVersion)

return nil
}

0 comments on commit 9a2310f

Please sign in to comment.