Skip to content

Commit

Permalink
Merge branch 'master' of github.com:chenshijie/spider
Browse files Browse the repository at this point in the history
  • Loading branch information
unknown authored and unknown committed Feb 27, 2012
2 parents 78ab200 + 3b22791 commit 3395152
Show file tree
Hide file tree
Showing 4 changed files with 45 additions and 20 deletions.
3 changes: 2 additions & 1 deletion etc/settings.original.json
Expand Up @@ -33,5 +33,6 @@
"spider_monitor_queue" : "url",
"spider_generate_queue" : "page_content",
"check_interval" : 2000,
"cache_time" : 300
"cache_time" : 300,
"debug" : false
}
30 changes: 22 additions & 8 deletions lib/worker.js
Expand Up @@ -10,7 +10,9 @@ var Worker = function(options) {
* @param callback
*/
Worker.prototype.getTaskDetailFromDB = function(task, callback) {
console.log('----------> getTaskDetailFromDB <-----------');
if (task.debug) {
console.log('----------> getTaskDetailFromDB <-----------');
}
task.mysql.get_url_info(task.table, task.id, function(result) {
if (result.length == 0 || result[0].url == '' || result[0].site == '' || result[0].type == '') {
var error = {
Expand All @@ -32,7 +34,9 @@ Worker.prototype.getTaskDetailFromDB = function(task, callback) {
* @param callback
*/
Worker.prototype.getPageContentFromCache = function(task, callback) {
console.log('----------> getPageContentFromCache ' + task.urlInfo.id + '<-----------');
if (task.debug) {
console.log('----------> getPageContentFromCache ' + task.urlInfo.id + '<-----------');
}
var cache_key = 'CONTENT:' + task.urlInfo.url;
task.redis.get(cache_key, function(error, reply) {
if (reply) {
Expand All @@ -54,7 +58,9 @@ Worker.prototype.getPageContentFromCache = function(task, callback) {
* @returns
*/
Worker.prototype.fetchPageContent = function(task, callback) {
console.log('----------> fetchPageContent ' + task.urlInfo.id + '<-----------');
if (task.debug) {
console.log('----------> fetchPageContent ' + task.urlInfo.id + '<-----------');
}
if (task.pageContentCached) {
callback(null, task);
} else {
Expand Down Expand Up @@ -85,7 +91,9 @@ Worker.prototype.fetchPageContent = function(task, callback) {
* @param callback
*/
Worker.prototype.savePageContent2Cache = function(task, callback) {
console.log('----------> savePageContent2Cache ' + task.urlInfo.id + '<-----------');
if (task.debug) {
console.log('----------> savePageContent2Cache ' + task.urlInfo.id + '<-----------');
}
if (task.pageContentCached) {
callback(null, task);
} else {
Expand All @@ -103,7 +111,9 @@ Worker.prototype.savePageContent2Cache = function(task, callback) {
* @param callback
*/
Worker.prototype.checkPageContent = function(task, callback) {
console.log('----------> checkPageContent ' + task.urlInfo.id + '<-----------');
if (task.debug) {
console.log('----------> checkPageContent ' + task.urlInfo.id + '<-----------');
}
if (task.urlInfo.type == 'list') {
var cache_key = 'HASH:' + task.urlInfo.url;
task.redis.get(cache_key, function(error, reply) {
Expand Down Expand Up @@ -134,7 +144,9 @@ Worker.prototype.checkPageContent = function(task, callback) {
* @param callback
*/
Worker.prototype.save2Database = function(task, callback) {
console.log('----------> save2Database ' + task.urlInfo.id + '<-----------');
if (task.debug) {
console.log('----------> save2Database ' + task.urlInfo.id + '<-----------');
}
var in_time = utils.getTimestamp();
var meta = {
url : task.urlInfo.url,
Expand Down Expand Up @@ -164,11 +176,13 @@ Worker.prototype.save2Database = function(task, callback) {
* @param callback
*/
Worker.prototype.updateUrlInfo = function(task, callback) {
console.log('----------> updateUrlInfo ' + task.urlInfo.id + '<-----------');
if (task.debug) {
console.log('----------> updateUrlInfo ' + task.urlInfo.id + '<-----------');
}
var fetch_time = utils.getTimestamp();
task.mysql.update_url_fetch_time(task.table, task.id, fetch_time, function(result) {
if (!result) {
// console.log([ self.name, task, 'update url table error' ]);
console.log('update url table error');
}
callback(null, task);
});
Expand Down
2 changes: 2 additions & 0 deletions refresh_queue.js
Expand Up @@ -35,3 +35,5 @@ var refresh_queue = function() {
setInterval(function() {
refresh_queue();
}, 60 * 1000);

console.log('Server Started ' + utils.getLocaleISOString());
30 changes: 19 additions & 11 deletions spider_server.js
Expand Up @@ -26,7 +26,7 @@ var queue4Url = queue.getQueue('http://' + configs.queue_server.host + ':' + con
var databases = {};
// 准备数据库队列
var i = 0;
for ( i = 0; i < configs.mysql.length; i++) {
for (i = 0; i < configs.mysql.length; i++) {
var options = configs.mysql[i];
var key = options.host + ':' + options.port + ':' + options.database;
var mysql = new MySqlClient(options);
Expand All @@ -36,16 +36,15 @@ for ( i = 0; i < configs.mysql.length; i++) {
devent.on('queued', function(queue) {
// 同时多个队列进入时会调用allSpidersRun()多次。
if (queue == 'url') {
console.log('SERVER: ' + queue + " received task");
// console.log('SERVER: ' + queue + " received task");
// allSpidersRun();
}
});

var databases = {};
for ( i = 0; i < configs.mysql.length; i++) {
for (i = 0; i < configs.mysql.length; i++) {
var option = configs.mysql[i];
var key = option.host + ':' + option.port + ':' + option.database;
console.log(key);
var mysql = new MySqlClient(option);
databases[key] = mysql;
}
Expand All @@ -60,7 +59,9 @@ var queue4Url = queue.getQueue('http://' + configs.queue_server.host + ':' + con
* @param callback
*/
var prepareTask = function(task, callback) {
console.log('----------> prepareTask <-----------');
if (configs.debug) {
console.log('----------> prepareTask <-----------');
}
if (task.original_task.retry >= 10) {
var error = {
error : 'TASK_RETRY_TIMES_LIMITED',
Expand All @@ -75,6 +76,7 @@ var prepareTask = function(task, callback) {
task['redis'] = redisClient;
task['logger'] = _logger;
task['cache_time'] = configs.cache_time;
task['debug'] = configs.debug;
callback(null, task);
} else {
var error = {
Expand Down Expand Up @@ -108,15 +110,17 @@ var getCallback = function(info) {
console.log('任务尝试次数太多,通知队列任务完成,不在继续尝试');
devent.emit('task-finished', info.original_task);
} else if (err.error == 'TASK_DB_NOT_FOUND') {
console.log('TASK_DB_NOT_FOUND');
console.log('TASK_DB_NOT_FOUND: ' + info.original_task.uri);
devent.emit('task-finished', info.original_task);
} else if (err.error == 'TASK_URL_NOT_FOUND') {
console.log('TASK_URL_NOT_FOUND: ' + info.original_task.uri);
devent.emit('task-finished', info.original_task);
} else if (err.error == 'PAGE_CONTENT_UNCHANGED') {
console.log('page content is not changed');
console.log('page content is not changed: ' + info.original_task.uri);
devent.emit('task-finished', info.original_task);
} else if (err.error == 'FETCH_URL_ERROR') {
devent.emit('task-error', info.original_task);
console.log('FETCH_URL_ERROR do nothing:' + info.original_task.uri);
// devent.emit('task-error', info.original_task);
} else if (err.error == 'PAGE_CONTENT_SAVE_2_DB_ERROR') {
devent.emit('task-error', info.original_task);
} else {
Expand All @@ -129,14 +133,16 @@ var getCallback = function(info) {
* 从队列中获取新任务,取到新任务将其压入workFlow队列
*/
var getNewTask = function() {
console.log('----------> getNewTask <-----------');
if (configs.debug) {
console.log('----------> getNewTask <-----------');
}
queue4Url.dequeue(function(error, task) {
if (error != 'empty' && task != undefined) {
var time = utils.getTimestamp();
var task_obj = utils.parseTaskURI(task, time);
workFlow.push(task_obj);
} else {
console.log('task queue is empty');
// console.log('task queue is empty');
}
});
};
Expand All @@ -146,8 +152,10 @@ var workFlow = new WorkFlow([ prepareTask, worker.getTaskDetailFromDB, worker.ge

setInterval(function() {
if (workFlow.getQueueLength() < 50) {
for(i = 0; i < 50 - workFlow.getQueueLength(); i++) {
for ( var i = 0; i < 50 - workFlow.getQueueLength(); i++) {
getNewTask();
}
}
}, configs.check_interval);

console.log('Server Started ' + utils.getLocaleISOString());

0 comments on commit 3395152

Please sign in to comment.