Browse files

Process documents in batches wrapped in transactions

  • Loading branch information...
1 parent 29bb6b4 commit 965065c14e102b52815caec9209786d92c76a9d8 Laszlo Bacsi committed Feb 20, 2012
Showing with 32 additions and 8 deletions.
  1. +32 −8 lib/squealer/database.rb
View
40 lib/squealer/database.rb
@@ -7,6 +7,7 @@ module Squealer
class Database
include Singleton
+ attr_accessor :default_batch_size
attr_reader :import, :export
def import_from(uri, options={})
@@ -30,10 +31,10 @@ def initialize(dbc)
@collections = {}
end
- def source(collection, conditions = {}, &block)
+ def source(collection, selector = {}, opts = {}, &block)
source = Source.new(@dbc, collection)
@collections[collection] = source
- source.source(conditions, &block)
+ source.source(selector, opts, &block)
end
def eval(string)
@@ -49,22 +50,45 @@ def initialize(dbc, collection)
@collection = dbc.collection(collection)
end
- def source(conditions)
- @cursor = block_given? ? yield(@collection) : @collection.find(conditions)
+ def source(selector, opts)
+ opts[:batch_size] ||= Squealer::Database.instance.default_batch_size || 0
+ @cursor = block_given? ? yield(@collection) : @collection.find(selector, opts)
@counts[:total] = cursor.count
@progress_bar = Squealer::ProgressBar.new(cursor.count)
self
end
- def each
+ def each(&block)
@progress_bar.start if @progress_bar
- @cursor.each do |row|
+ if @cursor.batch_size == 0
+ process_batch(@cursor, &block)
+ else
+ batch = []
+ @cursor.each do |doc|
+ batch << doc
+ if batch.size >= @cursor.batch_size
+ process_batch_in_transaction(batch, &block)
+ batch = []
+ end
+ end
+ process_batch_in_transaction(batch, &block)
+ end
+ @progress_bar.finish if @progress_bar
+ end
+
+ def process_batch(batch)
+ batch.each do |doc|
@counts[:imported] += 1
- yield row
+ yield doc
@progress_bar.tick if @progress_bar
@counts[:exported] += 1
end
- @progress_bar.finish if @progress_bar
+ end
+
+ def process_batch_in_transaction(batch, &block)
+ Squealer::Database.instance.export.transaction do
+ process_batch(batch, &block)
+ end
end
end # Source

0 comments on commit 965065c

Please sign in to comment.