Skip to content

Commit

Permalink
mvcc stuff, queues, sequences
Browse files Browse the repository at this point in the history
  • Loading branch information
palvaro committed Dec 22, 2011
1 parent 928ab05 commit 26a37c9
Show file tree
Hide file tree
Showing 5 changed files with 555 additions and 4 deletions.
148 changes: 148 additions & 0 deletions kvs/mvcc.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
# Written by Max Johnson and Ryan Spore

require 'rubygems'
require 'bud'
require 'kvs/kvs'
require 'ordering/queue'
require 'ordering/sequences'

# NOTES:
# * We disallow writes from clients to transactions they don't own. Ideally, we would also enforce this for get and del, but the KVSProtocol doesn't support it.


# @abstract MVCCProtocol is an abstract interface to an MVCC-based key-value store.
# an MVCC implementation should subclass MVCCProtocol
module MVCCProtocol
include KVSProtocol
state do

# used to request a new transaction id and start said transaction
# @param [String] client a unique client id, e.g. an ip address
interface input, :new_transaction, [:client] => []

# returns a new transaction id to the client who requested it
# @param [Number] transaction_id a unique transaction id to be used for read and write requests
# @param [String] client a unique client id, e.g. an ip address
interface output, :transaction_id_response, [:transaction_id] => [:client]

# used to commit a completed transaction
# @param [Number] transaction_id a unique transaction id to be used for read and write requests; the transaction to be commited
# @param [String] client a unique client id, e.g. an ip address
interface input, :commit, [:transaction_id] => [:client]

# a tuple in this output indicates that a transaction has been aborted
# @param [Number] transaction_id a unique transaction id to be used for read and write requests; the transaction that was aborted
# @param [String] client a unique client id, e.g. an ip address
interface output, :aborted_transactions, [:transaction_id] => [:client]
end
end

module MVCC
include MVCCProtocol
import KVSProtocol => :kvs
include Counter
include FIFOQueue

state do
table :active_transactions, [:transaction_id] => [:client]
scratch :new_active_transaction, active_transactions.schema
scratch :signal_commit, [:transaction_id]
scratch :signal_abort, [:transaction_id] => [:client]
scratch :write_request, kvput.schema
scratch :perform_write, write_request.schema
table :write_log, [:transaction_id, :key] => [:client]
table :transaction_id_lookup, [:key, :transaction_id] => []
scratch :old_transactions, transaction_id_lookup.schema
scratch :older_transactions, transaction_id_lookup.schema
scratch :oldest_transaction, active_transactions.schema
scratch :garbage_collection_keys, [:key] => [:transaction_id]
table :snapshot_lookup, [:transaction_id, :key] => [:lookup_id]
end

bloom :new_transactions do
push <= new_transaction { |t| [t, 0] }
pop <= [[0]]
get_count <= [[:next_transaction_id]]
new_active_transaction <= (return_count * pop_response).pairs do |count, t|
[count.tally, t.item.client]
end
increment_count <= new_active_transaction { |t| [:next_transaction_id] }
active_transactions <= new_active_transaction
older_transactions <= (new_active_transaction * transaction_id_lookup).pairs() do |new_transaction, writes_for_key|
[writes_for_key.key, writes_for_key.transaction_id] if writes_for_key.transaction_id <= new_transaction.transaction_id
end
snapshot_lookup <= (new_active_transaction * older_transactions.argmax([:key], :transaction_id)).pairs {|new_tx, key| [new_tx.transaction_id, key.key, key.transaction_id]}
transaction_id_response <= new_active_transaction
end

bloom :finish_transaction_successfully do
signal_commit <= (commit * active_transactions).lefts(:client => :client)
active_transactions <- (active_transactions * signal_commit).lefts(:transaction_id => :transaction_id)
write_log <- (write_log * signal_commit).lefts(:transaction_id => :transaction_id)
transaction_id_lookup <+ (write_log * signal_commit).lefts(:transaction_id => :transaction_id) do |w|
[w.key, w.transaction_id]
end
snapshot_lookup <- (signal_commit * snapshot_lookup).rights(:transaction_id => :transaction_id)
end

