Skip to content
This repository was archived by the owner on Dec 22, 2020. It is now read-only.
Permalink

Comparing changes

Choose two branches to see what’s changed or to start a new pull request. If you need to, you can also or learn more about diff comparisons.

Open a pull request

Create a new pull request by comparing changes across two branches. If you need to, you can also . Learn more about diff comparisons here.
base repository: stripe-archive/mosql
Failed to load repositories. Confirm that selected base ref is valid, then try again.
Loading
base: master
Choose a base ref
...
head repository: devcolor/mosql
Failed to load repositories. Confirm that selected head ref is valid, then try again.
Loading
compare: master
Choose a head ref
Able to merge. These branches can be automatically merged.

Commits on Mar 22, 2017

  1. Update gitignore

    oss92 committed Mar 22, 2017
    Copy the full SHA
    572509f View commit details

Commits on Mar 23, 2017

  1. Copy the full SHA
    3a3d591 View commit details
  2. Clean up and resolve warnings

    oss92 committed Mar 23, 2017
    Copy the full SHA
    152f8b4 View commit details
  3. Copy the full SHA
    5336ac6 View commit details
  4. Copy the full SHA
    33f6dea View commit details
  5. Add new timestamps test

    oss92 committed Mar 23, 2017
    Copy the full SHA
    fa79978 View commit details
  6. Copy the full SHA
    90d5ec8 View commit details
  7. Add Ruby 2.4.0 to travis

    oss92 committed Mar 23, 2017
    Copy the full SHA
    51b2d69 View commit details
  8. Copy the full SHA
    1989572 View commit details
  9. Adjust variable names

    oss92 committed Mar 23, 2017
    Copy the full SHA
    9d111fc View commit details
  10. Copy the full SHA
    9e00ec2 View commit details
  11. Bump version to 0.5.0

    oss92 committed Mar 23, 2017
    Copy the full SHA
    d1e55d4 View commit details
  12. Add total count

    oss92 committed Mar 23, 2017
    Copy the full SHA
    651d608 View commit details

Commits on Mar 24, 2017

  1. Copy the full SHA
    71eab08 View commit details
  2. Bump version to 0.5.1

    oss92 committed Mar 24, 2017
    Copy the full SHA
    d296c19 View commit details
  3. Merge branch 'release/0.5.1'

    oss92 committed Mar 24, 2017
    Copy the full SHA
    21195ba View commit details
  4. Adds auto_increment_id_pkey

    oss92 committed Mar 24, 2017
    Copy the full SHA
    49da2bf View commit details
  5. Add byebug 9.0

    oss92 committed Mar 24, 2017
    Copy the full SHA
    d9eea9d View commit details
  6. Copy the full SHA
    80e20cd View commit details
  7. Handle BSON::ObjectID error

    oss92 committed Mar 24, 2017
    Copy the full SHA
    8c67787 View commit details
  8. Copy the full SHA
    bdc70db View commit details
  9. Copy the full SHA
    da9f2b7 View commit details
  10. Copy the full SHA
    7336eb4 View commit details
  11. Copy the full SHA
    d70ac8a View commit details

Commits on Mar 28, 2017

  1. How to specify array defaults

    oss92 committed Mar 28, 2017
    Copy the full SHA
    54813ff View commit details
  2. Copy the full SHA
    84c2cc3 View commit details

Commits on Mar 29, 2017

  1. Copy the full SHA
    a363101 View commit details
  2. Copy the full SHA
    1fa73c4 View commit details

Commits on Mar 30, 2017

  1. Add timeout opt

    oss92 committed Mar 30, 2017
    Copy the full SHA
    c8c3af2 View commit details
  2. Add batch-size option to cli

    oss92 committed Mar 30, 2017
    Copy the full SHA
    d3bc1fe View commit details
  3. Bump version to 0.5.3

    oss92 committed Mar 30, 2017
    Copy the full SHA
    0bdecef View commit details
  4. Merge branch 'release/0.5.3'

    oss92 committed Mar 30, 2017
    Copy the full SHA
    f72dbd1 View commit details

Commits on Mar 31, 2017

  1. Copy the full SHA
    a8891d7 View commit details

Commits on Jun 26, 2017

  1. Copy the full SHA
    7fc0abd View commit details
  2. Add sources to list of seen

    oss92 committed Jun 26, 2017
    Copy the full SHA
    11f0e07 View commit details

Commits on Jun 30, 2017

  1. Merge pull request #1 from FindHotel/multiple-keys-and-sources

    FEATURE: Multiple keys and sources
    oss92 authored Jun 30, 2017
    Copy the full SHA
    aee0670 View commit details
  2. Convert seen set to array

    oss92 committed Jun 30, 2017
    Copy the full SHA
    28ab62f View commit details
  3. Remove buggy condition

    oss92 committed Jun 30, 2017
    Copy the full SHA
    564e72d View commit details
