Navigation Menu

Skip to content

Commit

Permalink
add new probability vectors system; misc other minor updates
Browse files Browse the repository at this point in the history
  • Loading branch information
saizai committed Mar 4, 2010
1 parent 00bf82a commit 5173ed0
Show file tree
Hide file tree
Showing 16 changed files with 422 additions and 49 deletions.
1 change: 1 addition & 0 deletions .gitignore
Expand Up @@ -15,4 +15,5 @@ public/stylesheets/all*.css
config/mail.yml
db/top-1m.csv*
tmp/*
*.lock

6 changes: 4 additions & 2 deletions app/controllers/scrapings_controller.rb
Expand Up @@ -4,7 +4,9 @@ def new
end

def create
if params[:cookie].blank?
if File.exist?(File.join(RAILS_ROOT, 'update.lock'))
@error_msg = "Our sites database is currently being updated. Please wait a few minutes and try again."
elsif params[:cookie].blank?
@error_msg = "Please enter a unique code."
else
@current_user = User.find_by_cookie(params[:cookie])
Expand Down Expand Up @@ -77,7 +79,7 @@ def results
end
else
render :update do |page|
page['status'].replace_html "Processing... #{@scraping.found_visitations_count} hits found. #{@scraping.visitations_count} processed so far of #{@scraping.served_urls} scraped. \
page['status'].replace_html "Processing ##{@scraping.id}... #{@scraping.found_visitations_count} hits found. #{@scraping.visitations_count} processed so far of #{@scraping.served_urls} scraped. \
#{WORKLING_CLIENT.stats.first[1]['curr_items']} jobs in queue."
end
end
Expand Down
79 changes: 79 additions & 0 deletions app/models/probability_vector.rb
@@ -0,0 +1,79 @@
class ProbabilityVector < ActiveRecord::Base
belongs_to :user
belongs_to :site
# hits, tests, avg

# on create:
# 0. abort if negative and no other users have hit the site
# 1a. if visited_users_count > 0: update aggregate a = (a * [n-1]/n) + (value/n), users_count += 1; visited_users_count += 1; add pv
# 1b. if not: create PVs for all other users' negative tests; update aggreagte a = 1/n, users_count = n; visited_users_count = 1; add pv

# on update:
# 1. increment tests; increment hits if true
# 2. update aggregate a = a + (new avg - old avg)/n

# site_results should be a hash of {siteid => visited, ...}
def self.report user_id, site_results
self.transaction do
prior_pvs = self.find(:all, :lock => true, :conditions => ["site_id in (?) and user_id = ?", site_results.keys, user_id]).inject({}){|m,v| m[v.site_id] = v; m }
new_hits = [] # site_id
new_pvs = [] # [site_id, user_id, tests, hits, avg]
sites = Site.find(:all, :lock => true, :conditions => ["id in (?)", site_results.keys], :select => "id, visited_users_count, users_count, avg_visited").inject([]) do |m,s|
visited_now = site_results[s.id]
new_avg_visited, new_users_count, new_visited_users_count = -s.avg_visited, s.users_count, s.visited_users_count
group = if prior_pvs[s.id] # already have a PV, just update it
pv = prior_pvs[s.id]
new_pv_hits = (visited_now ? pv.hits + 1 : pv.hits)
new_avg = (new_pv_hits.to_f / (pv.tests + 1))
new_avg_visited += ((new_avg - pv.avg) / new_users_count) # change the aggregate by the weighted delta of this user's PV
new_pvs << [s.id, user_id, pv.tests + 1, new_pv_hits, new_avg]
new_visited_users_count += 1 if visited_now and pv.hits == 0 # up the aggregated visited_users_count if this is our first *hit*
:old
else # new for this user
if new_visited_users_count > 0 # other users have hit this, no need to lazy-add their stuff
new_avg_visited = (new_avg_visited * (new_users_count - 1) / new_users_count) + ((visited_now ? 1 : 0).to_f / new_users_count)
new_visited_users_count += 1 if visited_now
new_users_count += 1
new_pvs << [s.id, user_id, 1, (visited_now ? 1 : 0), 1.0]
:new_existing
else
if visited_now # lazy-add the other users' PVs
new_hits << s.id
new_pvs << [s.id, user_id, 1, 1, 1.0] # will also add ones for the others' down below
:new_hit
else # don't create new PVs unless it's a hit or someone else had a hit
:noop
end
end
end
m << {:id => s.id, :group => group, :avg_visited => -new_avg_visited, :users_count => new_users_count, :visited_users_count => new_visited_users_count}
m
end

if !new_hits.empty?
# Get all the users who "ought" to have PVs for these sites
nohit_counts = Hash.new(0)
Visitation.find(:all, :conditions => ['site_id IN (?)', new_hits], :joins => :scraping, :group => "site_id",
:select => "group_concat(user_id) as user_ids, site_id").map{|v|
users = v.user_ids.split(',').map(&:to_i).inject(Hash.new(0)){|mm,vv| mm[vv] += 1 ;mm } # hash {user_id => count, ...}
users.each{|user, count| new_pvs << [v.site_id, user, count, 0, 0.0]
nohit_counts[v.site_id] = users.count } }
sites.each do |s|
if s[:group] == :new_hit
n = nohit_counts[s[:id]] + 1
s[:users_count] = n
s[:visited_users_count] = 1
s[:avg_visited] = -1.0 / n
end
end
end

# Update everything, releasing their locks
Site.import [:id, :users_count, :visited_users_count, :avg_visited], sites.map{|v| [v[:id], v[:users_count], v[:visited_users_count], v[:avg_visited]] },
:validate => false, :on_duplicate_key_update => [:users_count, :visited_users_count, :avg_visited]
self.import [:site_id, :user_id, :tests, :hits, :avg], new_pvs, :validate => false, :on_duplicate_key_update => [:tests, :hits, :avg] if !new_pvs.empty?
end
end


end
51 changes: 51 additions & 0 deletions app/models/site.rb
Expand Up @@ -4,5 +4,56 @@ class Site < ActiveRecord::Base
has_many :found_visitations, :class_name => 'Visitation', :conditions => 'visited = 1'
has_many :found_scrapings, :class_name => 'Scraping', :through => :found_visitations, :source => :scraping

# NOTE: avg_visited is stored NEGATIVE to the real value, because mysql cannot use an index if ORDER BY has mixed ascendency. It's a kludge.

validates_presence_of :url, :alexa_rank, :users_count

def self.avg_probability_vector site_ids = nil
if site_ids
self.find(:all, :conditions => ['id IN (?)', site_ids], :select => 'id, avg_visited').inject({}){|m,x| m[x.id] = -x.avg_visited; m}
else
self.find(:all, :select => 'id, avg_visited').inject({}){|m,x| m[x.id] = -x.avg_visited; m}
end
end

def self.avg_url_probabilities site_ids = nil
if site_ids
self.find(:all, :conditions => ['id IN (?)', site_ids], :select => 'url, avg_visited').inject({}){|m,x| m[x.url] = -x.avg_visited; m}
else
self.find(:all, :select => 'url, avg_visited').inject({}){|m,x| m[x.url] = -x.avg_visited; m}
end
end

def self.update_user_counts sites = nil
sites = sites.map(&:to_i) if sites
return if sites.empty?
ActiveRecord::Base.connection.execute "UPDATE sites INNER JOIN ( \
SELECT v.site_id, count(DISTINCT user_id) AS count_distinct_user_id FROM `scrapings` INNER JOIN ( \
SELECT site_id, scraping_id, visited FROM `visitations` WHERE visited = 1 #{ 'AND site_id IN (' + sites.join(',') + ')' if sites }) \
as v ON scrapings.id = v.scraping_id group by site_id) \
as s on s.site_id = sites.id \
SET users_count = count_distinct_user_id;"
end

def self.version
unless v = Rails.cache.increment('sites_version', 0)
Rails.cache.write 'sites_version', 0
self.version! # increment seems to mess with the cache format
v = 1
end
v
end

def self.version!
Rails.cache.increment 'sites_version', 1
end

def self.get offset, batch_size = 500
key = "sites_#{offset}_#{batch_size}_#{self.version}"
unless r = Rails.cache.read(key)
r = Site.find(:all, :limit => batch_size, :offset => offset, :order => 'avg_visited, alexa_rank', :select => 'id, url, alexa_rank, avg_visited')
Rails.cache.write key, r
end
r
end
end
24 changes: 7 additions & 17 deletions app/models/user.rb
Expand Up @@ -4,6 +4,7 @@ class User < ActiveRecord::Base
has_many :visitations, :through => :successful_scrapings
has_many :found_visitations, :through => :successful_scrapings
has_many :unfound_visitations, :through => :successful_scrapings
has_many :probability_vectors

validates_presence_of :cookie
validates_uniqueness_of :cookie
Expand All @@ -15,28 +16,17 @@ def wipe_blanks
self.email = nil if email.blank?
end

def probability_vector
found_site_ids = found_visitations.find(:all, :select => 'site_id').map(&:site_id)
visitations.find(:all, :group => 'site_id', :select => 'site_id, AVG(visited) as prob',
:conditions => ["site_id IN (?)", found_site_ids]).inject({}){|m, x| m[x.site_id] = x.prob.to_f; m }
def probability_vector site_ids = nil
if site_ids
probability_vectors.find(:all, :conditions => ['site_id IN (?)', site_ids]).inject({}){|m,x| m[x.site_id] = x.avg}
else
probability_vectors.inject({}){|m,x| m[x.site_id] = x.avg}
end
end

def url_probabilities prob = nil
prob ||= probability_vector
Site.find(prob.keys).inject({}){|m,x| m[x.url] = prob[x.id]; m }
end

# FIXME: Make this make each user weight 1, i.e. each visitation weighted (1 / # scrapings for this user-site)
def self.avg_probability_vector site_ids = nil
site_ids ||= Visitation.find(:all, :select => 'DISTINCT site_id', :conditions => 'visited = 1').map(&:site_id)
successful_scraping_ids = Scraping.find(:all, :select => 'id', :conditions => 'found_visitations_count > 0').map(&:id)
Visitation.find(:all, :group => 'site_id', :select => 'site_id, AVG(visited) as prob',
:conditions => ["site_id IN (?) AND scraping_id IN (?)", site_ids, successful_scraping_ids]).inject({}){|m, x|
m[x.site_id] = x.prob.to_f; m }
end

def self.avg_url_probabilities site_ids = nil
prob = avg_probability_vector(site_ids)
Site.find(prob.keys).inject({}){|m,x| m[x.url] = prob[x.id]; m }
end
end
2 changes: 1 addition & 1 deletion app/views/scrapings/error.js.erb
@@ -1 +1 @@
<%= update_page {|page| page['status'].replace_html @error_msg; page['cookie_form'].show; } %>
<%= update_page {|page| page['status_0'].replace_html @error_msg; page['cookie_form'].show; } %>
7 changes: 4 additions & 3 deletions app/views/scrapings/new.html.erb
Expand Up @@ -60,9 +60,10 @@
<p>Are you a geek? Please read the <%= link_to 'geektastic about page', about_url %> for technical details.</p>

<p>CSS Fingerprint is a research project inspired by the EFF's <a href="http://panopticlick.eff.org">Panopticlick</a>.</p>

<p>Its intent is to see how well the <a href="http://ha.ckers.org/weird/CSS-history-hack.html">CSS history hack</a> can be used with "fuzzy" AI techniques to uniquely fingerprint users
<i>despite changes in their browsing history, even on new computers or new browsers</i>, and to tell how socially/culturally similar any two users are.</p>

<p>Its intent is to see how well the <a href="http://ha.ckers.org/weird/CSS-history-hack.html">CSS history hack</a> can be used with "fuzzy" artificial intelligence
techniques to uniquely fingerprint users <i>despite changes in their browsing history, even on new computers or new browsers</i>, and to tell how socially/culturally
similar any two users are.</p>

<p>At the moment, the AI component is not yet active. In order to write it, I need data.</p>

Expand Down
35 changes: 18 additions & 17 deletions app/workers/scraping_worker.rb
Expand Up @@ -5,24 +5,25 @@ class ScrapingWorker < Workling::Base
BG_LOGGER = Logger.new(logfile)
BG_LOGGER.debug "#{Time.now.to_s}: Loading ScrapingWorker. Return store: #{Workling.return.inspect}"

def process_results(options)
Workling.return.set options[:uid], "Starting results calculation..."
scraping = Scraping.find(options[:scraping_id])
BG_LOGGER.debug "#{Time.now.to_s}: #{options[:uid]}: Starting results for scraping #{scraping.id}"
sites = scraping.found_sites.find(:all, :select => :url).map(&:url)
Workling.return.set options[:uid], "Calculating results... 1/5"
unfound_sites = scraping.unfound_sites.find(:all, :select => :url).map(&:url)
Workling.return.set options[:uid], "Calculating results... 2/5"
pv = scraping.user.probability_vector
Workling.return.set options[:uid], "Calculating results... 3/5"
probabilities = scraping.user.url_probabilities(pv)
Workling.return.set options[:uid], "Calculating results... 4/5"
avg_up = User.avg_url_probabilities pv.keys
Workling.return.set options[:uid], "Calculating results... 5/5"
BG_LOGGER.debug "#{Time.now.to_s}: #{options[:uid]}: Returning results for scraping #{scraping.id}"
def version_sites_once_idle!(options)
if Rails.cache.read 'version_sites_once_idle_lock'
BG_LOGGER.debug "#{Time.now.to_s}: #{options[:uid]}: version_sites_once_idle already in queue"
return
else
Rails.cache.write 'version_sites_once_idle_lock', true
end

Workling.return.set options[:uid], :sites => sites, :unfound_sites => unfound_sites, :probabilities => probabilities, :avg_up => avg_up
BG_LOGGER.debug "#{Time.now.to_s}: #{options[:uid]}: Processed results for scraping #{scraping.id}"
while Scraping.last.created_at > 2.minutes.ago
BG_LOGGER.debug "#{Time.now.to_s}: #{options[:uid]}: Not idle..."
sleep 10
end

Site.version!
BG_LOGGER.debug "#{Time.now.to_s}: #{options[:uid]}: Versioned!"
300.times{|i| Site.get 500 * i }
BG_LOGGER.debug "#{Time.now.to_s}: #{options[:uid]}: Warmed up!"

Rails.cache.delete 'version_sites_once_idle_lock'
rescue => e
BG_LOGGER.debug "#{Time.now.to_s}: #{options[:uid]}: ERROR #{e}"
end
Expand Down
15 changes: 8 additions & 7 deletions app/workers/visitation_worker.rb
Expand Up @@ -11,26 +11,27 @@ def process_results(options)
scraping_id, results = options[:scraping_id], options[:results]
results = JSON.parse(results)
sites = Site.find(:all, :conditions => ['url IN (?)', results.keys.map{|x| URI.decode x}], :select => 'id, url').map{|s| [s.id, s.url]}
site_results = results.inject({}){|m,v| m[sites.rassoc(URI.decode v[0])[0]] = v[1]; m}
found_site_ids = site_results.reject{|k,v| !v}.keys
Visitation.import [:scraping_id, :site_id, :visited], results.map{|key,value| [scraping_id, sites.rassoc(URI.decode key)[0], value]}, :validate => false # save a bit of RAM

# because we're using mass import, this isn't getting updated automagically
found_count = results.map{|k,v| v}.count(true)
found_count = found_site_ids.count
Scraping.update_counters scraping_id, :visitations_count => results.size, :found_visitations_count => found_count
scraping = Scraping.find(scraping_id) # AFTER the update

# (almost) all done. Sometimes batches seem to get lost over the wire.
# FIXME: why are they getting lost? Why are some threads not finishing?
if scraping.finished_threads <= THREADS - 1 and scraping.served_urls <= scraping.visitations_count + scraping.batch_size * THREADS
if scraping.created_at < 60.seconds.ago and scraping.served_urls <= scraping.visitations_count + scraping.batch_size * THREADS # and finished_threads <= THREADS - 1
ScrapingWorker.asynch_version_sites_once_idle!
Workling.return.set options[:uid], "done"
scraping.update_attribute :job_id, options[:uid] # ScrapingWorker.asynch_process_results(:scraping_id => scraping_id)
end

ProbabilityVector.report scraping.user_id, site_results
# Site.update_user_counts found_site_ids if !found_site_ids.empty?

# BG_LOGGER.debug "#{Time.now.to_s}: #{options[:uid]}: Updating scrapings count..."
# there should be a faster way of doing this
# sites.map{|s| s.update_attribute :users_count, x.found_scrapings.count('DISTINCT user_id')}

BG_LOGGER.debug "#{Time.now.to_s}: #{options[:uid]}: Processed scraping #{scraping_id} offset #{sites.first[0]}; found #{found_count} / #{results.size}"
BG_LOGGER.debug "#{Time.now.to_s}: #{options[:uid]}: Processed scraping #{scraping_id} offset #{sites.first[0]}; found #{found_count} / #{results.size}: #{found_site_ids.join(', ')}"
rescue => e
BG_LOGGER.debug "#{Time.now.to_s}: #{options[:uid]}: ERROR #{e}"
end
Expand Down
23 changes: 23 additions & 0 deletions db/migrate/20100303004045_add_probability_vectors.rb
@@ -0,0 +1,23 @@
class AddProbabilityVectors < ActiveRecord::Migration
def self.up
add_column :sites, :avg_visited, :float, :default => 0
add_column :sites, :visited_users_count, :integer, :default => 0
create_table :probability_vectors do |t|
t.references :user, :site, :default => nil, :null => false
t.integer :hits, :tests, :default => 0
t.float :avg, :default => 0

t.timestamps
end

add_index :probability_vectors, [:user_id, :site_id], :unique => true
remove_index :sites, [:alexa_rank, :id, :url]
add_index :sites, [:avg_visited, :alexa_rank, :id, :url], :name => :by_popularity
end

def self.down
remove_column :sites, :avg_visited
remove_column :sites, :visited_users_count
drop_table :probability_vectors
end
end
8 changes: 6 additions & 2 deletions lib/tasks/alexa.rake
Expand Up @@ -7,19 +7,23 @@ namespace :alexa do
# If RAM needs to be conserved, use FasterCSV.foreach and Site.create instead
# In testing, single-item import was about ~600 items per second; batch import was ~1300

raise "Lockfile found" if File.exist?(File.join(RAILS_ROOT, 'update.lock'))
f = File.new(File.join(RAILS_ROOT, 'update.lock'), 'w')
f.close
FileUtils.rm(File.join(RAILS_ROOT, 'db', 'top-1m.csv.zip')) rescue true
`cd #{File.join(RAILS_ROOT, 'db')} && wget http://s3.amazonaws.com/alexa-static/top-1m.csv.zip`
`cd #{File.join(RAILS_ROOT, 'db')} && unzip -o top-1m.csv.zip`
puts "Parsing..."
alexa = FasterCSV.read(File.join(RAILS_ROOT, 'db', 'top-1m.csv'))
puts "Importing..."
Site.import [:alexa_rank, :url], alexa, :validate => false, :on_duplicate_key_update => [:alexa_rank]
Site.version!
File.delete(File.join(RAILS_ROOT, 'update.lock'))
puts "Done!"
end

desc "Warm up the database"
task :warm_db => :environment do
300.times{|i| Site.find(:all, :order => 'alexa_rank', :limit => 500, :offset => 500 * i, :select => 'alexa_rank, id, url') } # This needs to be kept in sync with VisitationsController#create
true
300.times{|i| Site.get 500 * i }
end
end
7 changes: 7 additions & 0 deletions lib/tasks/technorati.rake
Expand Up @@ -4,6 +4,10 @@ require 'scrubyt'
namespace :technorati do
desc "Import top 100 Technorati blogs"
task :update => :environment do
raise "Lockfile found" if File.exist?(File.join(RAILS_ROOT, 'update.lock'))
f = File.new(File.join(RAILS_ROOT, 'update.lock'), 'w')
f.close

technorati = Scrubyt::Extractor.define do
fetch 'http://technorati.com/blogs/top100/'

Expand All @@ -16,5 +20,8 @@ namespace :technorati do

Site.import [:alexa_rank, :url], technorati.to_hash.map{|x| [0, x[:link_url].sub('http://www.', '').sub('http://','').sub(/\/$/, '')]},
:validate => false, :on_duplicate_key_update => [:alexa_rank]

Site.version!
File.delete(File.join(RAILS_ROOT, 'update.lock'))
end
end
1 change: 1 addition & 0 deletions public/index_offline.html
@@ -0,0 +1 @@
<html><body>Scraping temporarily disabled pending a database renovation. See <a href=/about>about page</a> or <a href=/results>results page</a> for now. Back up within a few hours</body></html>

0 comments on commit 5173ed0

Please sign in to comment.