Permalink
Browse files

messagebin

  • Loading branch information...
sorah committed Oct 22, 2017
1 parent a8c8b62 commit 2ac14b8f0b01f9027811c5237913837ed82811b2
Showing with 133 additions and 26 deletions.
  1. +119 −26 isubata/webapp/ruby/app.rb
  2. +14 −0 isubata/webapp/ruby/stream.rb
View
@@ -4,6 +4,31 @@
require 'hiredis'
require 'redis'
require 'timeout'
require 'msgpack'
class MessageBin
LIMIT = 100
def initialize
@messages = []
end
def push(row)
@messages << row
@messages.shift([@messages.size - LIMIT,0].max)
self
end
def [](*args)
@messages[*args]
end
def since(id, limit: 100)
idx = @messages.index { |_| _['id'] > id }
if idx
@messages[idx + 1 .. idx + limit]
end
end
end
class AwesomeFetch
STREAM_KEY = 'isubata:stream:message'
@@ -15,6 +40,17 @@ def self.instance
def initialize()
@subscribers = {}
@lock = Mutex.new
reset
end
def reset
@lock.synchronize do
@messages = {}
end
end
def channel(id)
@messages[id]
end
def wait(timeout: 5)
@@ -58,6 +94,22 @@ def on_payload(payload)
@subscribers.dup.each_key do |th|
th.wakeup
end
case payload['type']
when 'message'
on_message payload
when 'reset'
puts "AwesomeFetch Reset #{$$}"
reset
when 'init'
puts "AwesomeFetch Init #{$$}"
init
end
end
def on_message(message)
bin = (@messages[message['channel_id']] ||= MessageBin.new)
bin.push message
end
end
@@ -112,10 +164,10 @@ class App < Sinatra::Base
enable :sessions
end
configure :development do
require 'sinatra/reloader'
register Sinatra::Reloader
end
#configure :development do
# require 'sinatra/reloader'
# register Sinatra::Reloader
#end
helpers do
def user
@@ -153,6 +205,30 @@ def user
redis.hset(redis_key_total_messages, ch, cnt)
end
redis.publish('isubata:stream:message', {'type' => 'reset'}.to_msgpack)
redis.publish('isubata:stream:message', {'type' => 'load'}.to_msgpack)
all_users = {}
db.query("SELECT id, name, display_name, avatar_icon FROM user").each do |row|
all_users[row['id']] = row.select { |k, v| %w[name display_name avatar_icon].include?(k) }
end
db.query('SELECT id FROM channel').each do |ch|
db.query("SELECT * FROM message WHERE channel_id = #{ch['id']} ORDER BY id DESC LIMIT #{MessageBin::LIMIT}").to_a.reverse_each do |row|
user = all_users.fetch row['user_id']
redis.publish('isubata:stream:message',
{
'type' => 'message',
'id' => row['id'],
'channel_id' => row['channel_id'],
'user' => { 'name' => user['name'], 'display_name' => user['display_name'], 'avatar_icon' => user['avatar_icon'] },
'date' => row['created_at'].strftime("%Y/%m/%d %H:%M:%S"),
'content' => row['content'],
}.to_msgpack,
)
end
end
204
end
@@ -235,28 +311,35 @@ def user
channel_id = params[:channel_id].to_i
last_message_id = params[:last_message_id].to_i
statement = db.prepare('SELECT * FROM message WHERE id > ? AND channel_id = ? ORDER BY id DESC LIMIT 100')
rows = statement.execute(last_message_id, channel_id).to_a
statement.close
users = get_users(rows.map { |r| r['user_id'] }.uniq)
response = []
rows.each do |row|
r = {}
r['id'] = row['id']
# statement = db.prepare('SELECT name, display_name, avatar_icon FROM user WHERE id = ?')
r['user'] = users[row['user_id']] # statement.execute(row['user_id']).first
r['date'] = row['created_at'].strftime("%Y/%m/%d %H:%M:%S")
r['content'] = row['content']
response << r
# statement.close
bin = AwesomeFetch.instance.channel(channel_id)
res= bin&.since(last_message_id)
unless res
statement = db.prepare('SELECT * FROM message WHERE id > ? AND channel_id = ? ORDER BY id DESC LIMIT 100')
rows = statement.execute(last_message_id, channel_id).to_a
statement.close
users = get_users(rows.map { |r| r['user_id'] }.uniq)
res= []
rows.each do |row|
r = {}
r['id'] = row['id']
# statement = db.prepare('SELECT name, display_name, avatar_icon FROM user WHERE id = ?')
r['user'] = users[row['user_id']] # statement.execute(row['user_id']).first
r['date'] = row['created_at'].strftime("%Y/%m/%d %H:%M:%S")
r['content'] = row['content']
res<< r
# statement.close
end
response.headers['X-Awesome-Fetch'] = 'miss'
end
response.reverse!
res.reverse! # TODO:
max_message_id = rows.empty? ? 0 : rows.map { |row| row['id'] }.max
max_message_id = res.empty? ? 0 : res.map { |_| _['id'] }.max
redis.hset(redis_key_lastreads(user_id), channel_id, max_message_id)
content_type :json
response.to_json
res.to_json
end
get '/fetch' do
@@ -530,11 +613,22 @@ def db_get_user(user_id)
end
def db_add_message(channel_id, user_id, content)
statement = db.prepare('INSERT INTO message (channel_id, user_id, content, created_at) VALUES (?, ?, ?, NOW())')
messages = statement.execute(channel_id, user_id, content)
statement = db.prepare('INSERT INTO message (channel_id, user_id, content, created_at) VALUES (?, ?, ?, ?)')
time = Time.now
messages = statement.execute(channel_id, user_id, content, time)
statement.close
redis.hincrby(redis_key_total_messages, channel_id, 1)
redis.publish(AwesomeFetch::STREAM_KEY, '')
redis.publish(AwesomeFetch::STREAM_KEY,
{
'type' => 'message',
'id' => db.last_id,
'channel_id' => channel_id,
'user' => { 'name' => user['name'], 'display_name' => user['display_name'], 'avatar_icon' => user['avatar_icon'] },
'date' => time.strftime("%Y/%m/%d %H:%M:%S"),
'content' => content,
}.to_msgpack
)
session[:bakusoku] = true
messages
end
@@ -548,10 +642,9 @@ def register(user, password)
pass_digest = Digest::SHA1.hexdigest(salt + password)
statement = db.prepare('INSERT INTO user (name, salt, password, display_name, avatar_icon, created_at) VALUES (?, ?, ?, ?, ?, NOW())')
statement.execute(user, salt, pass_digest, user, 'default.png')
row = db.query('SELECT LAST_INSERT_ID() AS last_insert_id').first
statement.close
session[:bakusoku] = true
row['last_insert_id']
db.last_id
end
def get_channel_list_info(focus_channel_id = nil)
@@ -0,0 +1,14 @@
require 'redis'
require 'msgpack'
Redis.new.subscribe('isubata:stream:message') do |on|
on.subscribe do |ch, subs|
puts "AwesomeFetch subscribed to #{ch.inspect} (#{subs} subscriptions)"
end
on.message do |ch, message|
payload = message.empty? ? nil : MessagePack.unpack(message)
p payload
end
end

0 comments on commit 2ac14b8

Please sign in to comment.