Skip to content

Commit

Permalink
Merge branch 'release/0.0.4'
Browse files Browse the repository at this point in the history
  • Loading branch information
Joel Baranick committed Sep 1, 2022
2 parents 50c42a5 + a8bed42 commit a0c452c
Show file tree
Hide file tree
Showing 24 changed files with 1,247 additions and 530 deletions.
4 changes: 2 additions & 2 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ RUN GOOS=darwin GARCH=amd64 go build \
-o /dist/promutil \
-a \
-ldflags "-s -w -extldflags \"-fno-PIC -static\" -X github.com/kadaan/promutil/version.Version=$VERSION -X github.com/kadaan/promutil/version.Revision=$REVISION -X github.com/kadaan/promutil/version.Branch=$BRANCH -X github.com/kadaan/promutil/version.BuildUser=$USER -X github.com/kadaan/promutil/version.BuildHost=$HOST -X github.com/kadaan/promutil/version.BuildDate=$BUILD_DATE" \
-tags 'osusergo netgo' \
-tags 'osusergo netgo jsoniter' \
-installsuffix netgo && \
tar -czf "/archives/promutil_darwin.tar.gz" -C "/dist" .

Expand All @@ -39,7 +39,7 @@ RUN GOOS=linux GARCH=amd64 go build \
-o /dist/promutil \
-a \
-ldflags "-s -w -X github.com/kadaan/promutil/version.Version=$VERSION -X github.com/kadaan/promutil/version.Revision=$REVISION -X github.com/kadaan/promutil/version.Branch=$BRANCH -X github.com/kadaan/promutil/version.BuildUser=$USER -X github.com/kadaan/promutil/version.BuildHost=$HOST -X github.com/kadaan/promutil/version.BuildDate=$BUILD_DATE" \
-tags 'osusergo netgo static_build' \
-tags 'osusergo netgo jsoniter static_build' \
-installsuffix netgo && \
tar -czf "/archives/promutil_linux.tar.gz" -C "/dist" .

Expand Down
2 changes: 2 additions & 0 deletions cmd/web.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package cmd

import (
"github.com/kadaan/promutil/config"
"github.com/kadaan/promutil/lib/block"
"github.com/kadaan/promutil/lib/command"
"github.com/kadaan/promutil/lib/web"
)
Expand All @@ -17,5 +18,6 @@ func init() {
fb.ListenAddress(&cfg.ListenAddress, "the listen address")
fb.SampleInterval(&cfg.SampleInterval, "interval at which samples will be taken within a range")
fb.Host(&cfg.Host, "remote prometheus host")
fb.Parallelism(&cfg.Parallelism, block.MaxParallelism, "parallelism for backfill")
})
}
3 changes: 2 additions & 1 deletion config/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ var (
defaultRuleGroupFilters = []*regexp.Regexp{regexp.MustCompile(".+")}
defaultRuleNameFilters = []*regexp.Regexp{regexp.MustCompile(".+")}
yamlFileExtensions = []string{"yml", "yaml"}
defaultListenAddress = ListenAddress{Host: "", Port: 8080}
)

