Skip to content

Commit

Permalink
Add python 3 support to scrapy-cluster. We use decode_responses optio…
Browse files Browse the repository at this point in the history
…n on Redis client and

value_deserializer,value_serializer option on Kafka client to handle unicode problem. We also
fix several syntax error and update several test cases.
  • Loading branch information
gas1121 committed Jul 16, 2017
1 parent dcbd36d commit 12a9da5
Show file tree
Hide file tree
Showing 30 changed files with 195 additions and 38 deletions.
2 changes: 2 additions & 0 deletions .gitignore
@@ -1,3 +1,5 @@
.vscode

# Python binaries
*.pyc

Expand Down
3 changes: 3 additions & 0 deletions .travis.yml
Expand Up @@ -20,6 +20,9 @@ env:
- docker: 1
dockerfile_name: Dockerfile.py2alpine
docker_tag_suffix: dev-alpine
- docker: 1
dockerfile_name: Dockerfile.py3
docker_tag_suffix: dev-py3

install: true

Expand Down
5 changes: 3 additions & 2 deletions crawler/crawling/distributed_scheduler.py
Expand Up @@ -282,7 +282,7 @@ def update_ipaddress(self):
try:
obj = urllib.request.urlopen(settings.get('PUBLIC_IP_URL',
'http://ip.42.pl/raw'))
results = self.ip_regex.findall(obj.read())
results = self.ip_regex.findall(obj.read().decode('utf-8'))
if len(results) > 0:
self.my_ip = results[0]
else:
Expand Down Expand Up @@ -313,7 +313,8 @@ def report_self(self):
def from_settings(cls, settings):
server = redis.Redis(host=settings.get('REDIS_HOST'),
port=settings.get('REDIS_PORT'),
db=settings.get('REDIS_DB'))
db=settings.get('REDIS_DB'),
decode_responses=True)
persist = settings.get('SCHEDULER_PERSIST', True)
up_int = settings.get('SCHEDULER_QUEUE_REFRESH', 10)
hits = settings.get('QUEUE_HITS', 10)
Expand Down
3 changes: 2 additions & 1 deletion crawler/crawling/log_retry_middleware.py
Expand Up @@ -61,7 +61,8 @@ def setup(self, settings):
if self.settings['STATS_STATUS_CODES']:
self.redis_conn = redis.Redis(host=self.settings.get('REDIS_HOST'),
port=self.settings.get('REDIS_PORT'),
db=settings.get('REDIS_DB'))
db=settings.get('REDIS_DB'),
decode_responses=True)

try:
self.redis_conn.info()
Expand Down
3 changes: 2 additions & 1 deletion crawler/crawling/pipelines.py
Expand Up @@ -111,7 +111,8 @@ def from_settings(cls, settings):
producer = KafkaProducer(bootstrap_servers=settings['KAFKA_HOSTS'],
retries=3,
linger_ms=settings['KAFKA_PRODUCER_BATCH_LINGER_MS'],
buffer_memory=settings['KAFKA_PRODUCER_BUFFER_BYTES'])
buffer_memory=settings['KAFKA_PRODUCER_BUFFER_BYTES'],
value_serializer=lambda m: m.encode('utf-8'))
except Exception as e:
logger.error("Unable to connect to Kafka in Pipeline"\
", raising exit flag.")
Expand Down
3 changes: 2 additions & 1 deletion crawler/crawling/redis_stats_middleware.py
Expand Up @@ -41,7 +41,8 @@ def setup(self, settings):
# set up redis
self.redis_conn = redis.Redis(host=settings.get('REDIS_HOST'),
port=settings.get('REDIS_PORT'),
db=settings.get('REDIS_DB'))
db=settings.get('REDIS_DB'),
decode_responses=True)

try:
self.redis_conn.info()
Expand Down
6 changes: 4 additions & 2 deletions crawler/tests/online.py
Expand Up @@ -46,7 +46,8 @@ def setUp(self):
# set up redis
self.redis_conn = redis.Redis(host=self.settings['REDIS_HOST'],
port=self.settings['REDIS_PORT'],
db=self.settings['REDIS_DB'])
db=self.settings['REDIS_DB'],
decode_responses=True)
try:
self.redis_conn.info()
except ConnectionError:
Expand All @@ -66,7 +67,8 @@ def setUp(self):
group_id="demo-id",
auto_commit_interval_ms=10,
consumer_timeout_ms=5000,
auto_offset_reset='earliest'
auto_offset_reset='earliest',
value_deserializer=lambda m: m.decode('utf-8')
)
time.sleep(1)

