Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
juliusv committed Feb 15, 2017
1 parent 4632ce6 commit bd031fe
Show file tree
Hide file tree
Showing 7 changed files with 141 additions and 105 deletions.
2 changes: 1 addition & 1 deletion lib/prometheus/client/counter.rb
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ def increment(labels = {}, by = 1)
private

def default(labels)
ValueType.new(type, @name, @name, labels)
ValueClass.new(type, @name, @name, labels)
end
end
end
Expand Down
12 changes: 12 additions & 0 deletions lib/prometheus/client/gauge.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,22 @@ module Client
# A Gauge is a metric that exposes merely an instantaneous value or some
# snapshot thereof.
class Gauge < Metric
def initialize(name, docstring, base_labels = {}, multiprocess_mode='all')
super(name, docstring, base_labels)
if ValueClass.multiprocess and !['min', 'max', 'livesum', 'liveall', 'all'].include?(multiprocess_mode)
raise ArgumentError, 'Invalid multiprocess mode: ' + multiprocess_mode
end
@multiprocess_mode = multiprocess_mode
end

def type
:gauge
end

def default(labels)
ValueClass.new(type, @name, @name, labels, @multiprocess_mode)
end

# Sets the value for the given label set
def set(labels, value)
@values[label_set_for(labels)].set(value)
Expand Down
6 changes: 3 additions & 3 deletions lib/prometheus/client/histogram.rb
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,12 @@ class Value < Hash
attr_accessor :sum, :total

def initialize(type, name, labels, buckets)
@sum = ValueType.new(type, name, name.to_s + '_sum', labels)
@sum = ValueClass.new(type, name, name.to_s + '_sum', labels)
# TODO: get rid of total and use +Inf bucket instead.
@total = ValueType.new(type, name, name.to_s + '_count', labels)
@total = ValueClass.new(type, name, name.to_s + '_count', labels)

buckets.each do |bucket|
self[bucket] = ValueType.new(type, name, name.to_s + '_count', labels)
self[bucket] = ValueClass.new(type, name, name.to_s + '_count', labels)
end
end

Expand Down
2 changes: 1 addition & 1 deletion lib/prometheus/client/metric.rb
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ def values
private

def default(labels)
ValueType.new(type, @name, @name, labels, nil)
ValueClass.new(type, @name, @name, labels)
end

def validate_name(name)
Expand Down
6 changes: 3 additions & 3 deletions lib/prometheus/client/summary.rb
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,11 @@ class Value < Hash
attr_accessor :sum, :total

def initialize(name, labels, estimator)
@sum = ValueType.new(name, name + '_sum', labels, estimator.sum)
@total = ValueType.new(name, name + '_count', labels, estimator.observations)
@sum = ValueClass.new(name, name + '_sum', labels, estimator.sum)
@total = ValueClass.new(name, name + '_count', labels, estimator.observations)

estimator.invariants.each do |invariant|
self[invariant.quantile] = ValueType.new(name, labels, estimator.query(invariant.quantile), nil)
self[invariant.quantile] = ValueClass.new(name, labels, estimator.query(invariant.quantile), nil)
end
end
end
Expand Down
216 changes: 120 additions & 96 deletions lib/prometheus/client/valuetype.rb
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
# encoding: UTF-8

require "json"
require "mmap"

module Prometheus
module Client
class SimpleValue
Expand All @@ -15,72 +18,81 @@ def increment(by = 1)
@value += by
end

def get()
def get
@value
end
end

def MultiProcessValue(pid=Process.pid)
files = {}
files_lock = Mutex.new
def multiprocess
false
end
end

# A float protected by a mutex backed by a per-process mmaped file.
class MmapedValue
def initialize(type, metric_name, name, labels, multiprocess_mode='')
file_prefix = typ
if type == :gauge
file_prefix += '_' + multiprocess_mode
end
# A float protected by a mutex backed by a per-process mmaped file.
class MmapedValue
@@files = {}
@@files_lock = Mutex.new
@@pid = Process.pid

files_lock.synchronize do
if files.has_key?(file_prefix)
filename = File.join(ENV['prometheus_multiproc_dir'], "#{file_prefix}_#{pid}.db")
files[file_prefix] = MmapedDict.new(filename)
end
end
def initialize(type, metric_name, name, labels, multiprocess_mode='')
file_prefix = type.to_s
if type == :gauge
file_prefix += '_' + multiprocess_mode
end

@file = files[file_prefix]
labelnames = []
labelvalues = []
labels.each do |k, v|
labelnames << k
labelvalues << v
@@files_lock.synchronize do
if !@@files.has_key?(file_prefix)
filename = File.join(ENV['prometheus_multiproc_dir'], "#{file_prefix}_#{@@pid}.db")
@@files[file_prefix] = MmapedDict.new(filename)
end

@key = [metric_name, name, labelnames, labelvalues)].to_json
@value = @file.read_value(@key)
@mutex = Mutex.new
end

def inc(self, amount):
@mutex.synchronize do
self._value += amount
self._file.write_value(self._key, self._value)
end
@file = @@files[file_prefix]
labelnames = []
labelvalues = []
labels.each do |k, v|
labelnames << k
labelvalues << v
end

def set(self, value):
@mutex.synchronize do
self._value = value
self._file.write_value(self._key, self._value)
end
@key = [metric_name, name, labelnames, labelvalues].to_json
@value = @file.read_value(@key)
@mutex = Mutex.new
end

def increment(amount=1)
@mutex.synchronize do
@value += amount
@file.write_value(@key, @value)
end
end

