Skip to content

Commit

Permalink
use async control work flow
Browse files Browse the repository at this point in the history
  • Loading branch information
chenshijie committed Feb 27, 2012
1 parent 1c2150a commit 196701f
Show file tree
Hide file tree
Showing 8 changed files with 147 additions and 191 deletions.
57 changes: 0 additions & 57 deletions Readme

This file was deleted.

36 changes: 18 additions & 18 deletions etc/settings.original.json
Expand Up @@ -4,27 +4,27 @@
"port" : 3000,
"queue_path" : "queue"
},
"mysql" : {
"172.16.33.237:3306:stock_radar" : {
"username" : "stockradar",
"password" : "stockradar"
},
"172.16.39.117:3306:spider" : {
"username" : "spider",
"password" : "spider"
},
"baseurl" : {
"host" : "172.16.39.117",
"port" : 3306,
"username" : "spider",
"password" : "spider",
"database" : "spider"
"mysql" : [
{
"host" : "127.0.0.1",
"port" : "3306",
"database" : "spider",
"user" : "root",
"password" : ""
}
],
"baseurl" : {
"host" : "127.0.0.1",
"port" : "3306",
"database" : "spider",
"user" : "root",
"password" : ""
},
"redis" : {
"host" : "172.16.39.117",
"host" : "127.0.0.1",
"port" : 6379,
"db" : 14
"content_db" : 14,
"hash_db" : 13
},
"log" : {
"file" : "log/spider.log"
Expand All @@ -34,4 +34,4 @@
"spider_generate_queue" : "page_content",
"check_interval" : 2000,
"cache_time" : 300
}
}
28 changes: 0 additions & 28 deletions lib/WorkFlow.js

This file was deleted.

16 changes: 6 additions & 10 deletions lib/mysql.js 100644 → 100755
@@ -1,14 +1,10 @@
var Client = require('mysql').Client;
var mysql = require('mysql');

var MySqlClient = function(host, port, user, password, database) {
var MySqlClient = function(options) {
var self = this;
self.client = new Client();
self.client.host = host;
self.client.port = port;
self.client.user = user;
self.client.password = password;
self.client.database = database;
self.client = mysql.createClient(options);
};

MySqlClient.prototype.store_page_content = function(url_id, in_time, stock_code, url_table, meta, content, cb) {
this.client.query('INSERT INTO page_content SET url_id = ?, in_time = ?, stock_code = ?, url_table = ?, parse_time = 0, meta = ?, content = ?', [ url_id, in_time, stock_code, url_table, meta, content ], function(err, results) {
if (err) {
Expand Down Expand Up @@ -51,8 +47,8 @@ MySqlClient.prototype.update_url_fetch_time = function(table_name, url_id, fetch
});
};

MySqlClient.prototype.get_base_url = function(fetch_time, cb) {
this.client.query('SELECT * FROM baseurl WHERE fetch_time < ?', [ fetch_time ], function(err, results,fields) {
MySqlClient.prototype.get_base_url = function(fetch_time, count, cb) {
this.client.query('SELECT * FROM baseurl WHERE fetch_time < ? limit ?', [ fetch_time, count ], function(err, results, fields) {
if (err) {
cb(err);
} else {
Expand Down
2 changes: 1 addition & 1 deletion lib/utils.js
Expand Up @@ -8,7 +8,7 @@ exports.md5 = function(str, encoding) {
};

exports.getTimestamp = function() {
return Math.floor(new Date().getTime() / 1000);
return Math.floor(Date.now() / 1000);
};

exports.getDateString = function() {
Expand Down
2 changes: 2 additions & 0 deletions package.json
Expand Up @@ -5,11 +5,13 @@
"author": "shijie.chen <shijie.chen@gmail.com>",
"dependencies": {
"request": ">= 2.2.9",
"async": ">= 0.1.15",
"underscore": ">= 1.2.3",
"underscore.string" : "2.0.0",
"mysql": ">= 0.9.5",
"redis": ">= 0.7.1",
"crypto": ">= 0.0.3",
"vows": ">=0.6.1",
"devent": "https://github.com/netgen-inc/devent/tarball/master",
"queuer": "https://github.com/netgen-inc/queuer/tarball/master"
},
Expand Down
18 changes: 7 additions & 11 deletions refresh_queue.js
@@ -1,40 +1,36 @@
var MySqlClient = require('./lib/mysql').MySqlClient;
var fs = require('fs');

var config = __dirname + '/etc/settings.json';
var configs = JSON.parse(fs.readFileSync(config, 'utf8'));
var configs = require('./etc/newsettings.json');
var logger = require('./lib/logger').logger;
var _logger = logger(__dirname + '/log/refresh.log');
var utils = require('./lib/utils');
var mysql = new MySqlClient(configs.mysql.baseurl.host, configs.mysql.baseurl.port, configs.mysql.baseurl.username, configs.mysql.baseurl.password, configs.mysql.baseurl.database);
var mysql = new MySqlClient(configs.baseurl);
var queue = require('queuer');
var q4url = queue.getQueue('http://' + configs.queue_server.host + ':' + configs.queue_server.port + '/' + configs.queue_server.queue_path, configs.spider_monitor_queue);

var last_run_time = 0;
var base_url_count = 500;
var refresh_queue = function() {
var current_time = utils.getTimestamp();
var hour = new Date().getHours();
var time_step = 15 * 60;
if (hour > 22 || hour < 8) {
time_step = 30 * 60;
}
if (current_time - last_run_time < time_step) {
return;
}
console.log(utils.getLocaleISOString() + ' refresher run');
last_run_time = current_time;
var fetch_time = current_time - time_step;
mysql.get_base_url(fetch_time, function(result) {
mysql.get_base_url(fetch_time, base_url_count, function(result) {
var length = result.length;
for ( var i = 0; i < length; i++) {
var task = 'mysql://' + configs.mysql.baseurl.host + ':' + configs.mysql.baseurl.port + '/' + configs.mysql.baseurl.database + '?baseurl#' + result[i].id;
var task = 'mysql://' + configs.baseurl.host + ':' + configs.baseurl.port + '/' + configs.baseurl.database + '?baseurl#' + result[i].id;
console.log(task);
_logger.info(task);
q4url.enqueue(task);
}
});
};

//refresh_queue();
// refresh_queue();

setInterval(function() {
refresh_queue();
Expand Down

0 comments on commit 196701f

Please sign in to comment.