Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Non-Serial ("Snowflake") IDs #4801

Merged
merged 20 commits into from
Oct 4, 2017
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion app/controllers/api/v1/accounts/relationships_controller.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,10 @@ class Api::V1::Accounts::RelationshipsController < Api::BaseController
respond_to :json

def index
@accounts = Account.where(id: account_ids).select('id')
accounts = Account.where(id: account_ids).select('id')
# .where doesn't guarantee that our results are in the same order
# we requested them, so return the "right" order to the requestor.
@accounts = accounts.index_by(&:id).values_at(*account_ids)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should be ordering in SQL as much as possible.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In general, I agree. However, the API for this function allows users to supply an arbitrarily-ordered set of IDs, and the tests imply they're expected to be returned in the same order (although I can't find any docs saying one way or the other). Doing this ordering in SQL would require synthesizing a CASE to sort by, which seems unlikely to be cleaner or faster.

In practice, this should be roughly O(n) (to the extent that you believe hashes are really O(1) insertion/lookup), with very small values of n. (Also in practice, the endpoint currently doesn't actually guarantee you get entries back in the same order you requested them, so we could just toss this change.)

If you think we should generate the CASE and do it in SQL, I can write that code, it just felt unnecessary here.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My understanding is that our code doesn't depend on this anywhere, and it was just ease of use for the tests. I can ask app developers though.

render json: @accounts, each_serializer: REST::RelationshipSerializer, relationships: relationships
end

Expand Down
128 changes: 103 additions & 25 deletions app/lib/feed_manager.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,13 @@ class FeedManager

MAX_ITEMS = 400

def key(type, id)
"feed:#{type}:#{id}"
# Must be <= MAX_ITEMS or the tracking sets will grow forever
REBLOG_FALLOFF = 40

def key(type, id, subtype = nil)
return "feed:#{type}:#{id}" unless subtype

"feed:#{type}:#{id}:#{subtype}"
end

def filter?(timeline_type, status, receiver_id)
Expand All @@ -22,23 +27,36 @@ def filter?(timeline_type, status, receiver_id)
end

def push(timeline_type, account, status)
timeline_key = key(timeline_type, account.id)
return false unless add_to_feed(timeline_type, account, status)

if status.reblog?
# If the original status is within 40 statuses from top, do not re-insert it into the feed
rank = redis.zrevrank(timeline_key, status.reblog_of_id)
return if !rank.nil? && rank < 40
redis.zadd(timeline_key, status.id, status.reblog_of_id)
else
redis.zadd(timeline_key, status.id, status.id)
trim(timeline_type, account.id)
end
trim(timeline_type, account.id)

PushUpdateWorker.perform_async(account.id, status.id) if push_update_required?(timeline_type, account.id)

true
end

def unpush(timeline_type, account, status)
return false unless remove_from_feed(timeline_type, account, status)

payload = Oj.dump(event: :delete, payload: status.id.to_s)
Redis.current.publish("timeline:#{account.id}", payload)

true
end

def trim(type, account_id)
redis.zremrangebyrank(key(type, account_id), '0', (-(FeedManager::MAX_ITEMS + 1)).to_s)
timeline_key = key(type, account_id)
reblog_key = key(type, account_id, 'reblogs')
# Remove any items past the MAX_ITEMS'th entry in our feed
redis.zremrangebyrank(timeline_key, '0', (-(FeedManager::MAX_ITEMS + 1)).to_s)

# Get the score of the REBLOG_FALLOFF'th item in our feed, and stop
# tracking anything after it for deduplication purposes.
falloff_rank = FeedManager::REBLOG_FALLOFF - 1
falloff_range = redis.zrevrange(timeline_key, falloff_rank, falloff_rank, with_scores: true)
falloff_score = falloff_range&.first&.last&.to_i || 0
redis.zremrangebyscore(reblog_key, 0, falloff_score)
end

def push_update_required?(timeline_type, account_id)
Expand All @@ -54,11 +72,9 @@ def merge_into_timeline(from_account, into_account)
query = query.where('id > ?', oldest_home_score)
end

redis.pipelined do
query.each do |status|
next if status.direct_visibility? || filter?(:home, status, into_account)
redis.zadd(timeline_key, status.id, status.id)
end
query.each do |status|
next if status.direct_visibility? || filter?(:home, status, into_account)
add_to_feed(:home, into_account, status)
end

trim(:home, into_account.id)
Expand All @@ -69,21 +85,29 @@ def unmerge_from_timeline(from_account, into_account)
oldest_home_score = redis.zrange(timeline_key, 0, 0, with_scores: true)&.first&.last&.to_i || 0

from_account.statuses.select('id').where('id > ?', oldest_home_score).reorder(nil).find_in_batches do |statuses|
redis.pipelined do
statuses.each do |status|
redis.zrem(timeline_key, status.id)
redis.zremrangebyscore(timeline_key, status.id, status.id)
end
statuses.each do |status|
unpush(:home, into_account, status)
end
end
end

def clear_from_timeline(account, target_account)
timeline_key = key(:home, account.id)
timeline_status_ids = redis.zrange(timeline_key, 0, -1)
target_status_ids = Status.where(id: timeline_status_ids, account: target_account).ids
target_statuses = Status.where(id: timeline_status_ids, account: target_account)

redis.zrem(timeline_key, target_status_ids) if target_status_ids.present?
target_statuses.each do |status|
unpush(:home, account, status)
end
end

def populate_feed(account)
prepopulate_limit = FeedManager::MAX_ITEMS / 4
statuses = Status.as_home_timeline(account).order(account_id: :desc).limit(prepopulate_limit)
statuses.reverse_each do |status|
next if filter_from_home?(status, account)
add_to_feed(:home, account, status)
end
end

private
Expand Down Expand Up @@ -131,4 +155,58 @@ def filter_from_mentions?(status, receiver_id)

should_filter
end

# Adds a status to an account's feed, returning true if a status was
# added, and false if it was not added to the feed. Note that this is
# an internal helper: callers must call trim or push updates if
# either action is appropriate.
def add_to_feed(timeline_type, account, status)
timeline_key = key(timeline_type, account.id)
reblog_key = key(timeline_type, account.id, 'reblogs')

if status.reblog?
# If the original status or a reblog of it is within
# REBLOG_FALLOFF statuses from the top, do not re-insert it into
# the feed
rank = redis.zrevrank(timeline_key, status.reblog_of_id)
return false if !rank.nil? && rank < FeedManager::REBLOG_FALLOFF

reblog_rank = redis.zrevrank(reblog_key, status.reblog_of_id)
return false unless reblog_rank.nil?

redis.zadd(timeline_key, status.id, status.id)
redis.zadd(reblog_key, status.id, status.reblog_of_id)
else
redis.zadd(timeline_key, status.id, status.id)
end

true
end

# Removes an individual status from a feed, correctly handling cases
# with reblogs, and returning true if a status was removed. As with
# `add_to_feed`, this does not trigger push updates, so callers must
# do so if appropriate.
def remove_from_feed(timeline_type, account, status)
timeline_key = key(timeline_type, account.id)
reblog_key = key(timeline_type, account.id, 'reblogs')

if status.reblog?
# 1. If the reblogging status is not in the feed, stop.
status_rank = redis.zrevrank(timeline_key, status.id)
return false if status_rank.nil?

# 2. Remove the reblogged status from the `:reblogs` zset.
redis.zrem(reblog_key, status.reblog_of_id)

# 3. Add the reblogged status to the feed using the reblogging
# status' ID as its score, and the reblogged status' ID as its
# value.
redis.zadd(timeline_key, status.id, status.reblog_of_id)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder why this line is needed.
On new redis timeline, it always score == member.
Thus, original status id is not vanished when reblog is added.
So, re-inserting original status id is not needed. I think.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updating the score will prevent from reinserting the reblogged status after the reblog has been deleted.

Copy link
Contributor

@clworld clworld Sep 21, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To prevent status re-inserting, reblog_key should be used instead of timeline_key.
and what condition for prevent status re-inserting?
I feel 2. and 3. are both unneeded for prevent status re-inserting.
I think reblog_key is used only for prevent status re-inserting. So, leaving ghost reblog_key is safe. and ghost reblog_key properly prevent status re-inserting.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

reblog_key is used to track existing reblogs, so you'll see removed reblogs if you do so.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

really? Feed#from_redis is reading only from timeline_key as I see and not reading from reblog_key.
and reblog_key only stores reblog data for recent REBLOG_FALLOFF items in timeline_key (FeedManager#trim).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can offer two things that will hopefully explain. The first, a long description of how these different zsets work, as I wrote it up while making the changes:

FeedManager maintains two sets of data in Redis for each feed. These feeds are keyed by FeedManager#key, here just [key]. The two sets of data we manage are:

  1. [key]: a zset of statuses that show up in the feed
  2. [key]:reblogs: a zset of statuses that are reblogs in the feed

When trimming the feed, we trim the feed to the last MAX_ITEMS statuses, and then trim the :reblogs zset to only include items with scores that indicate they would be in the last REBLOG_FALLOFF statuses.

When we add a status to a feed, we normally just add it to the feed zset (keyed by and with a value of the status ID). If the status is a reblog, instead we do the following:

  1. If the status it's reblogging is in the last REBLOG_FALLOFF statuses on the feed, we ignore the reblog.
  2. If the status it's reblogging is already reblogged in the :reblogs zset, we ignore the reblog, because this implies that the reblog is already in the last REBLOG_FALLOFF messages.
  3. If we haven't given up yet, we add the reblog to the feed as usual, and also add it to the :reblogs zset with the value of the reblogged status' ID, and a score of the reblogging status' ID*.

*: Note that this is similar to the old behavior, where step 2 was skipped, and step 3 simply added a value with the score of the reblogging status' ID, and a value of the reblogged status's ID. We can't necessarily rely on this to be usable, because status IDs are 8-byte integers, and may not be fully representable in an IEEE double-precision float, as used by Redis for scores.

When we remove a status from a feed, we are unfortunately in the same position we had been before: because we may discard some legitimate reblogs, we cannot tell if removing the reblogged status would incorrectly remove it from our feed. Instead, if a removed status is a reblog, we do the following:

  1. If the reblogging status is not in the feed, stop.
  2. Remove the reblogged status from the :reblogs zset.
  3. Add the reblogged status to the feed using the reblogging status' ID as its score, and the reblogged status' ID as its value.
  4. Remove the reblogging status from the feed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The second thing I'll say is more handwavey but may provide a better intuition: the reblog_key zset stores data that are used to keep a status from being reblogged more than once in the general case. It is not expected to be read from in the case of a feed being read.

In the current (pre-this-PR) state of affairs, a reblog is stored with a score of the reblogging status, and a value of the reblogged status. When a reblog is deleted, that entry is removed, and one with both the score and value set to the reblogged status is added (this is in order to to avoid that status randomly disappearing if the status should have already appeared in the feed, but got a reblog attached to it first somehow. (This would happen most in the case where a user views a reblogged status and then follows the link and follows the user: their old posts should still show up in the timeline regardless of whether or not they have been reblogged.)

In this changed value, by default, we store the reblogging status' ID as both the score and value of the reblogging status. The score is stored this way to allow sorting, and yet not run into precision issues with Redis' scores being stored as double. However, as in the case now, when we remove a reblog, we can't tell if the status should still be shown or not, so we continue to err on the side of showing it. In this case, we store with a score of the old reblogging status' ID (to keep it in the same sort order), and a value of the reblogged status' ID. That's what this line is doing.

(In the future, I'd like to store a list of reasons a reblogged status should be shown to users (reblogs by which users, largely), and only re-insert a status if it has been reblogged by other users. However, that's an entirely different discussion for a different pull request.)

Copy link
Contributor

@akihikodaki akihikodaki Sep 21, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have came up a different concern in this discussion so please let me describe it here though it is a bit off-topic:
You say:

When trimming the feed, we trim the feed to the last MAX_ITEMS statuses, and then trim the :reblogs zset to only include items with scores that indicate they would be in the last REBLOG_FALLOFF statuses.

And the code would actually work as you describe. However, the behavior sounds bad when considering from_redis. When a user reblogs an old status:

we add the reblog to the feed as usual

So the reblogged, old status would get a small score. Such a status would be trimmed so quickly, which effectively makes the status invisible from from_redis, and /api/v1/timelines/*, a controller using the method.
I think such statuses should be persistent on the feed, and they are in the current implementation.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't mind. I got the idea. Sorry for confusing.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, I misunderstood this PR can solve the problem about #2817 (comment) perfectly.
It reduces problem but not perfectly solve problem. So, 2. and 3. are still needed. OK.


# 4. Remove the reblogging status from the feed (as normal)
end

redis.zrem(timeline_key, status.id)
end
end
2 changes: 1 addition & 1 deletion app/models/feed.rb
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ def get(limit, max_id = nil, since_id = nil)
def from_redis(limit, max_id, since_id)
max_id = '+inf' if max_id.blank?
since_id = '-inf' if since_id.blank?
unhydrated = redis.zrevrangebyscore(key, "(#{max_id}", "(#{since_id}", limit: [0, limit], with_scores: true).map(&:last).map(&:to_i)
unhydrated = redis.zrevrangebyscore(key, "(#{max_id}", "(#{since_id}", limit: [0, limit], with_scores: true).map(&:first).map(&:to_i)
Status.where(id: unhydrated).cache_ids
end

Expand Down
37 changes: 8 additions & 29 deletions app/services/batched_remove_status_service.rb
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ def call(statuses)
statuses.group_by(&:account_id).each do |_, account_statuses|
account = account_statuses.first.account

unpush_from_home_timelines(account_statuses)
unpush_from_home_timelines(account, account_statuses)

if account.local?
batch_stream_entries(account, account_statuses)
Expand Down Expand Up @@ -72,14 +72,15 @@ def batch_activity_json(account, statuses)
end
end

def unpush_from_home_timelines(statuses)
account = statuses.first.account
recipients = account.followers.local.pluck(:id)
def unpush_from_home_timelines(account, statuses)
recipients = account.followers.local.to_a

recipients << account.id if account.local?
recipients << account if account.local?

recipients.each do |follower_id|
unpush(follower_id, statuses)
recipients.each do |follower|
statuses.each do |status|
FeedManager.instance.unpush(:home, follower, status)
end
end
end

Expand Down Expand Up @@ -109,28 +110,6 @@ def batch_salmon_slaps(status)
end
end

def unpush(follower_id, statuses)
key = FeedManager.instance.key(:home, follower_id)

originals = statuses.reject(&:reblog?)
reblogs = statuses.select(&:reblog?)

# Quickly remove all originals
redis.pipelined do
originals.each do |status|
redis.zremrangebyscore(key, status.id, status.id)
redis.publish("timeline:#{follower_id}", @json_payloads[status.id])
end
end

# For reblogs, re-add original status to feed, unless the reblog
# was not in the feed in the first place
reblogs.each do |status|
redis.zadd(key, status.reblog_of_id, status.reblog_of_id) unless redis.zscore(key, status.reblog_of_id).nil?
redis.publish("timeline:#{follower_id}", @json_payloads[status.id])
end
end

def redis
Redis.current
end
Expand Down
38 changes: 1 addition & 37 deletions app/services/precompute_feed_service.rb
Original file line number Diff line number Diff line change
@@ -1,43 +1,7 @@
# frozen_string_literal: true

class PrecomputeFeedService < BaseService
LIMIT = FeedManager::MAX_ITEMS / 4

def call(account)
@account = account
populate_feed
end

private

attr_reader :account

def populate_feed
pairs = statuses.reverse_each.lazy.reject(&method(:status_filtered?)).map(&method(:process_status)).to_a

redis.pipelined do
redis.zadd(account_home_key, pairs) if pairs.any?
redis.del("account:#{@account.id}:regeneration")
end
end

def process_status(status)
[status.id, status.reblog? ? status.reblog_of_id : status.id]
end

def status_filtered?(status)
FeedManager.instance.filter?(:home, status, account.id)
end

def account_home_key
FeedManager.instance.key(:home, account.id)
end

def statuses
Status.as_home_timeline(account).order(account_id: :desc).limit(LIMIT)
end

def redis
Redis.current
FeedManager.instance.populate_feed(account)
end
end
8 changes: 1 addition & 7 deletions app/services/remove_status_service.rb
Original file line number Diff line number Diff line change
Expand Up @@ -102,13 +102,7 @@ def remove_reblogs
end

def unpush(type, receiver, status)
if status.reblog? && !redis.zscore(FeedManager.instance.key(type, receiver.id), status.reblog_of_id).nil?
redis.zadd(FeedManager.instance.key(type, receiver.id), status.reblog_of_id, status.reblog_of_id)
else
redis.zremrangebyscore(FeedManager.instance.key(type, receiver.id), status.id, status.id)
end

Redis.current.publish("timeline:#{receiver.id}", @payload)
FeedManager.instance.unpush(type, receiver, status)
end

def remove_from_hashtags
Expand Down
36 changes: 36 additions & 0 deletions db/migrate/20170920024819_status_ids_to_timestamp_ids.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
class StatusIdsToTimestampIds < ActiveRecord::Migration[5.1]
def up
# Prepare the function we will use to generate IDs.
Rake::Task['db:define_timestamp_id'].execute

# Set up the statuses.id column to use our timestamp-based IDs.
ActiveRecord::Base.connection.execute(
<<~SQL
ALTER TABLE statuses
ALTER COLUMN id
SET DEFAULT timestamp_id('statuses')
SQL
)

# Make sure we have a sequence to use.
Rake::Task['db:ensure_id_sequences_exist'].execute
end

def down
# Revert the column to the old method of just using the sequence
# value for new IDs. Set the current ID sequence to the maximum
# existing ID, such that the next sequence will be one higher.

# We lock the table during this so that the ID won't get clobbered,
# but ID is indexed, so this should be a fast operation.
ActiveRecord::Base.connection.execute(
<<~SQL
LOCK statuses;
SELECT setval('statuses_id_seq', (SELECT MAX(id) FROM statuses));
ALTER TABLE statuses
ALTER COLUMN id
SET DEFAULT nextval('statuses_id_seq');"
SQL
)
end
end
Loading