Skip to content

Commit

Permalink
Downloads CRX list, yields IDs from generator
Browse files Browse the repository at this point in the history
  • Loading branch information
mmabey committed Oct 12, 2016
1 parent 1583356 commit 0b12679
Show file tree
Hide file tree
Showing 17 changed files with 326 additions and 17 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Expand Up @@ -4,3 +4,6 @@
images
src/dbling_conf.json
src/chrome_ext.db
*.merl
creds/
.vagrant/
1 change: 1 addition & 0 deletions crawl/__init__.py
@@ -0,0 +1 @@
# *-* coding: utf-8 *-*
9 changes: 9 additions & 0 deletions crawl/celery.py
@@ -0,0 +1,9 @@
# *-* coding: utf-8 *-*
from celery import Celery

app = Celery('crawl', broker='amqp://', backend='amqp://', include=['crawl.tasks'])
app.config_from_object('crawl.celeryconfig')


if __name__ == '__main__':
app.start()
31 changes: 31 additions & 0 deletions crawl/celeryconfig.py
@@ -0,0 +1,31 @@
# *-* coding: utf-8 *-*
# ## To use Eventlet concurrency, Start worker with -P eventlet
# Never use the worker_pool setting as that'll patch
# the worker too late.
#
# The default concurrency number is the number of CPU’s on that machine (including cores), you can specify a custom
# number using -c option. There is no recommended value, as the optimal number depends on a number of factors, but if
# your tasks are mostly I/O-bound then you can try to increase it, experimentation has shown that adding more than
# twice the number of CPU’s is rarely effective, and likely to degrade performance instead.

from celery.schedules import crontab
from datetime import timedelta


BROKER_URL = 'amqp://'
CELERY_RESULT_BACKEND = 'amqp://'
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_ACCEPT_CONTENT = ['json']


# A "beat" service can be started with `celery -A proj beat` that uses the time information to periodically start
# the specified task. See http://docs.celeryproject.org/en/latest/tutorials/daemonizing.html for infor on daemonizing
# this kind of service.
CELERYBEAT_SCHEDULE = {
'download-every-12-hrs': {
'task': 'crawl.tasks.start_list_download',
# 'schedule': crontab(minute=42, hour='9,21'),
'schedule': timedelta(seconds=20),
},
}
1 change: 1 addition & 0 deletions crawl/chrome_db.py
1 change: 1 addition & 0 deletions crawl/crx_conf.py
26 changes: 26 additions & 0 deletions crawl/db.py
@@ -0,0 +1,26 @@
# *-* coding: utf-8 *-*

# Some code borrowed from http://www.prschmid.com/2013/04/using-sqlalchemy-with-celery-tasks.html


from celery import Task
from sqlalchemy.orm import scoped_session, sessionmaker

from crawl.chrome_db import DB_ENGINE

db_session = scoped_session(sessionmaker(
autocommit=False, autoflush=False, bind=DB_ENGINE))


class SqlAlchemyTask(Task):
"""An abstract Celery Task that ensures that the connection the the
database is closed on task completion"""

abstract = True

def run(self, *args, **kwargs):
"""The body of the task executed by workers."""
raise NotImplementedError('Tasks must define the run method.')

def after_return(self, status, retval, task_id, args, kwargs, einfo):
db_session.remove()
57 changes: 57 additions & 0 deletions crawl/serius.py
@@ -0,0 +1,57 @@
# *-* coding: utf-8 *-*
from json import dumps, loads
from json.decoder import JSONDecodeError
from unicodedata import category as unicat

from pymemcache.client.base import Client

__all__ = ['Cacher', 'DeserializeError', 'make_keyable']

HOST = 'localhost'
PORT = 12321


class DeserializeError(ValueError):
"""Raised when deserializer finds an invalid format flag."""


def json_serializer(key, value):
"""Serialize value into JSON.
Based on the code in the documentation at:
https://pymemcache.readthedocs.io/en/latest/apidoc/pymemcache.client.base.html
:param key: The key. Not used in this function.
:param value: Any serializable object.
:return: JSON of the object
:rtype: str
"""
if isinstance(value, str):
return value, 1
return dumps(value), 2


