From 5dd962cce7be99e03ad52b4bb93481ec9f4cbc38 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Agust=C3=ADn=20M=C3=A9ndez?= Date: Sun, 19 Apr 2015 17:37:30 -0300 Subject: [PATCH] Using celery to add and updated feeds asyncronously when possible. Base on the work by @MissFilly for http://github.com/PyAr/pyarweb This closes #33. --- planet/management/commands/__init__.py | 312 ---------------- planet/management/commands/planet_add_feed.py | 11 +- .../commands/planet_update_all_feeds.py | 13 +- .../management/commands/planet_update_feed.py | 4 +- planet/tasks.py | 349 ++++++++++++++++++ requirements.txt | 2 + setup.py | 13 +- 7 files changed, 374 insertions(+), 330 deletions(-) create mode 100644 planet/tasks.py diff --git a/planet/management/commands/__init__.py b/planet/management/commands/__init__.py index 7ddfd3c..40a96af 100644 --- a/planet/management/commands/__init__.py +++ b/planet/management/commands/__init__.py @@ -1,313 +1 @@ -#!/usr/bin/env python # -*- coding: utf-8 -*- -from datetime import datetime -from hashlib import md5 -try: - from urlparse import urlparse -except ImportError: - from urllib.parse import urlparse - -import feedparser -import time -import mimetypes -from bs4 import BeautifulSoup - -from django.conf import settings -from django.contrib.sites.models import Site - -from tagging.models import Tag - -from planet.models import (Blog, Generator, Feed, FeedLink, Post, PostLink, - Author, PostAuthorData, Enclosure, Category) -from planet.signals import post_created - - -class PostAlreadyExists(Exception): - pass - - -def process_feed(feed_url, owner=None, create=False, category_title=None): - """ - Stores a feed, its related data, its entries and their related data. - If create=True then it creates the feed, otherwise it only stores new - entries and their related data. - """ - - def normalize_tag(tag): - """ - converts things like "-noise-" to "noise" and "- noise -" to "noise" - """ - if tag.startswith("-"): - tag = tag[1:] - if tag.endswith("-"): - tag = tag[:-1] - - ## fix for HTML entities - tag = BeautifulSoup(tag).prettify(formatter="html") - tag = tag.strip().lower() - return tag - - try: - USER_AGENT = settings.PLANET["USER_AGENT"] - except (KeyError, AttributeError): - print("""Please set PLANET = {" USER_AGENT": } in your settings.py""") - exit(0) - - feed_url = str(feed_url).strip() - - try: - planet_feed = Feed.objects.get(url=feed_url) - except Feed.DoesNotExist: - planet_feed = None - - print("*" * 20) - print("Feed: {}".format(feed_url)) - - if create and planet_feed: - # can't create it due to it already exists - print("This feed already exists!") - exit(0) - - if not create and not planet_feed: - # can't update it due to it does not exist - print("This feed does not exist!") - exit(0) - - # retrieve and parse feed using conditional GET method - if not create: - modified = datetime.timetuple(planet_feed.last_modified) - etag = planet_feed.etag - # update last checked datetime - planet_feed.last_checked = datetime.now() - planet_feed.save() - else: - modified = etag = None - - document = feedparser.parse(feed_url, agent=USER_AGENT, - modified=modified, etag=etag) - - current_site = Site.objects.get(pk=settings.SITE_ID) - - if create: - # then create blog, feed, generator, feed links and feed tags - title = document.feed.get("title", "--") - subtitle = document.feed.get("subtitle") - blog_url = document.feed.get("link") - rights = document.feed.get("rights") or document.feed.get("license") - info = document.feed.get("info") - try: - guid = unicode(md5(document.feed.get("link")).hexdigest()) - except NameError: - guid = md5(document.feed.get("link").encode('utf-8')).hexdigest() - image_url = document.feed.get("image", {}).get("href") - icon_url = document.feed.get("icon") - language = document.feed.get("language") - etag = document.get("etag", '') - - updated_parsed = document.get("updated_parsed") - if updated_parsed: - last_modified = datetime.fromtimestamp(time.mktime(updated_parsed)) - else: - last_modified = datetime.now() - - feed_links = document.feed.get("links", []) - if not blog_url: - link = [item for item in feed_links if item["rel"]=="alternate"] - if link: - blog_url = link[0]["href"] - - blog, created = Blog.objects.get_or_create( - url=blog_url, defaults={"title": title}, owner=owner) - - generator_dict = document.feed.get("generator_detail", {}) - - if generator_dict: - generator, created = Generator.objects.get_or_create( - name=generator_dict.get("name", "--"), - link=generator_dict.get("link"), - version=generator_dict.get("version")) - else: - generator = None - - if category_title: - ##TODO: site_objects! - category = Category.objects.get(title=category_title) - else: - category = None - - planet_feed = Feed(title=title, subtitle=subtitle, blog=blog, - url=feed_url, rights=rights, info=info, guid=guid, - image_url=image_url, icon_url=icon_url, language=language, - etag=etag, last_modified=last_modified, generator=generator, - is_active=True, last_checked=datetime.now(), - site=current_site, category=category - ) - planet_feed.save() - - for tag_dict in document.feed.get("tags", []): - name = tag_dict.get("term") - - for link_dict in feed_links: - feed_link, created = FeedLink.objects.get_or_create( - feed=planet_feed, - rel=link_dict.get("rel", "--"), - mime_type=link_dict.get("type", "text/html"), - link=link_dict.get("href", blog_url) - ) - - entries = [] - - total_results = int(document.feed.get("opensearch_totalresults", len(document.entries))) - items_per_page = int(document.feed.get("opensearch_itemsperpage", 25)) - new_posts_count = 0 - - if total_results == 0: - print("No entries to store. status: {} {}".format(document.get("status"), document.get("debug_message"))) - else: - print("Entries total count: {}".format(total_results)) - stop_retrieving = False - while (total_results > len(entries)) and not stop_retrieving: - - # retrieve and store feed posts - entries.extend(document.entries) - print("Processing {} entries".format(len(document.entries))) - - for entry in document.entries: - title = entry.get("title", "") - url = entry.get("link") - try: - guid = unicode(md5(entry.get("link")).hexdigest()) - except NameError: - guid = md5(entry.get("link").encode('utf-8')).hexdigest() - content = entry.get('description') or entry.get("content", [{"value": ""}])[0]["value"] - comments_url = entry.get("comments") - date_modified = entry.get("updated_parsed") or\ - entry.get("published_parsed") - try: - date_modified = datetime.fromtimestamp( - time.mktime(date_modified)) - except Exception: - date_modified = None - - try: - if len(Post.objects.filter(url=url, guid=guid)): - raise PostAlreadyExists - post = Post(title=title, url=url, guid=guid, content=content, - comments_url=comments_url, date_modified=date_modified, - feed=planet_feed) - # To have the feed entry in the pre_save signal - post.entry = entry - post.save() - except PostAlreadyExists: - print("Skipping post {} ({}) because already exists"\ - .format(guid, url)) - if not create: - # if it is in update-mode then stop retrieving when - # it finds repeated posts - stop_retrieving = True - else: - new_posts_count += 1 - # create post tags... - for tag_dict in entry.get("tags", []): - tag_name = tag_dict.get("term") or tag_dict.get("label") - tag_name = normalize_tag(tag_name) - - if len(tag_name) > 50: continue - - try: - if "/" in tag_name: - # For path based categories - for subtag in tag_name.split("/"): - if subtag: - # empty string if starts/ends with slash - Tag.objects.add_tag(post, '"%s"' % subtag) - else: - Tag.objects.add_tag(post, '"%s"' % tag_name) - except AttributeError as e: - print("Ignoring tag error: {}".format(e)) - # create post links... - for link_dict in entry.get("links", []): - post_link, created = PostLink.objects.get_or_create( - post=post, - rel=link_dict.get("rel", "--"), - mime_type=link_dict.get("type", "text/html"), - link=link_dict.get("href", "--"), - title=link_dict.get("title", "--") - ) - - # create and store enclosures... - if entry.get('media_thumbnail', False): - try: - media_url = entry.get('media_thumbnail').href - media_list = [{"url": media_url}] - except AttributeError: - media_list = entry.get('media_thumbnail', [{"url": None}]) - - for media in media_list: - media_url = media["url"] - mime_type, enc = mimetypes.guess_type(urlparse(media_url).path) - - post_enclosure, created = Enclosure.objects.get_or_create( - post=post, - length=0, - mime_type=mime_type, - link=media_url - ) - - for enclosure_dict in entry.get("enclosures", []): - post_enclosure = Enclosure( - post=post, - length=enclosure_dict.get("length", 0), - mime_type=enclosure_dict.get("type", ""), - link=enclosure_dict.get("href") - ) - post_enclosure.save() - - # create and store author... - author_dict = entry.get("author_detail") - - if author_dict: - author, created = Author.objects.get_or_create( - name=author_dict.get("name", ""), - email=author_dict.get("email", ""), - profile_url=author_dict.get("href") - ) - try: - PostAuthorData.objects.get(author=author, post=post) - except PostAuthorData.DoesNotExist: - pad = PostAuthorData(author=author, post=post) - pad.save() - - # create and store contributors... - for contributor_dict in entry.get("contributors", []): - contributor, created = Author.objects.get_or_create( - name=author_dict.get("name", ""), - email=author_dict.get("email", ""), - profile_url=contributor_dict.get("href") - ) - try: - PostAuthorData.objects.get(author=contributor, post=post) - except PostAuthorData.DoesNotExist: - pad = PostAuthorData(author=contributor, post=post, - is_contributor=True) - pad.save() - - # We send a post_created signal - print('post_created.send(sender=post)', post) - post_created.send(sender=post, instance=post) - - if not stop_retrieving: - opensearch_url = "{}?start-index={}&max-results={}".format(\ - feed_url, len(entries) + 1, items_per_page) - - print("retrieving {}...".format(opensearch_url)) - document = feedparser.parse(opensearch_url, agent=USER_AGENT) - - if new_posts_count: - # update last modified datetime - planet_feed.last_modified = datetime.now() - planet_feed.save() - print("{} posts were created. Done.".format(new_posts_count)) - - print() - return new_posts_count diff --git a/planet/management/commands/planet_add_feed.py b/planet/management/commands/planet_add_feed.py index 6d0b82e..12baad2 100644 --- a/planet/management/commands/planet_add_feed.py +++ b/planet/management/commands/planet_add_feed.py @@ -1,9 +1,11 @@ #!/usr/bin/env python # -*- coding: utf-8 -*- +from optparse import make_option from django.core.management.base import BaseCommand -from planet.management.commands import process_feed -from optparse import make_option + +from planet.tasks import process_feed + class Command(BaseCommand): help = "Add a complete blog feed to our db." @@ -19,9 +21,10 @@ class Command(BaseCommand): def handle(self, *args, **options): if not len(args): - print("You must provide the feed url as parameter") + self.stderr.write("You must provide the feed url as parameter") exit(0) feed_url = args[0] # process feed in create-mode - process_feed(feed_url, create=True, category_title=options['category']) + process_feed.delay(feed_url, create=True, category_title=options['category']) + self.stdout.write("Feed created. Posts scheduled to be retrived sonn.") diff --git a/planet/management/commands/planet_update_all_feeds.py b/planet/management/commands/planet_update_all_feeds.py index e7b3fff..692d571 100644 --- a/planet/management/commands/planet_update_all_feeds.py +++ b/planet/management/commands/planet_update_all_feeds.py @@ -1,9 +1,7 @@ -#!/usr/bin/env python # -*- coding: utf-8 -*- -from datetime import datetime from django.core.management.base import NoArgsCommand -from planet.management.commands import process_feed +from planet.tasks import process_feed from planet.models import Feed from planet.signals import feeds_updated @@ -12,12 +10,9 @@ class Command(NoArgsCommand): help = "Update all feeds" def handle(self, *args, **options): - new_posts_count = 0 - start = datetime.now() for feed_url in Feed.site_objects.all().values_list("url", flat=True): # process feed in create-mode - new_posts_count += process_feed(feed_url, create=False) - delta = datetime.now() - start - print("Added {} posts in {} seconds".format(new_posts_count, delta.seconds)) - feeds_updated.send(sender=self, instance=self) + self.stdout.write("Scheduling feed with URL=%s..." % feed_url) + process_feed.delay(feed_url, create=False) + feeds_updated.send(sender=self, instance=self) diff --git a/planet/management/commands/planet_update_feed.py b/planet/management/commands/planet_update_feed.py index 52e9010..11c0d32 100644 --- a/planet/management/commands/planet_update_feed.py +++ b/planet/management/commands/planet_update_feed.py @@ -3,7 +3,7 @@ from django.core.management.base import BaseCommand -from planet.management.commands import process_feed +from planet.tasks import process_feed class Command(BaseCommand): @@ -17,4 +17,4 @@ def handle(self, *args, **options): feed_url = args[0] # process feed in create-mode - process_feed(feed_url, create=False) + process_feed.delay(feed_url, create=False) diff --git a/planet/tasks.py b/planet/tasks.py new file mode 100644 index 0000000..7931d4c --- /dev/null +++ b/planet/tasks.py @@ -0,0 +1,349 @@ +# -*- coding: utf-8 -*- + +from __future__ import absolute_import + +from celery import task +from datetime import datetime +from hashlib import md5 +try: + from urlparse import urlparse +except ImportError: + from urllib.parse import urlparse + +import feedparser +import time +import mimetypes +from bs4 import BeautifulSoup + +from django.conf import settings +from django.contrib.sites.models import Site +from django.contrib.auth import get_user_model + +from tagging.models import Tag + +from planet.models import (Blog, Generator, Feed, FeedLink, Post, PostLink, + Author, PostAuthorData, Enclosure, Category) +from planet.signals import feeds_updated +from planet.signals import post_created + + +class PostAlreadyExists(Exception): + pass + + +@task +def process_feed(feed_url, owner_id=None, create=False, category_title=None): + """ + Stores a feed, its related data, its entries and their related data. + If create=True then it creates the feed, otherwise it only stores new + entries and their related data. + """ + + def normalize_tag(tag): + """ + converts things like "-noise-" to "noise" and "- noise -" to "noise" + """ + if tag.startswith("-"): + tag = tag[1:] + if tag.endswith("-"): + tag = tag[:-1] + + # fix for HTML entities + tag = BeautifulSoup(tag).prettify(formatter="html") + tag = tag.strip().lower() + return tag + + try: + USER_AGENT = settings.PLANET["USER_AGENT"] + except (KeyError, AttributeError): + print( + """Please set PLANET = {" USER_AGENT": } in your settings.py""") + exit(0) + + feed_url = str(feed_url).strip() + + try: + planet_feed = Feed.objects.get(url=feed_url) + except Feed.DoesNotExist: + planet_feed = None + + print("*" * 20) + print("Feed: {}".format(feed_url)) + + if create and planet_feed: + # can't create it due to it already exists + print("This feed already exists!") + exit(0) + + if not create and not planet_feed: + # can't update it due to it does not exist + print("This feed does not exist!") + exit(0) + + # retrieve and parse feed using conditional GET method + if not create: + modified = datetime.timetuple(planet_feed.last_modified) + etag = planet_feed.etag + # update last checked datetime + planet_feed.last_checked = datetime.now() + planet_feed.save() + else: + modified = etag = None + + document = feedparser.parse(feed_url, agent=USER_AGENT, + modified=modified, etag=etag) + + current_site = Site.objects.get(pk=settings.SITE_ID) + + if create: + # then create blog, feed, generator, feed links and feed tags + title = document.feed.get("title", "--") + subtitle = document.feed.get("subtitle") + blog_url = document.feed.get("link") + rights = document.feed.get("rights") or document.feed.get("license") + info = document.feed.get("info") + try: + guid = unicode(md5(document.feed.get("link")).hexdigest()) + except NameError: + guid = md5(document.feed.get("link").encode('utf-8')).hexdigest() + image_url = document.feed.get("image", {}).get("href") + icon_url = document.feed.get("icon") + language = document.feed.get("language") + etag = document.get("etag", '') + + updated_parsed = document.get("updated_parsed") + if updated_parsed: + last_modified = datetime.fromtimestamp(time.mktime(updated_parsed)) + else: + last_modified = datetime.now() + + feed_links = document.feed.get("links", []) + if not blog_url: + link = [item for item in feed_links if item["rel"] == "alternate"] + if link: + blog_url = link[0]["href"] + + User = get_user_model() + try: + owner = User.objects.get(pk=owner_id) + except User.DoesNotExist: + owner = None + + blog, created = Blog.objects.get_or_create( + url=blog_url, defaults={"title": title}, owner=owner) + + generator_dict = document.feed.get("generator_detail", {}) + + if generator_dict: + generator, created = Generator.objects.get_or_create( + name=generator_dict.get("name", "--"), + link=generator_dict.get("link"), + version=generator_dict.get("version")) + else: + generator = None + + if category_title: + # TODO: site_objects! + category = Category.objects.get(title=category_title) + else: + category = None + + planet_feed = Feed(title=title, subtitle=subtitle, blog=blog, + url=feed_url, rights=rights, info=info, guid=guid, + image_url=image_url, icon_url=icon_url, language=language, + etag=etag, last_modified=last_modified, generator=generator, + is_active=True, last_checked=datetime.now(), + site=current_site, category=category + ) + planet_feed.save() + + for tag_dict in document.feed.get("tags", []): + name = tag_dict.get("term") + + for link_dict in feed_links: + feed_link, created = FeedLink.objects.get_or_create( + feed=planet_feed, + rel=link_dict.get("rel", "--"), + mime_type=link_dict.get("type", "text/html"), + link=link_dict.get("href", blog_url) + ) + + entries = [] + + total_results = int( + document.feed.get("opensearch_totalresults", len(document.entries))) + items_per_page = int(document.feed.get("opensearch_itemsperpage", 25)) + new_posts_count = 0 + + if total_results == 0: + print("No entries to store. status: {} {}".format( + document.get("status"), document.get("debug_message"))) + else: + print("Entries total count: {}".format(total_results)) + stop_retrieving = False + while (total_results > len(entries)) and not stop_retrieving: + + # retrieve and store feed posts + entries.extend(document.entries) + print("Processing {} entries".format(len(document.entries))) + + for entry in document.entries: + title = entry.get("title", "") + url = entry.get("link") + try: + guid = unicode(md5(entry.get("link")).hexdigest()) + except NameError: + guid = md5(entry.get("link").encode('utf-8')).hexdigest() + content = entry.get('description') or entry.get( + "content", [{"value": ""}])[0]["value"] + comments_url = entry.get("comments") + date_modified = entry.get("updated_parsed") or\ + entry.get("published_parsed") + try: + date_modified = datetime.fromtimestamp( + time.mktime(date_modified)) + except Exception: + date_modified = None + + try: + if len(Post.objects.filter(url=url, guid=guid)): + raise PostAlreadyExists + post = Post(title=title, url=url, guid=guid, content=content, + comments_url=comments_url, date_modified=date_modified, + feed=planet_feed) + # To have the feed entry in the pre_save signal + post.entry = entry + post.save() + except PostAlreadyExists: + print("Skipping post {} ({}) because already exists" + .format(guid, url)) + if not create: + # if it is in update-mode then stop retrieving when + # it finds repeated posts + stop_retrieving = True + else: + new_posts_count += 1 + # create post tags... + for tag_dict in entry.get("tags", []): + tag_name = tag_dict.get( + "term") or tag_dict.get("label") + tag_name = normalize_tag(tag_name) + + if len(tag_name) > 50: + continue + + try: + if "/" in tag_name: + # For path based categories + for subtag in tag_name.split("/"): + if subtag: + # empty string if starts/ends with + # slash + Tag.objects.add_tag( + post, '"%s"' % subtag) + else: + Tag.objects.add_tag(post, '"%s"' % tag_name) + except AttributeError as e: + print("Ignoring tag error: {}".format(e)) + # create post links... + for link_dict in entry.get("links", []): + post_link, created = PostLink.objects.get_or_create( + post=post, + rel=link_dict.get("rel", "--"), + mime_type=link_dict.get("type", "text/html"), + link=link_dict.get("href", "--"), + title=link_dict.get("title", "--") + ) + + # create and store enclosures... + if entry.get('media_thumbnail', False): + try: + media_url = entry.get('media_thumbnail').href + media_list = [{"url": media_url}] + except AttributeError: + media_list = entry.get( + 'media_thumbnail', [{"url": None}]) + + for media in media_list: + media_url = media["url"] + mime_type, enc = mimetypes.guess_type( + urlparse(media_url).path) + + post_enclosure, created = Enclosure.objects.get_or_create( + post=post, + length=0, + mime_type=mime_type, + link=media_url + ) + + for enclosure_dict in entry.get("enclosures", []): + post_enclosure = Enclosure( + post=post, + length=enclosure_dict.get("length", 0), + mime_type=enclosure_dict.get("type", ""), + link=enclosure_dict.get("href") + ) + post_enclosure.save() + + # create and store author... + author_dict = entry.get("author_detail") + + if author_dict: + author, created = Author.objects.get_or_create( + name=author_dict.get("name", ""), + email=author_dict.get("email", ""), + profile_url=author_dict.get("href") + ) + try: + PostAuthorData.objects.get( + author=author, post=post) + except PostAuthorData.DoesNotExist: + pad = PostAuthorData(author=author, post=post) + pad.save() + + # create and store contributors... + for contributor_dict in entry.get("contributors", []): + contributor, created = Author.objects.get_or_create( + name=author_dict.get("name", ""), + email=author_dict.get("email", ""), + profile_url=contributor_dict.get("href") + ) + try: + PostAuthorData.objects.get( + author=contributor, post=post) + except PostAuthorData.DoesNotExist: + pad = PostAuthorData(author=contributor, post=post, + is_contributor=True) + pad.save() + + # We send a post_created signal + print('post_created.send(sender=post)', post) + post_created.send(sender=post, instance=post) + + if not stop_retrieving: + opensearch_url = "{}?start-index={}&max-results={}".format( + feed_url, len(entries) + 1, items_per_page) + + print("retrieving {}...".format(opensearch_url)) + document = feedparser.parse(opensearch_url, agent=USER_AGENT) + + if new_posts_count: + # update last modified datetime + planet_feed.last_modified = datetime.now() + planet_feed.save() + print("{} posts were created. Done.".format(new_posts_count)) + + print() + return new_posts_count + + +@task +def update_feeds(): + new_posts_count = 0 + start = datetime.now() + for feed_url in Feed.site_objects.all().values_list("url", flat=True): + # process feed in create-mode + new_posts_count += process_feed(feed_url, create=False) + delta = datetime.now() - start + print("Added {} posts in {} seconds".format(new_posts_count, delta.seconds)) + feeds_updated.send(sender=None, instance=None) diff --git a/requirements.txt b/requirements.txt index d4c795d..57b5086 100644 --- a/requirements.txt +++ b/requirements.txt @@ -5,3 +5,5 @@ feedparser south pinax-theme-bootstrap==3.0a4 beautifulsoup4 +celery>=3.0.0 +django-celery diff --git a/setup.py b/setup.py index be340d0..7e3fd71 100644 --- a/setup.py +++ b/setup.py @@ -12,9 +12,16 @@ author="Matias Agustin Mendez", author_email="me@matagus.com.ar", - install_requires=["feedparser", "south", "django-tagging>=0.3.1", - "django-pagination-py3", "Django>=1.4", "beautifulsoup4"], - # "pinax-theme-bootstrap==3.0a4"], + install_requires=[ + "feedparser", + "south", + "django-tagging>=0.3.1", + "django-pagination-py3", + "Django>=1.5", + "beautifulsoup4", + "celery>=3.0.0", + "django-celery" + ], packages=find_packages(), package_dir={"planet": "planet"},