Skip to content
Permalink
Browse files

Rework PA again to not try and sort the passiveLog. Instead, we itera…

…te and store "last" data in a new database table.
  • Loading branch information...
perlDreamer committed Oct 6, 2011
1 parent 5314530 commit 6ed275b13334092d227a11d27a75e21b222d2a58
@@ -31,10 +31,30 @@ BEGIN
my $session = start(); # this line required

# upgrade functions go here
addPALastLogTable($session);

finish($session); # this line required


#----------------------------------------------------------------------------
# Describe what our function does
sub addPALastLogTable {
my $session = shift;
print "\tAdd a table to keep track of additional Passive Analytics data... " unless $quiet;
# and here's our code
$session->db->write(<<EOSQL);
CREATE TABLE `PA_lastLog` (
`userId` char(22) CHARACTER SET utf8 COLLATE utf8_bin NOT NULL,
`assetId` char(22) CHARACTER SET utf8 COLLATE utf8_bin NOT NULL,
`sessionId` char(22) CHARACTER SET utf8 COLLATE utf8_bin NOT NULL,
`timeStamp` bigint(20) DEFAULT NULL,
`url` char(255) NOT NULL,
PRIMARY KEY (userId, sessionId)
) ENGINE=MyISAM DEFAULT CHARSET=utf8
EOSQL
print "DONE!\n" unless $quiet;
}

#----------------------------------------------------------------------------
# Describe what our function does
#sub exampleFunction {
@@ -61,7 +61,7 @@ Return a statement handle at the desired offset.

sub get_statement {
my ($session, $logIndex) = @_;
my $deltaSql = q{select SQL_CALC_FOUND_ROWS userId, assetId, url, delta, from_unixtime(timeStamp) as stamp from deltaLog order by timestamp limit ?, 500000};
my $deltaSql = q{select SQL_CALC_FOUND_ROWS userId, assetId, url, delta, from_unixtime(timeStamp) as stamp from deltaLog limit ?, 500000};
my $sth = $session->db->read($deltaSql, [$logIndex+0]);
return $sth;
}
@@ -59,7 +59,7 @@ Return a statement handle at the desired offset.

