Skip to content

Commit

Permalink
Replace deprecated lastUpdate field in whipser data format with an cu…
Browse files Browse the repository at this point in the history
…stomizable aggregationMethod
  • Loading branch information
tmm1 committed Jul 25, 2011
1 parent 4a5f4ea commit 75340fb
Show file tree
Hide file tree
Showing 6 changed files with 64 additions and 31 deletions.
2 changes: 2 additions & 0 deletions carbon/conf/storage-schemas.conf.example
Expand Up @@ -3,8 +3,10 @@
#
# [name]
# pattern = regex
# aggregate = (average|sum|last|min|max)
# retentions = timePerPoint:timeToStore, timePerPoint:timeToStore, ...

[default_1min_for_1day]
pattern = .*
aggregate = average
retentions = 60s:1d
18 changes: 11 additions & 7 deletions carbon/lib/carbon/storage.py
Expand Up @@ -79,31 +79,34 @@ def matches(self, metric):


class DefaultSchema(Schema):
def __init__(self, name, archives):
def __init__(self, name, archives, aggregate):
self.name = name
self.archives = archives
self.aggregate = aggregate

def test(self, metric):
return True


class PatternSchema(Schema):
def __init__(self, name, pattern, archives):
def __init__(self, name, pattern, archives, aggregate):
self.name = name
self.pattern = pattern
self.regex = re.compile(pattern)
self.archives = archives
self.aggregate = aggregate

def test(self, metric):
return self.regex.search(metric)


class ListSchema(Schema):
def __init__(self, name, listName, archives):
def __init__(self, name, listName, archives, aggregate):
self.name = name
self.listName = listName
self.archives = archives
self.path = join(settings.WHITELISTS_DIR, listName)
self.aggregate = aggregate

if exists(self.path):
self.mtime = os.stat(self.path).st_mtime
Expand Down Expand Up @@ -156,18 +159,19 @@ def loadStorageSchemas():
matchAll = options.get('match-all')
pattern = options.get('pattern')
listName = options.get('list')
aggregate = options.get('aggregate')

retentions = options['retentions'].split(',')
archives = [ Archive.fromString(s) for s in retentions ]

if matchAll:
mySchema = DefaultSchema(section, archives)
mySchema = DefaultSchema(section, archives, aggregate)

elif pattern:
mySchema = PatternSchema(section, pattern, archives)
mySchema = PatternSchema(section, pattern, archives, aggregate)

elif listName:
mySchema = ListSchema(section, listName, archives)
mySchema = ListSchema(section, listName, archives, aggregate)

else:
raise ValueError('schema "%s" has no pattern or list parameter configured' % section)
Expand All @@ -179,4 +183,4 @@ def loadStorageSchemas():


defaultArchive = Archive(60, 60 * 24 * 7) #default retention for unclassified data (7 days of minutely data)
defaultSchema = DefaultSchema('default', [defaultArchive])
defaultSchema = DefaultSchema('default', [defaultArchive], 'average')
4 changes: 3 additions & 1 deletion carbon/lib/carbon/writer.py
Expand Up @@ -94,11 +94,13 @@ def writeCachedDataPoints():

if not dbFileExists:
archiveConfig = None
aggregationMethod = None

for schema in schemas:
if schema.matches(metric):
log.creates('new metric %s matched schema %s' % (metric, schema.name))
archiveConfig = [archive.getTuple() for archive in schema.archives]
aggregationMethod = schema.aggregate
break

if not archiveConfig:
Expand All @@ -108,7 +110,7 @@ def writeCachedDataPoints():
os.system("mkdir -p -m 755 '%s'" % dbDir)

log.creates("creating database file %s" % dbFilePath)
whisper.create(dbFilePath, archiveConfig)
whisper.create(dbFilePath, archiveConfig, aggregationMethod=aggregationMethod)
os.chmod(dbFilePath, 0755)
increment('creates')

Expand Down
3 changes: 2 additions & 1 deletion whisper/bin/whisper-create.py
Expand Up @@ -45,6 +45,7 @@ def parseRetentionDef(retentionDef):

option_parser = OptionParser(usage='''%prog path secondsPerPoint:pointsToStore [secondsPerPoint:pointsToStore]* ''')
option_parser.add_option('--xFilesFactor', default=0.5, type='float')
option_parser.add_option('--aggregationMethod', default='average', type='string', help="Method to use when aggregating values into lower precisions (average, sum, last, min, max)")
option_parser.add_option('--overwrite', default=False, action='store_true')

