Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
Conflicts:
	whisper/whisper.go
  • Loading branch information
rafrombrc committed Aug 8, 2013
2 parents 319ef7e + 351d9ee commit 1863c0c
Show file tree
Hide file tree
Showing 5 changed files with 255 additions and 86 deletions.
13 changes: 11 additions & 2 deletions README.md
@@ -1,6 +1,15 @@
**WARNING** This library is still under development and largely untested. Do not attempt to use with production data.

whisper-go
==========

whisper-go is a library and set of utilities for working with the whisper database format used by the graphite project: https://github.com/graphite-project/
[![Build Status](https://drone.io/github.com/kisielk/whisper-go/status.png)](https://drone.io/github.com/kisielk/whisper-go/latest)

whisper-go is a library and set of utilities for working with the whisper database format used by the graphite project:
https://github.com/graphite-project/

Note that it is still in development so the API is subject to change.

Note that it is still in development so the API is subject to change, and it may corrupt your data at any moment. Use with caution.
API Documentation
-----------------
API documentation can be found at http://godoc.org/github.com/kisielk/whisper-go/whisper
37 changes: 33 additions & 4 deletions cmd/whisper-create/whisper-create.go
Expand Up @@ -6,19 +6,48 @@ import (
"github.com/kisielk/whisper-go/whisper"
"log"
"os"
"strings"
)

var aggregationMethod whisper.AggregationMethod = whisper.AGGREGATION_AVERAGE
type aggregationFlag whisper.AggregationMethod

func (f *aggregationFlag) String() string {
return f.String()
}

func (f *aggregationFlag) Set(s string) error {
var m whisper.AggregationMethod
s = strings.ToLower(s)
switch s {
case "average":
m = whisper.AggregationAverage
case "last":
m = whisper.AggregationLast
case "sum":
m = whisper.AggregationSum
case "max":
m = whisper.AggregationMax
case "min":
m = whisper.AggregationMin
default:
m = whisper.AggregationUnknown
}
*f = aggregationFlag(m)
return nil
}

var aggregationMethod = aggregationFlag(whisper.AggregationAverage)
var xFilesFactor float64

func main() {
flag.Var(&aggregationMethod, "aggregationMethod", "aggregation method to use")
flag.Float64Var(&xFilesFactor, "xFilesFactor", 0.5, "x-files factor")
flag.Float64Var(&xFilesFactor, "xFilesFactor", whisper.DefaultXFilesFactor, "x-files factor")
flag.Usage = func() {
fmt.Fprintf(os.Stderr, "usage: %s [OPTION]... [FILE] [PRECISION:RETENTION]...\n", os.Args[0])
flag.PrintDefaults()
}
flag.Parse()
method := whisper.AggregationMethod(aggregationMethod)

log.SetFlags(0)

Expand All @@ -32,7 +61,7 @@ func main() {
log.Fatal("error: you must specify at least one PRECISION:RETENTION pair for the archive\n")
}

if aggregationMethod == whisper.AGGREGATION_UNKNOWN {
if method == whisper.AggregationUnknown {
flag.Usage()
log.Fatal(fmt.Sprintf("error: unknown aggregation method \"%v\"", aggregationMethod.String()))
}
Expand All @@ -50,7 +79,7 @@ func main() {
archives = append(archives, archive)
}

_, err := whisper.Create(path, archives, float32(xFilesFactor), aggregationMethod, false)
_, err := whisper.Create(path, archives, whisper.CreateOptions{XFilesFactor: float32(xFilesFactor), AggregationMethod: method})
if err != nil {
log.Fatal(err)
}
Expand Down
45 changes: 45 additions & 0 deletions cmd/whisper-dump/main.go
@@ -0,0 +1,45 @@
package main

import (
"flag"
"fmt"
"github.com/kisielk/whisper-go/whisper"
"os"
)

func main() {
flag.Parse()
filename := flag.Arg(0)
if filename == "" {
flag.Usage()
os.Exit(1)
}

db, err := whisper.Open(filename)
if err != nil {
fmt.Println("could not open database:", err)
os.Exit(1)
}
defer db.Close()
dumpHeader(db)
dumpArchiveHeaders(db)
}

func dumpHeader(w *whisper.Whisper) {
fmt.Println("Meta data:")
fmt.Println(" aggregation method:", w.Header.Metadata.AggregationMethod)
fmt.Println(" max retention:", w.Header.Metadata.MaxRetention)
fmt.Println(" xFilesFactor:", w.Header.Metadata.XFilesFactor)
fmt.Println()
}

func dumpArchiveHeaders(w *whisper.Whisper) {
for i, archive := range w.Header.Archives {
fmt.Println("Archive", i, "info:")
fmt.Println(" offset:", archive.Offset)
fmt.Println(" seconds per point", archive.SecondsPerPoint)
fmt.Println(" points", archive.Points)
fmt.Println(" retention", archive.Retention())
fmt.Println()
}
}
112 changes: 66 additions & 46 deletions whisper/whisper.go
Expand Up @@ -19,10 +19,17 @@ import (

// Metadata holds metadata that's common to an entire whisper database
type Metadata struct {
AggregationMethod AggregationMethod // Aggregation method used. See the AGGREGATION_* constants
MaxRetention uint32 // The maximum retention period
XFilesFactor float32 // The minimum percentage of known values required to aggregate
ArchiveCount uint32 // The number of archives in the database
// Aggregation method used. See the Aggregation* constants.
AggregationMethod AggregationMethod

// The maximum retention period.
MaxRetention uint32

// The minimum percentage of known values required to aggregate.
XFilesFactor float32

// The number of archives in the database.
ArchiveCount uint32
}

// ArchiveInfo holds metadata about a single archive within a whisper database
Expand All @@ -32,6 +39,11 @@ type ArchiveInfo struct {
Points uint32 // The number of data points
}

// NewArchiveInfo returns a new ArchiveInfo with a zero offset
func NewArchiveInfo(secondsPerPoint, points uint32) ArchiveInfo {
return ArchiveInfo{SecondsPerPoint: secondsPerPoint, Points: points}
}

// Retention returns the retention period of the archive in seconds
func (a ArchiveInfo) Retention() uint32 {
return a.SecondsPerPoint * a.Points
Expand All @@ -47,54 +59,42 @@ func (a ArchiveInfo) end() uint32 {
return a.Offset + a.size()
}

// The AggregationMethod type describes how values are aggregated from one Whisper archive to another.
type AggregationMethod uint32

// Valid aggregation methods
const (
AGGREGATION_UNKNOWN AggregationMethod = 0 // Unknown aggregation method
AGGREGATION_AVERAGE AggregationMethod = 1 // Aggregate using averaging
AGGREGATION_SUM AggregationMethod = 2 // Aggregate using sum
AGGREGATION_LAST AggregationMethod = 3 // Aggregate using the last value
AGGREGATION_MAX AggregationMethod = 4 // Aggregate using the maximum value
AGGREGATION_MIN AggregationMethod = 5 // Aggregate using the minimum value
AggregationUnknown AggregationMethod = 0 // Unknown aggregation method
AggregationAverage AggregationMethod = 1 // Aggregate using averaging
AggregationSum AggregationMethod = 2 // Aggregate using sum
AggregationLast AggregationMethod = 3 // Aggregate using the last value
AggregationMax AggregationMethod = 4 // Aggregate using the maximum value
AggregationMin AggregationMethod = 5 // Aggregate using the minimum value
)

func (a *AggregationMethod) String() (s string) {
switch *a {
case AGGREGATION_AVERAGE:
const (
DefaultXFilesFactor = 0.5
DefaultAggregationMethod = AggregationAverage
)

func (m AggregationMethod) String() (s string) {
switch m {
case AggregationAverage:
s = "average"
case AGGREGATION_SUM:
case AggregationSum:
s = "sum"
case AGGREGATION_LAST:
case AggregationLast:
s = "last"
case AGGREGATION_MIN:
case AggregationMin:
s = "min"
case AGGREGATION_MAX:
case AggregationMax:
s = "max"
default:
s = "unknown"
}
return
}

func (a *AggregationMethod) Set(s string) error {
switch s {
case "average":
*a = AGGREGATION_AVERAGE
case "sum":
*a = AGGREGATION_SUM
case "last":
*a = AGGREGATION_LAST
case "min":
*a = AGGREGATION_MIN
case "max":
*a = AGGREGATION_MAX
default:
*a = AGGREGATION_UNKNOWN
}
return nil
}

// Header contains all the metadata about a whisper database.
type Header struct {
Metadata Metadata // General metadata about the database
Expand Down Expand Up @@ -287,13 +287,32 @@ func ValidateArchiveList(archives []ArchiveInfo) error {

}

// Create a new whisper database at a given file path
func Create(path string, archives []ArchiveInfo, xFilesFactor float32, aggregationMethod AggregationMethod, sparse bool) (*Whisper, error) {
// CreateOptions sets the option
type CreateOptions struct {
// The XFiles factor to use. DefaultXFilesFactor if not set.
XFilesFactor float32

// The archive aggregation method to use. DefaultAggregationMethod if not set.
AggregationMethod AggregationMethod

// If true, allocate a sparse archive.
Sparse bool
}

// Create a new database at the given filepath.
func Create(path string, archives []ArchiveInfo, options CreateOptions) (*Whisper, error) {
file, err := os.OpenFile(path, os.O_RDWR|os.O_CREATE|os.O_EXCL, 0666)
if err != nil {
return nil, err
}

if options.XFilesFactor == 0.0 {
options.XFilesFactor = DefaultXFilesFactor
}
if options.AggregationMethod == 0 {
options.AggregationMethod = DefaultAggregationMethod
}

oldest := uint32(0)
for _, archive := range archives {
age := archive.SecondsPerPoint * archive.Points
Expand All @@ -303,8 +322,8 @@ func Create(path string, archives []ArchiveInfo, xFilesFactor float32, aggregati
}

metadata := Metadata{
AggregationMethod: aggregationMethod,
XFilesFactor: xFilesFactor,
AggregationMethod: options.AggregationMethod,
XFilesFactor: options.XFilesFactor,
ArchiveCount: uint32(len(archives)),
MaxRetention: oldest,
}
Expand All @@ -323,7 +342,7 @@ func Create(path string, archives []ArchiveInfo, xFilesFactor float32, aggregati
archiveOffsetPointer += archive.Points * pointSize
}

if sparse {
if options.Sparse {
file.Seek(int64(archiveOffsetPointer-headerSize-1), 0)
file.Write([]byte{0})
} else {
Expand Down Expand Up @@ -357,6 +376,7 @@ func Open(path string) (*Whisper, error) {
return openWhisper(file)
}

// Close closes a whisper database.
func (w *Whisper) Close() error {
return w.file.Close()
}
Expand Down Expand Up @@ -451,7 +471,7 @@ PointLoop:
}

// Fetch is equivalent to calling FetchUntil with until set to time.Now()
func (w *Whisper) Fetch(from uint32) (interval Interval, points []Point, err error) {
func (w *Whisper) Fetch(from uint32) (Interval, []Point, error) {
now := uint32(time.Now().Unix())
return w.FetchUntil(from, now)
}
Expand Down Expand Up @@ -845,25 +865,25 @@ func quantizeTimestamp(timestamp uint32, resolution uint32) (quantized uint32) {

func aggregate(aggregationMethod AggregationMethod, points []Point) (point Point, err error) {
switch aggregationMethod {
case AGGREGATION_AVERAGE:
case AggregationAverage:
for _, p := range points {
point.Value += p.Value
}
point.Value /= float64(len(points))
case AGGREGATION_SUM:
case AggregationSum:
for _, p := range points {
point.Value += p.Value
}
case AGGREGATION_LAST:
case AggregationLast:
point.Value = points[len(points)-1].Value
case AGGREGATION_MAX:
case AggregationMax:
point.Value = points[0].Value
for _, p := range points {
if p.Value > point.Value {
point.Value = p.Value
}
}
case AGGREGATION_MIN:
case AggregationMin:
point.Value = points[0].Value
for _, p := range points {
if p.Value < point.Value {
Expand Down

0 comments on commit 1863c0c

Please sign in to comment.