Skip to content

Commit

Permalink
GH-60: DataSink, DataSource, BorrowedSample. Better core docs
Browse files Browse the repository at this point in the history
  • Loading branch information
skipor committed Jan 22, 2018
1 parent 7bc8c92 commit 3cb4e71
Show file tree
Hide file tree
Showing 13 changed files with 433 additions and 66 deletions.
3 changes: 1 addition & 2 deletions components/phttp/doc.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,5 @@
// license that can be found in the LICENSE file.
// Author: Vladimir Skipor <skipor@yandex-team.ru>

// package phttp (pandora http) contains pandora extension points for HTTP
// related protocols.
// package phttp (pandora http) contains pandora extension points for HTTP related protocols.
package phttp
5 changes: 4 additions & 1 deletion core/aggregate/discard.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"context"

"github.com/yandex/pandora/core"
"github.com/yandex/pandora/core/coreutil"
)

// NewDiscard returns Aggregator that just throws reported ammo away.
Expand All @@ -23,4 +24,6 @@ func (discard) Run(ctx context.Context, _ core.AggregatorDeps) error {
return nil
}

func (discard) Report(core.Sample) {}
func (discard) Report(s core.Sample) {
coreutil.ReturnSampleIfBorrowed(s)
}
197 changes: 134 additions & 63 deletions core/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,42 +4,49 @@
// Author: Vladimir Skipor <skipor@yandex-team.ru>

// package core defines pandora engine extension points.
// Core interfaces implementations can be used for manual engine creation and using as a library,
// or can be registered in pandora plugin system (look at core/plugin package), for creating engine
// Core interfaces implementations MAY be used for custom engine creation and using as a library,
// or MAY be registered in pandora plugin system (look at core/plugin package), for creating engine
// from abstract config.
//
// The key words "MUST", "MUST NOT", "REQUIRED", "SHALL", "SHALL NOT", "SHOULD", "SHOULD NOT",
// "RECOMMENDED", "MAY", and "OPTIONAL" in that package doc are to be interpreted as described in
// https://www.ietf.org/rfc/rfc2119.txt
package core

import (
"context"
"io"
"time"

"go.uber.org/zap"
)

// Ammo is data required for one shot. Usually it contains something that differs
// from one shot to another.
// Something like requested recourse indetificator, query params, meta information
// helpful for future shooting analysis.
// Ammo is data required for one shot. SHOULD contains something that differs
// from one shot to another. Something like requested recourse indetificator, query params,
// meta information helpful for future shooting analysis.
// Information common for all shoots SHOULD be passed via Provider configuration.
type Ammo interface{}

//go:generate mockery -name=Provider -case=underscore -outpkg=coremock

// Provider is routine that generates ammo for instance shoots.
// A Provider must be goroutine safe.
// Provider is routine that generates ammo for Instance shoots.
// A Provider MUST be goroutine safe.
type Provider interface {
// Run starts provider routine. Blocks until ammo finish, error or context cancel.
// Run must be called once before any Acquire or Release calls.
// In case of context cancel, return nil (recommended), ctx.Err(), or error caused ctx.Err()
// Run starts provider routine of ammo generation.
// Blocks until ammo finish, error or context cancel.
// Run MUST be called only once. Run SHOULD be called before Acquire or Release calls, but
// MAY NOT because of goroutine races.
// In case of ctx cancel, SHOULD return nil, but MAY ctx.Err(), or error caused ctx.Err()
// in terms of github.com/pkg/errors.Cause.
Run(ctx context.Context, deps ProviderDeps) error
// Acquire acquires ammo for shoot. Should be lightweight, so instance can shoot as
// soon as possible. That means ammo format parsing MAY be done in Provider Run goroutine,
// but acquire just takes ammo from ready pool.
// Ok false means that shooting should be stopped: ammo finished or shooting is canceled.
// Acquire may be called before start, but may block until start is called.
// Acquire acquires ammo for shoot. Acquire SHOULD be lightweight, so Instance can Shoot as
// soon as possible. That means ammo format parsing SHOULD be done in Provider Run goroutine,
// but acquire just takes ammo from ready queue.
// Ok false means that shooting MUST be stopped because ammo finished or shooting is canceled.
// Acquire MAY be called before Run, but MAY block until Run is called.
Acquire() (ammo Ammo, ok bool)
// Release notifies that ammo usage is finished, and it can be reused.
// Instance should not retain references to released ammo.
// Instance MUST NOT retain references to released ammo.
Release(ammo Ammo)
}