Showing with 506 additions and 97 deletions.
  1. +2 −1 .gitignore
  2. +1 −1 .travis.yml
  3. +53 −0 Gemfile.lock
  4. +83 −3 README.md
  5. +1 −0 Rakefile
  6. +17 −7 lib/mosql/cli.rb
  7. +166 −58 lib/mosql/schema.rb
  8. +13 −1 lib/mosql/sql.rb
  9. +14 −6 lib/mosql/streamer.rb
  10. +1 −1 lib/mosql/version.rb
  11. +22 −15 mosql.gemspec
  12. +1 −0 test/_lib.rb
  13. +1 −0 test/functional/_lib.rb
  14. +127 −0 test/functional/schema.rb
  15. +4 −4 test/unit/lib/mosql/schema.rb
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
collections.yml
/.bundle/
Gemfile.lock
*.gem
.byebug_history
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
language: ruby
rvm:
- 1.9.3
- 2.1.2
- 2.4.0
services:
- mongodb
- postgresql
53 changes: 53 additions & 0 deletions Gemfile.lock
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
PATH
remote: .
specs:
mosql (0.5.3)
bson (~> 1.12)
bson_ext (~> 1.12)
json (~> 2.0)
log4r (~> 1.1)
mongo (~> 1.12)
mongoriver (= 0.4)
pg (~> 0.20)
rake (~> 12.0)
sequel (~> 4.44)

GEM
remote: https://rubygems.org/
specs:
bson (1.12.5)
bson_ext (1.12.5)
bson (~> 1.12.5)
byebug (9.0.6)
colored (1.2)
json (2.0.3)
log4r (1.1.10)
metaclass (0.0.4)
minitest (5.10.1)
mocha (1.2.1)
metaclass (~> 0.0.1)
mongo (1.12.5)
bson (= 1.12.5)
mongoriver (0.4.0)
bson_ext
log4r
mongo (>= 1.7)
pg (0.20.0)
rake (12.0.0)
rake-notes (0.2.0)
colored
rake
sequel (4.44.0)

PLATFORMS
ruby

DEPENDENCIES
byebug (~> 9.0)
minitest (~> 5.10)
mocha (~> 1.2)
mosql!
rake-notes (~> 0.2)

BUNDLED WITH
1.14.6
86 changes: 83 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
# MoSQL: a MongoDB → SQL streaming translator

