Permalink
Browse files

Initial open-source release

  • Loading branch information...
0 parents commit 59cfcd317e4e71be75c37a01baffe42cffff1934 @nelhage nelhage committed Jan 14, 2013
@@ -0,0 +1,17 @@
+*.gem
+*.rbc
+.bundle
+.config
+.yardoc
+Gemfile.lock
+InstalledFiles
+_yardoc
+coverage
+doc/
+lib/bundler/man
+pkg
+rdoc
+spec/reports
+test/tmp
+test/version_tmp
+tmp
@@ -0,0 +1,3 @@
+source 'https://rubygems.org'
+
+gemspec
22 LICENSE
@@ -0,0 +1,22 @@
+Copyright (c) 2012 Greg Brockman
+
+MIT License
+
+Permission is hereby granted, free of charge, to any person obtaining
+a copy of this software and associated documentation files (the
+"Software"), to deal in the Software without restriction, including
+without limitation the rights to use, copy, modify, merge, publish,
+distribute, sublicense, and/or sell copies of the Software, and to
+permit persons to whom the Software is furnished to do so, subject to
+the following conditions:
+
+The above copyright notice and this permission notice shall be
+included in all copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
+NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
+LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
+OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
+WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
@@ -0,0 +1,29 @@
+# Mongoriver
+
+TODO: Write a gem description
+
+## Installation
+
+Add this line to your application's Gemfile:
+
+ gem 'mongoriver'
+
+And then execute:
+
+ $ bundle
+
+Or install it yourself as:
+
+ $ gem install mongoriver
+
+## Usage
+
+TODO: Write usage instructions here
+
+## Contributing
+
+1. Fork it
+2. Create your feature branch (`git checkout -b my-new-feature`)
+3. Commit your changes (`git commit -am 'Added some feature'`)
+4. Push to the branch (`git push origin my-new-feature`)
+5. Create new Pull Request
@@ -0,0 +1,2 @@
+#!/usr/bin/env rake
+require "bundler/gem_tasks"
@@ -0,0 +1,250 @@
+#!/usr/bin/env ruby
+require 'logger'
+require 'optparse'
+
+require 'rubygems'
+require 'bundler/setup'
+require 'mongoriver'
+
+module Mongoriver
+ class Mongocp < Streambed
+ include Mongoriver::Logging
+
+ def initialize(upstreams, type, downstream, prefix)
+ super(upstreams, type)
+ @downstream = downstream
+ @prefix = prefix
+ connect_downstream
+ end
+
+ def hook_optime
+ if optime = optime_collection.find_one(:_id => @prefix)
+ optime['ts']
+ else
+ nil
+ end
+ end
+
+ def hook_update_optime(ts, mandatory)
+ optime_collection.update({:_id => @prefix}, {'$set' => {:ts => ts}}, :upsert => true) if mandatory || rand(20) == 0
+ end
+
+ def hook_initial_sync_index(db_name, collection_name, index_key, options)
+ collection = downstream_collection(db_name, collection_name)
+ index_hash = BSON::OrderedHash.new
+ index_key.each {|k,v| index_hash[k] = v}
+ collection.send(:generate_indexes, index_hash, nil, options)
+ end
+
+ def hook_initial_sync_record_batch(db_name, collection_name, records)
+ collection = downstream_collection(db_name, collection_name)
+ bulk_insert(collection, records)
+ end
+
+ # TODO: should probably do the same key checking nonsense as the above
+ def hook_stream_insert(db_name, collection_name, object)
+ collection = downstream_collection(db_name, collection_name)
+ wrap_errors(collection, object['_id']) do
+ # Only needed if safe mode is set in the driver. Note that the
+ # argument here for oplog idempotency in the case of unique
+ # keys is kind of interesting. I believe I can prove
+ # idempotency as long as Mongo has no insert order-dependent
+ # unique indexes (which I believe is true) and that you do all
+ # your object updates as upserts.
+ allow_dupkeys do
+ collection.insert(object)
+ end
+ end
+ end
+
+ def hook_stream_update(db_name, collection_name, selector, update)
+ collection = downstream_collection(db_name, collection_name)
+ wrap_errors(collection, selector['_id']) do
+ collection.update(selector, update, :upsert => true)
+ end
+ end
+
+ def hook_stream_remove(db_name, collection_name, object)
+ collection = downstream_collection(db_name, collection_name)
+ wrap_errors(collection, object['_id']) do
+ collection.remove(object)
+ end
+ end
+
+ def hook_stream_create_collection(db_name, create)
+ db = downstream_db(db_name)
+ wrap_errors(db, create) do
+ db.create_collection(create)
+ end
+ end
+
+ # "Error renaming collection: #<BSON::OrderedHash:0x83869e34 {\"errmsg\"=>\"exception: source namespace does not exist\", \"code\"=>10026, \"ok\"=>0.0}>"
+ #
+ # Possibly need the same thing if the destination already exists
+ def hook_stream_rename_collection(db_name, source, target)
+ db = downstream_db(db_name)
+ wrap_errors(db, "#{source} -> #{target}") do
+ begin
+ db.rename_collection(source, target)
+ rescue Mongo::MongoDBError => e
+ if e.message =~ /Error renaming collection: .*exception: source namespace does not exist"/
+ log.warn("Ignoring rename of non-existent collection #{source} -> #{target}: #{e} (expected when replaying part of the oplog)")
+ elsif e.message =~ /Error renaming collection: .*exception: target namespace exists"/
+ log.warn("Ignoring rename of #{source} to existing collection #{target}: #{e} (expected when replaying part of the oplog)")
+ else
+ raise
+ end
+ end
+ end
+ end
+
+ def hook_stream_drop_index(db_name, collection_name, index_name)
+ collection = downstream_collection(db_name, collection_name)
+ wrap_errors(collection, index_name) do
+ begin
+ collection.drop_index(index_name)
+ rescue Mongo::MongoDBError => e
+ raise
+ if e.message =~ /index not found/
+ log.warn("Ignoring drop of non-existent index #{index_name.inspect}: #{e} (expected when replaying part of the oplog)")
+ else
+ raise
+ end
+ end
+ end
+ end
+
+ def hook_stream_drop_collection(db_name, dropped)
+ db = downstream_db(db_name)
+ wrap_errors(db, dropped) do
+ db.drop_collection(dropped)
+ end
+ end
+
+ def hook_stream_drop_database(db_name)
+ db = downstream_db(db_name)
+ wrap_errors(db, db_name) do
+ db.command(:dropDatabase => 1)
+ end
+ end
+
+ private
+
+ def allow_dupkeys(&blk)
+ begin
+ blk.call
+ rescue Mongo::OperationFailure => e
+ if e.error_code == 11000
+ log.warn("Ignoring unique index violation: #{e} (expected when replaying part of the oplog)")
+ else
+ raise
+ end
+ end
+ end
+
+ def bulk_insert(collection, docs)
+ begin
+ # Use the internal insert_documents method because it lets us
+ # disable key verification
+ collection.send(:insert_documents, docs, collection.name, false)
+ rescue Mongo::MongoRubyError => e
+ log.error("#{ns}: Caught error on batch insert", e)
+ docs.each do |doc|
+ wrap_errors(collection, doc['_id']) do
+ collection.send(:insert_documents, [doc], collection.name, false)
+ end
+ end
+ end
+ end
+
+ def wrap_errors(collection_or_db, object, &blk)
+ begin
+ blk.call
+ rescue Mongo::MongoRubyError => e
+ if collecton_or_db.kind_of?(Mongo::Collection)
+ ns = "#{collection_or_db.db.name}.#{collection_or_db.name}"
+ else
+ ns = collection_or_db.db.name
+ end
+ log.error("#{ns}: Unknown error for #{object}", e)
+ end
+ end
+
+ def downstream_db(db_name)
+ prefixed = "#{@prefix}_#{db_name}"
+ @downstream_conn.db(prefixed)
+ end
+
+ def downstream_collection(db_name, collection_name)
+ downstream_db(db_name).collection(collection_name)
+ end
+
+ def optime_collection
+ @optime_collection ||= @downstream_conn.db('_mongocp').collection('optime')
+ end
+
+ def connect_downstream
+ host, port = @tailer.parse_host_spec(@downstream)
+ @downstream_conn = Mongo::Connection.new(host, port, :safe => true)
+ end
+ end
+end
+
+def main
+ options = {:host => nil, :port => nil, :type => :slave, :verbose => 0}
+ optparse = OptionParser.new do |opts|
+ opts.banner = "Usage: #{$0} [options]"
+
+ opts.on('-v', '--verbosity', 'Verbosity of debugging output') do
+ options[:verbose] += 1
+ end
+
+ opts.on('-h', '--help', 'Display this message') do
+ puts opts
+ exit(1)
+ end
+
+ opts.on('--help', 'Display this message') do
+ puts opts
+ exit(1)
+ end
+
+ opts.on('-h HOST', '--host', 'Upstream host to connect to') do |host|
+ options[:host] = host
+ end
+
+ opts.on('-p PORT', '--port', 'Upstream host to connect to') do |port|
+ options[:port] = Integer(port)
+ end
+
+ opts.on('-a', '--all', 'Allow connections even directly to a primary') do
+ options[:type] = :direct
+ end
+ end
+ optparse.parse!
+
+ if ARGV.length != 0
+ puts optparse
+ return 1
+ end
+
+ log = Log4r::Logger.new('Stripe')
+ log.outputters = Log4r::StdoutOutputter.new(STDERR)
+ if options[:verbose] >= 1
+ log.level = Log4r::DEBUG
+ else
+ log.level = Log4r::INFO
+ end
+ runner = Mongoriver::Mongocp.new(["#{options[:host]}:#{options[:port]}"], options[:type], 'localhost:5001', 'test')
+ runner.run
+ return 0
+end
+
+if $0 == __FILE__
+ ret = main
+ begin
+ exit(ret)
+ rescue TypeError
+ exit(0)
+ end
+end
Oops, something went wrong.

0 comments on commit 59cfcd3

Please sign in to comment.