Skip to content

Commit

Permalink
adding "filter_domains" to streamcorpus_pipeline, see examples/filter…
Browse files Browse the repository at this point in the history
…_domains.yaml
  • Loading branch information
John R. Frank committed May 13, 2014
1 parent f8a2ff8 commit 7844aa7
Show file tree
Hide file tree
Showing 4 changed files with 178 additions and 0 deletions.
55 changes: 55 additions & 0 deletions examples/filter_domains.yaml
@@ -0,0 +1,55 @@
kvlayer: &kvlayer
app_name: diffeo
namespace: kba2014
storage_type: redis
storage_addresses: [ "redis.diffeo.com:6379" ]
username: root
password: secret

#logging:
# root:
# level: INFO

## This causes logs to flow into kvlayer, so the `dblogger`
## command-line tool can query them.
logging:
handlers:
dblogger:
class: dblogger.DatabaseLogHandler
storage_config: *kvlayer
root:
handlers: [ console, dblogger ]
level: DEBUG

streamcorpus_pipeline:
level: DEBUG

streamcorpus_pipeline:
root_path: /data/trec-kba

tmp_dir_path: /data/trec-kba/tmp
cleanup_tmp_files: true

reader: from_s3_chunks

incremental_transforms:
- filter_domains

filter_domains:
include_domains_path: bc-wa-domains.txt

batch_transforms: []

## to_local_* must be last, because *moves* the tmp file away
writers: [to_kvlayer]

from_s3_chunks:
aws_access_key_id_path: keys/trec-aws-s3.aws_access_key_id
aws_secret_access_key_path: keys/trec-aws-s3.aws_secret_access_key
bucket: aws-publicdatasets
s3_path_prefix: trec/kba/kba-streamcorpus-2014-v0_3_0
tries: 10
input_format: streamitem
streamcorpus_version: v0_3_0
gpg_decryption_key_path: keys/trec-kba-rsa.gpg-key.private

86 changes: 86 additions & 0 deletions streamcorpus_pipeline/_filters.py
Expand Up @@ -6,8 +6,17 @@
'''
from __future__ import absolute_import
from itertools import imap, chain
import logging
import re
from urlparse import urlparse

from streamcorpus_pipeline.stages import Configured


logger = logging.getLogger(__name__)


class debug_filter(Configured):
'''Remove all stream items except specified ones.
Expand Down Expand Up @@ -118,3 +127,80 @@ def __call__(self, si, context):

return si


def domain_name_cleanse(raw_string):
'''extract a lower-case, no-slashes domain name from a raw string
that might be a URL
'''
try:
parts = urlparse(raw_string)
domain = parts.netloc.split(':')[0]
except:
domain = ''
if not domain:
domain = raw_string
if not domain:
return ''
domain = re.sub('\/', '', domain.strip().lower())
return domain

def domain_name_left_cuts(domain):
'''returns a list of strings created by splitting the domain on
'.' and successively cutting off the left most portion
'''
cuts = []
if domain:
parts = domain.split('.')
for i in range(len(parts)):
cuts.append( '.'.join(parts[i:]))
return cuts

class filter_domains(Configured):
'''Remove stream items that are not from a specific domain by
inspecting first :attr:`~StreamItem.abs_url` and then
:attr:`~StreamItem.schost`
.. code-block:: yaml
filter_domains:
include_domains: [example.com]
include_domains_path:
- path-to-file-with-one-domain-per-line.txt
- path-to-file-with-one-domain-per-line2.txt
'''
config_name = 'filter_domains'
default_config = { 'include_domains': [] }
def __init__(self, config, *args, **kwargs):
super(filter_domains, self).__init__(config, *args, **kwargs)

## cleanse the input domains lists
self.domains = set()

include_domains = self.config.get('include_domains', [])
map(self.domains.add, imap(domain_name_cleanse, include_domains))

include_domains_path = self.config.get('include_domains_path', [])
if not isinstance(include_domains_path, list):
include_domains_path = [include_domains_path]
map(self.domains.add, imap(
domain_name_cleanse,
chain(*imap(open, include_domains_path))))

logger.info('filter_domains configured with %d domain names',
len(self.domains))
logger.info('filter_domains.domains = %r', self.domains)

def __call__(self, si, context=None):
url_things = [si.abs_url, si.schost]
for thing in url_things:
if not thing:
continue
for cut in domain_name_left_cuts(domain_name_cleanse(thing)):
if cut in self.domains:
logger.info('found: %r', cut)
return si

## otherwise return None, which excludes the stream item
logger.critical('rejecting: %r %r', si.schost, si.abs_url)
return None
2 changes: 2 additions & 0 deletions streamcorpus_pipeline/stages.py
Expand Up @@ -29,6 +29,7 @@
.. autoclass:: streamcorpus_pipeline._docx_to_text.docx_to_text
.. autoclass:: streamcorpus_pipeline._title.title
.. autoclass:: streamcorpus_pipeline._filters.debug_filter
.. autoclass:: streamcorpus_pipeline._filters.filter_domains
.. autoclass:: streamcorpus_pipeline._dedup.dedup
.. autoclass:: streamcorpus_pipeline._dump_label_stats.dump_label_stats
.. autoclass:: streamcorpus_pipeline._filters.exclusion_filter
Expand Down Expand Up @@ -348,6 +349,7 @@ def __init__(self, *args, **kwargs):
self.tryload_stage('_docx_to_text', 'docx_to_text')
self.tryload_stage('_title', 'title')
self.tryload_stage('_filters', 'debug_filter')
self.tryload_stage('_filters', 'filter_domains')
self.tryload_stage('_dedup', 'dedup')
self.tryload_stage('_dump_label_stats', 'dump_label_stats')
self.tryload_stage('_filters', 'exclusion_filter')
Expand Down
35 changes: 35 additions & 0 deletions streamcorpus_pipeline/tests/test_filters.py
@@ -0,0 +1,35 @@

from streamcorpus import make_stream_item
from streamcorpus_pipeline._filters import filter_domains, domain_name_cleanse, domain_name_left_cuts

def test_filter_domains(tmpdir):

domains_path = tmpdir.join('domains_path.txt')
domains_path.write('cats.com\nhttp://birds.com/')

stage = filter_domains(dict(
include_domains = ['dogs.com'],
include_domains_path = str(domains_path),
))

assert stage.domains == set(['dogs.com', 'cats.com', 'birds.com'])

si = make_stream_item(0, 'http://dogs.com/')
assert stage(si) is not None

si = make_stream_item(0, 'http://cats.com/')
assert stage(si) is not None

si = make_stream_item(0, 'http://birds.com/')
assert stage(si) is not None

si = make_stream_item(0, 'http://things.com/')
assert stage(si) is None

si = make_stream_item(0, 'http://things.com/')
si.schost = 'https://birds.com'
assert domain_name_cleanse(si.schost) == 'birds.com'
assert stage(si) is not None

def test_domain_name_left_cuts():
assert domain_name_left_cuts('www.5.cars.com') == ['www.5.cars.com', '5.cars.com', 'cars.com', 'com']

0 comments on commit 7844aa7

Please sign in to comment.