Skip to content

Commit

Permalink
Stop parsing after a too many report are already in the DB
Browse files Browse the repository at this point in the history
  • Loading branch information
kremio committed Nov 26, 2018
1 parent 28d90d3 commit 2c74f81
Show file tree
Hide file tree
Showing 4 changed files with 132 additions and 26 deletions.
11 changes: 6 additions & 5 deletions pipeline.js
Expand Up @@ -17,16 +17,17 @@ const pipeline = async () => getDB( path.resolve('./config/database.json'), fals
.then( ({DB, migrations}) => new Promise( (s,f) => {
dbHandle = DB
//Get the newest inserted report, if any
DB.db.get('SELECT uri FROM data ORDER BY createdDate ASC LIMIT 1', (err, row) => {
/*DB.db.get('SELECT uri FROM data ORDER BY createdDate ASC LIMIT 1', (err, row) => {
if(err){
f(err)
return
}
insert = new InsertStream({}, DB)
s( {stopAtReportURI: row ? row.uri : false} )
})
})
)
})*/
insert = new InsertStream({}, DB)
s( {} )
}) )
.then( async (scraperOptions) => {

//Parameters passed by env variables have priority
Expand All @@ -52,7 +53,7 @@ const pipeline = async () => getDB( path.resolve('./config/database.json'), fals
startAtPageURL: error.pageURL
})
})
.then( (scraperOptions) => scrape(scraperOptions, true) )
.then( (scraperOptions) => scrape(dbHandle.db, scraperOptions, true) )
.then( (scraperStream) => {
source = scraperStream
})
Expand Down
41 changes: 36 additions & 5 deletions scrape.js
@@ -1,3 +1,4 @@
const { promisify } = require('util')
const { Readable } = require('stream')

const scrapeIndex = require('./lib/index')
Expand Down Expand Up @@ -62,7 +63,8 @@ class ReportStream extends Readable{


class RequestsQueue {
constructor( pageCount, reportsURLs = [], groupSize, groupInterval, startPageURL, startAtReportURI = false, stopAtReportURI ){
constructor( db, pageCount, reportsURLs = [], groupSize, groupInterval, startPageURL, startAtReportURI = false, stopAtReportURI, alreadyScrapedLimit = 10 ){
this.db = db
this.pageCount = pageCount
this.currentPage = pageNumberOfURL(startPageURL)
this.initialReportsURLs = reportsURLs
Expand All @@ -75,13 +77,17 @@ class RequestsQueue {

this.startAtReportURI = startAtReportURI
this.stopAtReportURI = stopAtReportURI
this.alreadyScrapedLimit = alreadyScrapedLimit
this.howManyAlreadyScraped = 0
this.timeout
this.started = false
this.done = false

this.onData = () => true
this.onEnd = () => {}
this.onError = (error) => { console.error(error) }


}

get reportsURLs(){
Expand Down Expand Up @@ -113,10 +119,28 @@ class RequestsQueue {
this.started = true
//Sets the initial reportURLs
this.reportsURLs = this.initialReportsURLs
this.next()
this.filterAlreadyScraped().then( () => this.next() )
}
}

/*
* Check if the database already contains some of the reports in reportsURLs,
* if any found, remove them from reportsURLs and add there count to
* howManyAlreadyScraped
*/
async filterAlreadyScraped(){
//Get the existing reports' URIs
const countStatement = this.db.prepare( `SELECT uri FROM data WHERE uri IN (${this.reportsURLs.map(() => '?').join(',')})`, this.reportsURLs )
const asyncAll = promisify(countStatement.all).bind(countStatement)
const alreadyInDb = (await asyncAll()).map((row) => row.uri)
countStatement.finalize()

//Filter out the existing reports
this.reportsURLs = this.reportsURLs.filter((uri) => !alreadyInDb.includes( uri ))
//Update the count
this.howManyAlreadyScraped += alreadyInDb.length
}

async next(){
try{
if( this.timeout ){
Expand All @@ -130,10 +154,16 @@ class RequestsQueue {
return
}
if( this.reportsURLs.length == 0 ){ //all the reports of the page have been processed
if(this.howManyAlreadyScraped >= this.alreadyScrapedLimit){
//looks like there is nothing new, let's call it a day
this.onEnd() //done
return
}
//Load next page
this.currentPage++
const {reportsURLs} = await scrapeIndex( urlOfPage( this.currentPage ) )
this.reportsURLs = reportsURLs
await this.filterAlreadyScraped()
return this.next()
}

Expand Down Expand Up @@ -181,14 +211,15 @@ class RequestsQueue {
}
}

const scrape = async (options, verbose = false) => {
const scrape = async (db, options, verbose = false) => {
//Override defaults with given options
const opts = Object.assign({
groupSize: 5,
groupInterval: 30000, //in ms
stopAtReportURI: false,
startAtReportURI: false,
startAtPageURL: DEFAULT_INDEX_URL
startAtPageURL: DEFAULT_INDEX_URL,
alreadyScrapedLimit: 10
}, options)

if(verbose){
Expand All @@ -199,7 +230,7 @@ const scrape = async (options, verbose = false) => {

const {reportsURLs, pageCount} = await scrapeIndex( opts.startAtPageURL )

const queue = new RequestsQueue( pageCount, reportsURLs, opts.groupSize, opts.groupInterval, opts.startAtPageURL, opts.startAtReportURI, opts.stopAtReportURI )
const queue = new RequestsQueue( db, pageCount, reportsURLs, opts.groupSize, opts.groupInterval, opts.startAtPageURL, opts.startAtReportURI, opts.stopAtReportURI, opts.alreadyScrapedLimit )
return new ReportStream(queue)
}

Expand Down
10 changes: 6 additions & 4 deletions tests/pipeline.test.js
Expand Up @@ -138,7 +138,7 @@ describe( 'Scraper pipeline', () => {
await pipeline()
}catch(e){}

expect( scrape ).toBeCalledWith( expect.objectContaining({
expect( scrape ).toBeCalledWith( db.db, expect.objectContaining({
startAtPageURL: errorPageURL,
startAtReportURI: errorReportURI
}), expect.anything() )
Expand All @@ -162,7 +162,7 @@ describe( 'Scraper pipeline', () => {
await pipeline()
}catch(e){}

expect( scrape ).toBeCalledWith( expect.objectContaining({
expect( scrape ).toBeCalledWith( db.db, expect.objectContaining({
startAtPageURL: errorPageURL,
startAtReportURI: false
}), expect.anything() )
Expand All @@ -179,12 +179,14 @@ describe( 'Scraper pipeline', () => {

const _pipeline = require('../pipeline')
const _scrape = require('../scrape')
const {DB} = await require('rwv-sqlite/lib/db')()
db = DB

try{
await _pipeline()
}catch(e){}

expect( _scrape ).toBeCalledWith( expect.objectContaining({
expect( _scrape ).toBeCalledWith( db.db, expect.objectContaining({
startAtPageURL: process.env.MORPH_START_PAGE,
startAtReportURI: process.env.MORPH_START_REPORT
}), expect.anything() )
Expand All @@ -211,7 +213,7 @@ describe( 'Scraper pipeline', () => {
await _pipeline()
}catch(e){}

expect( _scrape ).toBeCalledWith( expect.objectContaining({
expect( _scrape ).toBeCalledWith( db.db, expect.objectContaining({
startAtPageURL: process.env.MORPH_START_PAGE,
startAtReportURI: process.env.MORPH_START_REPORT
}), expect.anything() )
Expand Down
96 changes: 84 additions & 12 deletions tests/scrape.test.js
@@ -1,8 +1,11 @@
//const fs = require('fs')
const scrapeIndex = require('../lib/index')
const scrapeReport = require('../lib/report')
const getDB = require('rwv-sqlite/lib/db')
const reportTable = require('rwv-sqlite/lib/report')
const scrape = require('../scrape')


const { DEFAULT_INDEX_URL, FIRST_PAGE, urlOfPage } = require('../lib/constants')
//const reportJson = require('../samples/report.json')
//Mock HTTP requests
Expand All @@ -14,20 +17,36 @@ jest.useFakeTimers()

describe('Scraper stream', () => {

beforeEach(() => {
let db

beforeEach( async (done) => {
setTimeout.mockClear()
scrapeIndex.mockReset()
scrapeReport.mockReset()

getDB().then( ({DB, migrations}) =>{
db = DB
//Start with a clean database
migrations.reset( () => migrations.up( done ) )
})
})

afterEach( () => {
if(db){
db.close()
db = undefined
}
})


test( 'No URLs, one page', async (done) => {

scrapeIndex.mockImplementationOnce(() => ({
reportURLs: [],
pageCount: 1
}))

const reportStream = await scrape()
const reportStream = await scrape( db.db )
expect( scrapeIndex ).toHaveBeenCalledTimes(1)

reportStream.resume()//pipe( process.stdout )
Expand All @@ -47,7 +66,7 @@ describe('Scraper stream', () => {

let c = 0
const longWait = 999999
const reportStream = await scrape( { groupSize: 2, groupInterval: longWait } )
const reportStream = await scrape( db.db, { groupSize: 2, groupInterval: longWait } )

reportStream.on('data', async(chunk) => {
if( c % 2 == 0 ){
Expand Down Expand Up @@ -84,7 +103,7 @@ describe('Scraper stream', () => {
reportsURLs: [4,5],
}))

const reportStream = await scrape( { groupSize: 2, groupInterval: 1000 } )
const reportStream = await scrape( db.db , { groupSize: 2, groupInterval: 1000 } )

let c = 0
reportStream.on('data', async(chunk) => {
Expand Down Expand Up @@ -122,7 +141,7 @@ describe('Scraper stream', () => {
reportsURLs: [4,5],
}))

const reportStream = await scrape( { groupSize: 2, groupInterval: 1 } )
const reportStream = await scrape( db.db, { groupSize: 2, groupInterval: 1 } )

let c = 0
reportStream.on('data', async(chunk) => {
Expand Down Expand Up @@ -158,7 +177,7 @@ describe('Scraper stream', () => {
reportsURLs: [4],
}))

const reportStream = await scrape( { startAtPageURL, startAtReportURI, groupSize: 2, groupInterval: 1, stopAtReportURI: 1 } )
const reportStream = await scrape( db.db, { startAtPageURL, startAtReportURI, groupSize: 2, groupInterval: 1, stopAtReportURI: 1 } )

let c = 0
reportStream.on('data', async(chunk) => {
Expand Down Expand Up @@ -192,7 +211,7 @@ describe('Scraper stream', () => {
reportsURLs: [4],
}))

const reportStream = await scrape( { startAtPageURL, startAtReportURI, groupSize: 2, groupInterval: 1, stopAtReportURI: 1 } )
const reportStream = await scrape( db.db, { startAtPageURL, startAtReportURI, groupSize: 2, groupInterval: 1, stopAtReportURI: 1 } )

reportStream.on('data', async(chunk) => {
throw new Error("Should not have scraped this report.")
Expand All @@ -216,7 +235,7 @@ describe('Scraper stream', () => {
reportsURLs: [4,5],
}))

const reportStream = await scrape( { groupSize: 2, groupInterval: 1, stopAtReportURI: 3 } )
const reportStream = await scrape( db.db, { groupSize: 2, groupInterval: 1, stopAtReportURI: 3 } )


let c = 0
Expand Down Expand Up @@ -255,7 +274,7 @@ describe('Scraper stream', () => {
reportsURLs: [5,6],
}))

const reportStream = await scrape( { groupSize: 2, groupInterval: 1, startAtReportURI: 3, stopAtReportURI: 6 } )
const reportStream = await scrape( db.db, { groupSize: 2, groupInterval: 1, startAtReportURI: 3, stopAtReportURI: 6 } )


let c = 0
Expand Down Expand Up @@ -290,7 +309,7 @@ describe('Scraper stream', () => {
reportsURLs: [3],
}))

const reportStream = await scrape( { groupSize: 2, groupInterval: 1 } )
const reportStream = await scrape( db.db, { groupSize: 2, groupInterval: 1 } )

let c = 1
scrapeReport.mockImplementation((d) => d)
Expand Down Expand Up @@ -320,7 +339,7 @@ describe('Scraper stream', () => {
pageCount: 3
}))

const reportStream = await scrape( { groupSize: 2, groupInterval: 1, stopAtReportURI: 1 } )
const reportStream = await scrape( db.db, { groupSize: 2, groupInterval: 1, stopAtReportURI: 1 } )


reportStream.on('data', (chunk) => {
Expand Down Expand Up @@ -353,7 +372,7 @@ describe('Scraper stream', () => {
}
})

const reportStream = await scrape( { groupSize: 2, groupInterval: 1 } )
const reportStream = await scrape( db.db, { groupSize: 2, groupInterval: 1 } )


reportStream.on('error', (err) => {
Expand All @@ -374,4 +393,57 @@ describe('Scraper stream', () => {

})

test( 'Stop parsing after already scraped limit reached', async (done) => {
scrapeIndex.mockImplementationOnce(() => ({ //page 1
reportsURLs: ['http://1','http://2'],
pageCount: 3
})).mockImplementationOnce(() => ({ //page 2
reportsURLs: ['http://3','http://4'],
})).mockImplementationOnce(() => ({ //page 3
reportsURLs: ['http://5','http://6'],
}))

//Add reports to the DB
const baseReport = {
description:'description',
startDate: new Date().toISOString(),
iso3166_2: 'DE-BE',
locations: [{
subdivisions:['somewhere','over the rainbow'],
}],
sources: [{
name:'source'
}]
}
await reportTable( {...baseReport, uri:'http://1' }, db )
await reportTable( {...baseReport, uri:'http://2' }, db )
await reportTable( {...baseReport, uri:'http://4' }, db )


const reportStream = await scrape( db.db, { groupSize: 2, groupInterval: 1, alreadyScrapedLimit: 3 } )


let c = 0
reportStream.on('data', async(chunk) => {
switch( c ){
case 'http://1':
case 'http://2':
case 'http://4':
case 'http://5':
case 'http://6':
throw new Error("Should not have scraped this report.")
default:
jest.runOnlyPendingTimers()
}
})

reportStream.on('end', () => {
expect( scrapeIndex ).toHaveBeenCalledTimes(2)
expect( scrapeReport ).toHaveBeenCalledTimes(1)
expect( scrapeReport ).toHaveBeenCalledWith( 'http://3' )
done()
})

})

})

0 comments on commit 2c74f81

Please sign in to comment.