func NewFlagBuilder(cmd *cobra.Command) FlagBuilder {
Expand Down Expand Up @@ -241,6 +242,6 @@ func (fb *flagBuilder) Matchers(dest *map[string][]*labels.Matcher, usage string

func (fb *flagBuilder) ListenAddress(dest *ListenAddress, usage string) Flag {
return fb.newFlag(listenAddressKey, func(flagSet *pflag.FlagSet) {
flagSet.Var(NewListenAddressValue(dest, ListenAddress{Host: "", Port: 8080}), listenAddressKey, usage)
flagSet.Var(NewListenAddressValue(dest, defaultListenAddress), listenAddressKey, usage)
})
}
5 changes: 1 addition & 4 deletions config/web.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,10 @@ import (
"time"
)

const (
DefaultListenAddress = ":8080"
)

// WebConfig represents the configuration of the web command.
type WebConfig struct {
ListenAddress ListenAddress
Host *url.URL
SampleInterval time.Duration
Parallelism uint8
}
20 changes: 3 additions & 17 deletions lib/block/planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,12 +146,12 @@ func (p *planner[V]) Plan(transform func(int64, int64, int64) []PlanEntry[V]) []
stepDuration := int64(p.config.SampleInterval() / (time.Millisecond / time.Nanosecond))
for ; blockStart <= endInMs; blockStart = blockStart + p.config.BlockDuration() {
blockEnd := blockStart + p.config.BlockDuration() - 1
currStart := p.max(blockStart/int64(time.Second/time.Millisecond), p.config.StartTime().Unix())
currStart := common.MaxInt64(blockStart/int64(time.Second/time.Millisecond), p.config.StartTime().Unix())
startWithAlignment := p.evalTimestamp(time.Unix(currStart, 0).UTC().UnixNano(), stepDuration)
for startWithAlignment.Unix() < currStart {
startWithAlignment = startWithAlignment.Add(p.config.SampleInterval())
}
end := time.Unix(p.min(blockEnd/int64(time.Second/time.Millisecond), p.config.EndTime().Unix()), 0).UTC()
end := time.Unix(common.MinInt64(blockEnd/int64(time.Second/time.Millisecond), p.config.EndTime().Unix()), 0).UTC()
if end.Equal(startWithAlignment) || end.Before(startWithAlignment) {
break
}
Expand Down Expand Up @@ -181,20 +181,6 @@ func (p *planner[V]) planBlock(blockStart time.Time, blockEnd time.Time, stepDur
return plan
}

func (p *planner[V]) max(x, y int64) int64 {
if x > y {
return x
}
return y
}

func (p *planner[V]) min(x, y int64) int64 {
if x < y {
return x
}
return y
}

func (p *planner[V]) evalTimestamp(startTime int64, stepDuration int64) time.Time {
var (
offset = stepDuration
Expand Down Expand Up @@ -437,7 +423,7 @@ func (p *plannedBlockWriter[V]) Run() error {

go func() {
select {
case <-s.C:
case <-s.C():
klog.V(0).Infof("Stopping producer, consumers, and db")
cancel()
}
Expand Down
99 changes: 95 additions & 4 deletions lib/command/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,13 @@ import (
"github.com/spf13/cobra"
"github.com/spf13/pflag"
"github.com/spf13/viper"
"io"
"k8s.io/klog/v2"
"log"
"os"
"runtime"
"runtime/pprof"
"runtime/trace"
)

var (
Expand All @@ -22,6 +26,7 @@ var (
type RootCommand interface {
Execute()
addCommand(cmd *cobra.Command)
startProfiler() (io.Closer, error)
}

func NewRootCommand(short string, long string) RootCommand {
Expand All @@ -42,15 +47,68 @@ func NewRootCommand(short string, long string) RootCommand {
r.addVersionCommand(r.cmd)
r.addCompletionCommand(r.cmd)
cobra.OnInitialize(r.initConfig)
r.cmd.PersistentFlags().StringVar(&r.cpuProfile, "cpuProfile", "", "Cpu profile result file")
r.cmd.PersistentFlags().StringVar(&r.memoryProfile, "memoryProfile", "", "Memory profile result file")
r.cmd.PersistentFlags().StringVar(&r.traceProfile, "traceProfile", "", "Trace profile result file")
r.cmd.PersistentFlags().CountVarP(&r.verbosity, "verbose", "v", "enables verbose logging (multiple times increases verbosity)")
r.cmd.PersistentFlags().StringVar(&r.cfgFile, "config", "", "config file (default is ."+version.Name+".config)")
return r
}

type rootCommand struct {
verbosity int
cfgFile string
cmd *cobra.Command
cpuProfile string
memoryProfile string
traceProfile string
verbosity int
cfgFile string
cmd *cobra.Command
}

func (r *rootCommand) startProfiler() (io.Closer, error) {
p := profiler{}
if r.traceProfile != "" {
f, err := os.Create(r.traceProfile)
if err != nil {
return &p, errors.Wrap(err, "could not create Trace profile")
}
p.fileClosers = append(p.fileClosers, f)
if err = trace.Start(f); err != nil {
return &p, errors.Wrap(err, "could not start Trace profile")
}
p.methodClosers = append(p.methodClosers, func() error {
trace.Stop()
return nil
})
}
if r.cpuProfile != "" {
f, err := os.Create(r.cpuProfile)
if err != nil {
return &p, errors.Wrap(err, "could not create CPU profile")
}
p.fileClosers = append(p.fileClosers, f)
if err = pprof.StartCPUProfile(f); err != nil {
return &p, errors.Wrap(err, "could not start CPU profile")
}
p.methodClosers = append(p.methodClosers, func() error {
pprof.StopCPUProfile()
return nil
})
}
if r.memoryProfile != "" {
f, err := os.Create(r.memoryProfile)
if err != nil {
return &p, errors.Wrap(err, "could not create memory profile")
}
p.fileClosers = append(p.fileClosers, f)
p.methodClosers = append(p.methodClosers, func() error {
runtime.GC()
if err = pprof.WriteHeapProfile(f); err != nil {
return errors.Wrap(err, "could not write memory profile")
}
return nil
})
}
return &p, nil
}

func (r *rootCommand) addCommand(cmd *cobra.Command) {
Expand Down Expand Up @@ -188,7 +246,17 @@ func NewCommand[C any](root RootCommand, use string, short string, long string,
Short: short,
Long: long,
RunE: func(cmd *cobra.Command, args []string) error {
if err := task.Run(cfg); err != nil {
p, err := root.startProfiler()
defer func(profiler io.Closer) {
errP := profiler.Close()
if errP != nil {
klog.Errorf("failed to close profiler: %w", errP)
}
}(p)
if err != nil {
return errors.Wrap(err, "failed to start profiler")
}
if err = task.Run(cfg); err != nil {
return errors.Wrap(err, "%s failed", use)
}
return nil
Expand All @@ -204,3 +272,26 @@ func NewCommand[C any](root RootCommand, use string, short string, long string,
type Task[C any] interface {
Run(cfg *C) error
}

type profiler struct {
fileClosers []io.Closer
methodClosers []func() error
}

func (p *profiler) Close() error {
var errs []error
for _, methodCloser := range p.methodClosers {
if err := methodCloser(); err != nil {
errs = append(errs, err)
}
}
for _, fileCloser := range p.fileClosers {
if err := fileCloser.Close(); err != nil {
errs = append(errs, err)
}
}
if len(errs) > 0 {
return errors.NewMulti(errs, "failed to close profiler")
}
return nil
}
21 changes: 17 additions & 4 deletions lib/common/canceller.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,22 +3,35 @@ package common
import "sync"

type Canceller interface {
C() chan struct{}
Cancelled() bool
Cancel()
}

type canceller struct {
C chan struct{}
once sync.Once
cancelled bool
c chan struct{}
once sync.Once
}

func NewCanceller() *canceller {
return &canceller{
C: make(chan struct{}),
cancelled: false,
c: make(chan struct{}),
}
}

func (s *canceller) C() chan struct{} {
return s.c
}

func (s *canceller) Cancelled() bool {
return s.cancelled
}

func (s *canceller) Cancel() {
s.once.Do(func() {
close(s.C)
s.cancelled = true
close(s.c)
})
}
28 changes: 28 additions & 0 deletions lib/common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,3 +101,31 @@ func JoinUrl(base *url.URL, paths ...string) (*url.URL, error) {
p := path.Join(paths...)
return url.Parse(fmt.Sprintf("%s/%s", strings.TrimRight(s, "/"), strings.TrimLeft(p, "/")))
}

func MaxInt64(x int64, y int64) int64 {
if x > y {
return x
}
return y
}

func MinInt64(x int64, y int64) int64 {
if x < y {
return x
}
return y
}

func MaxUInt8(x uint8, y uint8) uint8 {
if x > y {
return x
}
return y
}

func MinTime(x time.Time, y time.Time) time.Time {
if x.After(y) {
return y
}
return x
}
Loading

0 comments on commit a0c452c

Please sign in to comment.