Skip to content

Commit

Permalink
improvment
Browse files Browse the repository at this point in the history
  • Loading branch information
trecouvr committed Mar 15, 2012
1 parent 03b7169 commit 5912923
Show file tree
Hide file tree
Showing 11 changed files with 171 additions and 162 deletions.
4 changes: 2 additions & 2 deletions crawler/config.py
Expand Up @@ -9,10 +9,10 @@
PROXIES['https'] = 'sigma.utc.fr:3128'
"""

GEPHI_HOST = 'localhost'
GEPHI_HOST = '127.0.0.1'
GEPHI_PORT = 8081

MONGODB_HOST = 'localhost'
MONGODB_HOST = '127.0.0.1'
MONGODB_PORT = 8080
MONGODB_DBNAME = 'test_database'
MONGODB_COLLECTION = 'test'
90 changes: 0 additions & 90 deletions crawler/controller.py

This file was deleted.

30 changes: 14 additions & 16 deletions crawler/crawler.py
Expand Up @@ -9,8 +9,8 @@
from urlhandler import *
from extractor import *
from fetcher import *
from controller import *
from config import *
from tools import *
from robot import *


Expand All @@ -20,60 +20,58 @@ def __init__(self, n_threads_fetchers, max_depth, db_host, db_port, db_name, col
nb_ask_feeds=0):

self.queue_fetchers = LifoQueue()
self.queue_controller = Queue()
self.controller = Controller(self.queue_controller, self.queue_fetchers, max_depth, db_host, db_port, db_name, collection_name)
self.robot = Robot()
self.fetchers = [ Fetcher(self.robot, self.queue_fetchers, self.queue_controller, PROXIES) for _ in range(n_threads_fetchers) ]
self.fetchers = [ Fetcher(self.robot, self.queue_fetchers, self.queue_fetchers, max_depth, PROXIES) for _ in range(n_threads_fetchers) ]


if not feeds and nb_ask_feeds < 1:
nb_ask_feeds = 1

if nb_ask_feeds > 0:
feeds += self.controller.mongodbAPI.get_urls_to_visit(nb_ask_feeds)
feeds += self.fetcher[0].mongodbAPI.get_urls_to_visit(nb_ask_feeds)

print(feeds)

for feed in feeds:
x = {'url':self.controller.normalize_url("", feed), 'depth':0}
x = {'url':normalize_url("", feed), 'depth':0}
self.queue_fetchers.put(x)

for t in self.fetchers:
t.setDaemon(True)
t.start()
self.controller.start()

self.e_stop = threading.Event()


def loop(self):
n_inactivity = 0
while not self.e_stop.is_set():
nb_fetchers_working = 0
for fetcher in self.fetchers:
if fetcher.is_working():
nb_fetchers_working += 1
if nb_fetchers_working == 0 and not self.controller.is_working():
self.stop()
break
if nb_fetchers_working == 0:
n_inactivity += 1
if n_inactivity >= 3:
self.stop()
break
print("Nb Fetchers working : %s" % nb_fetchers_working)
print("Controller working : %s" % self.controller.is_working())
print("Queue Fetchers : %s" % self.queue_fetchers.qsize())
print("Queue Controller : %s" % self.queue_controller.qsize())
self.e_stop.wait(5)

def stop(self):
print("Closing all fetchers...")
for fetcher in self.fetchers:
fetcher.stop()
print("Closing Controller...")
self.controller.stop()
print("End")
self.e_stop.set()



if __name__ == "__main__":
c = Crawler(10, 4, MONGODB_HOST, MONGODB_PORT, MONGODB_DBNAME, MONGODB_COLLECTION,
feeds=[],
nb_ask_feeds=100)
feeds=['http://www.pornhub.com'],
nb_ask_feeds=0)
try:
c.loop()
except KeyboardInterrupt:
Expand Down
5 changes: 4 additions & 1 deletion crawler/extractor.py
Expand Up @@ -8,6 +8,9 @@
from bs4 import BeautifulSoup
from unac import unac_string


from tools import *

class Extractor:
def __init__(self, url, html):
self.url = url
Expand All @@ -29,7 +32,7 @@ def __init__(self, url, html):


def get_links(self):
return [ link.get('href') for link in self.soup.find_all('a') if link.get('href') ]
return [ normalize_url(self.url, link.get('href')) for link in self.soup.find_all('a') if link.get('href') ]


def get_keywords(self):
Expand Down
107 changes: 81 additions & 26 deletions crawler/fetcher.py
Expand Up @@ -6,14 +6,20 @@
from urlhandler import *
from extractor import *

from gephiAPI import GephiAPI
from mongodbapi import MongodbAPI

class Fetcher(threading.Thread):
def __init__(self, robot, queue_in, queue_out, proxies):
def __init__(self, robot, queue_in, queue_out, max_depth, proxies):
threading.Thread.__init__(self, name="Fetcher-%s"%id(self))
self.robot = robot
self.queue_in = queue_in
self.queue_out = queue_out
self.max_depth = max_depth
self.proxies = proxies

self.gephiAPI = GephiAPI(GEPHI_HOST, GEPHI_PORT)
self.mongodbAPI = MongodbAPI(MONGODB_HOST, MONGODB_PORT)

self.e_stop = threading.Event()

Expand All @@ -37,29 +43,78 @@ def run(self):
else:
self._is_working.set()
url = params['url']
depth = params['depth']
urlhandler = UrlHandler(self.robot, url, 5, self.proxies)
try:
urlhandler.open()
except ExceptionUrlForbid: pass
except ExceptionMaxTries: pass
except Exception as ex:
print(url,ex)
else:
print("OPENED", url)
html = urlhandler.html
try:
extractor = Extractor(url, html)
except Exception as ex:
print("ERROR", self.__class__.__name__, ex, url)
links = extractor.links
keywords = extractor.keywords
result = {
'url': url,
'links': links,
'keywords': keywords,
'depth': depth
}
self.queue_out.put(result)
if self.url_need_a_visit(url):
depth = params['depth']
html = self.get_html(url)
if html:
extractor = self.extract(html, url)
if extractor:
links = extractor.links
keywords = extractor.keywords
self.process_result(depth+1, url, links, keywords)
self._is_working.clear()


def process_result(self, depth, url, links, keywords):
#print("process gephi")
self.process_result_gephi(url, links, keywords)
#print("process db")
self.process_result_db(url, links, keywords)
#print("add links to queue")
if depth < self.max_depth:
for link in links:
result = {'url':link, 'depth':depth}
self.queue_out.put(result)

def process_result_gephi(self, url, links, keywords):
self.gephiAPI.add_node(url)
for link in links:
self.gephiAPI.add_node(link)
self.gephiAPI.add_edge(url, link)

def process_result_db(self, url, links, keywords):
self.mongodbAPI.add_page(url=url)
for link in links:
self.mongodbAPI.add_link(source=url, target=link)


def get_html(self, url):
"""
Récupérer le contenu d'une page
"""
urlhandler = UrlHandler(self.robot, self.proxies)
try:
stream = urlhandler.open(url, None, 5)
except ExceptionUrlForbid as ex:
print("ERROR", ex, "\n"+get_traceback())
except ExceptionMaxTries as ex:
print("ERROR", ex, "\n"+get_traceback())
except Exception as ex:
print(url, ex, "\n"+get_traceback())
else:
print("OPENED", url)
html = ""
try:
html = stream.read().decode()
except Exception as ex:
print(url, ex, "\n"+get_traceback())
finally:
stream.close()
return html

def extract(self, html, url):
"""
Extraires les choses importantes d'une page (liens, mots clefs, ...)
"""
try:
extractor = Extractor(url, html)
except Exception as ex:
print("ERROR", self.__class__.__name__, "extract :", ex, url, "\n"+get_traceback())
else:
return extractor

def url_need_a_visit(self, url):
p = urllib.parse.urlparse(url)
if p.scheme in ('http','https'):
return self.mongodbAPI.url_need_a_visit(url)
else:
return False
8 changes: 6 additions & 2 deletions crawler/mongodbapi.py
Expand Up @@ -4,8 +4,12 @@
import urllib.error
import threading


from tools import *


def dict_to_json(d):
return str(d).replace("'", '"')
return str(d).replace("'", '"').replace('""', '"')

class MongodbAPI:
def __init__(self, host='localhost', port=8080):
Expand Down Expand Up @@ -46,7 +50,7 @@ def _f():
try:
r = urllib.request.urlopen(url, encoded_req)
except urllib.error.URLError as ex:
print("ERROR", self.__class__.__name__, ex, "url=", url, "req=", req)
print("ERROR", self.__class__.__name__, "send :", ex, "url=", url, "req=", req, "\n"+get_traceback())
else:
return r
if block:
Expand Down
16 changes: 15 additions & 1 deletion crawler/test.py
@@ -1,5 +1,6 @@



def test_alchemy(url):
import AlchemyAPI

Expand All @@ -24,8 +25,21 @@ def test_urllib(url):
s = stream.read()
print(s)

def test_redirection(url):
import urllib.request
opener = urllib.request.FancyURLopener()
opener.addheader('User-agent', 'Galopa')
try:
stream = opener.open(url)
except Exception as ex:
print(ex)
return
s = stream.read()
print(s)


url = "http://www.google.fr"

#test_alchemy(url)
test_urllib(url)
#test_urllib(url)
test_redirection("http://www.cr-picardie.fr/spip.php?article709")

0 comments on commit 5912923

Please sign in to comment.