Skip to content
This repository

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse code

Add Dataset#paged_each, for processing entire datasets without keepin…

…g all rows in memory

This seems to be a somewhat common need, and previously the
database-independent way to handle this has been to use the
pagination extension's Dataset#each_page method.  Unfortunately,
that yields datasets instead of rows, so if you just want to
process all rows, you end up with more complex code.

This new Dataset#paged_each method internalizes that complexity,
so the user can use it as a drop-in replacement for each.
  • Loading branch information...
commit 5d820c6dbe742bf304963e4034acbe40c17c2f3c 1 parent 9bca383
Jeremy Evans authored
2  CHANGELOG
... ... @@ -1,5 +1,7 @@
1 1 === HEAD
2 2
  3 +* Add Dataset#paged_each, for processing entire datasets without keeping all rows in memory (jeremyevans)
  4 +
3 5 * Add Sequel::ConstraintViolation exception class and subclasses for easier exception handling (jeremyevans)
4 6
5 7 * Fix use of identity_map plugin with many_to_many associations with right composite keys (chanks) (#603)
49 lib/sequel/dataset/actions.rb
@@ -461,6 +461,55 @@ def multi_insert(hashes, opts={})
461 461 import(columns, hashes.map{|h| columns.map{|c| h[c]}}, opts)
462 462 end
463 463
  464 + # Yields each row in the dataset, but interally uses multiple queries as needed with
  465 + # limit and offset to process the entire result set without keeping all
  466 + # rows in the dataset in memory, even if the underlying driver buffers all
  467 + # query results in memory.
  468 + #
  469 + # Because this uses multiple queries internally, in order to remain consistent,
  470 + # it also uses a transaction internally. Additionally, to make sure that all rows
  471 + # in the dataset are yielded and none are yielded twice, the dataset must have an
  472 + # unambiguous order. Sequel requires that datasets using this method have an
  473 + # order, but it cannot ensure that the order is unambiguous.
  474 + #
  475 + # Options:
  476 + # :rows_per_fetch :: The number of rows to fetch per query. Defaults to 1000.
  477 + def paged_each(opts={})
  478 + unless @opts[:order]
  479 + raise Sequel::Error, "Dataset#paged_each requires the dataset be ordered"
  480 + end
  481 +
  482 + total_limit = @opts[:limit]
  483 + offset = @opts[:offset] || 0
  484 +
  485 + if server = @opts[:server]
  486 + opts = opts.merge(:server=>server)
  487 + end
  488 +
  489 + rows_per_fetch = opts[:rows_per_fetch] || 1000
  490 + num_rows_yielded = rows_per_fetch
  491 + total_rows = 0
  492 +
  493 + db.transaction(opts) do
  494 + while num_rows_yielded == rows_per_fetch && (total_limit.nil? || total_rows < total_limit)
  495 + if total_limit && total_rows + rows_per_fetch > total_limit
  496 + rows_per_fetch = total_limit - total_rows
  497 + end
  498 +
  499 + num_rows_yielded = 0
  500 + limit(rows_per_fetch, offset).each do |row|
  501 + num_rows_yielded += 1
  502 + total_rows += 1 if total_limit
  503 + yield row
  504 + end
  505 +
  506 + offset += rows_per_fetch
  507 + end
  508 + end
  509 +
  510 + self
  511 + end
  512 +
464 513 # Returns a +Range+ instance made from the minimum and maximum values for the
465 514 # given column/expression. Uses a virtual row block if no argument is given.
466 515 #
86 spec/core/dataset_spec.rb
@@ -40,6 +40,14 @@
40 40 Sequel::Dataset.included_modules.should include(Enumerable)
41 41 end
42 42
  43 + specify "should yield rows to each" do
  44 + ds = Sequel.mock[:t]
  45 + ds._fetch = {:x=>1}
  46 + called = false
  47 + ds.each{|a| called = true; a.should == {:x=>1}}
  48 + called.should be_true
  49 + end
  50 +
43 51 specify "should get quote_identifiers default from database" do
44 52 db = Sequel::Database.new(:quote_identifiers=>true)
45 53 db[:a].quote_identifiers?.should == true
@@ -4526,3 +4534,81 @@ class << Sequel
4526 4534 end
4527 4535 end
4528 4536
  4537 +describe "Dataset#paged_each" do
  4538 + before do
  4539 + @ds = Sequel.mock[:test].order(:x)
  4540 + @db = (0...10).map{|i| {:x=>i}}
  4541 + @ds._fetch = @db
  4542 + @rows = []
  4543 + @proc = lambda{|row| @rows << row}
  4544 + end
  4545 +
  4546 + it "should yield rows to the passed block" do
  4547 + @ds.paged_each(&@proc)
  4548 + @rows.should == @db
  4549 + end
  4550 +
  4551 + it "should respect the row_proc" do
  4552 + @ds.row_proc = lambda{|row| {:x=>row[:x]*2}}
  4553 + @ds.paged_each(&@proc)
  4554 + @rows.should == @db.map{|row| {:x=>row[:x]*2}}
  4555 + end
  4556 +
  4557 + it "should use a transaction to ensure consistent results" do
  4558 + @ds.paged_each(&@proc)
  4559 + sqls = @ds.db.sqls
  4560 + sqls[0].should == 'BEGIN'
  4561 + sqls[-1].should == 'COMMIT'
  4562 + end
  4563 +
  4564 + it "should use a limit and offset to go through the dataset in chunks at a time" do
  4565 + @ds.paged_each(&@proc)
  4566 + @ds.db.sqls[1...-1].should == ['SELECT * FROM test ORDER BY x LIMIT 1000 OFFSET 0']
  4567 + end
  4568 +
  4569 + it "should accept a :rows_per_fetch option to change the number of rows per fetch" do
  4570 + @ds._fetch = @db.each_slice(3).to_a
  4571 + @ds.paged_each(:rows_per_fetch=>3, &@proc)
  4572 + @rows.should == @db
  4573 + @ds.db.sqls[1...-1].should == ['SELECT * FROM test ORDER BY x LIMIT 3 OFFSET 0',
  4574 + 'SELECT * FROM test ORDER BY x LIMIT 3 OFFSET 3',
  4575 + 'SELECT * FROM test ORDER BY x LIMIT 3 OFFSET 6',
  4576 + 'SELECT * FROM test ORDER BY x LIMIT 3 OFFSET 9']
  4577 + end
  4578 +
  4579 + it "should handle cases where the last query returns nothing" do
  4580 + @ds._fetch = @db.each_slice(5).to_a
  4581 + @ds.paged_each(:rows_per_fetch=>5, &@proc)
  4582 + @rows.should == @db
  4583 + @ds.db.sqls[1...-1].should == ['SELECT * FROM test ORDER BY x LIMIT 5 OFFSET 0',
  4584 + 'SELECT * FROM test ORDER BY x LIMIT 5 OFFSET 5',
  4585 + 'SELECT * FROM test ORDER BY x LIMIT 5 OFFSET 10']
  4586 + end
  4587 +
  4588 + it "should respect an existing server option to use" do
  4589 + @ds = Sequel.mock(:servers=>{:foo=>{}})[:test].order(:x)
  4590 + @ds._fetch = @db
  4591 + @ds.server(:foo).paged_each(&@proc)
  4592 + @rows.should == @db
  4593 + @ds.db.sqls.should == ["BEGIN -- foo", "SELECT * FROM test ORDER BY x LIMIT 1000 OFFSET 0 -- foo", "COMMIT -- foo"]
  4594 + end
  4595 +
  4596 + it "should require an order" do
  4597 + lambda{@ds.unordered.paged_each(&@proc)}.should raise_error(Sequel::Error)
  4598 + end
  4599 +
  4600 + it "should handle an existing limit and/or offset" do
  4601 + @ds._fetch = @db.each_slice(3).to_a
  4602 + @ds.limit(5).paged_each(:rows_per_fetch=>3, &@proc)
  4603 + @ds.db.sqls[1...-1].should == ["SELECT * FROM test ORDER BY x LIMIT 3 OFFSET 0", "SELECT * FROM test ORDER BY x LIMIT 2 OFFSET 3"]
  4604 +
  4605 + @ds._fetch = @db.each_slice(3).to_a
  4606 + @ds.limit(5, 2).paged_each(:rows_per_fetch=>3, &@proc)
  4607 + @ds.db.sqls[1...-1].should == ["SELECT * FROM test ORDER BY x LIMIT 3 OFFSET 2", "SELECT * FROM test ORDER BY x LIMIT 2 OFFSET 5"]
  4608 +
  4609 + @ds._fetch = @db.each_slice(3).to_a
  4610 + @ds.limit(nil, 2).paged_each(:rows_per_fetch=>3, &@proc)
  4611 + @ds.db.sqls[1...-1].should == ["SELECT * FROM test ORDER BY x LIMIT 3 OFFSET 2", "SELECT * FROM test ORDER BY x LIMIT 3 OFFSET 5", "SELECT * FROM test ORDER BY x LIMIT 3 OFFSET 8", "SELECT * FROM test ORDER BY x LIMIT 3 OFFSET 11"]
  4612 + end
  4613 +end
  4614 +
22 spec/integration/dataset_test.rb
@@ -98,6 +98,28 @@
98 98 @ds.all.should == [{:id=>1, :number=>10}]
99 99 end
100 100
  101 + specify "should iterate over records as they come in" do
  102 + called = false
  103 + @ds.each{|row| called = true; row.should == {:id=>1, :number=>10}}
  104 + called.should == true
  105 + end
  106 +
  107 + specify "should support iterating over large numbers of records with paged_each" do
  108 + (2..100).each{|i| @ds.insert(:number=>i*10)}
  109 +
  110 + rows = []
  111 + @ds.order(:number).paged_each(:rows_per_fetch=>5){|row| rows << row}
  112 + rows.should == (1..100).map{|i| {:id=>i, :number=>i*10}}
  113 +
  114 + rows = []
  115 + @ds.order(:number).paged_each(:rows_per_fetch=>3){|row| rows << row}
  116 + rows.should == (1..100).map{|i| {:id=>i, :number=>i*10}}
  117 +
  118 + rows = []
  119 + @ds.order(:number).limit(50, 25).paged_each(:rows_per_fetch=>3){|row| rows << row}
  120 + rows.should == (26..75).map{|i| {:id=>i, :number=>i*10}}
  121 + end
  122 +
101 123 specify "should fetch all results correctly" do
102 124 @ds.all.should == [{:id=>1, :number=>10}]
103 125 end

0 comments on commit 5d820c6

Please sign in to comment.
Something went wrong with that request. Please try again.