diff --git a/.npmignore b/.npmignore new file mode 100644 index 0000000..b7c3916 --- /dev/null +++ b/.npmignore @@ -0,0 +1,3 @@ +tests +contributing.md +scratch.js diff --git a/README.md b/README.md index 4cf4976..47123d7 100644 --- a/README.md +++ b/README.md @@ -24,17 +24,19 @@

💂 Yup 💂

do it on your laptop.
+ -![image](https://user-images.githubusercontent.com/399657/39391259-b57ca9e0-4a6e-11e8-8b33-2064e5fc187e.png) `dumpster-dive` is a **nodejs** script that puts a **highly-queryable** wikipedia on your computer in a nice afternoon. It uses [worker-nodes](https://github.com/allegro/node-worker-nodes) to process pages in parallel, and [wtf_wikipedia](https://github.com/spencermountain/wtf_wikipedia) to turn ***wikiscript*** into whatever json.
- -- en-wikipedia takes about 7-hours, end-to-end -- + -- en-wikipedia takes about 5-hours, end-to-end --
+![dumpster](https://user-images.githubusercontent.com/399657/40262198-a268b95a-5ad3-11e8-86ef-29c2347eec81.gif) + ```bash npm install -g dumpster-dive ``` @@ -53,11 +55,11 @@ dumpster /path/to/my-wikipedia-article-dump.xml --citations=false --html=true ````bash $ mongo #enter the mongo shell use enwiki #grab the database -db.wikipedia.find({title:"Toronto"})[0].categories +db.pages.count() +# 4,926,056... +db.pages.find({title:"Toronto"})[0].categories #[ "Former colonial capitals in Canada", # "Populated places established in 1793" ...] -db.wikipedia.count() -# 4,926,056... ```` # Steps: @@ -106,16 +108,21 @@ The en-wiki dump should take a few hours. Maybe 8. Should be done before dinner. The console will update you every couple seconds to let you know where it's at. ### 7️⃣ done! +![image](https://user-images.githubusercontent.com/399657/40262181-7c1f17bc-5ad3-11e8-95ab-55f324022d43.png) + go check-out the data! to view your data in the mongo console: ````javascript $ mongo use afwiki //your db name //show a random page -db.wikipedia.find().skip(200).limit(2) +db.pages.find().skip(200).limit(2) //find a specific page -db.wikipedia.findOne({title:"Toronto"}).categories +db.pages.findOne({title:"Toronto"}).categories + +//find the last page +db.wikipedia.find().sort({$natural:-1}).limit(1) ```` alternatively, you can run `dumpster-report afwiki` to see a quick spot-check of the records it has created across the database. @@ -154,6 +161,22 @@ you can tell wtf_wikipedia what you want it to parse, and which data you don't n ```bash dumpster ./my-wiki-dump.xml --infoboxes=false --citations=false --categories=false --links=false ``` +* **custom json formatting** +you can grab whatever data you want, by passing-in a `custom` function. It takes a [wtf_wikipedia](https://github.com/spencermountain/wtf_wikipedia) `Doc` object, and you can return your cool data: +```js +let obj={ + file: path, + db: dbName, + custom: function(doc) { + return { + _id: doc.title(), //for duplicate-detection + title: doc.title(), //for the logger.. + categories: doc.categories() //whatever you want! + } + } +} +dumpster(obj, () => console.log('custom wikipedia!') ) +``` ## how it works: this library uses: diff --git a/bin/dumpster.js b/bin/dumpster.js index a98e370..b110ae2 100755 --- a/bin/dumpster.js +++ b/bin/dumpster.js @@ -56,7 +56,7 @@ Object.keys(options).forEach((k) => { //grab the wiki file if (!file) { - console.log('please supply a filename to the wikipedia article dump') + console.log('❌ please supply a filename to the wikipedia article dump') process.exit(1) } //try to make-up the language name for the db @@ -67,5 +67,4 @@ if (file.match(/-latest-pages-articles/)) { } options.file = file options.db = db -// console.log(options) dumpster(options) diff --git a/bin/report.js b/bin/report.js index c6ca3a9..6633a08 100644 --- a/bin/report.js +++ b/bin/report.js @@ -1,68 +1,54 @@ -const config = require('../config'); const chalk = require('chalk'); -const niceNumber = require('../lib/fns').niceNumber; -const MongoClient = require('mongodb').MongoClient +const openDb = require('../src/lib/open-db') +const niceNumber = require('../src/lib/fns').niceNumber; const dbName = process.argv[2] || 'enwiki' -const open = function(_dbName, callback) { - let url = 'mongodb://localhost:27017/' + _dbName - MongoClient.connect(url, function(err, db) { - if (err) { - console.log(err) - process.exit(1) +const showPage = async function(col) { + // int = parseInt(int, 10) + let docs = await col.aggregate( + { + $sample: { + size: 1 + } } - callback(db) - }) -} + ) + console.log(docs) + // let docs = await col.find({}, { + // skip: int, + // limit: 1 + // }) + // console.log(docs.toArray()) + // let doc = docs[0] + // console.log(chalk.blue('\npage #' + niceNumber(int) + `: -- ${chalk.green(chalk.underline(doc.title))} --`)) + // let sections = doc.sections || [] + // let str = ' ' + chalk.red(`${(doc.sections || []).length} sections`) + // str += ' - ' + chalk.red(`${(doc.infoboxes || []).length} infoboxes`) + // str += ' - ' + chalk.red(`${(doc.categories || []).length} categories`) + // str += ' - ' + chalk.red(`${(doc.citations || []).length} citations`) + // console.log(str, '\n') + // sections.forEach((sec) => { + // let heading = '='.repeat(sec.depth + 2) + // console.log(chalk.grey(' ' + heading + ' ' + (sec.title || '(intro)') + ' ' + heading)) + // //print first sentence + // if (sec.sentences && sec.sentences[0]) { + // let sen = sec.sentences[0].text || '' + // console.log(chalk.yellow(` "${sen.slice(0, 170)}..."`)) + // } + // }) + console.log('\n\n\n') -const showPage = function(col, int) { - col.find({}, { - skip: int, - limit: 1 - }).toArray(function(err, docs) { - let doc = docs[0] - console.log(chalk.blue('\npage #' + niceNumber(int) + `: -- ${chalk.green(chalk.underline(doc.title))} --`)) - let sections = doc.sections || [] - let str = ' ' + chalk.red(`${(doc.sections || []).length} sections`) - str += ' - ' + chalk.red(`${(doc.infoboxes || []).length} infoboxes`) - str += ' - ' + chalk.red(`${(doc.categories || []).length} categories`) - str += ' - ' + chalk.red(`${(doc.citations || []).length} citations`) - console.log(str, '\n') - sections.forEach((sec) => { - let heading = '='.repeat(sec.depth + 2) - console.log(chalk.grey(' ' + heading + ' ' + (sec.title || '(intro)') + ' ' + heading)) - //print first sentence - if (sec.sentences && sec.sentences[0]) { - let sen = sec.sentences[0].text || '' - console.log(chalk.yellow(` "${sen.slice(0, 170)}..."`)) - } - }) - console.log('\n\n\n') - }) } -open(dbName, (db) => { - let col = db.collection(config.collection) - col.count().then((count) => { - console.log(chalk.blue('\n\n ----------- ' + niceNumber(count) + ' pages total -----------\n')) - let showPages = [1] - showPages.push(Math.floor(count / 6)) - showPages.push(Math.floor(count / 5)) - showPages.push(Math.floor(count / 4)) - showPages.push(Math.floor(count / 3)) - showPages.push(Math.floor(count / 2)) - showPages.push(Math.floor(count / 1.5)) - let i = 0 - let repeat = setInterval(function() { - if (!showPages[i]) { - clearInterval(repeat) - db.close() - return - } - showPage(col, showPages[i]) - i += 1 - }, 2000) - +//cool moves, +const main = async function() { + let obj = await openDb({ + db: dbName }) -}) + let count = await obj.col.count() + console.log(chalk.blue('\n\n ----------- ' + niceNumber(count) + ' pages total -----------\n')) + await showPage(obj.col) + // await showPage(obj.col, count / 5) + await obj.client.close() +} +main() diff --git a/changelog.md b/changelog.md index 8a3370c..8089702 100644 --- a/changelog.md +++ b/changelog.md @@ -16,3 +16,7 @@ * rename from `wikipedia-to-mongo` to `dumpster-dive` * use wtf_wikipedia v3 (a big re-factor too!) * use `line-by-line`, and `worker-nodes` to run parsing in parallel +### v3.1.0 +* fix connection time-outs & improve logging output +* change default collection name to `pages` +* add `.custom()` function support diff --git a/config.js b/config.js index 224048f..f2e7a68 100644 --- a/config.js +++ b/config.js @@ -2,7 +2,7 @@ module.exports = { //number of pages to write at a time, to the queue "batch_size": 1000, //the default name of the collection to write to - "collection": "wikipedia", + "collection": "pages", //update interval - "logInterval": 4000, + "logInterval": 10000, } diff --git a/package.json b/package.json index 15f6e3c..7a5b128 100644 --- a/package.json +++ b/package.json @@ -2,37 +2,35 @@ "author": "Spencer Kelly (http://spencermounta.in)", "name": "dumpster-dive", "description": "get a wikipedia dump parsed into mongodb", - "version": "3.0.4", + "version": "3.1.0", "repository": { "type": "git", "url": "git://github.com/spencermountain/wikipedia-to-mongodb.git" }, "bin": { - "dumpster": "./bin/dumpster.js", - "dumpster-report": "./bin/report.js" + "dumpster": "./bin/dumpster.js" }, "engines": { "node": ">=6.0.0" }, "main": "./src/index.js", "scripts": { - "test": "\"node_modules/.bin/tape\" \"./tests/*.test.js\" | \"node_modules/.bin/tap-spec\" --color", + "test": "\"node_modules/.bin/tape\" \"./tests/*.test.js\" | \"node_modules/.bin/tap-dancer\" --color", "cleanup": "rm /tmp/worker.logs && touch /tmp/worker.logs", "watch": "node ./scratch.js" }, "dependencies": { "chalk": "2.4.1", - "line-by-line": "0.1.6", "mongodb": "3.0.7", - "ora": "2.1.0", "prettysize": "1.1.0", + "sunday-driver": "1.0.1", "worker-nodes": "1.6.0", "wtf_wikipedia": "^3.1.1", "yargs": "11.0.0" }, "devDependencies": { - "shelljs": "^0.8.1", - "tap-spec": "4.1.1", + "shelljs": "0.8.2", + "tap-dancer": "0.0.3", "tape": "4.9.0" }, "license": "MIT" diff --git a/scratch.js b/scratch.js index 392b544..b8bad3e 100644 --- a/scratch.js +++ b/scratch.js @@ -1,24 +1,41 @@ const dumpster = require('./src') const drop = require('./src/lib/drop-db') -const path = '/Users/spencer/data/wikipedia/simplewiki-latest-pages-articles.xml' -// const path = '/Users/spencer/data/wikipedia/eswiki-latest-pages-articles.xml' -// const path = '/Users/spencer/data/wikipedia/enwiki-latest-pages-articles.xml' -// const path = './tests/smallwiki-latest-pages-articles.xml' -// const path = './tests/tinywiki-latest-pages-articles.xml' +//144mb → 2.5 minutes = 57mb per worker per minute + +// const path = '/Users/spencer/data/wikipedia/afwiki-latest-pages-articles.xml' //4.3mins +const path = '/Users/spencer/data/wikipedia/simplewiki-latest-pages-articles.xml' //5mins //144 MB each +// const path = '/Users/spencer/data/wikipedia/eswiki-latest-pages-articles.xml' //2hrs - 12gb→5gb +// const path = '/Users/spencer/data/wikipedia/enwiki-latest-pages-articles.xml' //6hrs +// const path = './tests/smallwiki-latest-pages-articles.xml' //3s +// const path = './tests/tinywiki-latest-pages-articles.xml' //2s const dbName = path.match(/\/([a-z-]+)-latest-pages/)[1] let options = { file: path, db: dbName, - plaintext: true, - html: true, - markdown: true, + custom: function(doc) { + return { + _id: doc.title(), + title: doc.title(), + categories: doc.categories(), + } + } } + //delete all pages drop(options).then(() => { dumpster(options) }) +// const fs = require('fs'); +// let str = fs.readFileSync(path).toString() +// let str = ` +// +// this duplicate should stay +// from here too +// ` +// console.log(str.match(/([\s\S]*?)<\/text>/)) + // half- 6021472 // Euston Road - 5888070 diff --git a/src/01-prelim-stuff.js b/src/01-prepwork.js similarity index 83% rename from src/01-prelim-stuff.js rename to src/01-prepwork.js index 7c14c87..c5bb5b4 100644 --- a/src/01-prelim-stuff.js +++ b/src/01-prepwork.js @@ -3,7 +3,7 @@ const fs = require("fs") const config = require("../config") const cpuCount = require('os').cpus().length -const guardFile = function(options) { +const guardIO = function(options) { if (!options.file || !fs.existsSync(options.file)) { console.log(chalk.red('\n --can\'t find file: "' + chalk.blue(options.file) + '" ---')); console.log(chalk.grey(' please supply a filename for the wikipedia article dump in xml format')); @@ -16,8 +16,8 @@ const guardFile = function(options) { } } -//a little housework first, -const prepare = function(options) { +//a little housework first, for our config object +const prepWork = function(options) { options = options || {} options = Object.assign({}, options); @@ -25,9 +25,10 @@ const prepare = function(options) { if (!options.db) { options.db = options.file.match(/\/([a-z-]+)-latest-pages/)[1] || 'wikipedia' } - guardFile(options) + //make sure the file looks good.. + guardIO(options) - //few defaults + //set a few defaults options.dbName = options.db options.workers = options.workers || cpuCount options.batch_size = options.batch_size || config.batch_size @@ -38,4 +39,4 @@ const prepare = function(options) { }); return options } -module.exports = prepare +module.exports = prepWork diff --git a/src/02-Worker-pool.js b/src/02-Worker-pool.js index 1e2e803..d9f3405 100644 --- a/src/02-Worker-pool.js +++ b/src/02-Worker-pool.js @@ -3,7 +3,12 @@ const WorkerNodes = require('worker-nodes'); const fs = require("fs"); const chalk = require('chalk') const EventEmitter = require('events'); -const margin = ' ' +const fns = require('./lib/fns') +const right = fns.alignRight +const niceTime = fns.niceTime +const margin = ' ' +//estimate of duration: +const mbPerMinute = 58 class WorkerPool extends EventEmitter { constructor(options) { @@ -21,19 +26,19 @@ class WorkerPool extends EventEmitter { } printHello() { - console.log('\n\n\n' + margin + ' ----------') - console.log(margin + ` oh hi 👋`) - console.log('\n') - console.log(margin + `total file size: ${chalk.green(pretty(this.fileSize))}`) - console.log(margin + 'creating ' + chalk.blue(this.workerCount + ' workers') + ``) - console.log(margin + chalk.grey('-') + ` each worker will be given: ${chalk.magenta(pretty(this.chunkSize))} ` + chalk.grey('-')); - console.log(margin + ' ----------') + let megaBytes = this.chunkSize / 1048576 //1,048,576 + let duration = megaBytes / mbPerMinute + console.log('\n\n\n' + margin + '---------------------------') + console.log(margin + chalk.yellow(` oh hi `) + `👋`) + console.log(margin + chalk.green(`size:`) + ` ${chalk.green(right(pretty(this.fileSize)))}`) + console.log(margin + ` ${chalk.blue(right(this.workerCount + ' workers'))}`) + console.log(margin + ` ${chalk.magenta(right(pretty(this.chunkSize) + ' each'))}`); + console.log(margin + chalk.red(`estimate:`) + ` ${chalk.red(right(niceTime(duration)))}`); + console.log(margin + '---------------------------') console.log('\n') } isDone() { - console.log('\n') - console.log(' 💪 a worker has finished 💪 ') this.running -= 1 console.log(chalk.grey(' - ' + this.running + ' workers still running -\n')) if (this.running === 0) { @@ -61,7 +66,7 @@ class WorkerPool extends EventEmitter { this.printHello() //convoluted loop to wire-up each worker for(let i = 0; i < self.workerCount; i += 1) { - self.workerNodes.call.doSection(options, this.chunkSize, i).then(() => { + self.workerNodes.call.doSection(options, this.workerCount, i).then(() => { self.running += 1 //once all workers have been started.. if (self.running === self.workerCount) { diff --git a/src/03-logger.js b/src/03-logger.js index f5e1c19..8a2b695 100644 --- a/src/03-logger.js +++ b/src/03-logger.js @@ -3,57 +3,56 @@ const openDB = require('./lib/open-db') const fns = require("./lib/fns") const config = require("../config") -//logger for current status of import +//a periodic status-logger for the import class Logger { constructor(options) { this.options = options this.wait = config.logInterval - this.interval = null + this.please_stop = false } - start() { - this.interval = setInterval(() => { + open(cb) { + openDB(this.options.db, cb) + } + triggerNext() { + setTimeout(() => { this.stat() }, this.wait) } - stop() { - clearInterval(this.interval) + start() { + this.triggerNext() } - open(cb) { - openDB(this.options.db, cb) + stop() { + this.please_stop = true } //# of records entered in db - count(obj, cb) { - obj.col.count().then(cb) + count(obj) { + return obj.col.count() } //get the most recent article written - lastPage(obj, count, cb) { - obj.col.find({}, { - skip: count - 1, - limit: 1 - }).toArray(function(err, docs) { - if (!docs || !docs[0]) { - cb(null) - } else { - cb(docs[0]) - } - }) + lastPage(obj) { + return obj.col.find({}).sort({ + $natural: -1 + }).limit(1).toArray() } //log some output async stat() { + // console.time('stat') let obj = await openDB(this.options) - this.count(obj, (count) => { - if (!count) { - obj.client.close() - return - } - this.lastPage(obj, count, (doc) => { - count = fns.niceNumber(count) - if (doc) { - console.log(chalk.grey(' last page: ') + chalk.green('#' + count) + chalk.blue(' - "' + doc.title + '" ')) - } - obj.client.close() - }) - }) + let count = await this.count(obj) + let page = await this.lastPage(obj) + if (page && page[0]) { + page = page[0] + count = fns.niceNumber(count) + console.log('') + console.log(chalk.grey(' current: ') + chalk.green(count) + ' pages' + chalk.blue(' - "' + page.title + '" ')) + console.log('') + } + await obj.client.close() + // console.timeEnd('stat') + //fire the next one! + if (!this.please_stop) { + this.triggerNext() + } } } diff --git a/src/index.js b/src/index.js index bb28613..32f174b 100755 --- a/src/index.js +++ b/src/index.js @@ -1,7 +1,7 @@ //stream a big wikipedia xml.bz2 file into mongodb // because why not. const chalk = require('chalk') -const prelim = require('./01-prelim-stuff') +const prelim = require('./01-prepwork') const WorkerPool = require("./02-Worker-pool") const hound = require("./03-logger") const openDB = require('./lib/open-db') @@ -13,15 +13,14 @@ const noop = function() {} const finish = async function(options) { let obj = await openDB(options) console.log('\n\n 👍 closing down.\n') - obj.col.count().then((count) => { - let duration = fns.timeSince(start) - console.log(' -- final count is ' + chalk.magenta(fns.niceNumber(count)) + ' pages --') - console.log(' ' + chalk.yellow(`took ${duration}`)) - console.log(' 🎉') - console.log('\n\n') - obj.client.close() - process.exit() - }) + let count = await obj.col.count() + let duration = fns.timeSince(start) + console.log(' -- final count is ' + chalk.magenta(fns.niceNumber(count)) + ' pages --') + console.log(' ' + chalk.yellow(`took ${duration}`)) + console.log(' 🎉') + console.log('\n\n') + await obj.client.close() + process.exit() } //open up a mongo db, and start xml-streaming.. @@ -51,6 +50,7 @@ const main = (options, done) => { //handle ctrl-c gracefully process.on('SIGINT', async function() { + logger.stop(); workers.cleanup(); oneSec(() => { process.exit(); diff --git a/src/lib/drop-db.js b/src/lib/drop-db.js index 71375b3..2369df4 100644 --- a/src/lib/drop-db.js +++ b/src/lib/drop-db.js @@ -3,12 +3,9 @@ const openDb = require('./open-db') // const dropDb = async function(options) { let obj = await openDb(options) - obj.col.deleteMany({}).then(() => { - console.log('dropped') - obj.col.count().then((count) => { - console.log(' - now ' + count + ' records - ') - }) - }) - return + await obj.col.deleteMany({}) + console.log('dropped') + let count = await obj.col.count() + console.log(' - now ' + count + ' records - ') } module.exports = dropDb diff --git a/src/lib/fns.js b/src/lib/fns.js index c12c0e9..fb06130 100644 --- a/src/lib/fns.js +++ b/src/lib/fns.js @@ -11,7 +11,14 @@ exports.niceNumber = (x) => { //logger of rough-time since an epoch exports.timeSince = function(start) { - let seconds = (Date.now() - start) / 1000 + let ms = (Date.now() - start) + if (ms < 1000) { + return ms + 'ms' + } + let seconds = ms / 1000 + if (seconds < 60) { + return parseInt(seconds, 10) + 's' + } let minutes = seconds / 60 let duration = minutes.toFixed(1) + ' minutes' if (minutes > 120) { @@ -20,3 +27,15 @@ exports.timeSince = function(start) { } return duration } + +exports.alignRight = function(str) { + return (" " + str).slice(-13); +} + +exports.niceTime = function(mins) { + if (mins <= 60) { + return mins.toFixed(1) + ' mins' + } + let hours = mins / 60 + return hours.toFixed(1) + ' hrs' +} diff --git a/src/worker/01-parsePage.js b/src/worker/01-parsePage.js new file mode 100644 index 0000000..e23d478 --- /dev/null +++ b/src/worker/01-parsePage.js @@ -0,0 +1,46 @@ +const isRedirect = /([\s\S]*?)<\/text>/) + if (m !== null) { + page.wiki = m[1] + } + return page +} +module.exports = parsePage diff --git a/src/worker/02-parseWiki.js b/src/worker/02-parseWiki.js index af1871c..61c31b6 100644 --- a/src/worker/02-parseWiki.js +++ b/src/worker/02-parseWiki.js @@ -12,10 +12,10 @@ const escapeXML = function(str) { } //get parsed json from the wiki markup -const parseData = function(page, options) { +const parseWiki = function(page, options) { try { - page.script = escapeXML(page.script || '') - let doc = wtf(page.script); + page.wiki = escapeXML(page.wiki || '') + let doc = wtf(page.wiki); //dont insert this if it's a redirect if (options.skip_redirects === true && doc.isRedirect()) { return null @@ -24,7 +24,12 @@ const parseData = function(page, options) { return null } //turn the wtf_wikipedia document into storable json - let data = doc.json(options) + let data = {} + if (!options.custom) { //default format + data = doc.json(options) + } else { //DIY format + data = options.custom(doc) + } data.title = page.title || data.title data = encode.encodeData(data); //use the title/pageID from the xml @@ -39,4 +44,4 @@ const parseData = function(page, options) { } }; -module.exports = parseData +module.exports = parseWiki diff --git a/src/worker/03-write-db.js b/src/worker/03-write-db.js index 0d79c59..e600eab 100644 --- a/src/worker/03-write-db.js +++ b/src/worker/03-write-db.js @@ -1,32 +1,45 @@ const chalk = require('chalk') const openDB = require('../lib/open-db') +const fns = require('../lib/fns') const mongoConfig = { ordered: false } -const writeDb = (options, pages) => { - return new Promise(async (resolve) => { +//report how many pages we wrote this time +const writeMsg = function(pages, count, start, workerNum) { + let msg = chalk.yellow(` #${workerNum} `) + count = fns.niceNumber(count) + msg += chalk.green(`+${count} `) + 'pages' + msg += chalk.grey(' - ' + fns.timeSince(' ' + start)) + msg += chalk.blue(` - `) + msg += chalk.magenta(`"${pages[0].title}"`) + console.log(msg) +} - let obj = await openDB(options) +const writeDb = async (options, pages, workerNum) => { + const start = Date.now() + let obj = await openDB(options) - obj.col.insertMany(pages, mongoConfig, (err) => { - if (err) { - console.log(' ' + chalk.red(err.message)) - if (err.writeErrors && err.writeErrors[0]) { - let e = err.writeErrors[0] - //suppress duplicate key errors - if (e && e.code === 11000) { - console.log(chalk.red(` - already have "${err.writeErrors[0]._id}"`)) - } else { - console.log(' ' + chalk.red(e.errmsg)) - } - } - } - obj.client.close() - //keep going .. 🙉 - resolve() - }) + let result = await obj.col.insertMany(pages, mongoConfig).catch(async (err) => { + if (err.code === 11000) { + let errCount = err.result.getWriteErrorCount() + errCount = fns.niceNumber(errCount) + console.log(chalk.red(`-- ${errCount} duplicate pages --`)) + } else { + console.log(chalk.red(`====error!===`)) + console.log(err) + } + err = err.result.toJSON() + const count = err.nInserted + writeMsg(pages, count, start, workerNum) + await obj.client.close() }) + //no errors thrown, all good + if (result) { + const count = result.insertedCount + writeMsg(pages, count, start, workerNum) + await obj.client.close() + } } module.exports = writeDb diff --git a/src/worker/_encode.js b/src/worker/_encode.js index 6b4ef73..a321c87 100644 --- a/src/worker/_encode.js +++ b/src/worker/_encode.js @@ -2,6 +2,10 @@ //https://stackoverflow.com/questions/12397118/mongodb-dot-in-key-name/30254815#30254815 const encodeStr = function(str) { + if (typeof str !== 'string') { + console.log(str) + str = '' + } return str .replace(/\\/g, '\\\\') .replace(/^\$/, '\\u0024') diff --git a/src/worker/index.js b/src/worker/index.js index 091d8b7..e3bc0c4 100644 --- a/src/worker/index.js +++ b/src/worker/index.js @@ -1,83 +1,63 @@ const chalk = require('chalk') -const fns = require('../lib/fns') -const LineByLineReader = require('line-by-line') -const parseLine = require('./01-parseLine') +const sundayDriver = require('sunday-driver') +// const sundayDriver = require('/Users/spencer/mountain/sunday-driver/src/index.js') +const parsePage = require('./01-parsePage') const parseWiki = require('./02-parseWiki'); const writeDb = require('./03-write-db'); -const doSection = async (options, chunkSize, workerNum) => { - let startByte = 0 - if (workerNum !== 0) { - startByte = (workerNum * chunkSize) //- 1000000 // start a megabyte earlier +const doSection = async (options, workerCount, workerNum) => { + let pages = [] + let percent = 100 / workerCount + let start = percent * workerNum + let end = start + percent + // console.log(`#${workerNum} - ${start}% → ${end}%`) + let driver = { + file: options.file, + start: `${start}%`, + end: `${end}%`, + splitter: "", + each: (xml, resume) => { + //pull-out sections from this xml + let page = parsePage(xml) + if (page !== null) { + //parse the page into json + page = parseWiki(page, options) + if (page !== null) { + pages.push(page) + } + } + if (pages.length >= options.batch_size) { + writeDb(options, pages, workerNum).then(() => { + pages = [] + resume() + }) + } else { + resume() + } + }, + atPoint: { + 50: () => { + console.log('') + console.log(chalk.grey(` (worker #${workerNum} is 50% done)`)) + console.log('') + } + } } - let endByte = startByte + chunkSize //+ 3000000 // end 2 megabytes later so we don't lose pages cut by chunks - - // console.log('starting worker #' + workerNum + ' : ' + startByte + ' → ' + endByte) - let lr = new LineByLineReader(options.file, { - start: startByte, - end: endByte - }); - - let state = {}; - let pageCount = 0; - let pages = []; - let workerBegin = Date.now() - - const insertToDb = async function(isLast) { - lr.pause(); + let p = sundayDriver(driver) + p.catch(console.log) + p.then(async () => { //on done + // insert the remaining pages if (pages.length > 0) { - await writeDb(options, pages) - } - pages = []; - - //log some nice kinda output - let seconds = ((Date.now() - workerBegin) / 1000).toFixed(1) - let output = chalk.yellow(`worker #${workerNum} - `) - output += chalk.grey(` +${fns.niceNumber(options.batch_size)} pages - (${seconds}s) - `) - output += chalk.magenta(` at ${fns.niceNumber(pageCount)}`) - console.log(' ' + output); - - workerBegin = Date.now() - lr.resume(); - if (isLast === true) { - process.send({ - type: "workerDone", - pid: process.pid - }) + await writeDb(options, pages, workerNum) } - }; - - //reached the end of a page - const donePage = function(pageObj) { - pageCount += 1 - pageObj = parseWiki(pageObj, options) - if (pageObj !== null) { - pages.push(pageObj); - } else { - console.log(chalk.green(' -skipping page: ""')) - } - // doArticleTimeCounter += Date.now() - doArticleTime - if (pageCount % options.batch_size === 0) { - insertToDb(); - } - } - - lr.on('error', (e) => { - // 'err' contains error object - console.error(chalk.red("linereader error")); - console.log(e) - }); - - lr.on('line', (line) => { - state = parseLine(line, state, donePage) - }); - - lr.on('end', function() { - // All lines are read, file is closed now. - // insert remaining pages. - insertToDb(true); - }); - return (process.pid) + console.log('\n') + console.log(` 💪 worker #${workerNum} has finished 💪 `) + process.send({ + type: "workerDone", + pid: process.pid + }) + }) + return process.pid }; module.exports = { diff --git a/tests/custom.test.js b/tests/custom.test.js index 083808b..6f0b147 100644 --- a/tests/custom.test.js +++ b/tests/custom.test.js @@ -9,7 +9,7 @@ test('custom-made-tinywiki', function(t) { db: dbName, images: true } - db.drop(dbName, 'wikipedia', () => { + db.drop(dbName, 'pages', () => { dumpster(obj, () => { db.firstTen(dbName, docs => { t.equal(docs.length, 7, 'seven records') diff --git a/tests/db.js b/tests/db.js index fb91b4b..57c2667 100644 --- a/tests/db.js +++ b/tests/db.js @@ -15,7 +15,7 @@ const open = function(dbName, callback) { //count all pages const count = function(dbName, cb) { open(dbName, function(db, client) { - let col = db.collection('wikipedia') + let col = db.collection('pages') col.count().then(len => { client.close() cb(len) @@ -26,7 +26,7 @@ const count = function(dbName, cb) { //grab a couple const firstTen = function(dbName, cb) { open(dbName, function(db, client) { - let col = db.collection('wikipedia') + let col = db.collection('pages') col.find({}).toArray(function(err, docs) { if (err) { console.log(err) @@ -41,7 +41,7 @@ const firstTen = function(dbName, cb) { //delete all pages const drop = function(dbName, colName, cb) { open(dbName, function(db, client) { - db.collection('wikipedia') + db.collection('pages') let col = db.collection(colName) // console.log('dropping ' + colName) col.deleteMany({}) @@ -59,4 +59,4 @@ module.exports = { } // firstTwo('tempwiki', console.log) // open('tempwiki', console.log) -// drop('smallwiki', 'wikipedia',console.log) +// drop('smallwiki', 'pages',console.log) diff --git a/tests/plain.test.js b/tests/plain.test.js index 2cd04af..0780a8a 100644 --- a/tests/plain.test.js +++ b/tests/plain.test.js @@ -11,7 +11,7 @@ test('plaintext', function(t) { html: true, markdown: true, } - db.drop(dbName, 'wikipedia', () => { + db.drop(dbName, 'pages', () => { dumpster(obj, () => { db.firstTen(dbName, docs => { t.equal(docs.length, 7, '7 records') diff --git a/tests/redirects.test.js b/tests/redirects.test.js index d9450b0..efc3aac 100644 --- a/tests/redirects.test.js +++ b/tests/redirects.test.js @@ -10,7 +10,7 @@ test('no-redirects', function(t) { skip_redirects: true, skip_disambig: true, } - db.drop(dbName, 'wikipedia', () => { + db.drop(dbName, 'pages', () => { dumpster(obj, () => { db.firstTen(dbName, docs => { t.equal(docs.length, 5, 'five records')