diff --git a/examples/schemaless-mysql/mysql_interceptor.rb b/examples/schemaless-mysql/mysql_interceptor.rb index 184470d..d7ea1e5 100644 --- a/examples/schemaless-mysql/mysql_interceptor.rb +++ b/examples/schemaless-mysql/mysql_interceptor.rb @@ -1,11 +1,13 @@ require "lib/em-proxy" require "em-mysql" +require "stringio" require "fiber" Proxy.start(:host => "0.0.0.0", :port => 3307) do |conn| conn.server :mysql, :host => "127.0.0.1", :port => 3306, :relay_server => true QUERY_CMD = 3 + MAX_PACKET_LENGTH = 2**24-1 # open a direct connection to MySQL for the schema-free coordination logic @mysql = EventMachine::MySQL.new(:host => 'localhost', :database => 'noschema') @@ -25,9 +27,9 @@ case query.first when "create" then - # allow schemaless table creation, ex: 'create table posts' - # by creating a table with a single id for key storage, aka - # rewrite to: 'create table posts (id varchar(255))'. all + # Allow schemaless table creation, ex: 'create table posts' + # By creating a table with a single id for key storage, aka + # rewrite to: 'create table posts (id varchar(255))'. All # future attribute tables will be created on demand at # insert time of a new record overload = "(id varchar(255), UNIQUE(id));" @@ -37,7 +39,7 @@ p [:create_new_schema_free_table, query, data] when "insert" then - # overload the INSERT syntax to allow for nested parameters + # Overload the INSERT syntax to allow for nested parameters # inside the statement. ex: # INSERT INTO posts VALUE("post_id_1", ( # ("author", "Ilya Grigorik"), @@ -74,7 +76,7 @@ q = @mysql.query(attr_sql) q.errback { |res| # if the attribute table for this model does not yet exist then create it! - # - yes, there is a race condition here, add a fiber later + # - yes, there is a race condition here, add fiber logic later if res.is_a?(Mysql::Error) and res.message =~ /Table.*doesn\'t exist/ table_sql = "create table #{table}_#{value[1]} (id varchar(255), value varchar(255), UNIQUE(id))" @@ -96,32 +98,65 @@ end when "select" then - attrs = sql.match(/select(.*?)from/)[1].strip.split(',') - p [:select, attrs] - - tables = @mysql.query("show tables like 'posts_%'") - tables.callback {|res| fiber.resume(res.all_hashes.collect(&:values).flatten) } - tables = Fiber.yield + [table] - - p [:select_tables, tables] -# query = tables - # select posts.id as id, posts_author.value as author FROM posts # LEFT OUTER JOIN posts_author ON posts_author.id = posts.id # WHERE posts.id = "ilya"; # select posts.id as id, posts_author.value as author FROM posts LEFT OUTER JOIN posts_author ON posts_author.id = posts.id WHERE posts.id = "ilya"; + select = sql.match(/select(.*?)from\s([^\s]+)/) + where = sql.match(/where\s([^=]+)\s?=\s?'?"?([^\s'"]+)'?"?/) + attrs, table = select[1].strip.split(','), select[2] + key = where[2] if where + + p [:select, select, attrs, where] + + tables = @mysql.query("show tables like '#{table}_%'") + tables.callback { |res| + fiber.resume(res.all_hashes.collect(&:values).flatten.collect{ |c| + c.split('_').last + }) + } + tables = Fiber.yield + + p [:select_tables, tables] + + # build the select statements, hide the tables behind each attribute + join = "select #{table}.id as id " + tables.each do |column| + join += " , #{table}_#{column}.value as #{column} " + end + + # add the joins to stich it all together + join += " FROM #{table} " + tables.each do |column| + join += " LEFT OUTER JOIN #{table}_#{column} ON #{table}_#{column}.id = #{table}.id " + end + + join += " WHERE #{table}.id = '#{key}' " if key + + query = [join] + overhead = join.size + 1 + p [:join_query, join] end # repack the query data and forward to server - data = [overhead, chunks, seq].pack("CvC") + [type, query.join(" ")].pack("Ca*") + # - have to split message on packet boundaries + + seq, data = 0, [] + query = StringIO.new([type, query.join(" ")].pack("Ca*")) + while q = query.read(MAX_PACKET_LENGTH) + data.push [q.length % 256, q.length / 256, seq].pack("CvC") + q + seq = (seq + 1) % 256 + end - p [:final_query, data] + p [:final_query, data, chunks, overhead] puts "-" * 100 end - conn.relay_to_servers(data) + [data].flatten.each do |chunk| + conn.relay_to_servers(chunk) + end :async # we will render results later } @@ -130,4 +165,8 @@ end end -# create table #{table}_#{value[1]} (id varchar(255), value varchar(255), UNIQUE(id)) \ No newline at end of file +# +# INSERT INTO posts VALUE("igvita2", (('org', 'PostRank'),('nickname', 'igrigorik'),('title', 'hello world'))); +# select * from posts; +# select * from posts where id = 'igvita'; +# \ No newline at end of file