def get(self):
@mutex.synchronize do
return self._value
end
def set(value)
@mutex.synchronize do
@value = value
@file.write_value(@key, @value)
end
end

def multiprocess
true
def get
@mutex.synchronize do
return @value
end
end

return MmapedValue
def multiprocess
true
end
end

ValueType = SimpleValue
# Should we enable multi-process mode?
# This needs to be chosen before the first metric is constructed,
# and as that may be in some arbitrary library the user/admin has
# no control over we use an enviroment variable.
if ENV.has_key?('prometheus_multiproc_dir')
ValueClass = MmapedValue
else
ValueClass = SimpleValue
end
end
end

Expand All @@ -92,88 +104,100 @@ def multiprocess
# size of the next field, a utf-8 encoded string key, padding to a 8 byte
#alignment, and then a 8 byte float which is the value.
#
# TODO(julius): do Mmap.new, truncate, etc., raise exceptions on failure?
class MmapedDict(object):
@@_INITIAL_MMAP_SIZE = 1024*1024
# TODO(julius): dealing with Mmap.new, truncate etc. errors?
class MmapedDict
@@INITIAL_MMAP_SIZE = 1024*1024

attr_reader :m, :capacity, :used, :positions

def initialize(filename):
def initialize(filename)
@mutex = Mutex.new
@m = Mmap.new(filename)
if @m.empty?
@m.extend(@@INITIAL_MMAP_SIZE)
@f = File.open(filename, 'a+b')
if @f.size == 0
@f.truncate(@@INITIAL_MMAP_SIZE)
end
@capacity = @m.size
@m.mlock
@capacity = @f.size
@m = Mmap.new(filename, 'rw', Mmap::MAP_SHARED)
#@m.mlock

@positions = {}
@used = @m.unpack('l')[0]
if @used == 0:
@used = @m[0..3].unpack('l')[0]
if @used == 0
@used = 8
@m[0..3] = [@used].pack('l')
else
read_all_values.each do |key, _, pos|
@positions[key] = pos
end
end
end

# Yield (key, value, pos). No locking is performed.
def all_values
read_all_values.map { |k, v, p| [k, v] }
end

def read_value(key)
@mutex.synchronize do
if !@positions.has_key?(key)
init_value(key)
end
end
pos = @positions[key]
# We assume that reading from an 8 byte aligned value is atomic.
@m[pos..pos+7].unpack('d')[0]
end

def write_value(key, value)
@mutex.synchronize do
if !@positions.has_key?(key)
init_value(key)
end
end
pos = @positions[key]
# We assume that writing to an 8 byte aligned value is atomic.
@m[pos..pos+7] = [value].pack('d')
end

def close()
@m.munmap
@f.close
end

private

# Initialize a value. Lock must be held by caller.
def init_value(self, key):
encoded = key.encode('utf-8')
def init_value(key)
# Pad to be 8-byte aligned.
padded = encoded + (b' ' * (8 - (encoded.length + 4) % 8))
value = [encoded.length, padded, 0.0].pack('lA#{padded.length}d')
while @used + value.length > @capacity:
@m.extend(@capacity)
padded = key + (' ' * (8 - (key.length + 4) % 8))
value = [key.length, padded, 0.0].pack("lA#{padded.length}d")
while @used + value.length > @capacity
@capacity *= 2
####@m = mmap.mmap(self._f.fileno(), self._capacity)
@m[@used:@used + value.length] = value
@f.truncate(@capacity)
@m = Mmap.new(@f.path, 'rw', Mmap::MAP_SHARED)
end
@m[@used..@used + value.length] = value

# Update how much space we've used.
@used += value.length
@m[0..3] = [@used].pack('l')
@positions[key] = @used - 8
end

# Yield (key, value, pos). No locking is performed.
def read_all_values(self):
def read_all_values
pos = 8
values = []
while pos < @used:
while pos < @used
encoded_len = @m[pos..-1].unpack('l')[0]
pos += 4
encoded = @m[pos..-1].unpack('A#{encoded_len}')[0]
encoded = @m[pos..-1].unpack("A#{encoded_len}")[0]
padded_len = encoded_len + (8 - (encoded_len + 4) % 8)
pos += padded_len
value = @m[pos..-1].unpack('d')[0]
values << encoded.decode('utf-8'), value, pos
values << [encoded, value, pos]
pos += 8
end
values
end

# Yield (key, value, pos). No locking is performed.
def all_values():
read_all_values.map { |k, v, p| [k, v] }

def read_value(key):
@mutex.synchronize do
if !@positions.has_key?(key)
init_value(key)
end
end
pos = @positions[key]
# We assume that reading from an 8 byte aligned value is atomic.
@m[pos..-1].unpack('d')[0]

def write_value(key, value):
@mutex.synchronize do
if !@positions.has_key?(key)
init_value(key)
end
end
pos = @positions[key]
# We assume that writing to an 8 byte aligned value is atomic.
@m[pos..-1] = [value].pack('d')

def close():
@m.munmap
end
2 changes: 1 addition & 1 deletion spec/prometheus/client/formats/text_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@
]
metrics.each do |m|
m.values.each do |k, v|
m.values[k] = Prometheus::Client::ValueType.new(m.type, m.name, m.name, k, v)
m.values[k] = Prometheus::Client::ValueClass.new(m.type, m.name, m.name, k, v)
end
end
double(metrics: metrics)
Expand Down

0 comments on commit bd031fe

Please sign in to comment.