/
online.py
117 lines (98 loc) · 3.62 KB
/
online.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
'''
Online link spider test
'''
from __future__ import print_function
from future import standard_library
standard_library.install_aliases()
from builtins import next
import unittest
from unittest import TestCase
import time
import sys
from os import path
sys.path.append(path.dirname(path.dirname(path.abspath(__file__))))
import scrapy
import redis
from redis.exceptions import ConnectionError
import json
import threading, time
from crawling.spiders.link_spider import LinkSpider
from scrapy.utils.project import get_project_settings
from twisted.internet import reactor
from scrapy.crawler import CrawlerRunner
from kafka import KafkaConsumer
class CustomSpider(LinkSpider):
'''
Overridden link spider for testing
'''
name = "test-spider"
class TestLinkSpider(TestCase):
example_feed = "{\"allowed_domains\":null,\"allow_regex\":null,\""\
"crawlid\":\"abc12345\",\"url\":\"http://dmoztools.net/\",\"expires\":0,\""\
"ts\":1461549923.7956631184,\"priority\":1,\"deny_regex\":null,\""\
"cookie\":null,\"attrs\":null,\"appid\":\"test\",\"spiderid\":\""\
"test-link\",\"useragent\":null,\"deny_extensions\":null,\"maxdepth\":0}"
def setUp(self):
self.settings = get_project_settings()
self.settings.set('KAFKA_TOPIC_PREFIX', "demo_test")
# set up redis
self.redis_conn = redis.Redis(host=self.settings['REDIS_HOST'],
port=self.settings['REDIS_PORT'],
db=self.settings['REDIS_DB'])
try:
self.redis_conn.info()
except ConnectionError:
print("Could not connect to Redis")
# plugin is essential to functionality
sys.exit(1)
# clear out older test keys if any
keys = self.redis_conn.keys("test-spider:*")
for key in keys:
self.redis_conn.delete(key)
# set up kafka to consumer potential result
self.consumer = KafkaConsumer(
"demo_test.crawled_firehose",
bootstrap_servers=self.settings['KAFKA_HOSTS'],
group_id="demo-id",
auto_commit_interval_ms=10,
consumer_timeout_ms=5000,
auto_offset_reset='earliest'
)
time.sleep(1)
def test_crawler_process(self):
runner = CrawlerRunner(self.settings)
d = runner.crawl(CustomSpider)
d.addBoth(lambda _: reactor.stop())
# add crawl to redis
key = "test-spider:dmoztools.net:queue"
self.redis_conn.zadd(key, self.example_feed, -99)
# run the spider, give 20 seconds to see the url, crawl it,
# and send to kafka. Then we kill the reactor
def thread_func():
time.sleep(20)
reactor.stop()
thread = threading.Thread(target=thread_func)
thread.start()
reactor.run()
message_count = 0
m = next(self.consumer)
if m is None:
pass
else:
the_dict = json.loads(m.value)
if the_dict is not None and the_dict['appid'] == 'test' \
and the_dict['crawlid'] == 'abc12345':
message_count += 1
self.assertEquals(message_count, 1)
def tearDown(self):
keys = self.redis_conn.keys('stats:crawler:*:test-spider:*')
keys = keys + self.redis_conn.keys('test-spider:*')
for key in keys:
self.redis_conn.delete(key)
# if for some reason the tests fail, we end up falling behind on
# the consumer
for m in self.consumer:
pass
self.consumer.close()
if __name__ == '__main__':
unittest.main()