Skip to content
Permalink
Browse files

Allow Passive Analytics to scale to analyzing millions of log entries…

… without nuking MySQL in the process. More thorough test cleanup.
  • Loading branch information...
perlDreamer committed Sep 26, 2011
1 parent ec00e86 commit 45b5370469d87a29dd108d9aa4a719d652a44617
@@ -51,6 +51,20 @@ sub definition {
return $class->SUPER::definition($session,$definition);
}

#-------------------------------------------------------------------

=head2 get_statement( session, counter )
Return a statement handle at the desired offset.
=cut

sub get_statement {
my ($session, $logIndex) = @_;
my $deltaSql = q{select SQL_CALC_FOUND_ROWS userId, assetId, url, delta, from_unixtime(timeStamp) as stamp from deltaLog order by timestamp limit ?, 500000};
my $sth = $session->db->read($deltaSql, [$logIndex+0]);
return $sth;
}

#-------------------------------------------------------------------

@@ -85,47 +99,49 @@ sub execute {
my %bucketCache = ();

##Configure all the SQL
my $deltaSql = <<"EOSQL1";
select userId, assetId, url, delta, from_unixtime(timeStamp) as stamp
from deltaLog order by timestamp limit $logIndex, 1234567890
EOSQL1
my $deltaSth = $session->db->read($deltaSql);
my $bucketSth = $session->db->prepare('insert into bucketLog (userId, Bucket, duration, timeStamp) VALUES (?,?,?,?)');
my $deltaSth = get_statement($session, $logIndex);
my $total_rows = $session->db->quickScalar('select found_rows()');

my $bucketSth = $session->db->prepare('insert into bucketLog (userId, Bucket, duration, timeStamp) VALUES (?,?,?,?)');

##Walk through the log file entries, one by one. Run each entry against
##all the rules until 1 matches. If it doesn't match any rule, then bin it
##into the "Other" bucket.
DELTA_ENTRY: while (my $entry = $deltaSth->hashRef()) {
++$logIndex;
my $bucketFound = 0;
my $url = $entry->{url};
if (exists $bucketCache{$url}) {
$bucketSth->execute([$entry->{userId}, $bucketCache{$url}, $entry->{delta}, $entry->{stamp}]);
}
else {
RULE: foreach my $rule (@rules) {
next RULE unless $url =~ $rule->[1];

# Into the bucket she goes..
$bucketCache{$url} = $rule->[0];
$bucketSth->execute([$entry->{userId}, $rule->[0], $entry->{delta}, $entry->{stamp}]);
$bucketFound = 1;
last RULE;
DELTA_CHUNK: while (1) {
DELTA_ENTRY: while (my $entry = $deltaSth->hashRef()) {
++$logIndex;
my $bucketFound = 0;
my $url = $entry->{url};
if (exists $bucketCache{$url}) {
$bucketSth->execute([$entry->{userId}, $bucketCache{$url}, $entry->{delta}, $entry->{stamp}]);
}
if (!$bucketFound) {
$bucketCache{$url} = 'Other';
$bucketSth->execute([$entry->{userId}, 'Other', $entry->{delta}, $entry->{stamp}]);
else {
RULE: foreach my $rule (@rules) {
next RULE unless $url =~ $rule->[1];

# Into the bucket she goes..
$bucketCache{$url} = $rule->[0];
$bucketSth->execute([$entry->{userId}, $rule->[0], $entry->{delta}, $entry->{stamp}]);
$bucketFound = 1;
last RULE;
}
if (!$bucketFound) {
$bucketCache{$url} = 'Other';
$bucketSth->execute([$entry->{userId}, 'Other', $entry->{delta}, $entry->{stamp}]);
}
}
if (time() > $endTime) {
$expired = 1;
last DELTA_ENTRY;
}
}
if (time() > $endTime) {
$expired = 1;
last DELTA_ENTRY;
}
}

if ($expired) {
$instance->setScratch('logIndex', $logIndex);
return $self->WAITING(1);
if ($expired) {
$instance->setScratch('logIndex', $logIndex);
return $self->WAITING(1);
}
last DELTA_CHUNK if $logIndex >= $total_rows;
$deltaSth = get_statement($session, $logIndex);
}
my $message = 'Passive analytics is done.';
if ($session->setting->get('passiveAnalyticsDeleteDelta')) {
@@ -49,6 +49,20 @@ sub definition {
return $class->SUPER::definition($session,$definition);
}

#-------------------------------------------------------------------

=head2 get_statement( session, counter )
Return a statement handle at the desired offset.
=cut

sub get_statement {
my ($session, $counter) = @_;
my $passive = q{select SQL_CALC_FOUND_ROWS * from passiveLog where userId <> '1' order by userId, sessionId, timeStamp limit ?, 500000};
my $sth = $session->db->read($passive, [$counter+0]);
return $sth;
}

#-------------------------------------------------------------------

@@ -72,64 +86,67 @@ sub execute {
my $endTime = time() + $self->getTTL;
my $deltaInterval = $self->get('deltaInterval');

my $passive = q{select * from passiveLog where userId <> '1' order by userId, sessionId, timeStamp};
my $sth;
my $lastUserId;
my $lastSessionId;
my $lastTimeStamp;
my $lastAssetId;
my $lastUrl;
my $counter = $instance->getScratch('counter');
my $sth = get_statement($session, $counter);
if ($counter) {
$passive .= ' limit '. $counter .', 1234567890';
$sth = $session->db->read($passive);
$lastUserId = $instance->getScratch('lastUserId');
$lastSessionId = $instance->getScratch('lastSessionId');
$lastTimeStamp = $instance->getScratch('lastTimeStamp');
$lastAssetId = $instance->getScratch('lastAssetId');
$lastUrl = $instance->getScratch('lastUrl');
}
else {
$sth = $session->db->read($passive);
my $logLine = $sth->hashRef();
$lastUserId = $logLine->{userId};
$lastSessionId = $logLine->{sessionId};
$lastTimeStamp = $logLine->{timeStamp};
$lastAssetId = $logLine->{assetId};
$lastUrl = $logLine->{url};
$session->db->write('delete from deltaLog'); ##Only if we're starting out
}

$session->db->write('delete from deltaLog'); ##Only if we're starting out
my $total_rows = $session->db->quickScalar('select found_rows()');

my $deltaLog = $session->db->prepare('insert into deltaLog (userId, assetId, delta, timeStamp, url) VALUES (?,?,?,?,?)');

my $expired = 0;
LOG_ENTRY: while (my $logLine = $sth->hashRef()) {
$counter++;
my $delta = $logLine->{timeStamp} - $lastTimeStamp;
if ( $logLine->{userId} eq $lastUserId
&& $logLine->{sessionId} eq $lastSessionId
&& $delta < $deltaInterval ) {
$deltaLog->execute([$lastUserId, $lastAssetId, $delta, $lastTimeStamp, $lastUrl]);
LOG_CHUNK: while (1) {
LOG_ENTRY: while (my $logLine = $sth->hashRef()) {
$counter++;
my $delta = $logLine->{timeStamp} - $lastTimeStamp;
if ( $logLine->{userId} eq $lastUserId
&& $logLine->{sessionId} eq $lastSessionId
&& $delta < $deltaInterval ) {
$deltaLog->execute([$lastUserId, $lastAssetId, $delta, $lastTimeStamp, $lastUrl]);
}
$lastUserId = $logLine->{userId};
$lastSessionId = $logLine->{sessionId};
$lastTimeStamp = $logLine->{timeStamp};
$lastAssetId = $logLine->{assetId};
$lastUrl = $logLine->{url};
if (time() > $endTime) {
$instance->setScratch('lastUserId', $lastUserId);
$instance->setScratch('lastSessionId', $lastSessionId);
$instance->setScratch('lastTimeStamp', $lastTimeStamp);
$instance->setScratch('lastAssetId', $lastAssetId);
$instance->setScratch('lastUrl', $lastUrl);
$instance->setScratch('counter', $counter);
$expired = 1;
last LOG_ENTRY;
}
}
$lastUserId = $logLine->{userId};
$lastSessionId = $logLine->{sessionId};
$lastTimeStamp = $logLine->{timeStamp};
$lastAssetId = $logLine->{assetId};
$lastUrl = $logLine->{url};
if (time() > $endTime) {
$instance->setScratch('lastUserId', $lastUserId);
$instance->setScratch('lastSessionId', $lastSessionId);
$instance->setScratch('lastTimeStamp', $lastTimeStamp);
$instance->setScratch('lastAssetId', $lastAssetId);
$instance->setScratch('lastUrl', $lastUrl);
$instance->setScratch('counter', $counter);
$expired = 1;
last LOG_ENTRY;

$sth->finish;
if ($expired) {
return $self->WAITING(1);
}
}

if ($expired) {
return $self->WAITING(1);
last LOG_CHUNK if $counter >= $total_rows;
$sth = get_statement($session, $counter);
}

$instance->deleteScratch('lastUserId');
@@ -18,6 +18,8 @@ my $session = WebGUI::Test->session;
$session->user({userId => 3});

WebGUI::Test->addToCleanup(SQL => 'delete from passiveLog');
WebGUI::Test->addToCleanup(SQL => 'delete from deltaLog');
WebGUI::Test->addToCleanup(SQL => 'delete from bucketLog');
WebGUI::Test->addToCleanup(SQL => 'delete from analyticRule');

my $workflow = WebGUI::Workflow->new($session, 'PassiveAnalytics000001');

0 comments on commit 45b5370

Please sign in to comment.
You can’t perform that action at this time.