bloom :kvs_put do
write_request <= (kvput * active_transactions).lefts(:client => :client, :reqid => :transaction_id)
signal_abort <= (write_request * write_log).pairs(:key => :key) do |this, that|
if that.transaction_id < this.reqid
[this.reqid, this.client]
else
[that.transaction_id, that.client]
end
end
perform_write <= write_request.notin(signal_abort, :reqid => :transaction_id)
write_log <+ perform_write { |w| [w.reqid, w.key, w.client] }
kvs.kvput <= perform_write { |w| [w.client, [w.key, w.reqid], w.reqid, w.value]}
snapshot_lookup <+- perform_write { |w| [w.reqid, w.key, w.reqid] }
end

bloom :kvs_get do
kvs.kvget <= (kvget * snapshot_lookup).pairs(:key => :key, :reqid => :transaction_id) do |get_request, snapshot|
[get_request.reqid, [get_request.key, snapshot.lookup_id]]
end
kvget_response <= kvs.kvget_response do |get_response|
[get_response.reqid, get_response.key[0], get_response.value]
end
end

bloom :kvs_del do
kvput <= (kvdel * active_transactions).pairs(:reqid => :transaction_id) {|delete_request, active_transaction| [active_transaction.client, delete_request.key, active_transaction.transaction_id, nil]}
end

bloom :abort do
active_transactions <- (active_transactions * signal_abort).lefts(:transaction_id => :transaction_id)
write_log <- (write_log * signal_abort).lefts(:transaction_id => :transaction_id)
transaction_id_lookup <- (write_log * signal_abort).lefts(:transaction_id => :transaction_id) do |w|
[w.key, w.transaction_id]
end
snapshot_lookup <- (signal_abort * snapshot_lookup).rights(:transaction_id => :transaction_id)
kvs.kvdel <= (write_log * signal_abort).lefts(:transaction_id => :transaction_id) { |aborted_write| [aborted_write.key, aborted_write.transaction_id] }
aborted_transactions <= signal_abort
end

bloom :garbage_collection do
oldest_transaction <= (active_transactions.argmin([], :transaction_id) * signal_commit).lefts(:transaction_id => :transaction_id)
garbage_collection_keys <= (oldest_transaction * write_log).pairs(:transaction_id => :transaction_id) {|old, write| [write.key, old.transaction_id]}
old_transactions <= (garbage_collection_keys * transaction_id_lookup).pairs(:key=>:key) do |key_to_check, writes_for_key|
[writes_for_key.key, writes_for_key.transaction_id] if writes_for_key.transaction_id <= key_to_check.transaction_id
end
temp :x <= old_transactions.argmax([:key], :transaction_id)
temp :y <= old_transactions.notin(x)
transaction_id_lookup <- (y * oldest_transaction).lefts
kvs.kvdel <= (y * oldest_transaction).lefts
end
end

module BasicMVCC
include MVCC
import BasicKVS => :kvs
end

module ReplicatedMVCC
include MVCC
import ReplicatedKVS => :kvs
end
163 changes: 163 additions & 0 deletions ordering/queue.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
# @abstract PriorityQueueProtocol is the abstract interface for priority queues
# Any implementation of a queue should subclass PriorityQueueProtocol
module PriorityQueueProtocol
state do
# Push items into the queue.
# Useful Mnemonic: push "item" with priority "priority" into queue "queue."
# Note: queue is essentially optional - a single queue can be used without specifying queue because it will automatically be included as nil
# @param [Object] item is the item that will be pushed into the queue
# @param [Number] priority specifies the priority of the item in the queue
# @param [Number] queue specifies which queue to push the item in
interface input, :push, [:item, :priority, :queue]