def json_deserializer(key, value, flag):
"""Deserialize value back to original format.
:param key: The key. Not used in this function.
:param value: The JSON of the original value.
:type value: str
:param flag: The flag set in the serializer function. 1 means it was
already a string. 2 means it was serialized to JSON.
:return: The original value stored at the memcache key ``key``.
"""
opts = {1: lambda v: v,
2: lambda v: loads(v)}
try:
opts[flag](value)
except (KeyError, JSONDecodeError):
raise DeserializeError('Unknown serialization format.')


def make_keyable(key):
"""Remove all whitespace and control characters w/max length 250."""
return (''.join(ch for ch in key if unicat(ch)[0] not in 'CZ'))[:250]


Cacher = Client((HOST, PORT), serializer=json_serializer, deserializer=json_deserializer)
79 changes: 79 additions & 0 deletions crawl/tasks.py
@@ -0,0 +1,79 @@
# *-* coding: utf-8 *-*

import logging
from uuid import uuid4 as uuid

from crawl.celery import app
from crawl.serius import Cacher, make_keyable
# from crawl.db import db_session,SqlAlchemyTask
from crawl.webstore_iface import *
from crawl.crx_conf import conf as _conf
from crawl.util import calc_chrome_version

STAT_PREFIX = 'dbling:'
CHROME_VERSION = calc_chrome_version(_conf['version'], _conf['release_date'])
DOWNLOAD_URL = _conf['url']


@app.task
def start_list_download(show_progress=False):
_run_id = uuid()
count = 0
for crx in download_crx_list(_conf['extension_list_url'], show_progress=show_progress):
# Add to database
# Add to download queue
logging.info(crx)
count += 1
if count > 10:
logging.warning('Wahoo! I\'m gettin outta here!')
break

# email that _run_id has completed
pass


@app.task
def log_it(action, crx_id, lvl=logging.DEBUG):
logging.log(lvl, '%s %s complete' % (crx_id, action))


@app.task
def stat(stat_type):
"""Use memcached to store/retrieve the current stats dictionary.
See https://www.tutorialspoint.com/memcached/index.htm for good documentation.
:param stat_type: The string describing the statistic.
:type stat_type: str
"""
cache_key = make_keyable('{}{}'.format(STAT_PREFIX, stat_type))
ret = Cacher.incr(cache_key, 1, noreply=False)

# incr returns None if the key wasn't found, so let's add it
if ret is None:
# add() will be False if the key already exists, meaning we hit a race condition
if not Cacher.add(cache_key, 1, noreply=False):
stat.retry(stat_type)
else:
# Add this key to the list of keys
if not Cacher.append(STAT_PREFIX + 'KEYS', '{}\n'.format(stat_type), noreply=False):
# Key must not have already existed... Here we go again.
if not Cacher.add(STAT_PREFIX + 'KEYS', '{}\n'.format(stat_type)):
# At this point, why even check for race conditions?
Cacher.append(STAT_PREFIX + 'KEYS', '{}\n'.format(stat_type), noreply=False)


def save_stats():
"""Retrieve stats from memcached and save them to disk."""
# TODO: Retrieve stats
# TODO: Delete caches
raise NotImplementedError




# # Example database-using task
# @app.task(base=SqlAlchemyTask)
# def get_from_db(user_id):
# user = db_session.query(User).filter(id=user_id).one()
# # do something with the user
1 change: 1 addition & 0 deletions crawl/util.py
92 changes: 92 additions & 0 deletions crawl/webstore_iface.py
@@ -0,0 +1,92 @@
#!/usr/bin/env python3
# *-* coding: utf-8 *-*


import logging
from os import path
from lxml import etree
import requests
from requests.exceptions import ChunkedEncodingError
from time import sleep

__all__ = ['download_crx_list', 'ListDownloadFailedError']

LOG_PATH = path.join(path.dirname(path.realpath(__file__)), '../log', 'crx.log')
DBLING_DIR = path.abspath(path.join(path.dirname(path.realpath(__file__)), '..'))
DONT_OVERWRITE_DOWNLOADED_CRX = False
CHUNK_SIZE = 512
NUM_HTTP_RETIRES = 5


