Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP

Loading…

Feature web crawler #51

Merged
merged 8 commits into from

4 participants

@rkononov
Collaborator

No description provided.

@frommww
Owner
  • the web_spider name is a bit weird to me. had to think about it for a bit. any reason we don't use web_crawler?

  • return if in crawl_domain forces you to concentrate to understand the logic. is there another way to do this?

  • could use some line spacing around blocks within the procedures to make it easier to scan

  • @icache and @client should probably be closer in naming maybe?

  • where's run_crawler

  • Can we put the page in ironcache and then put the item id onto MQ and have these be processed.

  • Want to include comments for using mulitiple queues and having multiple workers processing a page (look for images, look for products)

  • we could point the web_crawler to meetup.com/sfrails and have the page processor grab the image.

Collaborator

k thanks, will refactor

Collaborator
  • renamed to 'web crawler' and 'page processor'
  • i don't see other way to do this, we could ask @manveru to make review )
  • added (actually removed all secondary stuff in url_utils)
  • renamed
  • it was a draft commit, run_crawler was in second one, please use this url to review - https://github.com/iron-io/iron_worker_examples/pull/51/files
  • done
  • made separate workers - one crawler, second page processor, they are using iron_cache and iron_mq (right now single iron_mq queue, don't see why we need to use more than one)
  • done
@frommww
Owner

Readme needs to be updated to take into account the changed worker name.

@frommww frommww commented on the diff
ruby_ng/web_crawler/README.md
@@ -0,0 +1,19 @@
+# WebCrawler Worker
+
+This is an example of web crawler that just get all links on given site and follow them (recursively queue new workers if possible) to find new links and so on with limited deep and only on given domain.
+After collecting links crawler put each link into iron_cache and in iron_mq to process it with PageProcessor.
+Page processor make simple processing like - extracting all links,count number of images/css find largest image on page and calculate frequency of each word on page.
+
@frommww Owner
frommww added a note

Additional page processing could be processed within a single worker or other workers could be used (to keep the workers
task specific).

To orchestrate this, you could fire up workers from the page processor or use multiple message queues in IronMQ and
have the workers run off of these queues.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
@rkononov
Collaborator

updated with Ken's comments

ruby_ng/web_crawler/page_processor.rb
@@ -0,0 +1,96 @@
+require 'net/http'
+require 'uri'
+require 'open-uri'
@manveru
manveru added a note

that requires both net/http and uri already.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
ruby_ng/web_crawler/page_processor.rb
@@ -0,0 +1,96 @@
+require 'net/http'
+require 'uri'
+require 'open-uri'
+require 'hpricot'
@manveru
manveru added a note

why not nokogiri?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
@frommww
Owner

Roman
Can you modify this to use nokogiri? It's much more popular than hpricot. We'd like to pick up on that aspect as we publicize this.

http://stackoverflow.com/questions/2888587/nokogiri-vs-hpricot

@frommww
Owner

Also, Travis had the thought of using IronCache to record the links that are being crawled and then checking them to make sure that they haven't been crawled previously. Would probably add a counter to the links so that you can tell how many times a link came up for crawling.

Not sure if we're printing out the stats of the links crawled in the log of a master worker but if we could also add that without much trouble, that would be great.

@rkononov
Collaborator

k will move to nokogiri, about IronCache - i already using it for checking page status (crawled,found but not crawled) and also i have counter of crawled pages (in IronCache)

@rkononov
Collaborator

done, moved to nokogiri

@frommww
Owner

Roman
This is looking really good.

  • Can you modify it to delete the message from the queue after the page have been successfully processed? I want to highlight that it can be put back on the queue if the process fails.

  • Can you have the page processor routine check to see if the page is in the cache prior to processing it (so as to reduce reprocessing).

  • Related to above, it could store as the value or as part of the value the number of times the pages URL came up in the process.

  • Not sure if there's a timestamp included with the processed page details but we should probably add that. (minor thing)

@frommww
Owner

Also, you could do a mass get of the messages, put them in an array and then circle through them, deleting each message after the page link is processed.

Note that if we do delete after it's processed, we'll need to set the timeout to be longer than the default (60 sec).

@rkononov rkononov deleting messages from queue after processing
checking page in cache before processing
counting number of reprocessing
adding timestamp
getting messages with batches
9c8a909
@rkononov
Collaborator

done

@frommww

puts "Getting messages from IronMQ"

@frommww
Owner

Can we get Nokogiri in the example title and description. Will get lost if it's just Webcrawler

@treeder treeder merged commit fb27bb7 into master
@rkononov
Collaborator

@frommww sorry for long response, updated desciption 50bd9be

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Commits on Jun 27, 2012
  1. @rkononov

    web crawling/processing draft

    rkononov authored
Commits on Jun 28, 2012
  1. @rkononov
Commits on Jun 29, 2012
  1. @rkononov
Commits on Jul 3, 2012
  1. @rkononov

    added Ken's comment

    rkononov authored
  2. @rkononov
Commits on Jul 8, 2012
  1. @rkononov
Commits on Jul 27, 2012
  1. @rkononov

    deleting messages from queue after processing

    rkononov authored
    checking page in cache before processing
    counting number of reprocessing
    adding timestamp
    getting messages with batches
Commits on Jul 28, 2012
  1. @rkononov

    fixed typo

    rkononov authored
This page is out of date. Refresh to see the latest.
View
23 ruby_ng/web_crawler/README.md
@@ -0,0 +1,23 @@
+# WebCrawler Worker
+
+This is an example of web crawler that just get all links on given site and follow them (recursively queue new workers if possible) to find new links and so on with limited deep and only on given domain.
+After collecting links crawler put each link into iron_cache and in iron_mq to process it with PageProcessor.
+Page processor make simple processing like - extracting all links,count number of images/css find largest image on page and calculate frequency of each word on page.
+Additional page processing could be processed within a single worker or other workers could be used (to keep the workers
@frommww Owner
frommww added a note

Additional page processing could be processed within a single worker or other workers could be used (to keep the workers
task specific).

To orchestrate this, you could fire up workers from the page processor or use multiple message queues in IronMQ and
have the workers run off of these queues.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
+task specific).
+To orchestrate this, you could fire up workers from the page processor or use multiple message queues in IronMQ and
+have the workers run off of these queues.
+
+## Getting Started
+
+###Configure crawler
+- url = 'http://sample.com' # url to domain you want to crawl
+- page_limit = 1000 #maximum number of links to collect
+- depth = 3 #maximum deep level
+- max_workers = 2 #max number of concurrent workers to use - workers are fully recursive if this possible worker queue another worker
+- iw_token = iron token
+- iw_project_id = iron project id
+
+### Start crawler/page processor
+- upload crawler/page processor: iron_worker upload web_crawler;iron_worker upload page_processor
+- queue crawler: ruby run_crawler.rb
View
107 ruby_ng/web_crawler/page_processor.rb
@@ -0,0 +1,107 @@
+require 'open-uri'
+require 'nokogiri'
+require 'iron_cache'
+require 'iron_mq'
+
+def make_absolute(href, root)
+ return unless href
+ puts "Making absolute:#{href} with root:#{root}"
+ URI.parse(root).merge(URI.parse(href)).to_s rescue nil
+end
+
+def process_images(doc)
+ #get all images
+ images = doc.css("img")
+ #get image with highest height on page
+ largest_image = doc.search("img").sort_by { |img| img["height"].to_i }[-1]
+ largest_image = largest_image ? largest_image['src'] : 'none'
+ list_of_images = doc.search("img").map { |img| img["src"] }
+ return images, largest_image, list_of_images
+end
+
+def process_links(doc)
+ #get all links
+ links = doc.css("a")
+end
+
+def process_css(doc)
+ #find all css includes
+ css = doc.search("[@type='text/css']")
+end
+
+def process_words(doc)
+ #converting to plain text and removing tags
+ text = doc.text
+ #splitting by words
+ words = text.split(/[^a-zA-Z]/)
+ #removing empty string
+ words.delete_if { |e| e.empty? }
+ #creating hash
+ freqs = Hash.new(0)
+ #calculating stats
+ words.each { |word| freqs[word] += 1 }
+ freqs.sort_by { |x, y| y }
+end
+
+def process_page(url)
+ puts "Processing page #{url}"
+ doc = Nokogiri(open(url))
+ images, largest_image, list_of_images = process_images(doc)
+ #processing links an making them absolute
+ links = process_links(doc).map { |link| make_absolute(link['href'], url) }.compact
+ css = process_css(doc)
+ words_stat = process_words(doc)
+ puts "Number of images on page:#{images.count}"
+ puts "Number of css on page:#{css.count}"
+ puts "Number of links on page:#{links.count}"
+ puts "Largest image on page:#{largest_image}"
+ puts "Words frequency:#{words_stat.inspect}"
+ #putting all in cache
+ @iron_cache_client.items.put(CGI::escape(url), {:status => "processed",
+ :number_of_images => images.count,
+ :largest_image => CGI::escape(largest_image),
+ :number_of_css => css.count,
+ :number_of_links => links.count,
+ :list_of_images => list_of_images,
+ :words_stat => words_stat,
+ :timestamp => Time.now,
+ :processed_counter => 1}.to_json)
+
+end
+
+def get_list_of_messages
+ #100 pages per worker at max
+ max_number_of_urls = 100
+ puts "Getting messages from IronMQ"
+ messages = @iron_mq_client.messages.get(:n => max_number_of_urls, :timeout => 100)
+ puts "Got messages from queue - #{messages.count}"
+ messages
+end
+
+def increment_counter(url, cache_item)
+ puts "Page already processed, so bypassing it and incrementing counter"
+ item = JSON.parse(cache_item)
+ item["processed_counter"]+=1 if item["processed_counter"]
+ @iron_cache_client.items.put(CGI::escape(url), item.to_json)
+end
+
+
+#initializing IW an Iron Cache
+@iron_cache_client = IronCache::Client.new({"token" => params['iw_token'], "project_id" => params['iw_project_id']})
+@iron_mq_client = IronMQ::Client.new(:token => params['iw_token'], :project_id => params['iw_project_id'])
+
+#getting list of urls
+messages = get_list_of_messages
+
+#processing each url
+messages.each do |message|
+ url = CGI::unescape(message.body)
+ #getting page details if page already processed
+ cache_item = @iron_cache_client.items.get(CGI::escape(url))
+ if cache_item
+ process_page(url)
+ else
+ increment_counter(url, cache_item)
+ end
+ message.delete
+end
View
5 ruby_ng/web_crawler/page_processor.worker
@@ -0,0 +1,5 @@
+merge_gem 'iron_worker_ng'
+merge_gem 'iron_cache'
+merge_gem 'iron_mq'
+exec "page_processor.rb"
+name 'PageProcessor'
View
24 ruby_ng/web_crawler/run_crawler.rb
@@ -0,0 +1,24 @@
+require 'iron_worker_ng'
+require 'iron_cache'
+require "yaml"
+
+@config_data = YAML.load_file("../_config.yml")
+
+def params
+ {'url' => 'http://www.meetup.com/sfrails/',
+ 'page_limit' => 1000,
+ 'depth' => 3,
+ 'max_workers' => 50,
+ 'iw_token' => @config_data['iw']['token'],
+ 'iw_project_id' => @config_data['iw']['project_id']}
+end
+
+
+ng_client = IronWorkerNG::Client.new(:token => params['iw_token'], :project_id => params['iw_project_id'])
+#cleaning up cache
+cache = IronCache::Client.new({"token" => params['iw_token'], "project_id" => params['iw_project_id']})
+cache.items.put('pages_count', 0)
+#launching worker
+puts "Launching crawler"
+ng_client.tasks.create("WebCrawler", params)
+puts "Crawler launched! now open http://hud.iron.io"
View
101 ruby_ng/web_crawler/url_utils.rb
@@ -0,0 +1,101 @@
+module UrlUtils
+ def relative?(url)
+ url.match(/^http/) ? false : true
+ end
+
+ def make_absolute(potential_base, relative_url)
+ if relative_url.match(/^\//)
+ create_absolute_url_from_base(potential_base, relative_url)
+ else
+ create_absolute_url_from_context(potential_base, relative_url)
+ end
+ end
+
+ def urls_on_same_domain?(url1, url2)
+ get_domain(url1) == get_domain(url2)
+ end
+
+ def get_domain(url)
+ remove_extra_paths(url)
+ end
+
+ private
+
+ def create_absolute_url_from_base(potential_base, relative_url)
+ remove_extra_paths(potential_base) + relative_url
+ end
+
+ def remove_extra_paths(potential_base)
+ index_to_start_slash_search = potential_base.index('://')+3
+ index_of_first_relevant_slash = potential_base.index('/', index_to_start_slash_search)
+ if index_of_first_relevant_slash != nil
+ return potential_base[0, index_of_first_relevant_slash]
+ end
+ potential_base
+ end
+
+ def create_absolute_url_from_context(potential_base, relative_url)
+ if potential_base.match(/\/$/)
+ absolute_url = potential_base+relative_url
+ else
+ last_index_of_slash = potential_base.rindex('/')
+ if potential_base[last_index_of_slash-2, 2] == ':/'
+ absolute_url = potential_base+'/'+relative_url
+ else
+ last_index_of_dot = potential_base.rindex('.')
+ if last_index_of_dot < last_index_of_slash
+ absolute_url = potential_base+'/'+relative_url
+ else
+ absolute_url = potential_base[0, last_index_of_slash+1] + relative_url
+ end
+ end
+ end
+ absolute_url
+ end
+ def open_url(url)
+ url_object = nil
+ begin
+ url_object = open(url)
+ rescue
+ puts "Unable to open url: " + url
+ end
+ url_object
+ end
+
+ def update_url_if_redirected(url, url_object)
+ if url != url_object.base_uri.to_s
+ return url_object.base_uri.to_s
+ end
+ url
+ end
+
+ def parse_url(url_object)
+ doc = nil
+ begin
+ doc = Nokogiri::HTML(url_object)
+ rescue
+ puts 'Could not parse url: ' + url_object.base_uri.to_s
+ end
+ puts 'Crawling url ' + url_object.base_uri.to_s
+ doc
+ end
+
+ def find_urls_on_page(parsed_url, current_url)
+ urls_list = []
+ begin
+ parsed_url.search('a[@href]').map do |x|
+ new_url = x['href'].split('#')[0]
+ unless new_url == nil
+ if relative?(new_url)
+ new_url = make_absolute(current_url, new_url)
+ end
+ urls_list.push(new_url)
+ end
+ end
+ rescue
+ puts "could not find links"
+ end
+ urls_list
+ end
+
+end
View
94 ruby_ng/web_crawler/web_crawler.rb
@@ -0,0 +1,94 @@
+require 'open-uri'
+require 'nokogiri'
+require 'iron_worker_ng'
+require 'iron_cache'
+require 'iron_mq'
+
+load 'url_utils.rb'
+
+include UrlUtils
+
+def process_page(url)
+ puts "Processing page #{url}"
+ #adding url to cache
+ @iron_cache_client.items.put(CGI::escape(url), {:status => "found"}.to_json)
+ #pushing url to iron_mq to process page
+ result = @iron_mq_client.messages.post(CGI::escape(url))
+ puts "Message put in queue #{result}"
+end
+
+def crawl_domain(url, depth)
+ url_object = open_url(url)
+ #returning if url is empty
+ return if url_object == nil
+ parsed_url = parse_url(url_object)
+ #trying to parse url and returning if parsed url is nil
+ return if parsed_url == nil
+ #all good, scanning url for links
+ puts "Scanning URL:#{url}"
+ page_urls = find_urls_on_page(parsed_url, url)
+ puts "FOUND links:#{page_urls.count}"
+
+ page_urls.each_with_index do |page_url, index|
+ if urls_on_same_domain?(url, page_url)
+ pages_count = @iron_cache_client.items.get('pages_count').value
+ puts "Pages scanned:#{pages_count}"
+ puts "Page url #{page_url},index:#{index}"
+
+ #incrementing page counts
+ @iron_cache_client.items.put('pages_count', pages_count + 1)
+
+ return if pages_count >= params['page_limit']
+ puts "current depth:#{depth}"
+ #getting page from cache
+ page_from_cache = @iron_cache_client.items.get(CGI::escape(page_url))
+
+ if page_from_cache.nil?
+ #page not processed yet so lets process it and queue worker if possible
+ process_page(page_url) if open_url(page_url)
+ queue_worker(depth, page_url) if depth > 1
+ else
+ puts "Link #{page_url} already processed, bypassing"
+ #page_from_cache.delete
+ end
+ end
+ end
+end
+
+def queue_worker(depth, page_url)
+ p = {:url => page_url,
+ :page_limit => params["page_limit"],
+ :depth => depth - 1,
+ :max_workers => params["max_workers"],
+ :iw_token => params["iw_token"],
+ :iw_project_id => params["iw_project_id"]
+ }
+ #queueing child worker or processing page in same worker
+ workers_count = @iron_cache_client.items.get('workers_count')
+ count = workers_count ? workers_count.value : 0
+ puts "Number of workers:#{count}"
+ if count < params['max_workers'] - 1
+ #launcing new worker
+ @iron_cache_client.items.put('workers_count', count+1)
+ @iron_worker_client.tasks.create("WebCrawler", p)
+ else
+ #processing in same worker - too many workers running
+ crawl_domain(page_url, depth-1)
+ end
+ @iron_worker_client.tasks.create("PageProcessor", p)
+end
+
+#initializing IW an Iron Cache
+@iron_cache_client = IronCache::Client.new({"token" => params['iw_token'], "project_id" => params['iw_project_id']})
+@iron_worker_client = IronWorkerNG::Client.new(:token => params['iw_token'], :project_id => params['iw_project_id'])
+@iron_mq_client = IronMQ::Client.new(:token => params['iw_token'], :project_id => params['iw_project_id'])
+
+#start crawling
+crawl_domain(params['url'], params['depth']||1)
+
+#decreasing number of workers - we need this in slave workers to say that this worker finish his work
+# and system could queue new one
+
+workers_count = @iron_cache_client.items.get('workers_count')
+count = workers_count ? workers_count.value : 0
+@iron_cache_client.items.put('workers_count', count-1) if count > 0
View
6 ruby_ng/web_crawler/web_crawler.worker
@@ -0,0 +1,6 @@
+merge_gem 'iron_worker_ng'
+merge_gem 'iron_cache'
+merge_gem 'iron_mq'
+file 'url_utils.rb'
+exec "web_crawler.rb"
+name 'WebCrawler'
Something went wrong with that request. Please try again.