# Removes items out of the queue, regardless of priority.
# Useful Mnemonic: remove "item" from queue "queue"
# @param [Object] item specifies which item to remove
# @param [Number] queue specifies which queue to remove the item from
# @return [remove_response] upon successful removal.
interface input, :remove, [:item, :queue]

# Pop items out of the queue.
# Removes the top priority item in queue queue: outputs the item into pop_response.
# Useful Mnemonic: pop from queue "queue"
# @param [Number] queue specifies which queue to pop from
# @return [pop_response] when the pop request is successfully processed.
interface input, :pop, [:queue]

# Peek the top item in the queue.
# Like pop, but does not remove the item from the queue.
# Useful Mnemonic: peek from queue "queue"
# @param [Number] queue specifies which queue to peek at
# @return [peek_response] when the peek request is successfully processed.
interface input, :peek, [:queue]

# If there is a remove request, remove and return the item regardless of priority
# @param [Object] item is the item that will be pushed into the queue
# @param [Number] priority specifies the priority of the item in the queue
# @param [Number] queue specifies which queue to push the item in
interface output, :remove_response, push.schema

# If there is a pop request, remove and return the top priority item from the queue
# @param [Object] item is the item that will be pushed into the queue
# @param [Number] priority specifies the priority of the item in the queue
# @param [Number] queue specifies which queue to push the item in
interface output, :pop_response, push.schema

# If there is a peek request, return (but don't remove) the top priority item from the queue
# @param [Object] item is the item that will be pushed into the queue
# @param [Number] priority specifies the priority of the item in the queue
# @param [Number] queue specifies which queue to push the item in
interface output, :peek_response, push.schema
end
end

# @abstract FIFOQueueProtocol is the abstract interface for fifo queues
module FIFOQueueProtocol
state do
# Push items into the queue.
# Note: queue is essentially optional - a single queue can be used without specifying queue because it will automatically be included as nil
# @param [Object] item is the item that will be pushed into the queue
# @param [Number] queue specifies which queue to push the item in
interface input, :push, [:item, :queue]

# Pop items out of the queue.
# Removes the top priority item in queue queue: outputs the item into pop_response.
# @param [Number] queue specifies which queue to pop from
# @return [pop_response] when the pop request is successfully processed.
interface input, :pop, [:queue]

# Peek the top item in the queue.
# Like pop, but does not remove the item from the queue.
# @param [Number] queue specifies which queue to peek at
# @return [peek_response] when the peek request is successfully processed.
interface input, :peek, [:queue]

# If there is a pop request, remove and return the first item that was inserted into the queue
# @param [Object] item is the item that will be pushed into the queue
# @param [Number] queue specifies which queue to push the item in
interface output, :pop_response, [:item, :queue]

# If there is a peek request, return (but don't remove) the first item that was inserted into the queue
# @param [Object] item is the item that will be pushed into the queue
# @param [Number] queue specifies which queue to push the item in
interface output, :peek_response, [:item, :queue]
end
end

# PriorityQueue is the basic implementation of a priority queue.
# The front of the queue is always the lowest priority item.
# @see PriorityQueue implements PriorityQueueProtocol
module PriorityQueue
include PriorityQueueProtocol

state do
# The items that are currently in the queue
table :items, [:item, :priority, :queue]

# The lowest priority item for each queue.
# Does not necessarily contain one item per queue (contains all items with the current lowest priority)
scratch :lowest, [:item, :priority, :queue]

# Temporary collection to contain the pop response.
# Does not necessarily contain one item per queue (contains all items with the current lowest priority)
# An interposition for breaking ties
scratch :lowest_popr, [:item, :priority, :queue]

# Temporary collection to contain the peek response.
# Does not necessarily contain one item per queue (contains all items with the current lowest priority)
# An interposition for breaking ties
scratch :lowest_peekr, [:item, :priority, :queue]
end