(options, args) = option_parser.parse_args()
Expand All @@ -60,7 +61,7 @@ def parseRetentionDef(retentionDef):
print 'Overwriting existing file: %s' % path
os.unlink(path)

whisper.create(path, archives, xFilesFactor=options.xFilesFactor)
whisper.create(path, archives, xFilesFactor=options.xFilesFactor, aggregationMethod=options.aggregationMethod)

size = os.stat(path).st_size
print 'Created: %s (%d bytes)' % (path,size)
8 changes: 7 additions & 1 deletion whisper/bin/whisper-resize.py
Expand Up @@ -56,6 +56,7 @@ def parseRetentionDef(retentionDef):
12h:2y 12 hours per datapoint, 2 years of retention
''')
option_parser.add_option('--xFilesFactor', default=None, type='float', help="Change the xFilesFactor")
option_parser.add_option('--aggregationMethod', default=None, type='string', help="Change the aggregation method used (average, sum, last, min, max)")
option_parser.add_option('--force', default=False, action='store_true', help="Perform a destructive change")
option_parser.add_option('--newfile', default=None, action='store', help="Create a new database file without removing the existing one")
option_parser.add_option('--nobackup', action='store_true', help='Delete the .bak file after successful execution')
Expand All @@ -78,6 +79,11 @@ def parseRetentionDef(retentionDef):
else:
xff = options.xFilesFactor

if options.aggregationMethod is None:
aggregationMethod = info['aggregationMethod']
else:
aggregationMethod = options.aggregationMethod

print 'Retrieving all data from the archives'
for archive in old_archives:
fromTime = now - archive['retention'] + archive['secondsPerPoint']
Expand All @@ -95,7 +101,7 @@ def parseRetentionDef(retentionDef):
newfile = options.newfile

print 'Creating new whisper database: %s' % newfile
whisper.create(newfile, new_archives, xFilesFactor=xff)
whisper.create(newfile, new_archives, xFilesFactor=xff, aggregationMethod=aggregationMethod)
size = os.stat(newfile).st_size
print 'Created: %s (%d bytes)' % (newfile,size)

Expand Down
60 changes: 39 additions & 21 deletions whisper/whisper.py
Expand Up @@ -19,13 +19,11 @@
#
# File = Header,Data
# Header = Metadata,ArchiveInfo+
# Metadata = lastUpdate,maxRetention,xFilesFactor,archiveCount
# Metadata = aggregationType,maxRetention,xFilesFactor,archiveCount
# ArchiveInfo = Offset,SecondsPerPoint,Points
# Data = Archive+
# Archive = Point+
# Point = timestamp,value
#
# NOTE: the lastUpdate field is deprecated, do not use it!

import os, struct, time
try:
Expand All @@ -43,8 +41,6 @@
longSize = struct.calcsize(longFormat)
floatFormat = "!f"
floatSize = struct.calcsize(floatFormat)
timestampFormat = "!L"
timestampSize = struct.calcsize(timestampFormat)
valueFormat = "!d"
valueSize = struct.calcsize(valueFormat)
pointFormat = "!Ld"
Expand All @@ -54,6 +50,15 @@
archiveInfoFormat = "!3L"
archiveInfoSize = struct.calcsize(archiveInfoFormat)

aggregationTypeToMethod = dict({
1: 'average',
2: 'sum',
3: 'last',
4: 'max',
5: 'min'
})
aggregationMethodToType = dict([[v,k] for k,v in aggregationTypeToMethod.items()])

debug = startBlock = endBlock = lambda *a,**k: None


Expand All @@ -65,6 +70,10 @@ class InvalidConfiguration(WhisperException):
"""Invalid configuration."""


class InvalidAggregationMethod(WhisperException):
"""Invalid aggregation method."""


class InvalidTimeInterval(WhisperException):
"""Invalid time interval."""

Expand Down Expand Up @@ -122,7 +131,7 @@ def __readHeader(fh):
packedMetadata = fh.read(metadataSize)

try:
(lastUpdate,maxRetention,xff,archiveCount) = struct.unpack(metadataFormat,packedMetadata)
(aggregationType,maxRetention,xff,archiveCount) = struct.unpack(metadataFormat,packedMetadata)
except:
raise CorruptWhisperFile("Unable to read header", fh.name)

Expand All @@ -146,7 +155,7 @@ def __readHeader(fh):

fh.seek(originalOffset)
info = {
#'lastUpdate' : lastUpdate, # Deprecated
'aggregationMethod' : aggregationTypeToMethod.get(aggregationType, 'average'),
'maxRetention' : maxRetention,
'xFilesFactor' : xff,
'archives' : archives,
Expand All @@ -159,20 +168,15 @@ def __readHeader(fh):

def __changeLastUpdate(fh):
return #XXX Make this a NOP, use os.stat(filename).st_mtime instead
originalOffset = fh.tell()
fh.seek(0) #Based on assumption that first field is lastUpdate
now = int( time.time() )
packedTime = struct.pack(timestampFormat,now)
fh.write(packedTime)
fh.seek(originalOffset)


def create(path,archiveList,xFilesFactor=0.5):
"""create(path,archiveList,xFilesFactor=0.5)
def create(path,archiveList,xFilesFactor=0.5,aggregationMethod='average'):
"""create(path,archiveList,xFilesFactor=0.5,aggregationMethod='average')
path is a string
archiveList is a list of archives, each of which is of the form (secondsPerPoint,numberOfPoints)
xFilesFactor specifies the fraction of data points in a propagation interval that must have known values for a propagation to occur
aggregationMethod specifies the method to use when propogating data (average, sum, last, min, max)
"""
#Validate archive configurations...
if not archiveList:
Expand Down Expand Up @@ -210,12 +214,12 @@ def create(path,archiveList,xFilesFactor=0.5):
if LOCK:
fcntl.flock( fh.fileno(), fcntl.LOCK_EX )

lastUpdate = struct.pack( timestampFormat, int(time.time()) )
aggregationType = struct.pack( longFormat, aggregationMethodToType[aggregationMethod] )
oldest = sorted([secondsPerPoint * points for secondsPerPoint,points in archiveList])[-1]
maxRetention = struct.pack( longFormat, oldest )
xFilesFactor = struct.pack( floatFormat, float(xFilesFactor) )
archiveCount = struct.pack(longFormat, len(archiveList))
packedMetadata = lastUpdate + maxRetention + xFilesFactor + archiveCount
packedMetadata = aggregationType + maxRetention + xFilesFactor + archiveCount
fh.write(packedMetadata)
headerSize = metadataSize + (archiveInfoSize * len(archiveList))
archiveOffsetPointer = headerSize
Expand All @@ -234,8 +238,22 @@ def create(path,archiveList,xFilesFactor=0.5):

fh.close()

def __aggregate(aggregationMethod, knownValues):
if aggregationMethod == 'average':
return float(sum(knownValues)) / float(len(knownValues))
elif aggregationMethod == 'sum':
return float(sum(knownValues))
elif aggregationMethod == 'last':
return knownValues[len(knownValues)-1]
elif aggregationMethod == 'max':
return max(knownValues)
elif aggregationMethod == 'min':
return min(knownValues)
else:
raise InvalidAggregationMethod("Unrecognized aggregation method")


def __propagate(fh,timestamp,xff,higher,lower):
def __propagate(fh,timestamp,aggregationMethod,xff,higher,lower):
lowerIntervalStart = timestamp - (timestamp % lower['secondsPerPoint'])
lowerIntervalEnd = lowerIntervalStart + lower['secondsPerPoint']

Expand Down Expand Up @@ -290,7 +308,7 @@ def __propagate(fh,timestamp,xff,higher,lower):

knownPercent = float(len(knownValues)) / float(len(neighborValues))
if knownPercent >= xff: #we have enough data to propagate a value!
aggregateValue = float(sum(knownValues)) / float(len(knownValues)) #TODO another CF besides average?
aggregateValue = __aggregate(aggregationMethod, knownValues)
myPackedPoint = struct.pack(pointFormat,lowerIntervalStart,aggregateValue)
fh.seek(lower['offset'])
packedPoint = fh.read(pointSize)
Expand Down Expand Up @@ -367,7 +385,7 @@ def file_update(fh, value, timestamp):
#Now we propagate the update to lower-precision archives
higher = archive
for lower in lowerArchives:
if not __propagate(fh, myInterval, header['xFilesFactor'], higher, lower):
if not __propagate(fh, myInterval, header['aggregationMethod'], header['xFilesFactor'], higher, lower):
break
higher = lower

Expand Down Expand Up @@ -493,7 +511,7 @@ def __archive_update_many(fh,header,archive,points):
uniqueLowerIntervals = set(lowerIntervals)
propagateFurther = False
for interval in uniqueLowerIntervals:
if __propagate(fh,interval,header['xFilesFactor'],higher,lower):
if __propagate(fh,interval,header['aggregationMethod'],header['xFilesFactor'],higher,lower):
propagateFurther = True

if not propagateFurther:
Expand Down

0 comments on commit 75340fb

Please sign in to comment.