sub get_statement {
my ($session, $counter) = @_;
my $passive = q{select SQL_CALC_FOUND_ROWS * from passiveLog where userId <> '1' order by userId, sessionId, timeStamp limit ?, 500000};
my $passive = q{select SQL_CALC_FOUND_ROWS * from passiveLog where userId <> '1' limit ?, 500000};
my $sth = $session->db->read($passive, [$counter+0]);
return $sth;
}
@@ -93,68 +93,51 @@ sub execute {
my $lastUrl;
my $counter = $instance->getScratch('counter');
my $sth = get_statement($session, $counter);
if ($counter) {
$lastUserId = $instance->getScratch('lastUserId');
$lastSessionId = $instance->getScratch('lastSessionId');
$lastTimeStamp = $instance->getScratch('lastTimeStamp');
$lastAssetId = $instance->getScratch('lastAssetId');
$lastUrl = $instance->getScratch('lastUrl');
}
else {
my $logLine = $sth->hashRef();
$lastUserId = $logLine->{userId};
$lastSessionId = $logLine->{sessionId};
$lastTimeStamp = $logLine->{timeStamp};
$lastAssetId = $logLine->{assetId};
$lastUrl = $logLine->{url};
$session->db->write('delete from deltaLog'); ##Only if we're starting out
if (! $counter) { #Clean up from last time, just in case
$session->db->write('delete from deltaLog');
$session->db->write('delete from PA_lastLog');
}

my $total_rows = $session->db->quickScalar('select found_rows()');

my $deltaLog = $session->db->prepare('insert into deltaLog (userId, assetId, delta, timeStamp, url) VALUES (?,?,?,?,?)');
my $deltaLog = $session->db->prepare('insert into deltaLog (userId, assetId, timeStamp, url, delta) VALUES (?,?,?,?,?)');
my $recordLast = $session->db->prepare('REPLACE INTO PA_lastLog (userId, sessionId, timeStamp, url) VALUES (?,?,?,?)');
my $fetchLast = $session->db->prepare('select * from PA_lastLog where sessionId=? and userId=?');

my $expired = 0;
LOG_CHUNK: while (1) {
LOG_ENTRY: while (my $logLine = $sth->hashRef()) {
$counter++;
my $delta = $logLine->{timeStamp} - $lastTimeStamp;
if ( $logLine->{userId} eq $lastUserId
&& $logLine->{sessionId} eq $lastSessionId
&& $delta < $deltaInterval ) {
$deltaLog->execute([$lastUserId, $lastAssetId, $delta, $lastTimeStamp, $lastUrl]);
$fetchLast->execute([@{$logLine}{qw/sessionId userId/}]);
my $lastLine = $fetchLast->hashRef();
$recordLast->execute([ (@{ $logLine }{qw/userId sessionId timeStamp url/}) ]);
if ($lastLine->{timeStamp}) {
my $delta = $logLine->{timeStamp} - $lastLine->{timeStamp};
$deltaLog->execute([ (@{ $lastLine }{qw/userId assetId timeStamp url/}), $delta]);
}
$lastUserId = $logLine->{userId};
$lastSessionId = $logLine->{sessionId};
$lastTimeStamp = $logLine->{timeStamp};
$lastAssetId = $logLine->{assetId};
$lastUrl = $logLine->{url};
if (time() > $endTime) {
$instance->setScratch('lastUserId', $lastUserId);
$instance->setScratch('lastSessionId', $lastSessionId);
$instance->setScratch('lastTimeStamp', $lastTimeStamp);
$instance->setScratch('lastAssetId', $lastAssetId);
$instance->setScratch('lastUrl', $lastUrl);
$instance->setScratch('counter', $counter);
$expired = 1;
last LOG_ENTRY;
}
}

$sth->finish;
if ($expired) {
$deltaLog->finish;
$recordLast->finish;
$fetchLast->finish;
$sth->finish;
return $self->WAITING(1);
}
last LOG_CHUNK if $counter >= $total_rows;
$sth = get_statement($session, $counter);
}

$instance->deleteScratch('lastUserId');
$instance->deleteScratch('lastSessionId');
$instance->deleteScratch('lastTimeStamp');
$instance->deleteScratch('lastAssetId');
$instance->deleteScratch('lastUrl');
$instance->deleteScratch('counter');
$deltaLog->finish;
$recordLast->finish;
$fetchLast->finish;
$sth->finish;
$session->db->write('delete from PA_lastLog');
return $self->COMPLETE;
}

@@ -5,14 +5,13 @@ use lib "$FindBin::Bin/../../lib";
#use DB;

use WebGUI::Test;
use WebGUI::Asset;
use WebGUI::PassiveAnalytics::Rule;
use WebGUI::Workflow::Activity::BucketPassiveAnalytics;
use WebGUI::Text;

use Test::More;
use Test::Deep;
use Data::Dumper;

plan tests => 1; # increment this value for each test you create
plan tests => 2; # increment this value for each test you create

my $session = WebGUI::Test->session;
$session->user({userId => 3});
@@ -21,6 +20,7 @@ WebGUI::Test->addToCleanup(SQL => 'delete from passiveLog');
WebGUI::Test->addToCleanup(SQL => 'delete from deltaLog');
WebGUI::Test->addToCleanup(SQL => 'delete from bucketLog');
WebGUI::Test->addToCleanup(SQL => 'delete from analyticRule');
WebGUI::Test->addToCleanup(SQL => 'delete from PA_lastLog');

my $workflow = WebGUI::Workflow->new($session, 'PassiveAnalytics000001');
my $activities = $workflow->getActivities();
@@ -67,7 +67,8 @@ while (my $spec = shift @url2) {
}

my @urls = map {$_->[1]} @ruleSets;
loadLogData($session, @urls);
#loadLogData($session, @urls);
repeatableLogData($session, 'passiveAnalyticsLog');

##Build rulesets

@@ -80,7 +81,28 @@ PAUSE: while (my $retval = $instance->run()) {
}
#DB::disable_profile();

ok(1, 'One test');
cmp_ok $counter, '<', 16, 'Successful completion of PA';

my $get_line = $session->db->read('select userId, Bucket, duration from bucketLog');

my @database_dump = ();
ROW: while ( 1 ) {
my @datum = $get_line->array();
last ROW unless @datum;
push @database_dump, [ @datum ];
}

cmp_bag(
[ @database_dump ],
[
['user1', 'one', 10],
['user1', 'two', 15],
['user2', 'zero', 2],
['user2', 'uno', 3],
['user2', 'Other', 5],
],
'PA analysis completed, and calculated correctly'
) or diag Dumper(\@database_dump);

sub loadLogData {
my ($session, @urls) = @_;
@@ -100,4 +122,24 @@ sub loadLogData {
}
}

sub repeatableLogData {
my ($session, $dataLogName) = @_;
$session->db->write('delete from passiveLog');
my $insert = $session->db->prepare(
q!insert into passiveLog (userId, sessionId, timeStamp, url, assetId) VALUES (?,?,?,?,'assetId')!
);
my $data_name = WebGUI::Test::collateral('passiveAnalyticsLog');
open my $log_data, '<', $data_name or
die "Unable to open $data_name for reading: $!";
local $_;
while (<$log_data>) {
next if /^\s*#/;
s/#\.*$//;
chomp;
my @data = split;
$insert->execute([@data]);
}
$insert->finish;
}

#vim:ft=perl
@@ -0,0 +1,8 @@
#user session timestamp url
user1 session11 100 /one
user1 session11 110 /two
user1 session11 125 /three
user2 session21 200 /yelnats
user2 session21 202 /one/uno
user2 session21 205 /whatever
user2 session21 210 /something_else

0 comments on commit 6ed275b

Please sign in to comment.
You can’t perform that action at this time.