Expand All @@ -53,71 +60,82 @@ type ProviderDeps struct {
//go:generate mockery -name=Gun -case=underscore -outpkg=coremock

// Gun represents logic of making shoots sequentially.
// A Gun is owned by only instance that uses it for shooting in cycle: acquire ammo from provider ->
// wait for next shoot schedule event -> shoot with gun.
// Guns that also implements io.Closer will be closed after instance finish.
// Actually, Guns that create resources which should be closed after instance finish,
// SHOULD also implement io.Closer
// A Gun is owned by only Instance that uses it for shooting in cycle: Acquire Ammo from Provider ->
// wait for next shoot schedule event -> Shoot with Gun.
// Guns that also implements io.Closer will be Closed after Instance finish.
// Rule of thumb: Guns that create resources which SHOULD be closed after Instance finish,
// SHOULD implement io.Closer.
// Example: Gun that makes HTTP requests through keep alive connection SHOULD close it in Close.
type Gun interface {
// Bind passes dependencies required for shooting. Called once before shooting start.
// Bind passes dependencies required for shooting. MUST be called once before any Shoot call.
Bind(aggr Aggregator, deps GunDeps) error
// Shoot makes one shoot. Shoot means some abstract load operation: web service or database request, for example.
// During shoot Gun acquires one or more samples and report them to bound Aggregator.
// Shoot error SHOULD be reported to Aggregator in sample and SHOULD be logged to Log.
// In case of error, that should cancel shooting for all instances (configuration problem
// or unexpected behaviour for example) Shoot should panic with error value.
// http.Request fail is not error for panic, but error for reporting to aggregator.
// Shoot makes one shoot. Shoot means some abstract load operation: web service or database
// request, for example.
// During shoot Gun SHOULD Acquire one or more Samples and Report them to bound Aggregator.
// Shoot error that MAY mean service under load fail SHOULD be reported to Aggregator in sample
// and SHOULD be logged to deps.Log at zap.WarnLevel.
// For example, HTTP request fail SHOULD be Reported and logged,.
// In case of error, that SHOULD cancel shooting for all Instances Shoot MUST panic using error
// value describing the problem. That could be configuration error, unsupported Ammo type,
// situation when service under load doesn't support required protocol,
Shoot(ammo Ammo)

// io.Closer // Optional. See Gun doc for details.
// io.Closer // OPTIONAL to implement. See Gun doc for details.
}

// GunDeps are passed to Gun before instance shoot start.
// GunDeps are passed to Gun before Instance Run.
// WARN: another fields could be added in next MINOR versions.
// That is NOT considered as a breaking compatibility change.
type GunDeps struct {
// Ctx is canceled on shoot cancel or finish.
// Context passed engine.Engine is ancestor to Contexts passed to Provider, Gun and Aggregator.
Ctx context.Context
// Log fields already contains Id's of Pool and Instance.
Log *zap.Logger
// Unique of gun owning instance. Can be used
// Pool set's ids to instances from 0, incrementing it after instance start.
// There is a race between instances for ammo acquire, so it's not guaranteed, that
// instance with lower id gets it's ammo earlier.
// Unique of Gun owning Instance. MAY be used for tagging Samples.
// Pool set's ids to Instances from 0, incrementing it after Instance Run.
// There is a race between Instances for Ammo Acquire, so it's not guaranteed, that
// Instance with lower InstanceId gets it's Ammo earlier.
InstanceId int
// TODO(skipor): https://github.com/yandex/pandora/issues/71
// Pass parallelism value. InstanceId should be -1 if parallelism > 1.
// Pass parallelism value. InstanceId MUST be -1 if parallelism > 1.
}

// Sample is data containing shoot report. Return code, timings, shoot meta information.
type Sample interface{}

//go:generate mockery -name=BorrowedSample -case=underscore -outpkg=coremock

// BorrowedSample is Sample that was borrowed from pool, and SHOULD be returned by Aggregator,
// after it will handle Sample.
type BorrowedSample interface {
Return()
}

//go:generate mockery -name=Aggregator -case=underscore -outpkg=coremock

// Aggregator is routine that aggregates samples from all instances.
// Usually aggregator is shooting result reporter, that writes released samples
// to file in machine readable format for future analysis.
// Aggregator is routine that aggregates Samples from all Pool Instances.
// Usually aggregator is shooting result reporter, that writes Reported Samples
// to DataSink in machine readable format for future analysis.
// An Aggregator MUST be goroutine safe.
// GunDeps are passed to gun before instance shoot start.
// WARN: another fields could be added in next MINOR versions.
// That is NOT considered as a breaking compatibility change.
// GunDeps are passed to Gun before Instance Run.
type Aggregator interface {
// Run starts aggregator routine. Blocks until error or context cancel.
// In case of context cancel, return nil, ctx.Err(), or error caused ctx.Err()
// in terms of github.com/pkg/errors.Cause in case of successful run, or other error
// if failed.
// Context passed engine.Engine is ancestor to Contexts passed to Provider, Gun and Aggregator.
// Run starts aggregator routine of handling Samples. Blocks until fail or context cancel.
// Run MUST be called only once. Run SHOULD be called before Report calls, but MAY NOT because
// of goroutine races.
// In case of ctx cancel, SHOULD return nil, but MAY ctx.Err(), or error caused ctx.Err()
// in terms of github.com/pkg/errors.Cause.
// In case of any dropped Sample (unhandled because of Sample queue overflow) Run SHOULD return
// error describing how many samples were dropped.
Run(ctx context.Context, deps AggregatorDeps) error
// Report reports sample to aggregator. SHOULD be lightweight and not blocking,
// so instance can shoot as soon as possible.
// That means, that sample encode and reporting SHOULD NOT be done in caller goroutine,
// but MAY in Aggregator Run goroutine, for example.
// If Aggregator can't process reported sample without blocking, it SHOULD just throw it away.
// If any reported samples were thrown away, after context cancel,
// Run SHOULD return error describing how many samples were thrown away.
// Reported sample can be reused for efficiency, so caller SHOULD NOT retain reference to Sample.
// so Instance can Shoot as soon as possible.
// That means, that Sample encode and reporting SHOULD NOT be done in caller goroutine,
// but SHOULD in Aggregator Run goroutine.
// If Aggregator can't handle Reported Sample without blocking, it SHOULD just drop it.
// Reported Samples MAY just be dropped, after context cancel.
// Reported Sample MAY be reused for efficiency, so caller MUST NOT retain reference to Sample.
// Report MAY be called before Aggregator Run.
// Aggregator SHOULD Return Sample if it implements BorrowedSample.
Report(s Sample)
}

Expand All @@ -132,20 +150,73 @@ type AggregatorDeps struct {

// Schedule represents operation schedule. Schedule MUST be goroutine safe.
type Schedule interface {
// Run starts schedule at passed time.
// Run may be called once, before any Next call. (Before, means not concurrently too.)
// If start was not called, schedule is started at first Next call.
// Start starts schedule at passed time.
// Start SHOULD be called once, before any Next call.
// Start MUST NOT be called more than once or after Next call.
// If Start was not called, Schedule MUST be started on first Next call.
Start(startAt time.Time)

// Next withdraw one operation token and returns next operation time and
// ok equal true, when schedule is not finished.
// If there is no operation tokens left, Next returns Schedule
// finish time and ok equals false.
// When Next returns ok == true first time, tx SHOULD be start time.
// ok equal true, when Schedule is not finished.
// If there is no operation tokens left, Next returns Schedule finish time and ok equals false.
// If Next called first time and Start was not called, Schedule MUST start and return tx
// equal to start time.
// Returned ts values MUST increase monotonically. That is, ts returned on next Next call MUST
// be greater or equal than returned on previous.
Next() (ts time.Time, ok bool)

// Left returns n >= 0 number operation token left, if it is known exactly.
// Returns n < 0, if number of operation tokens is unknown.
// It's OK to call Left before start.
// Left MAY be called before Start.
Left() int
}

//go:generate mockery -name=DataSource -case=underscore -outpkg=coremock

// DataSource is abstract, ready to only open, source of data.
// Returned source MUST implement io.ReadCloser at least, but can implement more wide interface,
// and this interface methods MAY be used. For example, returned source can be afero.File,
// and can be seeked in such case.
// Examples:
// Dummy os.Stdin wrapper.
// File DataSource that contains filename and afero.Fs, and returns afero.File on OpenSource.
// HTTP DataSource that contains URL and headers used on OpenSource to download content to file,
// and return afero.File, that will be deleted on rc Close.
// String DataSource returns just wrapped *bytes.Buffer with string content.
type DataSource interface {
// OpenSource opens source for read. OpenSource MUST NOT be called more than once.
// Returned rc SHOULD have low latency and good enough throughput for Read.
// rc MAY be afero.File but SHOULD NOT be TCP connection for example.
// DataSource MAY be some remote resource, but OpenSource SHOULD download all necessary data to
// local temporary file and return it as rc.
// Rule of thumb: returned rc SHOULD be afero.File or wrapped *bytes.Buffer.
// Returned rc SHOULD cleanup all created temporary resources on Close.
// rc owner SHOULD NOT try cast it to concrete types. For example, rc can be
// wrapped temporary *os.File, that will be deleted on Close, so it can't be casted to *os.File,
// because has type of wrapper, but can be used as afero.File.
// rc Reads SHOULD be buffered for better performance if it doesn't implement io.ByteReader.
// That usually means that short Reads are efficient enough. It is implemented by *bytes.Buffer
// and *bufio.Reader, for example. rc Reads MAY be buffered regardless.
OpenSource() (rc io.ReadCloser, err error)
}

//go:generate mockery -name=DataSink -case=underscore -outpkg=coremock

// DataSink is abstract ready to open sink of data.
//
// Examples:
// Dummy os.Stdout wrapper.
// File DataSink that contains filename and afero.Fs, and returns afero.File on OpenSource.
// HTTP DataSink caches Written data to temporary file on wc Writes,
// and POST it using contained URL and headers on wc Close.
type DataSink interface {
// OpenSink opens sink for writing. OpenSink MUST NOT be called more than once.
// Returned wc SHOULD have low latency and good enough throughput for Write.
// wc MAY be afero.File but SHOULD NOT be TCP connection for example.
// DataSink MAY upload Wrote data somewhere but SHOULD do it on wc Close or in background
// goroutine.
// wc Writes SHOULD be buffered for better performance if it doesn't implement io.ByteWriter.
// That usually means that short Writes are efficient enough. It is implemented by *bytes.Buffer
// and *bufio.Writer, for example. wc Writes MAY be buffered regardless.
OpenSink() (wc io.WriteCloser, err error)
}
28 changes: 28 additions & 0 deletions core/coreutil/data.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
// Copyright (c) 2018 Yandex LLC. All rights reserved.
// Use of this source code is governed by a MPL 2.0
// license that can be found in the LICENSE file.
// Author: Vladimir Skipor <skipor@yandex-team.ru>

package coreutil

import (
"io"

"github.com/yandex/pandora/core"
)

type DataSinkFunc func() (wc io.WriteCloser, err error)

func (f DataSinkFunc) OpenSink() (wc io.WriteCloser, err error) {
return f()
}

var _ core.DataSink = DataSinkFunc(nil)

type DataSourceFunc func() (wc io.ReadCloser, err error)

func (f DataSourceFunc) OpenSource() (rc io.ReadCloser, err error) {
return f()
}

var _ core.DataSource = DataSourceFunc(nil)
16 changes: 16 additions & 0 deletions core/coreutil/sample.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
// Copyright (c) 2018 Yandex LLC. All rights reserved.
// Use of this source code is governed by a MPL 2.0
// license that can be found in the LICENSE file.
// Author: Vladimir Skipor <skipor@yandex-team.ru>

package coreutil

import "github.com/yandex/pandora/core"

func ReturnSampleIfBorrowed(s core.Sample) {
borrowed, ok := s.(core.BorrowedSample)
if !ok {
return
}
borrowed.Return()
}
File renamed without changes.
File renamed without changes.
48 changes: 48 additions & 0 deletions core/datasink/file.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
// Copyright (c) 2018 Yandex LLC. All rights reserved.
// Use of this source code is governed by a MPL 2.0
// license that can be found in the LICENSE file.
// Author: Vladimir Skipor <skipor@yandex-team.ru>

package datasink

import (
"io"
"os"

"github.com/spf13/afero"

"github.com/yandex/pandora/core"
)

type FileSinkConfig struct {
Path string
}

func NewFileSink(fs afero.Fs, conf FileSinkConfig) core.DataSink {
return &fileSink{afero.Afero{fs}, conf}
}

type fileSink struct {
fs afero.Afero
conf FileSinkConfig
}

func (s *fileSink) OpenSink() (wc io.WriteCloser, err error) {
return s.fs.OpenFile(s.conf.Path, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644)
}

func NewStdoutSink() core.DataSink {
return hideCloseFileSink{os.Stdout}
}

func NewStderrSink() core.DataSink {
return hideCloseFileSink{os.Stderr}
}

type hideCloseFileSink struct{ *os.File }

func (f hideCloseFileSink) OpenSink() (wc io.WriteCloser, err error) {
return f, nil
}

func (f hideCloseFileSink) Close() error { return nil }
Loading

0 comments on commit 3cb4e71

Please sign in to comment.