bloom :remember do
items <= push
end

bloom :calc_lowest do
# Users can override method of choosing best priority
# By default it is based on the ruby min
lowest <= items.argmin([:queue], :priority)
lowest_popr <= (pop * lowest).rights(:queue => :queue)
lowest_peekr <= (peek * lowest).rights(:queue => :queue)
end

bloom :break_tie do
# Users can override method of breaking ties
# By default it is chosen arbitrarily
pop_response <= lowest_popr.argagg(:choose, [:queue, :priority], :item)
peek_response <= lowest_peekr.argagg(:choose, [:queue, :priority], :item)
end

bloom :remove_item do
remove_response <= (remove * items).rights(:queue => :queue, :item => :item)
end

bloom :drop do
items <- remove_response
items <- pop_response
end

bloom :debug do
# stdio <~ lowest.inspected
# stdio <~ pop_response.inspected
end
end

# FIFOQueue is the basic implementation of a fifo queue.
# The front of the queue is always the earliest item that was inserted out of the items in the queue.
# Uses budtime to order the items.
# @see FIFOQueue implements FIFOQueueProtocol
# @see FIFOQueue imports PriorityQueue
module FIFOQueue
include FIFOQueueProtocol
import PriorityQueue => :pq

bloom do
pq.push <= push {|p| [p.item, budtime, p.queue]}
pq.pop <= pop
pq.peek <= peek

pop_response <= pq.pop_response {|p| [p.item, p.queue]}
peek_response <= pq.peek_response {|p| [p.item, p.queue]}
end
end
70 changes: 70 additions & 0 deletions ordering/sequences.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
require 'rubygems'
require 'bud'

# @abstract SequencesProtocol is the abstract interface for updating and accessing a collection of counters.
# A module that requires counters should subclass SequencesProtocol.
module SequencesProtocol
state do
# increment the counter for an id. If the id does not exist, initialize it.
# @param [String] ident the unique identifier of a row in a collection of counts
interface input, :increment_count, [:ident]

# reset the counter for an id. In any implementation, the row identified by ident should either be set to
# its initial value again, or no longer be in the collection of counters.
# @param [String] ident the unique identifier of a row in a collection of counts
interface input, :clear_ident, [:ident]

# request the count of an id. If the id is non-existant, then get_count will initialize the given id.
# @param [String] ident the unique identifier of a row in a collection of counts
interface input, :get_count, [:ident]

# output the count of an id. A get_count invocation should never result in return_count being empty.
# @param [String] ident the unique identifier of a row in a collection of counts
# @param [Number] the count associated with the ident
interface output, :return_count, [:ident]=>[:tally]
end
end

# Counter is a simple implementation of SequencesProtocol
# @see SequencesProtocol implements SequencesProtocol
module Counter
include SequencesProtocol

state do
# used to keep state for all counters in Counter
table :total_counts, [:ident] => [:tally]
end

bloom do
# when first count for an ident comes in, set up new count tuple for ident
total_counts <+ increment_count do |u|
[u.ident, 1] if not total_counts.exists? do |t|
u.ident==t.ident
end
end

# when get count for nonexistent ident comes in, set up new count tuple for ident
total_counts <+ get_count do |u|
[u.ident, 0] if not total_counts.exists? do |t|
u.ident==t.ident
end
end

return_count <= get_count do |u|
[u.ident, 0] if not total_counts.exists? do |t|
u.ident==t.ident
end
end

# increment an existing count by 1
total_counts <+- (total_counts * increment_count).pairs(:ident=>:ident) do |l,r|
[l.ident, l.tally+1]
end

# return count when get request comes in
return_count <= (get_count*total_counts).rights(:ident=>:ident)

# clear count when clear request comes in
total_counts <- (clear_ident*total_counts).rights(:ident=>:ident)
end
end
Loading

0 comments on commit 26a37c9

Please sign in to comment.