From 196701f87e585b8b5304c4247fae1e84e031aea8 Mon Sep 17 00:00:00 2001 From: chenshijie Date: Mon, 27 Feb 2012 21:48:18 +0800 Subject: [PATCH] use async control work flow --- Readme | 57 ------------ etc/settings.original.json | 36 ++++---- lib/WorkFlow.js | 28 ------ lib/mysql.js | 16 ++-- lib/utils.js | 2 +- package.json | 2 + refresh_queue.js | 18 ++-- spider_server.js | 179 +++++++++++++++++++++++-------------- 8 files changed, 147 insertions(+), 191 deletions(-) delete mode 100644 Readme delete mode 100644 lib/WorkFlow.js mode change 100644 => 100755 lib/mysql.js diff --git a/Readme b/Readme deleted file mode 100644 index 0e0a9eb..0000000 --- a/Readme +++ /dev/null @@ -1,57 +0,0 @@ -#Step 1 -git clone https://github.com/netgen-inc/spider.git - -#Step 2 -cd spider - -#Step 3 -npm install -d - -#Step 4 -cd etc -cp settings.original.json settings.json -vim settings.json - -#Step 5 -#settings.json 说明 -{ - "queue_server" : { //Queue Server - "host" : "127.0.0.1", //Queue Server IP - "port" : 3000, //Queue Server Port - "queue_path" : "queue" //Queue Server Path - }, - "mysql" : { - "172.16.33.237:3306:stock_radar" : { //任务URI中的ip,端口和database - "username" : "stockradar",//任务中数据库用户名 - "password" : "stockradar"//任务中数据库用密码 - }, - "172.16.39.117:3306:spider" : { - "username" : "spider", - "password" : "spider" - }, - "redis" : { - "host" : "172.16.39.117", //redis服务器host - "port" : 6379,//redis服务器端口 - "db" : 14 //redis数据库 - }, - "baseurl" : { - "host" : "172.16.39.117", //BaseUrl表所在库的IP - "port" : 3306, //BaseUrl表所在库的端口 - "username" : "spider",//BaseUrl表所在库的用户名 - "password" : "spider",//BaseUrl表所在库的密码 - "database" : "spider" //BaseUrl表所在库的数据库名称 - } - }, - "log" : { - "file" : "log/spider.log" //Spider 日志文件 - }, - "spider_count" : 50, //同时最大请求数 - "spider_monitor_queue" : "url", - "spider_generate_queue" : "page_content", - "check_interval" : 2000 -} - - -#Step 6 -node spider_server.js -node refresh_queue.js diff --git a/etc/settings.original.json b/etc/settings.original.json index f4b6a73..4322666 100644 --- a/etc/settings.original.json +++ b/etc/settings.original.json @@ -4,27 +4,27 @@ "port" : 3000, "queue_path" : "queue" }, - "mysql" : { - "172.16.33.237:3306:stock_radar" : { - "username" : "stockradar", - "password" : "stockradar" - }, - "172.16.39.117:3306:spider" : { - "username" : "spider", - "password" : "spider" - }, - "baseurl" : { - "host" : "172.16.39.117", - "port" : 3306, - "username" : "spider", - "password" : "spider", - "database" : "spider" + "mysql" : [ + { + "host" : "127.0.0.1", + "port" : "3306", + "database" : "spider", + "user" : "root", + "password" : "" } + ], + "baseurl" : { + "host" : "127.0.0.1", + "port" : "3306", + "database" : "spider", + "user" : "root", + "password" : "" }, "redis" : { - "host" : "172.16.39.117", + "host" : "127.0.0.1", "port" : 6379, - "db" : 14 + "content_db" : 14, + "hash_db" : 13 }, "log" : { "file" : "log/spider.log" @@ -34,4 +34,4 @@ "spider_generate_queue" : "page_content", "check_interval" : 2000, "cache_time" : 300 -} +} \ No newline at end of file diff --git a/lib/WorkFlow.js b/lib/WorkFlow.js deleted file mode 100644 index 9194726..0000000 --- a/lib/WorkFlow.js +++ /dev/null @@ -1,28 +0,0 @@ -var async = require('async'); - -var WorkFlow = function(func, getCallback, getTask, concurrency) { - - var task = function(info, callback) { - var start = function(callback) { - callback(null, info); - }; - async.waterfall([ start ].concat(func), callback); - }; - - this.q = async.queue(task, concurrency); - - this.q.drain = function() { - console.log('all items have been processed'); - }; - - this.q.empty = getTask; - - this.getCallback = getCallback; - -}; - -Flow.prototype.push = function(task) { - this.q.push(task, this.getCallback(task)); -}; - -exports.WorkFlow = WorkFlow; diff --git a/lib/mysql.js b/lib/mysql.js old mode 100644 new mode 100755 index 6295d27..7a42161 --- a/lib/mysql.js +++ b/lib/mysql.js @@ -1,14 +1,10 @@ -var Client = require('mysql').Client; +var mysql = require('mysql'); -var MySqlClient = function(host, port, user, password, database) { +var MySqlClient = function(options) { var self = this; - self.client = new Client(); - self.client.host = host; - self.client.port = port; - self.client.user = user; - self.client.password = password; - self.client.database = database; + self.client = mysql.createClient(options); }; + MySqlClient.prototype.store_page_content = function(url_id, in_time, stock_code, url_table, meta, content, cb) { this.client.query('INSERT INTO page_content SET url_id = ?, in_time = ?, stock_code = ?, url_table = ?, parse_time = 0, meta = ?, content = ?', [ url_id, in_time, stock_code, url_table, meta, content ], function(err, results) { if (err) { @@ -51,8 +47,8 @@ MySqlClient.prototype.update_url_fetch_time = function(table_name, url_id, fetch }); }; -MySqlClient.prototype.get_base_url = function(fetch_time, cb) { - this.client.query('SELECT * FROM baseurl WHERE fetch_time < ?', [ fetch_time ], function(err, results,fields) { +MySqlClient.prototype.get_base_url = function(fetch_time, count, cb) { + this.client.query('SELECT * FROM baseurl WHERE fetch_time < ? limit ?', [ fetch_time, count ], function(err, results, fields) { if (err) { cb(err); } else { diff --git a/lib/utils.js b/lib/utils.js index df9766e..fcc4879 100644 --- a/lib/utils.js +++ b/lib/utils.js @@ -8,7 +8,7 @@ exports.md5 = function(str, encoding) { }; exports.getTimestamp = function() { - return Math.floor(new Date().getTime() / 1000); + return Math.floor(Date.now() / 1000); }; exports.getDateString = function() { diff --git a/package.json b/package.json index 27facd4..6442fb9 100644 --- a/package.json +++ b/package.json @@ -5,11 +5,13 @@ "author": "shijie.chen ", "dependencies": { "request": ">= 2.2.9", + "async": ">= 0.1.15", "underscore": ">= 1.2.3", "underscore.string" : "2.0.0", "mysql": ">= 0.9.5", "redis": ">= 0.7.1", "crypto": ">= 0.0.3", + "vows": ">=0.6.1", "devent": "https://github.com/netgen-inc/devent/tarball/master", "queuer": "https://github.com/netgen-inc/queuer/tarball/master" }, diff --git a/refresh_queue.js b/refresh_queue.js index 6e2e547..abd137a 100644 --- a/refresh_queue.js +++ b/refresh_queue.js @@ -1,16 +1,15 @@ var MySqlClient = require('./lib/mysql').MySqlClient; var fs = require('fs'); -var config = __dirname + '/etc/settings.json'; -var configs = JSON.parse(fs.readFileSync(config, 'utf8')); +var configs = require('./etc/newsettings.json'); var logger = require('./lib/logger').logger; var _logger = logger(__dirname + '/log/refresh.log'); var utils = require('./lib/utils'); -var mysql = new MySqlClient(configs.mysql.baseurl.host, configs.mysql.baseurl.port, configs.mysql.baseurl.username, configs.mysql.baseurl.password, configs.mysql.baseurl.database); +var mysql = new MySqlClient(configs.baseurl); var queue = require('queuer'); var q4url = queue.getQueue('http://' + configs.queue_server.host + ':' + configs.queue_server.port + '/' + configs.queue_server.queue_path, configs.spider_monitor_queue); -var last_run_time = 0; +var base_url_count = 500; var refresh_queue = function() { var current_time = utils.getTimestamp(); var hour = new Date().getHours(); @@ -18,23 +17,20 @@ var refresh_queue = function() { if (hour > 22 || hour < 8) { time_step = 30 * 60; } - if (current_time - last_run_time < time_step) { - return; - } console.log(utils.getLocaleISOString() + ' refresher run'); - last_run_time = current_time; var fetch_time = current_time - time_step; - mysql.get_base_url(fetch_time, function(result) { + mysql.get_base_url(fetch_time, base_url_count, function(result) { var length = result.length; for ( var i = 0; i < length; i++) { - var task = 'mysql://' + configs.mysql.baseurl.host + ':' + configs.mysql.baseurl.port + '/' + configs.mysql.baseurl.database + '?baseurl#' + result[i].id; + var task = 'mysql://' + configs.baseurl.host + ':' + configs.baseurl.port + '/' + configs.baseurl.database + '?baseurl#' + result[i].id; + console.log(task); _logger.info(task); q4url.enqueue(task); } }); }; -//refresh_queue(); +// refresh_queue(); setInterval(function() { refresh_queue(); diff --git a/spider_server.js b/spider_server.js index c416238..20f9b9a 100644 --- a/spider_server.js +++ b/spider_server.js @@ -1,104 +1,151 @@ -var http = require('http'); var fs = require('fs'); var MySqlClient = require('./lib/mysql').MySqlClient; var logger = require('./lib/logger').logger; var utils = require('./lib/utils'); -var Spider = require('./lib/spiders').Spider; var configs = require('./etc/settings.json'); var _logger = logger(__dirname + '/' + configs.log.file); var queue = require('queuer'); - +var WorkFlow = require('./lib/workflow').WorkFlow; +var Worker = require('./lib/worker'); +var request = require('request'); var redis = require("redis"); var redisClient = redis.createClient(configs.redis.port, configs.redis.host); redisClient.select(configs.redis.db); redisClient.on('ready', function() { - redisClient.select(configs.redis.db); + redisClient.select(configs.redis.content_db); }); -var databases = {}; -var spiders = []; - var devent = require('devent').createDEvent('spider'); - +// 将pid写入文件,以便翻滚日志时读取 fs.writeFileSync(__dirname + '/run/server.lock', process.pid.toString(), 'ascii'); +// 页面内容队列 +var queue4PageContent = queue.getQueue('http://' + configs.queue_server.host + ':' + configs.queue_server.port + '/' + configs.queue_server.queue_path, configs.spider_generate_queue); +// url队列 +var queue4Url = queue.getQueue('http://' + configs.queue_server.host + ':' + configs.queue_server.port + '/' + configs.queue_server.queue_path, configs.spider_monitor_queue); + +var databases = {}; +// 准备数据库队列 +var i = 0; +for ( i = 0; i < configs.mysql.length; i++) { + var options = configs.mysql[i]; + var key = options.host + ':' + options.port + ':' + options.database; + var mysql = new MySqlClient(options); + databases[key] = mysql; +} devent.on('queued', function(queue) { // 同时多个队列进入时会调用allSpidersRun()多次。 if (queue == 'url') { - // console.log('SERVER: ' + queue + " received task"); + console.log('SERVER: ' + queue + " received task"); // allSpidersRun(); } }); -var queue4page_content = queue.getQueue('http://' + configs.queue_server.host + ':' + configs.queue_server.port + '/' + configs.queue_server.queue_path, configs.spider_generate_queue); -// init spider -for ( var i = 1; i < configs.spider_count + 1; i++) { - var spider = new Spider('spider_' + i, configs.cache_time || 300); - spider.on('spider_finished', function(data) { - var task = data.task; - var run_time = utils.getTimestamp() - task.in_time; - _logger.info([ 'TASK_FINISHED', this.name, 'RETRY:' + task.original_task.retry, task.original_task.uri, 'RUN_TIME:' + run_time ].join("\t")); - devent.emit('task-finished', task.original_task); - }); - - spider.on('new_task', function(data, new_task_id) { - var task = data.task; - var new_task = utils.buildTaskURI({ protocol : task.protocol, hostname : task.hostname, port : task.port, database : task.database, table : 'page_content', id : new_task_id }); - _logger.info([ 'NEWTASK', this.name, 'RETRY:' + task.original_task.retry, task.original_task.uri, new_task ].join("\t")); - queue4page_content.enqueue(new_task); - }); - - spider.on('spider_error', function(data) { - var task = data.task; - var run_time = utils.getTimestamp() - task.in_time; - _logger.info([ 'TASK_ERROR', this.name, task.original_task.retry, task.original_task.uri, 'RUN_TIME:' + run_time ].join("\t")); - devent.emit('task-error', task.original_task); - }); - - spiders.push(spider); +var databases = {}; +for ( i = 0; i < configs.mysql.length; i++) { + var option = configs.mysql[i]; + var key = option.host + ':' + option.port + ':' + option.database; + console.log(key); + var mysql = new MySqlClient(option); + databases[key] = mysql; } -var queue4url = queue.getQueue('http://' + configs.queue_server.host + ':' + configs.queue_server.port + '/' + configs.queue_server.queue_path, configs.spider_monitor_queue); -var all_spiders_last_start_time = new Date().getTime(); +var queue4PageContent = queue.getQueue('http://' + configs.queue_server.host + ':' + configs.queue_server.port + '/' + configs.queue_server.queue_path, configs.spider_generate_queue); +var queue4Url = queue.getQueue('http://' + configs.queue_server.host + ':' + configs.queue_server.port + '/' + configs.queue_server.queue_path, configs.spider_monitor_queue); -var allSpidersRun = function() { - var time1 = new Date().getTime() - all_spiders_last_start_time; - if (time1 < 1000) { - console.log('################## the time between 2 allSpidersRun is too short (less than 1 second) ##################'); - return; - } - all_spiders_last_start_time = new Date().getTime(); - for ( var i = 0; i < configs.spider_count; i++) { - var spider = spiders[i]; - if (spider.getStatus() == 'waiting') { - startSimgleSpider(spider); +/** + * 对task进行准备工作 + * + * @param task + * @param callback + */ +var prepareTask = function(task, callback) { + console.log('----------> prepareTask <-----------'); + if (task.original_task.retry >= 10) { + var error = { + error : 'TASK_RETRY_TIMES_LIMITED', + msg : 'try to deal with the task more than 10 times' + }; + callback(error, task); + } else { + var key = task.hostname + ':' + task.port + ':' + task.database; + var db = databases[key]; + if (db != undefined) { + task['mysql'] = db; + task['redis'] = redisClient; + task['logger'] = _logger; + task['cache_time'] = configs.cache_time; + callback(null, task); + } else { + var error = { + error : 'TASK_DB_NOT_FOUND', + msg : 'cant not find the database configs included by task URI' + }; + callback(error, task); } - } }; +var getCallback = function(info) { + return function(err, ret) { + if (err == null) { + // 所有步骤完成,任务完成 + console.log('task-finished : ' + info.original_task.uri); + devent.emit('task-finished', info.original_task); + // 如果页面内容被保存到服务器,将新任务加入到队列 + if (info.save2DBOK && info.new_task_id > 0) { + var new_task = utils.buildTaskURI({ + protocol : info.protocol, + hostname : info.hostname, + port : info.port, + database : info.database, + table : 'page_content', + id : info.new_task_id + }); + console.log('NEW_TASK: ' + new_task); + queue4PageContent.enqueue(new_task); + } + } else if (err.error == 'TASK_RETRY_TIMES_LIMITED') { + console.log('任务尝试次数太多,通知队列任务完成,不在继续尝试'); + devent.emit('task-finished', info.original_task); + } else if (err.error == 'TASK_DB_NOT_FOUND') { + console.log('TASK_DB_NOT_FOUND'); + devent.emit('task-finished', info.original_task); + } else if (err.error == 'TASK_URL_NOT_FOUND') { + devent.emit('task-finished', info.original_task); + } else if (err.error == 'PAGE_CONTENT_UNCHANGED') { + console.log('page content is not changed'); + devent.emit('task-finished', info.original_task); + } else if (err.error == 'FETCH_URL_ERROR') { + devent.emit('task-error', info.original_task); + } else if (err.error == 'PAGE_CONTENT_SAVE_2_DB_ERROR') { + devent.emit('task-error', info.original_task); + } else { + console.log(err); + } + }; +}; -var startSimgleSpider = function(spider) { - queue4url.dequeue(function(error, task) { - if (error != 'empty') { - _logger.info([ 'TASK_POPED', spider.getName(), task.retry, task.uri ].join("\t")); +/** + * 从队列中获取新任务,取到新任务将其压入workFlow队列 + */ +var getNewTask = function() { + console.log('----------> getNewTask <-----------'); + queue4Url.dequeue(function(error, task) { + if (error != 'empty' && task != undefined) { var time = utils.getTimestamp(); var task_obj = utils.parseTaskURI(task, time); - var key = task_obj.hostname + ':' + task_obj.port + ':' + task_obj.database; - var uname_passwd = configs.mysql[key]; - if (databases[key] == undefined) { - var mysql = new MySqlClient(task_obj.hostname, task_obj.port, uname_passwd.username, uname_passwd.password, task_obj.database); - databases[key] = mysql; - } - var db = databases[key]; - var options = { task : task_obj, db : db, redis : redisClient, redis_db : configs.redis.db, logger : configs.redis.db }; - //spider.run(task_obj, db, redisClient, configs.redis.db, _logger); - spider.run(options); + workFlow.push(task_obj); + } else { + console.log('task queue is empty'); } }); }; +var worker = Worker.getWorker(); +var workFlow = new WorkFlow([ prepareTask, worker.getTaskDetailFromDB, worker.getPageContentFromCache, worker.fetchPageContent, worker.savePageContent2Cache, worker.checkPageContent, worker.save2Database, worker.updateUrlInfo ], getCallback, getNewTask, 2); + setInterval(function() { - allSpidersRun(); + if (workFlow.getQueueLength() < 10) { + getNewTask(); + } }, configs.check_interval); - -console.log('Server Started ' + utils.getLocaleISOString()); \ No newline at end of file