Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions newsfeeds/constants.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from django.conf import settings

FANOUT_BATCH_SIZE = 1000 if not settings.TESTING else 3
22 changes: 3 additions & 19 deletions newsfeeds/services.py
Original file line number Diff line number Diff line change
@@ -1,28 +1,12 @@
from friendships.services import FriendshipService
from newsfeeds.models import NewsFeed
from twitter.cache import USER_NEWSFEEDS_PATTERN
from utils.redis_helper import RedisHelper
from newsfeeds.tasks import fanout_newsfeeds_tasks
class NewsFeedService(object):

@classmethod
def fanout_to_follower(cls, tweet):
# followers = FriendshipService.get_followers(tweet.user)
# we can not use forloop + query
# for follower in followers:
# NewsFeed.objects.create(user=follower, tweet=tweet)

newsfeeds = [
# NewsFeed(user=follower, tweet=tweet)
# this clause does not call save()
# does not cause query
NewsFeed(user=follower, tweet=tweet)
for follower in FriendshipService.get_followers(tweet.user)
]
newsfeeds.append(NewsFeed(user=tweet.user, tweet=tweet))
NewsFeed.objects.bulk_create(newsfeeds)

for newsfeed in newsfeeds:
cls.push_newsfeeds_to_cache(newsfeed)
def fanout_to_followers(cls, tweet):
result = fanout_newsfeeds_tasks.delay(tweet.id)

@classmethod
def get_cached_newsfeeds(cls, user_id):
Expand Down
21 changes: 21 additions & 0 deletions newsfeeds/tasks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
from celery import shared_task
from friendships.services import FriendshipService
from newsfeeds.models import NewsFeed
from tweets.models import Tweet
from utils.time_constants import ONE_HOUR


@shared_task(time_limit=ONE_HOUR)
def fanout_newsfeeds_tasks(tweet_id):
from newsfeeds.services import NewsFeedService
tweet = Tweet.objects.get(id=tweet_id)
newsfeeds = [
NewsFeed(user=follower, tweet=tweet)
for follower in FriendshipService.get_followers(tweet.user)
]
newsfeeds.append(NewsFeed(user=tweet.user, tweet=tweet))
NewsFeed.objects.bulk_create(newsfeeds)

#bulk create 不会触发 post save 的 signal,需要手动 push 到 cache 里面
for newsfeed in newsfeeds:
NewsFeedService.push_newsfeeds_to_cache(newsfeed)
12 changes: 12 additions & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,10 +1,18 @@
amqp==5.2.0
asgiref==3.4.1
asn1crypto==0.24.0
async-timeout==4.0.2
billiard==3.6.4.0
boto3==1.23.10
botocore==1.26.10
cached-property==1.5.2
celery==5.1.2
certifi==2018.1.18
chardet==3.0.4
click==7.1.2
click-didyoumean==0.3.1
click-plugins==1.1.1
click-repl==0.3.0
cryptography==2.1.4
Django==3.2.25
django-debug-toolbar==3.2.4
Expand All @@ -19,8 +27,10 @@ jmespath==0.10.0
jsonfield==3.1.0
keyring==10.6.0
keyrings.alt==3.0
kombu==5.1.0
mysqlclient==2.0.3
packaging==21.3
prompt-toolkit==3.0.36
pycrypto==2.6.1
PyGObject==3.26.1
pyparsing==3.1.2
Expand All @@ -38,5 +48,7 @@ ssh-import-id==5.7
swapper==1.3.0
typing_extensions==4.1.1
urllib3==1.26.18
vine==5.1.0
wcwidth==0.2.13
wrapt==1.16.0
zipp==3.6.0
2 changes: 1 addition & 1 deletion tweets/api/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ def create(self, request):
}, status=400)
#save will call create method in TweetSerializerForCreate
tweet = serializer.save()
NewsFeedService.fanout_to_follower(tweet)
NewsFeedService.fanout_to_followers(tweet)
return Response(
TweetSerializer(tweet, context={'request' : request}).data,
status=201
Expand Down
5 changes: 5 additions & 0 deletions twitter/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
# This will make sure the app is always imported when
# Django starts so that shared_task will use this app.
from .celery import app as celery_app

__all__ = ('celery_app',)
22 changes: 22 additions & 0 deletions twitter/celery.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
import os

from celery import Celery

# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'twitter.settings')

app = Celery('twitter')

# Using a string here means the worker doesn't have to serialize
# the configuration object to child processes.
# - namespace='CELERY' means all celery-related configuration keys
# should have a `CELERY_` prefix.
app.config_from_object('django.conf:settings', namespace='CELERY')

# Load task modules from all registered Django app configs.`
app.autodiscover_tasks()


@app.task(bind=True)
def debug_task(self):
print(f'Request: {self.request!r}')
8 changes: 8 additions & 0 deletions twitter/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,14 @@
REDIS_KEY_EXPIRE_TIME = 7 * 86400
REDIS_LIST_LENGTH_LIMIT = 200 if not TESTING else 20

# Celery Configuration Options
# 使用如下命令把 worker 进程(只执行异步任务的进程,可以在不同的机器上)单独跑起来
# celery -A twitter worker -l INFO
CELERY_BROKER_URL = 'redis://127.0.0.1:6379/2' if not TESTING else 'redis://127.0.0.1:6379/0'
CELERY_TIMEZONE = "UTC"
CELERY_TASK_ALWAYS_EAGER = TESTING



try:
from .local_settings import *
Expand Down
5 changes: 5 additions & 0 deletions utils/time_constants.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
# in seconds
ONE_HOUR = 60 * 60

# in micro seconds
MAX_TIMESTAMP = 9999999999999999
4 changes: 2 additions & 2 deletions utils/time_helpers.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from datetime import datetime
# python time zone
import pytz


def utc_now():
return datetime.now().replace(tzinfo=pytz.utc)
return datetime.now().replace(tzinfo=pytz.utc)