Skip to content

Commit

Permalink
schemaless selects via joins, ftw.
Browse files Browse the repository at this point in the history
  • Loading branch information
igrigorik committed Feb 22, 2010
1 parent 5dc8551 commit f64abc4
Showing 1 changed file with 58 additions and 19 deletions.
77 changes: 58 additions & 19 deletions examples/schemaless-mysql/mysql_interceptor.rb
@@ -1,11 +1,13 @@
require "lib/em-proxy"
require "em-mysql"
require "stringio"
require "fiber"

Proxy.start(:host => "0.0.0.0", :port => 3307) do |conn|
conn.server :mysql, :host => "127.0.0.1", :port => 3306, :relay_server => true

QUERY_CMD = 3
MAX_PACKET_LENGTH = 2**24-1

# open a direct connection to MySQL for the schema-free coordination logic
@mysql = EventMachine::MySQL.new(:host => 'localhost', :database => 'noschema')
Expand All @@ -25,9 +27,9 @@

case query.first
when "create" then
# allow schemaless table creation, ex: 'create table posts'
# by creating a table with a single id for key storage, aka
# rewrite to: 'create table posts (id varchar(255))'. all
# Allow schemaless table creation, ex: 'create table posts'
# By creating a table with a single id for key storage, aka
# rewrite to: 'create table posts (id varchar(255))'. All
# future attribute tables will be created on demand at
# insert time of a new record
overload = "(id varchar(255), UNIQUE(id));"
Expand All @@ -37,7 +39,7 @@
p [:create_new_schema_free_table, query, data]

when "insert" then
# overload the INSERT syntax to allow for nested parameters
# Overload the INSERT syntax to allow for nested parameters
# inside the statement. ex:
# INSERT INTO posts VALUE("post_id_1", (
# ("author", "Ilya Grigorik"),
Expand Down Expand Up @@ -74,7 +76,7 @@
q = @mysql.query(attr_sql)
q.errback { |res|
# if the attribute table for this model does not yet exist then create it!
# - yes, there is a race condition here, add a fiber later
# - yes, there is a race condition here, add fiber logic later
if res.is_a?(Mysql::Error) and res.message =~ /Table.*doesn\'t exist/

table_sql = "create table #{table}_#{value[1]} (id varchar(255), value varchar(255), UNIQUE(id))"
Expand All @@ -96,32 +98,65 @@
end

when "select" then
attrs = sql.match(/select(.*?)from/)[1].strip.split(',')
p [:select, attrs]

tables = @mysql.query("show tables like 'posts_%'")
tables.callback {|res| fiber.resume(res.all_hashes.collect(&:values).flatten) }
tables = Fiber.yield + [table]

p [:select_tables, tables]
# query = tables

# select posts.id as id, posts_author.value as author FROM posts
# LEFT OUTER JOIN posts_author ON posts_author.id = posts.id
# WHERE posts.id = "ilya";
# select posts.id as id, posts_author.value as author FROM posts LEFT OUTER JOIN posts_author ON posts_author.id = posts.id WHERE posts.id = "ilya";

select = sql.match(/select(.*?)from\s([^\s]+)/)
where = sql.match(/where\s([^=]+)\s?=\s?'?"?([^\s'"]+)'?"?/)
attrs, table = select[1].strip.split(','), select[2]
key = where[2] if where

p [:select, select, attrs, where]

tables = @mysql.query("show tables like '#{table}_%'")
tables.callback { |res|
fiber.resume(res.all_hashes.collect(&:values).flatten.collect{ |c|
c.split('_').last
})
}
tables = Fiber.yield

p [:select_tables, tables]

# build the select statements, hide the tables behind each attribute
join = "select #{table}.id as id "
tables.each do |column|
join += " , #{table}_#{column}.value as #{column} "
end

# add the joins to stich it all together
join += " FROM #{table} "
tables.each do |column|
join += " LEFT OUTER JOIN #{table}_#{column} ON #{table}_#{column}.id = #{table}.id "
end

join += " WHERE #{table}.id = '#{key}' " if key

query = [join]
overhead = join.size + 1

p [:join_query, join]
end

# repack the query data and forward to server
data = [overhead, chunks, seq].pack("CvC") + [type, query.join(" ")].pack("Ca*")
# - have to split message on packet boundaries

seq, data = 0, []
query = StringIO.new([type, query.join(" ")].pack("Ca*"))
while q = query.read(MAX_PACKET_LENGTH)
data.push [q.length % 256, q.length / 256, seq].pack("CvC") + q
seq = (seq + 1) % 256
end

p [:final_query, data]
p [:final_query, data, chunks, overhead]
puts "-" * 100
end

conn.relay_to_servers(data)
[data].flatten.each do |chunk|
conn.relay_to_servers(chunk)
end

:async # we will render results later
}
Expand All @@ -130,4 +165,8 @@
end
end

# create table #{table}_#{value[1]} (id varchar(255), value varchar(255), UNIQUE(id))
#
# INSERT INTO posts VALUE("igvita2", (('org', 'PostRank'),('nickname', 'igrigorik'),('title', 'hello world')));
# select * from posts;
# select * from posts where id = 'igvita';
#

0 comments on commit f64abc4

Please sign in to comment.