Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

added indexer worker and stemming logic and db persistent logic.

started a link extractor job in hopes that can use to give the
scheduler more work to do.  hope it doesn't pull down the whole
internet

moved model Base to meta so it can be shared among other modules that
need db access and can be initialized easier

added new dependencies nltk and beautifulsoup
  • Loading branch information...
commit 2425b234465dceb94ec38bf205df5cdc69b27c5d 1 parent 0205a07
@twillis authored
View
62 src/FeederEngine/feederengine/indexer.py
@@ -0,0 +1,62 @@
+"""
+very very crude page indexer
+
+the goal is to get stems and the frequency on the page to result in a
+db row that looks like
+
+[stem] | [url] | domain | frequency | index_date
+"""
+import nltk
+from nltk.stem import snowball
+import urlparse
+import meta
+from sqlalchemy import PrimaryKeyConstraint, Table, Column, Unicode, UnicodeText, String, DateTime, Integer, or_, and_
+import datetime
+
+
+def index_page_iter(url, body):
+ body = nltk.clean_html(body)
+ tokens = nltk.word_tokenize(body)
+ domain = urlparse.urlparse(url).hostname
+ stems = [get_stemmer(body).stem(w.decode("utf-8").lower()) for w in tokens]
+ stem_freq = {}
+ for stem in stems:
+ if not stem in stem_freq:
+ stem_freq[stem] = 1
+ else:
+ stem_freq[stem] += 1
+
+ for k, v in stem_freq.items():
+ yield dict(stem=k, url=url, domain=domain, frequency=v)
+
+
+def get_stemmer(body):
+ return snowball.EnglishStemmer(ignore_stopwords=True)
+
+
+class IndexEntry(meta.Base):
+ __tablename__ = "index_entries"
+ domain = Column(Unicode)
+ frequency = Column(Integer)
+ index_date = Column(DateTime, default=datetime.datetime.now)
+ stem = Column(Unicode)
+ url = Column(Unicode)
+ __table_args__ = (PrimaryKeyConstraint("stem", "url"), {})
+
+
+def add_entries(entries):
+ ENTRY_COLUMNS = ["domain", "frequency", "stem", "url"]
+ DBSession = meta.Session()
+
+ for entry in entries:
+ args = {k.lower(): v for k, v in entry.items() \
+ if k.lower() in ENTRY_COLUMNS}
+ DBSession.add(IndexEntry(**args))
+
+
+def needs_indexed(url, threshold=10):
+ dt_threshold = datetime.datetime.now() - datetime.timedelta(minutes=threshold)
+ DBSession = meta.Session()
+ return DBSession.query(IndexEntry)\
+ .filter(IndexEntry.url == url)\
+ .filter(IndexEntry.index_date >= dt_threshold).count() == 0
View
10 src/FeederEngine/feederengine/link_extractor.py
@@ -0,0 +1,10 @@
+"""
+use beautifulsoup to extract urls
+"""
+from BeautifulSoup import BeautifulSoup, SoupStrainer
+
+
+def extract_links_iter(body):
+ for link in BeautifulSoup(body, parseOnlyThese=SoupStrainer('a')):
+ if "href" in link:
+ yield link['href']
View
4 src/FeederEngine/feederengine/meta.py
@@ -4,6 +4,8 @@
from sqlalchemy.orm import sessionmaker, scoped_session
from sqlalchemy import create_engine
from zope.sqlalchemy import ZopeTransactionExtension
+from sqlalchemy.ext.declarative import declarative_base
+
import logging
log = logging.getLogger(__name__)
@@ -13,6 +15,8 @@
# set this with the result of session_factory to initialize
Session = None
+Base = declarative_base()
+
def engine_factory():
global db_url
View
8 src/FeederEngine/feederengine/scheduler.py
@@ -2,20 +2,14 @@
given a table of urls and other information, decides when to schedule
a crawler to crawl for updates
"""
-from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, UnicodeText, String, DateTime, or_, and_
-from sqlalchemy.orm import scoped_session
import meta
import datetime
DEFAULT_JOB_COUNT = 10
-Base = declarative_base()
-
-
-
-class CrawlJobModel(Base):
+class CrawlJobModel(meta.Base):
"""
let's start with models just being the representation of a row and
not too much behavior and see how that goes.
View
46 src/FeederEngine/feederengine/workers.py
@@ -8,7 +8,8 @@
import json
import time
from crawler import crawl_url
-
+import indexer
+import datetime
log = logging.getLogger(__name__)
@@ -76,6 +77,7 @@ def work(self, should_continue):
scheduler.mark_job_scheduled(r.url)
else:
self.log.info("nothing to do, waiting......")
+
else:
self.log.debug("cleaning up....")
@@ -136,12 +138,50 @@ def work(self, should_continue):
self.log.info([presults, source in presults, destination in presults])
-class IndexWorker(object):
+class IndexWorker(KillableProcess):
"""
pulls job off queue
indexes response body[tokenize, stem, other]
+ saves to db
"""
- pass
+ def __init__(self, from_bind, db_url):
+ KillableProcess.__init__(self)
+ self._from = from_bind
+ self._db_url = db_url
+ self.log = logging.getLogger(self.__class__.__name__)
+
+ def work(self, should_continue):
+ self.log.debug("ready to work")
+ if not should_continue():
+ return
+
+ import meta
+ import transaction
+ meta.db_url = self._db_url
+ context = zmq.Context()
+ poller = zmq.Poller()
+
+ with pull_socket(context, self._from) as source:
+ self.log.debug("has socket")
+ poller = zmq.Poller()
+ poller.register(source, zmq.POLLIN)
+
+ while should_continue():
+ self.log.debug("polling socket")
+ presults = dict(poller.poll(timeout=10))
+ if source in presults:
+ self.log.debug("got work to do")
+ url, response = source.recv_multipart(zmq.NOBLOCK)
+ if indexer.needs_indexed(url):
+ self.log.debug("indexing url %s" % url)
+ index_list = list(indexer.index_page_iter(url, response))
+ # persist to database
+ if self.log.isEnabledFor("DEBUG"):
+ self.log.debug("indexing complete for url %s" % url)
+ for entry in index_list:
+ self.log.debug("url: %(url)s\n\nstem: %(stem)s (%(frequency)s)" % entry)
+ with transaction.manager:
+ indexer.add_entries(index_list)
class UpdateWorker(object):
View
4 src/FeederEngine/setup.py
@@ -12,7 +12,9 @@
"pyzmq-static",
"sqlalchemy",
"zope.sqlalchemy",
- "transaction"]
+ "transaction",
+ "nltk",
+ "beautifulsoup"]
version = '0.0'
View
8 src/FeederEngine/tests/test_scheduler.py
@@ -2,7 +2,7 @@
scheduler tests
"""
import unittest
-from feederengine import scheduler, meta
+from feederengine import scheduler, indexer, meta
from sqlalchemy import create_engine
import transaction
import datetime
@@ -15,15 +15,15 @@
def setUp(self):
"""db setup"""
- self.engine = create_engine(meta.db_url, echo=False)
+ self.engine = create_engine(meta.db_url, echo=True)
meta.Session = meta.session_factory(self.engine)
# scheduler.DBSession.configure(bind=self.engine)
- scheduler.Base.metadata.create_all(self.engine)
+ meta.Base.metadata.create_all(self.engine)
def tearDown(self):
"""db teardown"""
- scheduler.Base.metadata.drop_all(self.engine)
+ meta.Base.metadata.drop_all(self.engine)
class TestCrawlJobModel(unittest.TestCase):
View
27 src/FeederEngine/tests/test_workers.py
@@ -2,9 +2,10 @@
trying to test these things
"""
import unittest
-from feederengine.workers import SchedulerWorker, CrawlWorker, pull_socket
+from feederengine.workers import SchedulerWorker, CrawlWorker, IndexWorker, pull_socket, push_socket
from feederengine.process.base import KillableProcess
from feederengine.scheduler import CrawlJobModel, get_crawl_jobs
+from feederengine import indexer
from feederengine import meta, scheduler
from sqlalchemy import create_engine
import time
@@ -14,6 +15,7 @@
import logging
import zmq
from utils import mock, mock_rss_server
+from webob import Response
__here__ = os.path.abspath(os.path.dirname(__name__))
logging.basicConfig(level="INFO")
@@ -43,14 +45,31 @@ def setUp(self):
self.engine = create_engine(db_url,
echo=False)
meta.Session = meta.session_factory(self.engine)
- scheduler.Base.metadata.create_all(self.engine)
+ meta.Base.metadata.create_all(self.engine)
self.db_url = db_url
def tearDown(self):
- scheduler.Base.metadata.drop_all(self.engine)
+ meta.Base.metadata.drop_all(self.engine)
if os.path.isfile(self.db_path):
os.remove(self.db_path)
-
+
+ def testIndexer(self):
+ bind = "ipc:///tmp/crawler_socket"
+ context = zmq.Context()
+ iw = IndexWorker(bind, self.db_url)
+ iw.start()
+ self.assert_(iw.is_alive())
+ URL = "http://fictional.com"
+ with push_socket(context, bind) as source:
+ source.send_multipart([URL,
+ str(Response("this and that"))])
+
+ time.sleep(1)
+ self.assert_(not indexer.needs_indexed(URL))
+ iw.terminate()
+ time.sleep(.1)
+ self.assert_(not iw.is_alive())
+
def testSchedulerAndCrawler(self):
urls = [u"http://feeds.feedburner.com/43folders",
u"http://advocacy.python.org/podcasts/littlebit.rss",
Please sign in to comment.
Something went wrong with that request. Please try again.