Expand Down
35 changes: 35 additions & 0 deletions docker/crawler/Dockerfile.py3
@@ -0,0 +1,35 @@
FROM python:3.6
MAINTAINER Madison Bahmer <madison.bahmer@istresearch.com>

# os setup
RUN apt-get update && apt-get -y install \
python-lxml \
build-essential \
libssl-dev \
libffi-dev \
python-dev \
libxml2-dev \
libxslt1-dev \
&& rm -rf /var/lib/apt/lists/*
RUN mkdir -p /usr/src/app
WORKDIR /usr/src/app

# install requirements
COPY utils /usr/src/utils
COPY crawler/requirements.txt /usr/src/app/
RUN pip install --no-cache-dir -r requirements.txt
RUN rm -rf /usr/src/utils

# move codebase over
COPY crawler /usr/src/app

# override settings via localsettings.py
COPY docker/crawler/settings.py /usr/src/app/crawling/localsettings.py

# copy testing script into container
COPY docker/run_docker_tests.sh /usr/src/app/run_docker_tests.sh

# set up environment variables

# run the spider
CMD ["scrapy", "runspider", "crawling/spiders/link_spider.py"]
27 changes: 27 additions & 0 deletions docker/kafka-monitor/Dockerfile.py3
@@ -0,0 +1,27 @@
FROM python:3.6
MAINTAINER Madison Bahmer <madison.bahmer@istresearch.com>

# os setup
RUN apt-get update
RUN mkdir -p /usr/src/app
WORKDIR /usr/src/app

# install requirements
COPY utils /usr/src/utils
COPY kafka-monitor/requirements.txt /usr/src/app/
RUN pip install --no-cache-dir -r requirements.txt
RUN rm -rf /usr/src/utils

# move codebase over
COPY kafka-monitor /usr/src/app

# override settings via localsettings.py
COPY docker/kafka-monitor/settings.py /usr/src/app/localsettings.py

# copy testing script into container
COPY docker/run_docker_tests.sh /usr/src/app/run_docker_tests.sh

# set up environment variables

# run command
CMD ["python", "kafka_monitor.py", "run"]
27 changes: 27 additions & 0 deletions docker/redis-monitor/Dockerfile.py3
@@ -0,0 +1,27 @@
FROM python:3.6
MAINTAINER Madison Bahmer <madison.bahmer@istresearch.com>

# os setup
RUN apt-get update
RUN mkdir -p /usr/src/app
WORKDIR /usr/src/app

# install requirements
COPY utils /usr/src/utils
COPY redis-monitor/requirements.txt /usr/src/app/
RUN pip install --no-cache-dir -r requirements.txt
RUN rm -rf /usr/src/utils

# move codebase over
COPY redis-monitor /usr/src/app

# override settings via localsettings.py
COPY docker/redis-monitor/settings.py /usr/src/app/localsettings.py

# copy testing script into container
COPY docker/run_docker_tests.sh /usr/src/app/run_docker_tests.sh

# set up environment variables

# run command
CMD ["python", "redis_monitor.py"]
27 changes: 27 additions & 0 deletions docker/rest/Dockerfile.py3
@@ -0,0 +1,27 @@
FROM python:3.6
MAINTAINER Madison Bahmer <madison.bahmer@istresearch.com>

# os setup
RUN apt-get update
RUN mkdir -p /usr/src/app
WORKDIR /usr/src/app

# install requirements
COPY utils /usr/src/utils
COPY rest/requirements.txt /usr/src/app/
RUN pip install --no-cache-dir -r requirements.txt
RUN rm -rf /usr/src/utils

# move codebase over
COPY rest /usr/src/app

# override settings via localsettings.py
COPY docker/rest/settings.py /usr/src/app/localsettings.py

# copy testing script into container
COPY docker/run_docker_tests.sh /usr/src/app/run_docker_tests.sh

# set up environment variables

# run command
CMD ["python", "rest_service.py"]
6 changes: 4 additions & 2 deletions kafka-monitor/kafka_monitor.py
Expand Up @@ -123,7 +123,8 @@ def _setup_stats(self):

redis_conn = redis.Redis(host=self.settings['REDIS_HOST'],
port=self.settings['REDIS_PORT'],
db=self.settings.get('REDIS_DB'))
db=self.settings.get('REDIS_DB'),
decode_responses=True)

try:
redis_conn.info()
Expand Down Expand Up @@ -455,6 +456,7 @@ def _create_consumer(self):
self.settings['KAFKA_INCOMING_TOPIC'],
group_id=self.settings['KAFKA_GROUP'],
bootstrap_servers=brokers,
value_deserializer=lambda m: m.decode('utf-8'),
consumer_timeout_ms=self.settings['KAFKA_CONSUMER_TIMEOUT'],
auto_offset_reset=self.settings['KAFKA_CONSUMER_AUTO_OFFSET_RESET'],
auto_commit_interval_ms=self.settings['KAFKA_CONSUMER_COMMIT_INTERVAL_MS'],
Expand All @@ -478,7 +480,7 @@ def _create_producer(self):
str(brokers))

return KafkaProducer(bootstrap_servers=brokers,
value_serializer=lambda m: json.dumps(m),
value_serializer=lambda m: json.dumps(m).encode('utf-8'),
retries=3,
linger_ms=self.settings['KAFKA_PRODUCER_BATCH_LINGER_MS'],
buffer_memory=self.settings['KAFKA_PRODUCER_BUFFER_BYTES'])
Expand Down
1 change: 1 addition & 0 deletions kafka-monitor/kafkadump.py
Expand Up @@ -102,6 +102,7 @@ def main():
topic,
group_id=consumer_id,
bootstrap_servers=kafka_host,
value_deserializer=lambda m: m.decode('utf-8'),
consumer_timeout_ms=settings['KAFKA_CONSUMER_TIMEOUT'],
auto_offset_reset=offset,
auto_commit_interval_ms=settings['KAFKA_CONSUMER_COMMIT_INTERVAL_MS'],
Expand Down
3 changes: 2 additions & 1 deletion kafka-monitor/plugins/action_handler.py
Expand Up @@ -17,7 +17,8 @@ def setup(self, settings):
self.extract = tldextract.TLDExtract()
self.redis_conn = redis.Redis(host=settings['REDIS_HOST'],
port=settings['REDIS_PORT'],
db=settings.get('REDIS_DB'))
db=settings.get('REDIS_DB'),
decode_responses=True)

try:
self.redis_conn.info()
Expand Down
3 changes: 2 additions & 1 deletion kafka-monitor/plugins/scraper_handler.py
Expand Up @@ -18,7 +18,8 @@ def setup(self, settings):
self.extract = tldextract.TLDExtract()
self.redis_conn = redis.Redis(host=settings['REDIS_HOST'],
port=settings['REDIS_PORT'],
db=settings.get('REDIS_DB'))
db=settings.get('REDIS_DB'),
decode_responses=True)

try:
self.redis_conn.info()
Expand Down
3 changes: 2 additions & 1 deletion kafka-monitor/plugins/stats_handler.py
Expand Up @@ -15,7 +15,8 @@ def setup(self, settings):
'''
self.redis_conn = redis.Redis(host=settings['REDIS_HOST'],
port=settings['REDIS_PORT'],
db=settings.get('REDIS_DB'))
db=settings.get('REDIS_DB'),
decode_responses=True)

try:
self.redis_conn.info()
Expand Down
3 changes: 2 additions & 1 deletion kafka-monitor/plugins/zookeeper_handler.py
Expand Up @@ -18,7 +18,8 @@ def setup(self, settings):
self.extract = tldextract.TLDExtract()
self.redis_conn = redis.Redis(host=settings['REDIS_HOST'],
port=settings['REDIS_PORT'],
db=settings.get('REDIS_DB'))
db=settings.get('REDIS_DB'),
decode_responses=True)

try:
self.redis_conn.info()
Expand Down
3 changes: 2 additions & 1 deletion kafka-monitor/tests/online.py
Expand Up @@ -59,7 +59,8 @@ def timer():
self.redis_conn = redis.Redis(
host=self.kafka_monitor.settings['REDIS_HOST'],
port=self.kafka_monitor.settings['REDIS_PORT'],
db=self.kafka_monitor.settings['REDIS_DB'])
db=self.kafka_monitor.settings['REDIS_DB'],
decode_responses=True)

def test_feed(self):
json_req = "{\"uuid\":\"mytestid\"," \
Expand Down
2 changes: 1 addition & 1 deletion redis-monitor/plugins/kafka_base_monitor.py
Expand Up @@ -39,7 +39,7 @@ def _create_producer(self, settings):
str(brokers))

return KafkaProducer(bootstrap_servers=brokers,
value_serializer=lambda m: json.dumps(m),
value_serializer=lambda m: json.dumps(m).encode('utf-8'),
retries=3,
linger_ms=settings['KAFKA_PRODUCER_BATCH_LINGER_MS'],
buffer_memory=settings['KAFKA_PRODUCER_BUFFER_BYTES'])
Expand Down
10 changes: 8 additions & 2 deletions redis-monitor/redis_monitor.py
Expand Up @@ -60,7 +60,13 @@ def setup(self, level=None, log_file=None, json=None):

self.redis_conn = redis.StrictRedis(host=self.settings['REDIS_HOST'],
port=self.settings['REDIS_PORT'],
db=self.settings['REDIS_DB'])
db=self.settings['REDIS_DB'],
decode_responses=True)
# redis_lock needs a redis connection without setting decode_responses
# to True
self.lock_redis_conn = redis.StrictRedis(host=self.settings['REDIS_HOST'],
port=self.settings['REDIS_PORT'],
db=self.settings['REDIS_DB'])

try:
self.redis_conn.info()
Expand Down Expand Up @@ -182,7 +188,7 @@ def _create_lock_object(self, key):
'''
Returns a lock object, split for testing
'''
return redis_lock.Lock(self.redis_conn, key,
return redis_lock.Lock(self.lock_redis_conn, key,
expire=self.settings['REDIS_LOCK_EXPIRATION'],
auto_renewal=True)

Expand Down
8 changes: 7 additions & 1 deletion redis-monitor/tests/online.py
Expand Up @@ -59,6 +59,11 @@ def setUp(self):
'tests.online.CustomMonitor': 100,
}
self.redis_monitor.redis_conn = redis.Redis(
host=self.redis_monitor.settings['REDIS_HOST'],
port=self.redis_monitor.settings['REDIS_PORT'],
db=self.redis_monitor.settings['REDIS_DB'],
decode_responses=True)
self.redis_monitor.lock_redis_conn = redis.Redis(
host=self.redis_monitor.settings['REDIS_HOST'],
port=self.redis_monitor.settings['REDIS_PORT'],
db=self.redis_monitor.settings['REDIS_DB'])
Expand All @@ -72,7 +77,8 @@ def setUp(self):
group_id="demo-id",
auto_commit_interval_ms=10,
consumer_timeout_ms=5000,
auto_offset_reset='earliest'
auto_offset_reset='earliest',
value_deserializer=lambda m: m.decode('utf-8')
)
sleep(1)

Expand Down
7 changes: 4 additions & 3 deletions rest/rest_service.py
Expand Up @@ -63,7 +63,7 @@ def wrapper(*args, **kw):
True,
instance.UNKNOWN_ERROR)
log_dict = deepcopy(ret_dict)
log_dict['error']['cause'] = e.message
log_dict['error']['cause'] = ""
log_dict['error']['exception'] = str(e)
log_dict['error']['ex'] = traceback.format_exc()
instance.logger.error("Uncaught Exception Thrown", log_dict)
Expand Down Expand Up @@ -102,7 +102,7 @@ def wrapper(*args, **kw):
instance = args[0]
try:
instance.validator(instance.schemas[schema_name]).validate(request.get_json())
except ValidationError, e:
except ValidationError as e:
ret_dict = instance._create_ret_object(instance.FAILURE,
None, True,
instance.BAD_SCHEMA,
Expand Down Expand Up @@ -352,7 +352,8 @@ def _setup_redis(self):
str(self.settings['REDIS_HOST']))
self.redis_conn = redis.StrictRedis(host=self.settings['REDIS_HOST'],
port=self.settings['REDIS_PORT'],
db=self.settings['REDIS_DB'])
db=self.settings['REDIS_DB'],
decode_responses=True)
self.redis_conn.info()
self.redis_connected = True
self.logger.info("Successfully connected to redis")
Expand Down

0 comments on commit 12a9da5

Please sign in to comment.