Skip to content

Commit

Permalink
Module import now handles scrolling directly.
Browse files Browse the repository at this point in the history
  • Loading branch information
oalders committed Jun 3, 2011
1 parent f087fad commit af12f9b
Showing 1 changed file with 74 additions and 41 deletions.
115 changes: 74 additions & 41 deletions perl/lib/iCPAN.pm
Expand Up @@ -18,13 +18,16 @@ has 'es' => ( is => 'rw', isa => 'ElasticSearch', lazy_build => 1 );
has 'index' => ( is => 'rw', default => 'v0' );
has 'mech' =>
( is => 'rw', isa => 'WWW::Mechanize::Cached', lazy_build => 1 );
has 'server' => ( is => 'rw', default => 'api.beta.metacpan.org:80' );

sub _build_es {

my $self = shift;

return ElasticSearch->new(
servers => 'api.beta.metacpan.org:80',
servers => $self->server,
transport => 'httplite',
max_requests => 0, # default 10_000
max_requests => 0, # default 10_000
trace_calls => 'log_file',
no_refresh => 1,
);
Expand All @@ -46,16 +49,16 @@ sub _build_mech {

sub scroll {

my $self = shift;
my $self = shift;
my $scroller = shift;
my $limit = shift;
my @hits = ();
my $limit = shift || 100;
my @hits = ();

while ( my $result = $scroller->next ) {

#say dump $result;

push @hits, exists $result->{'_source'} ? $result->{'_source'} : $result->{fields};
push @hits, exists $result->{'_source'}
? $result->{'_source'}
: $result->{fields};
say dump $hits[-1];

last if scalar @hits > $limit;
Expand All @@ -71,15 +74,15 @@ sub insert_authors {
my $author_rs = $self->schema->resultset( 'Zauthor' );
$author_rs->delete;

my $result = $self->es->search(
my $scroller = $self->es->scrolled_search(
index => $self->index,
type => 'author',
query => { match_all => {}, },
scroll => '5m',
size => 100,
);

my $hits = $self->scroll( $result );
my $hits = $self->scroll( $scroller, 10000 );
my @authors = ();

say "found " . scalar @{$hits} . " hits";
Expand All @@ -104,33 +107,46 @@ sub insert_distributions {
my $rs = $self->schema->resultset( 'Zdistribution' );
$rs->delete;

my $result = $self->es->search(
my $scroller = $self->es->scrolled_search(
index => $self->index,
type => ['release'],
query => {
term => { status => 'latest' },

#match_all => {},
},
filter => { prefix => { distribution => "DBIx" } },
fields => [
'author', 'distribution', 'abstract', 'version_numified',
'name', 'date'
],
scroll => '5m',
size => 100,
explain => 0,
);

my $hits = $self->scroll( $result );
my $hits = $self->scroll( $scroller, 1000 );
my @rows = ();

say "found " . scalar @{$hits} . " hits";

foreach my $src ( @{$hits} ) {
say dump $src;

#return;
my $author = $self->schema->resultset( 'Zauthor' )
->find( { zpauseid => $src->{author} } );
if ( !$author ) {
say "cannot find $src->{author}. skipping!!!";
next;
}

push @rows,
{
zabstract => $src->{abstract},
zversion => $src->{version_numified},
zname => $src->{name},
zabstract => $src->{abstract},
zauthor => $author->z_pk,
zrelease_date => $src->{date},
zversion => $src->{version_numified},
zname => $src->{distribution},
};
}

Expand All @@ -143,6 +159,8 @@ sub insert_modules {
my $self = shift;
my $rs = $self->schema->resultset( 'Zmodule' );
$rs->delete;

my $size = 250;

my $scroller = $self->es->scrolled_search(
index => $self->index,
Expand All @@ -159,22 +177,18 @@ sub insert_modules {
},

"fields" => [ "abstract.analyzed", "documentation", "distribution" ],
scroll => '5m',
size => 100,
explain => 0,
scroll => '5m',
size => $size,
explain => 0,
);

my $hits = $self->scroll( $scroller, 500 );
my @rows = ();

say "found " . scalar @{$hits} . " hits";

foreach my $src ( @{$hits} ) {
my @rows = ( );
while ( my $result = $scroller->next ) {

my $src = $self->extract_hit( $result );
next if !$src;
say dump $src;

#return;
#exit;
my $pod_url = "http://metacpan.org:5001/pod/" . $src->{documentation};
say "GETting: $pod_url";

Expand All @@ -188,26 +202,45 @@ sub insert_modules {
next;
}

my %row = (
zabstract => $src->{'abstract.analyzed'},
zname => $src->{documentation},
zpod => $pod,
);

my $dist = $self->schema->resultset( 'Zdistribution' )
->find_or_create( { zname => $src->{distribution} } );

if ( $dist ) {
my $module = $dist->create_related( 'Modules',
{ zname => $src->{documentation}, } );
$module->update( \%row );
->find( { zname => $src->{distribution} } );

if ( !$dist ) {
say "cannot find $src->{distribution}. skipping!!!";
next;
}

say "dumping last row: " . dump \%row;
push @rows,
{
zabstract => $src->{'abstract.analyzed'},
zdistribution => $dist->z_pk,
zname => $src->{documentation},
zpod => $pod,
};

if ( scalar @rows >= $size ) {
say "inserting " . scalar @rows . " rows";
$rs->populate( \@rows );
@rows = ( );
}
}

#return $rs->populate( \@rows );
say dump \@rows;

return $rs->populate( \@rows ) if scalar @rows;
return;

}

sub extract_hit {

my $self = shift;
my $result = shift;

return exists $result->{'_source'}
? $result->{'_source'}
: $result->{fields};

}

1;

0 comments on commit af12f9b

Please sign in to comment.