Skip to content
Permalink
Browse files

Merge pull request #158 from lbryio/chainquery-db-connection

use DB connector instead of web API
  • Loading branch information...
nikooo777 committed Oct 31, 2019
2 parents cc653ec + f4115b0 commit 34df58e967b0ac230eb45f2c2a302fa2858014b0
@@ -9,3 +9,4 @@ npm-debug.log
claimTrieCache.json
syncState.json
yarn-error.log
chainquery-config.json
@@ -0,0 +1,6 @@
{
"host": "chainquery.lbry.com",
"user": "lighthouse",
"password": "",
"db": "chainquery"
}

This file was deleted.

This file was deleted.

Some generated files are not rendered by default. Learn more.

@@ -56,6 +56,7 @@
"koa-logger": "^2.0.0",
"koa-router": "^7.0.0",
"limited-request-queue": "^3.0.4",
"mysql": "^2.17.1",
"node-slack": "^0.0.7",
"oas": "^0.8.15",
"ora": "^1.3.0",
@@ -48,7 +48,7 @@ function getResults (input) {
'bool': {
'must': {
'query_string': {
'fields': ['channel_id'],
'fields': ['channel_claim_id'],
'query' : getEscapedQuery(input.channel_id.trim()),
},
},
@@ -6,14 +6,14 @@ import helmet from 'koa-helmet';
import routing from './routes/';
import { port } from './config';
import winston from 'winston';
import slack from 'node-slack';
import Slack from 'node-slack';
require('winston-daily-rotate-file');

// Setup logging
winston.remove(winston.transports.Console);
winston.add(winston.transports.Console, { colorize: true, timestamp: true, prettyPrint: true });
var slackAPIKey = process.env.SLACK_HOOK_URL;
var mySlack = new slack(slackAPIKey, {});
const slackAPIKey = process.env.SLACK_HOOK_URL;
const mySlack = new Slack(slackAPIKey, {});
// Create Koa Application
const app = new Koa();

@@ -13,25 +13,35 @@ import fs from 'fs';
import fileExists from 'file-exists';
import * as util from './util';
import {logErrorToSlack} from '../../index';
import mysql from 'mysql';
import chainqueryConfig from '../../../chainquery-config.json';

const elasticsearchloglevel = 'info';
let connection = null;

const esLogLevel = 'info';
const MaxClaimsToProcessPerIteration = 100000;
const BatchSize = 5000;
const loggerStream = winstonStream(winston, elasticsearchloglevel);
const loggerStream = winstonStream(winston, esLogLevel);
const eclient = new elasticsearch.Client({
host: 'http://localhost:9200',

log: {
level : elasticsearchloglevel,
level : esLogLevel,
type : 'stream',
stream: loggerStream,
},
});

const queue = new ElasticQueue({elastic: eclient});
queue.on('drain', function () {
console.log('elasticsearch queue is drained');
});

// Check that our syncState file exist.
fileExists(path.join(appRoot.path, 'syncState.json'), (err, exists) => {
if (err) { throw err }
if (err) {
throw err;
}
if (!exists) {
fs.writeFileSync(path.join(appRoot.path, 'syncState.json'), '{}');
}
@@ -56,16 +66,16 @@ export async function claimSync () {
let lastID = syncState.LastID;
let iteration = 0;
while (!finished) {
let claimsResponse = await getClaimsSince(syncState.LastSyncTime, lastID, BatchSize);
let claims = JSON.parse(claimsResponse).data;
let claims = await getClaimsSince(syncState.LastSyncTime, lastID, BatchSize);
status.info = 'addingClaimsToElastic';
for (let claim of claims) {
if (claim.value === null) {
console.log(claim);
await logErrorToSlack('Failed to process claim ' + claim.claimId + ' due to missing value');
// await logErrorToSlack('Failed to process claim ' + claim.claimId + ' due to missing value');
console.error('Failed to process claim ' + claim.claimId + ' due to missing value');
continue;
}
claim.value = JSON.parse(claim.value).Claim;
claim.value = claim.value.Claim;
if (claim.name && claim.value) {
claim.suggest_name = {
input : '' + claim.name + '',
@@ -83,7 +93,7 @@ export async function claimSync () {
finished = claims.length < BatchSize || (iteration * BatchSize + BatchSize >= MaxClaimsToProcessPerIteration);
iteration++;
}
deleteBlockedClaims();
await deleteBlockedClaims();
// If not finished, store last id to run again later where we left off, otherwise update last sync time.
if (iteration * BatchSize + BatchSize >= MaxClaimsToProcessPerIteration) {
syncState.LastID = lastID;
@@ -95,12 +105,12 @@ export async function claimSync () {
status.info = 'upToDate';
status.syncState = syncState;
await sleep(600000);
claimSync();
await claimSync();
} catch (err) {
await logErrorToSlack(err);
status.err = err;
await sleep(600000);
claimSync();
await claimSync();
}
}

@@ -119,25 +129,21 @@ async function deleteBlockedClaims () {
winston.log('info', '[Importer] Done processing blocked claims!');
}

async function deleteFromElastic (claimid) {
return new Promise(async (resolve, reject) => {
queue.push({
index: 'claims',
type : 'claim',
id : claimid,
body : {},
});
function deleteFromElastic (claimid) {
queue.push({
index: 'claims',
type : 'claim',
id : claimid,
body : {},
});
}

async function pushElastic (claim) {
return new Promise(async (resolve, reject) => {
queue.push({
index: 'claims',
type : 'claim',
id : claim.claimId,
body : claim,
});
function pushElastic (claim) {
queue.push({
index: 'claims',
type : 'claim',
id : claim.claimId,
body : claim,
});
}

@@ -153,6 +159,7 @@ function getJSON (path) {
});
});
}

function saveJSON (path, obj) {
return new Promise((resolve, reject) => {
jsonfile.writeFile(path, obj, function (err, jsoncontent) {
@@ -183,34 +190,66 @@ function getBlockedOutpoints () {
});
}

function getChainqueryConnection () {
if (connection === null) {
connection = mysql.createConnection({
host : chainqueryConfig.host,
user : chainqueryConfig.user,
password: chainqueryConfig.password,
database: chainqueryConfig.db,
});
connection.connect();
}
return connection;
}

function getClaimsSince (time, lastID, MaxClaimsInCall) {
return new Promise((resolve, reject) => {
let query = `` +
`SELECT ` +
`c.id, ` +
`c.name,` +
`p.name as channel,` +
`p.claim_id as channel_id,` +
`c.bid_state,` +
`c.effective_amount,` +
`COALESCE(p.effective_amount,1) as certificate_amount,` +
`c.claim_id as claimId,` +
`c.value_as_json as value ` +
`FROM claim c ` +
`LEFT JOIN claim p on p.claim_id = c.publisher_id ` +
`WHERE c.id >` + lastID + ` ` +
`AND c.modified_at >='` + time + `' ` +
`ORDER BY c.id ` +
`LIMIT ` + MaxClaimsInCall;
let query = `SELECT c.id,
c.name,
p.name as channel,
p.claim_id as channel_id,
c.bid_state,
c.effective_amount,
COALESCE(p.effective_amount,1) as certificate_amount,
c.claim_id as claimId,
c.value_as_json as value
FROM claim c LEFT JOIN claim p
on p.claim_id = c.publisher_id
WHERE c.id >${lastID} AND
c.modified_at >='${time}'
ORDER BY c.id LIMIT ${MaxClaimsInCall}`;
// Outputs full query to console for copy/paste into chainquery (debugging)
console.log(query);
rp(`https://chainquery.lbry.com/api/sql?query=` + query)
.then(function (htmlString) {
resolve(htmlString);
})
.catch(function (err) {
getChainqueryConnection().query(query, function (err, results, fields) {
if (err) {
console.error(err);
logErrorToSlack('[Importer] Error getting updated claims. ' + err);
reject(err);
});
return reject(err);
}
let claims = [];
for (let i = 0; i < results.length; i++) {
let r = results[i];
let value = null;
try {
value = JSON.parse(r.value);
} catch (e) {
console.error(e);
console.error(r.value);
}
claims.push({
id : r.id,
name : r.name,
channel : r.channel,
channel_claim_id : r.channel_id,
bid_state : r.bid_state,
effective_amount : r.effective_amount,
certificate_amount: r.certificate_amount,
claimId : r.claimId,
value : value,
});
}
resolve(claims);
});
});
}

0 comments on commit 34df58e

Please sign in to comment.
You can’t perform that action at this time.