> _**MoSQL is no longer being actively maintained.**_
> _If you are interested in helping maintain this repository, please let us know. We would love for it to find a forever home with someone who can give it the love it needs!_
[![Development](https://travis-ci.org/FindHotel/mosql.svg?branch=master)](https://travis-ci.org/FindHotel/mosql)

At Stripe, we love MongoDB. We love the flexibility it gives us in
changing data schemas as we grow and learn, and we love its
@@ -29,6 +28,11 @@ up-to-date. This lets you run production services against a MongoDB
database, and then run offline analytics or reporting using the full
power of SQL.

## Requirements
* Ruby >= 2.0.0
* MongoDB
* PostgreSQL

## Installation

Install from Rubygems as:
@@ -41,6 +45,30 @@ Or build from source by:

And then install the built gem.

## CLI

```
-h, --help Display this message
-v Increase verbosity
-c [collections.yml], Collection map YAML file
--collections
--sql [sqluri] SQL server to connect to
--mongo [mongouri] Mongo connection string
--schema [schema] PostgreSQL 'schema' to namespace tables
--ignore-delete Ignore delete operations when tailing
--only-db [dbname] Don't scan for mongo dbs, just use the one specified
--timeout [timeout] Specify a timeout for mongo operations
--batch-size [batchsize] Specify a batch size for mongo collections
--tail-from [timestamp] Start tailing from the specified UNIX timestamp
--service [service] Service name to use when storing tailing state
--skip-tail Don't tail the oplog, just do the initial import
--skip-import Don't import data before tailing oplog
--reimport Force a data re-import
--no-drop-tables Don't drop the table if it exists during the initial import
--unsafe Ignore rows that cause errors on insert
--oplog-filter [filter] An additional JSON filter for the oplog query
```

## The Collection Map file

In order to define a SQL schema and import your data, MoSQL needs a
@@ -65,11 +93,42 @@ types. An example collection map might be:
- author_bio:
:source: author.bio
:type: TEXT
- category:
:value: travel
:type: TEXT
- title: TEXT
- first_comment:
:source: $elem.comments.0
:type: TEXT
- domain
:source: domain
:type: INTEGER
:default: 0
:conversions:
blog.findhotel.net: 0
company.findhotel.net: 1
- readers:
:sources:
- AM
- EU
- AF
- AS
- AU
:keys:
- Americas
- Europe
- Africa
- Asia
- Australia
:type: JSONB
- created: DOUBLE PRECISION
:meta:
:table: blog_posts
:timestamps: true
# :auto_increment_id_pkey: true
:extra_props: true
:filter:
category: travel

Said another way, the collection map is a YAML file containing a hash
mapping
@@ -83,7 +142,12 @@ Where a `<Collection Definition>` is a hash with `:columns` and
describing that column. This hash may contain the following fields:

* `:source`: The name of the attribute inside of MongoDB.
* `:value`: Assign a static value to column (overrides :source if present).
* `:type`: (Mandatory) The SQL type.
* `:default`: Set default value if it was `null`.
* `:conversions`: Can be used to convert values to another; for example
* `:sources:` and `:keys:` are used together to convert multiple mongo fields to one JSON, JSONB or HSTORE as described above.
when changing a string field to an integer enumeration.


Use of the `:source` attribute allows for renaming attributes, and
@@ -105,9 +169,25 @@ attribute.

`:meta` contains metadata about this collection/table. It is
required to include at least `:table`, naming the SQL table this
collection will be mapped to. `extra_props` determines the handling of
collection will be mapped to.

`:timestamps` adds the current timestamps to inserted data.

`:auto_increment_id_pkey:` add an auto increment id public key but COMPROMISES
uniqueness of data.

`:extra_props` determines the handling of
unknown fields in MongoDB objects -- more about that later.

`:filter` is a property set to get a subset of the
data in a MongoDB style query. For example, to move all blog posts
satisfying this query `db.blog_posts.find({ category: 'travel' })` then
add the filter as follows.

:meta:
:filter:
category: travel

By default, `mosql` looks for a collection map in a file named
`collections.yml` in your current working directory, but you can
specify a different one with `-c` or `--collections`.
1 change: 1 addition & 0 deletions Rakefile
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
require 'rake/testtask'
require 'rake/notes/rake_task'

task :default => [:test]
task :build
24 changes: 17 additions & 7 deletions lib/mosql/cli.rb
Original file line number Diff line number Diff line change
@@ -7,8 +7,6 @@ module MoSQL
class CLI
include MoSQL::Logging

BATCH = 1000

attr_reader :args, :options, :tailer

def self.run(args)
@@ -34,11 +32,14 @@ def setup_signal_handlers

def parse_args
@options = {
:collections => 'collections.yml',
:sql => 'postgres:///',
:mongo => 'mongodb://localhost',
:verbose => 0
collections: 'collections.yml',
sql: 'postgres:///',
mongo: 'mongodb://localhost',
verbose: 0,
op_timeout: 60,
batch_size: Streamer::DEFAULT_BATCH_SIZE
}

optparse = OptionParser.new do |opts|
opts.banner = "Usage: #{$0} [options] "

@@ -75,6 +76,15 @@ def parse_args
@options[:dbname] = dbname
end

opts.on("--timeout [timeout]", "Specify a timeout for mongo operations") do |timeout|
@options[:op_timeout] = timeout.to_i
end

opts.on("--batch-size [batchsize]", "Specify a batch size for mongo collections") do |batch_size|
abort('zero/invalid batch size') if batch_size.to_i.zero?
@options[:batch_size] = batch_size.to_i
end

opts.on("--tail-from [timestamp]", "Start tailing from the specified UNIX timestamp") do |ts|
@options[:tail_from] = ts
end
@@ -121,7 +131,7 @@ def parse_args
end

def connect_mongo
@mongo = Mongo::MongoClient.from_uri(options[:mongo])
@mongo = Mongo::MongoClient.from_uri(options[:mongo], op_timeout: options[:op_timeout])
config = @mongo['admin'].command(:ismaster => 1)
if !config['setName'] && !options[:skip_tail]
log.warn("`#{options[:mongo]}' is not a replset.")
224 changes: 166 additions & 58 deletions lib/mosql/schema.rb
Original file line number Diff line number Diff line change
@@ -4,21 +4,69 @@ class SchemaError < StandardError; end;
class Schema
include MoSQL::Logging

def initialize(map)
@map = {}
map.each do |dbname, db|
@map[dbname] = { :meta => parse_meta(db[:meta]) }
db.each do |cname, spec|
next unless cname.is_a?(String)
begin
@map[dbname][cname] = parse_spec("#{dbname}.#{cname}", spec)
rescue KeyError => e
raise SchemaError.new("In spec for #{dbname}.#{cname}: #{e}")
end
end
end

# Lurky way to force Sequel force all timestamps to use UTC.
Sequel.default_timezone = :utc
end

def to_array(lst)
lst.map do |ent|
col = nil
if ent.is_a?(Hash) && ent[:source].is_a?(String) && ent[:type].is_a?(String)
raise SchemaError.new("Invalid ordered hash entry #{ent.inspect}") unless ent.is_a?(Hash)
if ent[:sources].is_a?(Array) && ent[:keys].is_a?(Array) && ent[:type].is_a?(String)
col = {
:sources => ent.fetch(:sources),
:keys => ent.fetch(:keys),
:value => ent[:value],
:type => ent.fetch(:type),
:name => (ent.keys - [:source, :type]).first,
:default => ent[:default],
:conversions => ent[:conversions],
:eval => ent[:eval],
}
elsif ent[:source].is_a?(String) && ent[:type].is_a?(String)
# new configuration format
col = {
:source => ent.fetch(:source),
:type => ent.fetch(:type),
:name => (ent.keys - [:source, :type]).first,
:source => ent.fetch(:source),
:value => ent[:value],
:type => ent.fetch(:type),
:name => (ent.keys - [:source, :type]).first,
:default => ent[:default],
:conversions => ent[:conversions],
:eval => ent[:eval],
}
elsif ent.is_a?(Hash) && ent.keys.length == 1 && ent.values.first.is_a?(String)
elsif ent.keys.length == 1 && ent.values.first.is_a?(String)
col = {
:source => ent.first.first,
:name => ent.first.first,
:type => ent.first.last
:source => ent.first.first,
:name => ent.first.first,
:type => ent.first.last,
:default => ent[:default],
:conversions => ent[:conversions],
:eval => ent[:eval],
}
elsif !ent[:value].nil? && ent[:type].is_a?(String)
# hardcoded value format
col = {
:source => nil,
:value => ent.fetch(:value),
:type => ent.fetch(:type),
:name => (ent.keys - [:source, :type]).first,
:default => ent[:default],
:conversions => ent[:conversions],
:eval => ent[:eval],
}
else
raise SchemaError.new("Invalid ordered hash entry #{ent.inspect}")
@@ -35,10 +83,12 @@ def to_array(lst)
def check_columns!(ns, spec)
seen = Set.new
spec[:columns].each do |col|
if seen.include?(col[:source])
if seen.include?(col[:source]) && col[:value].nil? && col[:sources].nil?
raise SchemaError.new("Duplicate source #{col[:source]} in column definition #{col[:name]} for #{ns}.")
end
seen.add(col[:source])

seen.add(col[:source]) if col[:source]
col[:sources].each { |source| seen.add(source) } if col[:sources]
end
end

@@ -57,33 +107,15 @@ def parse_meta(meta)
meta
end

def initialize(map)
@map = {}
map.each do |dbname, db|
@map[dbname] = { :meta => parse_meta(db[:meta]) }
db.each do |cname, spec|
next unless cname.is_a?(String)
begin
@map[dbname][cname] = parse_spec("#{dbname}.#{cname}", spec)
rescue KeyError => e
raise SchemaError.new("In spec for #{dbname}.#{cname}: #{e}")
end
end
end

# Lurky way to force Sequel force all timestamps to use UTC.
Sequel.default_timezone = :utc
end

def create_schema(db, clobber=false)
def create_schema(db, drop_table=false)
@map.values.each do |dbspec|
dbspec.each do |n, collection|
next unless n.is_a?(String)
meta = collection[:meta]
composite_key = meta[:composite_key]
keys = []
log.info("Creating table '#{meta[:table]}'...")
db.send(clobber ? :create_table! : :create_table?, meta[:table]) do
log.info("Dropping and creating table '#{meta[:table]}'...") if drop_table
db.send(drop_table ? :create_table! : :create_table?, meta[:table]) do
collection[:columns].each do |col|
opts = {}
if col[:source] == '$timestamp'
@@ -93,12 +125,22 @@ def create_schema(db, clobber=false)

if composite_key and composite_key.include?(col[:name])
keys << col[:name].to_sym
elsif not composite_key and col[:source].to_sym == :_id
elsif not composite_key and col[:source] and col[:source].to_sym == :_id
keys << col[:name].to_sym
end
end

primary_key keys
if meta[:auto_increment_id_pkey]
primary_key :id
else
primary_key keys
end

if meta[:timestamps]
column 'created_at', 'TIMESTAMP'
column 'updated_at', 'TIMESTAMP'
end

if meta[:extra_props]
type =
case meta[:extra_props]
@@ -143,25 +185,6 @@ def find_ns!(ns)
schema
end

def fetch_and_delete_dotted(obj, dotted)
pieces = dotted.split(".")
breadcrumbs = []
while pieces.length > 1
key = pieces.shift
breadcrumbs << [obj, key]
obj = obj[key]
return nil unless obj.is_a?(Hash)
end

val = obj.delete(pieces.first)

breadcrumbs.reverse.each do |obj, key|
obj.delete(key) if obj[key].empty?
end

val
end

def fetch_exists(obj, dotted)
pieces = dotted.split(".")
while pieces.length > 1
@@ -172,6 +195,19 @@ def fetch_exists(obj, dotted)
obj.has_key?(pieces.first)
end

def fetch_elem(obj, field_name, array_index)
field_value = obj[field_name]
return nil unless field_value
element = field_value[array_index.to_i]
if element.is_a?(Hash)
JSON.dump(Hash[element.map { |k, primitive_value|
[k, transform_primitive(primitive_value)]
} ])
else
element
end
end

def fetch_special_source(obj, source, original)
case source
when "$timestamp"
@@ -180,11 +216,44 @@ def fetch_special_source(obj, source, original)
# We need to look in the cloned original object, not in the version that
# has had some fields deleted.
fetch_exists(original, $1)
when /^\$elem.([a-zA-Z_]+).(\d+)/
# To fetch one element from array :source: $elem.0
fetch_elem(original, $1, $2)
else
raise SchemaError.new("Unknown source: #{source}")
end
end

def fetch_and_delete_dotted(obj, dotted)
pieces = dotted.split(".")
breadcrumbs = []
while pieces.length > 1
key = pieces.shift
breadcrumbs << [obj, key]
obj = obj[key]
return nil unless obj.is_a?(Hash)
end

val = obj.delete(pieces.first)

breadcrumbs.reverse.each do |value, k|
value.delete(k) if value[k].empty?
end

val
end

def fetch_multiple_and_convert_to_hash(obj, sources, keys)
val = {}
sources.each_with_index do |source, idx|
val[keys[idx]] = obj[source] if obj[source]
end

JSON.dump(Hash[val.map { |k, primitive_value|
[k, transform_primitive(primitive_value)]
} ])
end

def transform_primitive(v, type=nil)
case v
when BSON::ObjectId, Symbol
@@ -213,17 +282,28 @@ def transform(ns, obj, schema=nil)

row = []
schema[:columns].each do |col|

source = col[:source]
sources = col[:sources]
keys = col[:keys]
value = col[:value]
type = col[:type]

if source.start_with?("$")
conversions = col[:conversions]
default_v = col[:default]

if value
v = value
elsif sources && keys
v = fetch_multiple_and_convert_to_hash(obj, sources, keys)
elsif source.start_with?("$")
v = fetch_special_source(obj, source, original)
else
v = fetch_and_delete_dotted(obj, source)
v = eval(v) if col[:eval] && v
case v
when Hash
v = JSON.dump(Hash[v.map { |k,v| [k, transform_primitive(v)] }])
v = JSON.dump(Hash[v.map { |k, primitive_value|
[k, transform_primitive(primitive_value)]
} ])
when Array
v = v.map { |it| transform_primitive(it) }
if col[:array_type]
@@ -235,7 +315,26 @@ def transform(ns, obj, schema=nil)
v = transform_primitive(v, type)
end
end
row << v

if conversions
previous_v = v
v = conversions[v]
if v.nil?
v = previous_v
end
end

if !default_v.nil? && v.nil?
row << default_v
else
row << v
end
end

if schema[:meta][:timestamps]
utc_time = Time.now.getutc
row << utc_time
row << utc_time
end

if schema[:meta][:extra_props]
@@ -277,6 +376,10 @@ def all_columns(schema, copy=false)
schema[:columns].each do |col|
cols << col[:name] unless copy && !copy_column?(col)
end
if schema[:meta][:timestamps]
cols << "created_at"
cols << "updated_at"
end
if schema[:meta][:extra_props]
cols << "_extra_props"
end
@@ -346,7 +449,12 @@ def primary_sql_key_for_ns(ns)
if ns[:meta][:composite_key]
keys = ns[:meta][:composite_key]
else
keys << ns[:columns].find {|c| c[:source] == '_id'}[:name]
key_fetcher = ns[:columns].find {|c| c[:source] == '_id'}
if key_fetcher
keys << key_fetcher[:name]
else
keys = [:id]
end
end

return keys
14 changes: 13 additions & 1 deletion lib/mosql/sql.rb
Original file line number Diff line number Diff line change
@@ -69,6 +69,19 @@ def upsert!(table, table_primary_keys, item)
end
end

def update_primary_key_sequence(table, table_primary_keys)
if table_primary_keys == ["id"]
next_id = table.order(:id).last[:id]
return unless next_id.is_a?(0.class)
next_id += 1
table_name = table.first_source_table
# TODO check if sequence exists altogether
sequence_altering_query = "ALTER SEQUENCE #{table_name}_id_seq RESTART WITH #{next_id}"
log.info("Updating primary key sequence for #{table_name} with #{next_id}")
table.db.run(sequence_altering_query)
end
end

def self.duplicate_key_error?(e)
# c.f. http://www.postgresql.org/docs/9.2/static/errcodes-appendix.html
# for the list of error codes.
@@ -83,4 +96,3 @@ def self.duplicate_column_error?(e)
end
end
end

20 changes: 14 additions & 6 deletions lib/mosql/streamer.rb
Original file line number Diff line number Diff line change
@@ -2,7 +2,7 @@ module MoSQL
class Streamer
include MoSQL::Logging

BATCH = 1000
DEFAULT_BATCH_SIZE = 1000

attr_reader :options, :tailer

@@ -61,6 +61,8 @@ def bulk_upsert(table, ns, items)
@sql.upsert!(table, @schema.primary_sql_key_for_ns(ns), h)
end
end
ensure
@sql.update_primary_key_sequence(table, @schema.primary_sql_key_for_ns(ns))
end
end

@@ -131,7 +133,9 @@ def did_truncate; @did_truncate ||= {}; end

def import_collection(ns, collection, filter)
log.info("Importing for #{ns}...")
log.info("Filter applied #{filter} on #{collection.name}") if filter
count = 0
batch_size = options[:batch_size] || DEFAULT_BATCH_SIZE
batch = []
table = @sql.table_for_ns(ns)
unless options[:no_drop_tables] || did_truncate[table.first_source]
@@ -141,28 +145,32 @@ def import_collection(ns, collection, filter)

start = Time.now
sql_time = 0
collection.find(filter, :batch_size => BATCH) do |cursor|
total_count = collection.count(query: filter)
collection.find(filter, batch_size: batch_size) do |cursor|
with_retries do
cursor.each do |obj|
batch << @schema.transform(ns, obj)
count += 1

if batch.length >= BATCH
if batch.length >= batch_size
sql_time += track_time do
bulk_upsert(table, ns, batch)
end
elapsed = Time.now - start
log.info("Imported #{count} rows (#{elapsed}s, #{sql_time}s SQL)...")
percent_done = ((count.to_f / total_count.to_f) * 100).round(1)
log.info("[#{collection.name} #{percent_done}%] Imported #{count}/#{total_count} rows (#{elapsed.round(1)}s, #{sql_time.round(1)}s SQL time)")
batch.clear
exit(0) if @done
end
end
end
end


unless batch.empty?
bulk_upsert(table, ns, batch)
end
log.info("[#{collection.name} (#{filter}) DONE] Imported #{total_count} rows")
end

def optail
@@ -201,7 +209,7 @@ def handle_op(op)
# The oplog format of applyOps commands can be viewed here:
# https://groups.google.com/forum/#!topic/mongodb-user/dTf5VEJJWvY
if op['op'] == 'c' && (ops = op['o']['applyOps'])
ops.each { |op| handle_op(op) }
ops.each { |operation| handle_op(operation) }
return
end

@@ -211,7 +219,7 @@ def handle_op(op)
end

ns = op['ns']
dbname, collection_name = ns.split(".", 2)
_, collection_name = ns.split(".", 2)

case op['op']
when 'n'
2 changes: 1 addition & 1 deletion lib/mosql/version.rb
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
module MoSQL
VERSION = "0.4.3"
VERSION = "0.5.3"
end
37 changes: 22 additions & 15 deletions mosql.gemspec
Original file line number Diff line number Diff line change
@@ -3,11 +3,16 @@ $:.unshift(File.expand_path("lib", File.dirname(__FILE__)))
require 'mosql/version'

Gem::Specification.new do |gem|
gem.authors = ["Nelson Elhage"]
gem.email = ["nelhage@stripe.com"]
gem.description = %q{A library for streaming MongoDB to SQL}
gem.summary = %q{MongoDB -> SQL streaming bridge}
gem.homepage = "https://github.com/stripe/mosql"
gem.authors = ["Nelson Elhage", "Mohamed Osama"]
gem.email = ["nelhage@stripe.com", "oss@findhotel.net"]
gem.description = %q{A library for streaming MongoDB to PostgreSQL}
gem.summary = %q{MongoDB -> PostgreSQL streaming bridge}
# Forked from: https://github.com/stripe/mosql
gem.homepage = "https://github.com/FindHotel/mosql"

gem.required_ruby_version = ">= 2.0.0"

gem.license = "MIT"

gem.files = `git ls-files`.split($\)
gem.executables = gem.files.grep(%r{^bin/}).map{ |f| File.basename(f) }
@@ -16,18 +21,20 @@ Gem::Specification.new do |gem|
gem.require_paths = ["lib"]
gem.version = MoSQL::VERSION

gem.add_runtime_dependency "sequel"
gem.add_runtime_dependency "pg"
gem.add_runtime_dependency "rake"
gem.add_runtime_dependency "log4r"
gem.add_runtime_dependency "json"
gem.add_runtime_dependency "sequel", "~> 4.44"
gem.add_runtime_dependency "pg", "~> 0.20"
gem.add_runtime_dependency "rake", "~> 12.0"
gem.add_runtime_dependency "log4r", "~> 1.1"
gem.add_runtime_dependency "json", "~> 2.0"

gem.add_runtime_dependency "mongoriver", "0.4"

gem.add_runtime_dependency "mongo", "~> 1.10"
gem.add_runtime_dependency "bson", "~> 1.10"
gem.add_runtime_dependency "bson_ext", "~> 1.10"
gem.add_runtime_dependency "mongo", "~> 1.12"
gem.add_runtime_dependency "bson", "~> 1.12"
gem.add_runtime_dependency "bson_ext", "~> 1.12"

gem.add_development_dependency "minitest"
gem.add_development_dependency "mocha"
gem.add_development_dependency "minitest", "~> 5.10"
gem.add_development_dependency "mocha", "~> 1.2"
gem.add_development_dependency "rake-notes", "~> 0.2"
gem.add_development_dependency "byebug", "~> 9.0"
end
1 change: 1 addition & 0 deletions test/_lib.rb
Original file line number Diff line number Diff line change
@@ -4,6 +4,7 @@
require 'minitest/autorun'
require 'minitest/spec'
require 'mocha'
require 'byebug'

$:.unshift(File.expand_path(File.join(File.dirname(__FILE__), '../lib')))

1 change: 1 addition & 0 deletions test/functional/_lib.rb
Original file line number Diff line number Diff line change
@@ -58,6 +58,7 @@ def mongo_db
end

def setup
$VERBOSE = nil
Sequel.default_timezone = :utc
@sequel = connect_sql
@mongo = connect_mongo
127 changes: 127 additions & 0 deletions test/functional/schema.rb
Original file line number Diff line number Diff line change
@@ -29,6 +29,48 @@ class MoSQL::Test::Functional::SchemaTest < MoSQL::Test::Functional
- var_b:
:source: vars.b
:type: TEXT
with_timestamps:
:meta:
:table: sqltable4
:timestamps: true
:extra_props: true
:columns:
- _id: TEXT
- var_a:
:source: vars.a
:type: TEXT
- var_b:
:source: vars.b
:type: TEXT
with_hardcoded_value:
:meta:
:table: sqltable5
:columns:
- _id: TEXT
- var_a:
:source: vars.a
:value: harcoded_value_a
:type: TEXT
- var_b:
:value: 'harcoded_value::b'
:type: TEXT
with_elem_source:
:meta:
:table: sqltable6
:columns:
- _id: TEXT
- var_0:
:source: $elem.vars.0
:type: TEXT
with_default:
:meta:
:table: sqltable7
:columns:
- _id: TEXT
- var_0:
:source: $elem.vars.0
:default: "{ 1, 2 }"
:type: INTEGER ARRAY
EOF

before do
@@ -37,12 +79,20 @@ class MoSQL::Test::Functional::SchemaTest < MoSQL::Test::Functional
@sequel.drop_table?(:sqltable)
@sequel.drop_table?(:sqltable2)
@sequel.drop_table?(:sqltable3)
@sequel.drop_table?(:sqltable4)
@sequel.drop_table?(:sqltable5)
@sequel.drop_table?(:sqltable6)
@sequel.drop_table?(:sqltable7)
@map.create_schema(@sequel)
end

def table; @sequel[:sqltable]; end
def table2; @sequel[:sqltable2]; end
def table3; @sequel[:sqltable3]; end
def table4; @sequel[:sqltable4]; end
def table5; @sequel[:sqltable5]; end
def table6; @sequel[:sqltable6]; end
def table7; @sequel[:sqltable7]; end

it 'Creates the tables with the right columns' do
assert_equal(Set.new([:_id, :var, :arry]),
@@ -97,6 +147,83 @@ def table3; @sequel[:sqltable3]; end
assert_equal(o['_id'].to_s, table.select.first[:_id])
end

it 'Can ASSIGN timestamps' do
objects = [
{'_id' => "a", 'vars' => {'a' => 1, 'b' => 2}},
{'_id' => "b", 'vars' => {}},
{'_id' => "c", 'vars' => {'a' => 2, 'c' => 6}},
]
@map.copy_data(@sequel, 'db.with_timestamps', objects.map { |o| @map.transform('db.with_timestamps', o) } )
assert_equal(3, table4.count)
o = table4.first(:_id => 'a')
assert_equal("1", o[:var_a])
assert_equal("2", o[:var_b])
assert_equal(Time, o[:created_at].class)
assert_equal(Time, o[:updated_at].class)

o = table4.first(:_id => 'b')
assert_equal({}, JSON.parse(o[:_extra_props]))
assert_equal(Time, o[:created_at].class)
assert_equal(Time, o[:updated_at].class)

o = table4.first(:_id => 'c')
assert_equal({'vars' => { 'c' => 6} }, JSON.parse(o[:_extra_props]))
assert_equal(Time, o[:created_at].class)
assert_equal(Time, o[:updated_at].class)
end


it 'Can ASSIGN hardcoded values' do
objects = [
{'_id' => "a", 'vars' => {'a' => 1, 'b' => 2}},
{'_id' => "b", 'vars' => {}},
{'_id' => "c", 'vars' => {'a' => 2, 'c' => 6}},
]
@map.copy_data(@sequel, 'db.with_hardcoded_value', objects.map { |o| @map.transform('db.with_hardcoded_value', o) } )
assert_equal(3, table5.count)
o = table5.first(:_id => 'a')
assert_equal('harcoded_value_a', o[:var_a])
assert_equal('harcoded_value::b', o[:var_b])

o = table5.first(:_id => 'b')
assert_equal('harcoded_value_a', o[:var_a])
assert_equal('harcoded_value::b', o[:var_b])

o = table5.first(:_id => 'c')
assert_equal('harcoded_value_a', o[:var_a])
assert_equal('harcoded_value::b', o[:var_b])
end

it 'Can FETCH elements by array index' do
objects = [
{'_id' => "a", 'vars' => ["hello", "world"]},
{'_id' => "b", 'vars' => ["life", "is", "good"]}
]

@map.copy_data(@sequel, 'db.with_elem_source', objects.map { |o| @map.transform('db.with_elem_source', o) } )
assert_equal(2, table6.count)
o = table6.first(:_id => 'a')
assert_equal('hello', o[:var_0])

o = table6.first(:_id => 'b')
assert_equal('life', o[:var_0])
end

it 'Can FETCH elements by array index' do
objects = [
{'_id' => "a", 'vars' => ["{1}"]},
{'_id' => "b", 'vars' => []}
]

@map.copy_data(@sequel, 'db.with_default', objects.map { |o| @map.transform('db.with_default', o) } )
assert_equal(2, table7.count)
o = table7.first(:_id => 'a')
assert_equal([1], o[:var_0])

o = table7.first(:_id => 'b')
assert_equal([1, 2], o[:var_0])
end

it 'Can transform BSON::ObjectIDs' do
o = {'_id' => BSON::ObjectId.new, 'var' => 0}
row = @map.transform('db.collection', o)
8 changes: 4 additions & 4 deletions test/unit/lib/mosql/schema.rb
Original file line number Diff line number Diff line change
@@ -206,14 +206,14 @@ class MoSQL::Test::SchemaTest < MoSQL::Test
oid = [ BSON::ObjectId.new, BSON::ObjectId.new]
o = {'_id' => "row 1", "str" => [ BSON::DBRef.new('db.otherns', oid[0]), BSON::DBRef.new('db.otherns', oid[1]) ] }
out = @map.transform('db.collection', o)
assert_equal(["row 1", nil, JSON.dump(oid.map! {|o| o.to_s}), nil ], out)
assert_equal(["row 1", nil, JSON.dump(oid.map! {|val| val.to_s}), nil ], out)
end

it 'changes NaN to null in extra_props' do
out = @map.transform('db.with_extra_props', {'_id' => 7, 'nancy' => 0.0/0.0})
extra = JSON.parse(out[1])
assert(extra.key?('nancy'))
assert_equal(nil, extra['nancy'])
assert_nil(extra['nancy'])
end

it 'base64-encodes BSON::Binary blobs in extra_props' do
@@ -321,7 +321,7 @@ def check(orig, path, expect, result)
end

it 'caches negative lookups' do
assert_equal(nil, @map.find_ns("nosuchdb.foo"))
assert_nil(@map.find_ns("nosuchdb.foo"))
assert(@map.instance_variable_get(:@map).key?("nosuchdb"))
end

@@ -406,7 +406,7 @@ def check(orig, path, expect, result)

it 'rejects unknown specials' do
assert_raises(MoSQL::SchemaError) do
r = @othermap.transform('db.invalid', { '_id' => 'a' })
@othermap.transform('db.invalid', { '_id' => 'a' })
end
end
end