Permalink
Browse files

Merge pull request #2 from sorah/fetch2

bye bye haveread
  • Loading branch information...
sorah committed Oct 22, 2017
2 parents 1592194 + 051e737 commit 6891329f4230c3ac66cf63209a01f970bdfbb70e
Showing with 105 additions and 43 deletions.
  1. +1 −0 isubata/webapp/ruby/Gemfile
  2. +2 −0 isubata/webapp/ruby/Gemfile.lock
  3. +102 −43 isubata/webapp/ruby/app.rb
@@ -7,3 +7,4 @@ gem 'sinatra-contrib'
gem 'hiredis'
gem 'redis'
gem 'rack-ltsv_logger'
gem 'msgpack'
@@ -3,6 +3,7 @@ GEM
specs:
backports (3.9.1)
hiredis (0.6.1)
msgpack (1.1.0)
multi_json (1.12.2)
mustermann (1.0.1)
mysql2 (0.4.9)
@@ -32,6 +33,7 @@ PLATFORMS
DEPENDENCIES
hiredis
msgpack
mysql2
puma
rack-ltsv_logger
View
@@ -3,6 +3,67 @@
require 'sinatra/base'
require 'hiredis'
require 'redis'
require 'timeout'
class AwesomeFetch
STREAM_KEY = 'isubata:stream:message'
def self.instance
@instance ||= AwesomeFetch.new.tap(&:start)
end
def initialize()
@subscribers = {}
@lock = Mutex.new
end
def wait(timeout: 10)
@lock.synchronize do
@subscribers[Thread.current] = true
end
sleep timeout
ensure
@lock.synchronize do
@subscribers.delete(Thread.current)
end
end
def connect_redis
Redis.new(url: ENV.fetch('ISUBATA_REDIS_URL', 'redis://localhost:6379/0'))
end
def start
@thread = Thread.new do
redis = connect_redis()
redis.subscribe(STREAM_KEY) do |on|
on.subscribe do |ch, subs|
puts "AwesomeFetch subscribed to #{ch.inspect} (#{subs} subscriptions)"
end
on.message do |ch, message|
payload = message.empty? ? nil : MessagePack.unpack(message)
on_payload(payload)
end
end
rescue Exception => e
$stderr.puts "AwesomeFetch ERROR: #{e.inspect}\n\t#{e.backtrace.join("\n\t")}"
sleep 1
retry
end.tap do |th|
th.abort_on_exception = true
end
end
def on_payload(payload)
@subscribers.each_key do |th|
th.wakeup
end
end
end
AwesomeFetch.instance.start
# AwesomeFetch.
module MysqlMonkeyPatch
def logger
@@ -83,6 +144,14 @@ def user
db.query("DELETE FROM channel WHERE id > 10")
db.query("DELETE FROM message WHERE id > 10000")
db.query("DELETE FROM haveread")
channel_ids = db.query('SELECT id FROM channel').to_a.map { |_| _['id'] }
channel_ids.each do |ch|
statement = db.prepare('SELECT COUNT(*) as cnt FROM message WHERE channel_id = ?')
cnt = statement.execute(ch).first['cnt']
redis.hset(redis_key_total_messages, ch, cnt)
end
204
end
@@ -180,14 +249,8 @@ def user
end
response.reverse!
max_message_id = rows.empty? ? 0 : rows.map { |row| row['id'] }.max
statement = db.prepare([
'INSERT INTO haveread (user_id, channel_id, message_id, updated_at, created_at) ',
'VALUES (?, ?, ?, NOW(), NOW()) ',
'ON DUPLICATE KEY UPDATE message_id = ?, updated_at = NOW()',
].join)
statement.execute(user_id, channel_id, max_message_id, max_message_id)
statement.close
max_message_id = rows.empty? ? 0 : rows.map { |row| row['id'] }.max
redis.hset(redis_key_lastreads(user_id), channel_id, max_message_id)
content_type :json
response.to_json
@@ -199,31 +262,26 @@ def user
return 403
end
sleep 1.0
AwesomeFetch.instance.wait
rows = db.query('SELECT id FROM channel').to_a
channel_ids = rows.map { |row| row['id'] }
rs = redis.hgetall(redis_key_total_messages)
res = []
channel_ids.each do |channel_id|
statement = db.prepare('SELECT * FROM haveread WHERE user_id = ? AND channel_id = ?')
row = statement.execute(user_id, channel_id).first
statement.close
r = {}
r['channel_id'] = channel_id
r['unread'] = if row.nil?
statement = db.prepare('SELECT COUNT(*) as cnt FROM message WHERE channel_id = ?')
statement.execute(channel_id).first['cnt']
else
statement = db.prepare('SELECT COUNT(*) as cnt FROM message WHERE channel_id = ? AND ? < id')
statement.execute(channel_id, row['message_id']).first['cnt']
end
statement.close
res << r
redis.hgetall(redis_key_lastreads(user_id)).each do |ch, last|
statement = db.prepare('SELECT COUNT(*) as cnt FROM message WHERE channel_id = ? AND ? < id')
unread = statement.execute(ch, last).first['cnt']
rs[ch.to_s] = unread
end
r = rs.map do |ch, unread|
{
'channel_id' => ch.to_i,
'unread' => unread.to_i,
}
end
content_type :json
res.to_json
r.to_json
end
get '/history/:channel_id' do
@@ -314,6 +372,9 @@ def user
statement.execute(name, description)
channel_id = db.last_id
statement.close
redis.hset(redis_key_total_messages, channel_id, 0)
redis.publish(AwesomeFetch::STREAM_KEY, '')
redirect "/channel/#{channel_id}", 303
end
@@ -412,21 +473,17 @@ def redis
Thread.current[:isubata_redis] ||= Redis.new(url: ENV.fetch('ISUBATA_REDIS_URL', 'redis://localhost:6379/0'))
end
# def db
# return Thread.current[:isubata_db] if Thread.current[:isubata_db]
# client = Mysql2::Client.new(
# host: ENV.fetch('ISUBATA_DB_HOST') { 'localhost' },
# port: ENV.fetch('ISUBATA_DB_PORT') { '3306' },
# username: ENV.fetch('ISUBATA_DB_USER') { 'root' },
# password: ENV.fetch('ISUBATA_DB_PASSWORD') { '' },
# database: 'isubata',
# encoding: 'utf8mb4'
# )
# client.extend(MysqlMonkeyPatch) unless ENV['ISUCON7_DISABLE_LOGS'] == '1'
# client.query('SET SESSION sql_mode=\'TRADITIONAL,NO_AUTO_VALUE_ON_ZERO,ONLY_FULL_GROUP_BY\'')
# Thread.current[:isubata_db] = client
# client
# end
def redis_key_lastreads(user_id)
"isubata:lastreads:#{user_id}"
end
def redis_key_total_messages
"isubata:total_messages"
end
#def redis_key_unreads(user_id, channel_id)
# "isubata:unreads:#{user_id}:#{channel_id}"
#end
def db
Thread.current[:isubata_db] ||= Mysql2::Client.new(
host: ENV.fetch('ISUBATA_DB_HOST') { 'localhost' },
@@ -451,6 +508,8 @@ def db_add_message(channel_id, user_id, content)
statement = db.prepare('INSERT INTO message (channel_id, user_id, content, created_at) VALUES (?, ?, ?, NOW())')
messages = statement.execute(channel_id, user_id, content)
statement.close
redis.hincrby(redis_key_total_messages, channel_id, 1)
redis.publish(AwesomeFetch::STREAM_KEY, '')
messages
end

0 comments on commit 6891329

Please sign in to comment.