class ListDownloadFailedError(ConnectionError):
"""Raised when the list download fails."""


def download_crx_list(ext_url, session=None, show_progress=False):
"""Generate list of extension IDs downloaded from Google.
:param ext_url:
:param session:
:type session: requests.Session
:param show_progress:
:return:
"""
logging.info('Downloading list of extensions from Google.')
resp = _http_get(ext_url, session, stream=False)
if resp is None:
logging.critical('Failed to download list of extensions.')
raise ListDownloadFailedError('Unable to download list of extensions.')

# Save the list
local_sitemap = path.join(DBLING_DIR, 'src', 'chrome_sitemap.xml')
with open(local_sitemap, 'wb') as fout:
for chunk in resp.iter_content(chunk_size=None):
fout.write(chunk)
del resp

logging.debug('Download finished. Parsing XML and yielding extension IDs.')
xml_tree_root = etree.parse(local_sitemap).getroot() # Downloads for us from the URL
ns = '{http://www.sitemaps.org/schemas/sitemap/0.9}'

count = 0
for url_elm in xml_tree_root.iterfind(ns + 'url'):
yield path.basename(url_elm.findtext(ns + 'loc'))[:32] # This is the CRX ID
count += 1
if show_progress and not count % 1000:
print('.', end='', flush=True)

if show_progress:
print(count, flush=True)
logging.debug('Done parsing list of extensions.')


class RetryRequest(object):
"""Wraps functions that make HTTP requests, retries on failure."""

def __init__(self, f):
self.f = f

def __call__(self, *args, **kwargs):
resp = None
for i in range(NUM_HTTP_RETIRES):
try:
resp = self.f(*args, **kwargs)
except ChunkedEncodingError:
sleep(10 * (i+1))
else:
break
return resp


@RetryRequest
def _http_get(url, session=None, stream=True):
"""
:param url:
:param session:
:return:
"""
if session is None:
return requests.get(url, stream=stream)
elif isinstance(session, requests.Session):
return session.get(url, stream=stream)
17 changes: 7 additions & 10 deletions requirements.txt
@@ -1,13 +1,10 @@
netifaces
docopt
pexpect
beautifulsoup4
requests
matplotlib
networkx
pyparsing==1.5.7
colorama
lxml
sqlalchemy
plotly
selenium
crx_unpack>=0.0.3
celery
eventlet
pymemcache
ansible
pyuefi>=0.1
aiodns
2 changes: 2 additions & 0 deletions run_crawler.sh
@@ -0,0 +1,2 @@
#!/bin/sh
celery -A crawl worker -P eventlet -l info
2 changes: 2 additions & 0 deletions run_download_start.sh
@@ -0,0 +1,2 @@
#!/bin/sh
celery -A crawl beat
9 changes: 7 additions & 2 deletions src/chrome_db.py
Expand Up @@ -16,10 +16,15 @@
db_conf = json.load(fin)['db']


if uname().nodename != db_conf['nodename']:
try:
_nodename = uname().nodename
except AttributeError:
# Python < 3.3 doesn't return a named tuple
_nodename = uname()[1]
if _nodename != db_conf['nodename']:
create_str = '{}://{}:{}@{}'.format(db_conf['type'], db_conf['user'], db_conf['pass'],
path.join(db_conf['full_url'], db_conf['name']))
DB_ENGINE = create_engine(create_str)
DB_ENGINE = create_engine(create_str, convert_unicode=True, pool_recycle=3600, pool_size=10)
elif 'sqlalchemy.url' not in db_conf:
create_str = db_conf['type'] + '://'
if len(db_conf['user']):
Expand Down
2 changes: 1 addition & 1 deletion src/crx_conf.json → src/crx_conf.py
@@ -1,4 +1,4 @@
{
conf = {
"version": "49.0",
"release_date": [2016, 3, 9],
"url": "https://clients2.google.com/service/update2/crx?response=redirect&prodversion={}&x=id%3D{}%26installsource%3Dondemand%26uc",
Expand Down

0 comments on commit 0b12679

Please sign in to comment.