Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Retrieving logs using bloom cache #795

Merged
merged 2 commits into from Mar 29, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
12 changes: 10 additions & 2 deletions rskj-core/src/main/java/co/rsk/core/RskFactory.java
Expand Up @@ -25,6 +25,7 @@
import co.rsk.crypto.Keccak256;
import co.rsk.db.RepositoryImpl;
import co.rsk.metrics.BlockHeaderElement;
import co.rsk.logfilter.BlocksBloomStore;
import co.rsk.metrics.HashRateCalculator;
import co.rsk.metrics.HashRateCalculatorMining;
import co.rsk.metrics.HashRateCalculatorNonMining;
Expand Down Expand Up @@ -205,7 +206,8 @@ public Web3 getWeb3(Rsk rsk,
BlockProcessor nodeBlockProcessor,
HashRateCalculator hashRateCalculator,
ConfigCapabilities configCapabilities,
BuildInfo buildInfo) {
BuildInfo buildInfo,
BlocksBloomStore blocksBloomStore) {
return new Web3RskImpl(
rsk,
blockchain,
Expand All @@ -229,7 +231,8 @@ public Web3 getWeb3(Rsk rsk,
nodeBlockProcessor,
hashRateCalculator,
configCapabilities,
buildInfo
buildInfo,
blocksBloomStore
);
}

Expand Down Expand Up @@ -284,6 +287,11 @@ public BlockChainImpl getBlockchain(BlockChainLoader blockChainLoader) {
return blockChainLoader.loadBlockchain();
}

@Bean
public BlocksBloomStore getBlocksBloomStore() {
return new BlocksBloomStore(64, 20);
}

@Bean
public TransactionPool getTransactionPool(org.ethereum.db.BlockStore blockStore,
ReceiptStore receiptStore,
Expand Down
64 changes: 64 additions & 0 deletions rskj-core/src/main/java/co/rsk/logfilter/BlocksBloom.java
@@ -0,0 +1,64 @@
/*
* This file is part of RskJ
* Copyright (C) 2017 RSK Labs Ltd.
* (derived from ethereumJ library, Copyright (c) 2016 <ether.camp>)
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/

package co.rsk.logfilter;

import org.ethereum.core.Bloom;

/**
* Created by ajlopez on 29/01/2019.
*/
public class BlocksBloom {
private final Bloom bloom = new Bloom();
private long fromBlock = -1;
private long toBlock = -1;

public Bloom getBloom() { return this.bloom; }

public long fromBlock() { return this.fromBlock; }

public long toBlock() { return this.toBlock; }

public long size() {
if (this.fromBlock == -1) {
return 0;
}

return this.toBlock - this.fromBlock + 1;
}

public void addBlockBloom(long blockNumber, Bloom blockBloom) {
if (fromBlock == -1) {
fromBlock = blockNumber;
toBlock = blockNumber;
}
else if (blockNumber == toBlock + 1) {
toBlock = blockNumber;
}
else {
throw new UnsupportedOperationException("Block out of sequence");
}

this.bloom.or(blockBloom);
}

public boolean matches(Bloom bloom) {
return this.bloom.matches(bloom);
}
}
63 changes: 63 additions & 0 deletions rskj-core/src/main/java/co/rsk/logfilter/BlocksBloomStore.java
@@ -0,0 +1,63 @@
/*
* This file is part of RskJ
* Copyright (C) 2017 RSK Labs Ltd.
* (derived from ethereumJ library, Copyright (c) 2016 <ether.camp>)
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/

package co.rsk.logfilter;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

/**
* Created by ajlopez on 05/02/2019.
*/
public class BlocksBloomStore {
private final int noBlocks;
private final int noConfirmations;
private final Map<Long, BlocksBloom> blocksBloom = new ConcurrentHashMap<>();

public BlocksBloomStore(int noBlocks, int noConfirmations) {
this.noBlocks = noBlocks;
this.noConfirmations = noConfirmations;
}

public boolean hasBlockNumber(long blockNumber) {
return this.blocksBloom.containsKey(this.firstNumberInRange(blockNumber));
}

public BlocksBloom getBlocksBloomByNumber(long number) {
return this.blocksBloom.get(firstNumberInRange(number));
}

public void setBlocksBloom(BlocksBloom blocksBloom) {
this.blocksBloom.put(blocksBloom.fromBlock(), blocksBloom);
}

public long firstNumberInRange(long number) {
return number - (number % this.noBlocks);
}

public long lastNumberInRange(long number) {
return firstNumberInRange(number) + this.noBlocks - 1;
}

public int getNoBlocks() {
return this.noBlocks;
}

public int getNoConfirmations() { return this.noConfirmations; }
}
6 changes: 4 additions & 2 deletions rskj-core/src/main/java/co/rsk/rpc/Web3RskImpl.java
Expand Up @@ -20,6 +20,7 @@

import co.rsk.config.RskSystemProperties;
import co.rsk.core.NetworkStateExporter;
import co.rsk.logfilter.BlocksBloomStore;
import co.rsk.metrics.HashRateCalculator;
import co.rsk.mine.*;
import co.rsk.net.BlockProcessor;
Expand Down Expand Up @@ -83,11 +84,12 @@ public Web3RskImpl(Ethereum eth,
BlockProcessor nodeBlockProcessor,
HashRateCalculator hashRateCalculator,
ConfigCapabilities configCapabilities,
BuildInfo buildInfo) {
BuildInfo buildInfo,
BlocksBloomStore blocksBloomStore) {
super(eth, blockchain, transactionPool, blockStore, receiptStore, properties, minerClient, minerServer,
personalModule, ethModule, evmModule, txPoolModule, mnrModule, debugModule,
channelManager, repository, peerScoringManager, peerServer, nodeBlockProcessor,
hashRateCalculator, configCapabilities, buildInfo);
hashRateCalculator, configCapabilities, buildInfo, blocksBloomStore);

this.networkStateExporter = networkStateExporter;
this.blockStore = blockStore;
Expand Down
4 changes: 4 additions & 0 deletions rskj-core/src/main/java/co/rsk/vm/BitSet.java
Expand Up @@ -60,4 +60,8 @@ public boolean get(int position) {
public int size() {
return this.size;
}

protected byte[] getBytes() {
return this.bytes;
}
}
4 changes: 2 additions & 2 deletions rskj-core/src/main/java/org/ethereum/core/Bloom.java
Expand Up @@ -32,13 +32,13 @@
*/

public class Bloom {
public static final int BLOOM_BYTES = 256;

static final int _8STEPS = 8;
static final int _3LOW_BITS = 7;
static final int ENSURE_BYTE = 255;

byte[] data = new byte[256];

private byte[] data = new byte[BLOOM_BYTES];

public Bloom() {
}
Expand Down
Expand Up @@ -142,7 +142,7 @@ public byte[] getEncoded() {
byte[] postTxStateRLP = RLP.encodeElement(this.postTxState);
byte[] cumulativeGasRLP = RLP.encodeElement(this.cumulativeGas);
byte[] gasUsedRLP = RLP.encodeElement(this.gasUsed);
byte[] bloomRLP = RLP.encodeElement(this.bloomFilter.data);
byte[] bloomRLP = RLP.encodeElement(this.bloomFilter.getData());
byte[] statusRLP = RLP.encodeElement(this.status);

final byte[] logInfoListRLP;
Expand Down
51 changes: 45 additions & 6 deletions rskj-core/src/main/java/org/ethereum/rpc/LogFilter.java
Expand Up @@ -19,6 +19,8 @@
package org.ethereum.rpc;

import co.rsk.core.RskAddress;
import co.rsk.logfilter.BlocksBloom;
import co.rsk.logfilter.BlocksBloomStore;
import org.ethereum.core.*;
import org.ethereum.db.TransactionInfo;
import org.ethereum.vm.LogInfo;
Expand Down Expand Up @@ -102,7 +104,7 @@ public void newPendingTx(Transaction tx) {
//empty method
}

public static LogFilter fromFilterRequest(Web3.FilterRequest fr, Blockchain blockchain) throws Exception {
public static LogFilter fromFilterRequest(Web3.FilterRequest fr, Blockchain blockchain, BlocksBloomStore blocksBloomStore) throws Exception {
RskAddress[] addresses;

// Now, there is an array of array of topics
Expand Down Expand Up @@ -175,12 +177,12 @@ public static LogFilter fromFilterRequest(Web3.FilterRequest fr, Blockchain bloc

LogFilter filter = new LogFilter(addressesTopicsFilter, blockchain, fromLatestBlock, toLatestBlock);

retrieveHistoricalData(fr, blockchain, filter);
retrieveHistoricalData(fr, blockchain, filter, blocksBloomStore);

return filter;
}

private static void retrieveHistoricalData(Web3.FilterRequest fr, Blockchain blockchain, LogFilter filter) throws Exception {
private static void retrieveHistoricalData(Web3.FilterRequest fr, Blockchain blockchain, LogFilter filter, BlocksBloomStore blocksBloomStore) throws Exception {
Block blockFrom = isBlockWord(fr.fromBlock) ? null : Web3Impl.getBlockByNumberOrStr(fr.fromBlock, blockchain);
Block blockTo = isBlockWord(fr.toBlock) ? null : Web3Impl.getBlockByNumberOrStr(fr.toBlock, blockchain);

Expand All @@ -192,15 +194,52 @@ private static void retrieveHistoricalData(Web3.FilterRequest fr, Blockchain blo
// need to add historical data
blockTo = blockTo == null ? blockchain.getBestBlock() : blockTo;

for (long blockNum = blockFrom.getNumber(); blockNum <= blockTo.getNumber(); blockNum++) {
filter.onBlock(blockchain.getBlockByNumber(blockNum));
}
processBlocks(blockFrom.getNumber(), blockTo.getNumber(), filter, blockchain, blocksBloomStore);
}
else if ("latest".equalsIgnoreCase(fr.fromBlock)) {
filter.onBlock(blockchain.getBestBlock());
}
}

private static void processBlocks(long fromBlockNumber, long toBlockNumber, LogFilter filter, Blockchain blockchain, BlocksBloomStore blocksBloomStore) {
BlocksBloom auxiliaryBlocksBloom = null;
long bestBlockNumber = blockchain.getBestBlock().getNumber();

for (long blockNum = fromBlockNumber; blockNum <= toBlockNumber; blockNum++) {
boolean isConfirmedBlock = blockNum <= bestBlockNumber - blocksBloomStore.getNoConfirmations();

if (isConfirmedBlock) {
if (blocksBloomStore.firstNumberInRange(blockNum) == blockNum) {
if (blocksBloomStore.hasBlockNumber(blockNum)) {
BlocksBloom blocksBloom = blocksBloomStore.getBlocksBloomByNumber(blockNum);

if (!filter.addressesTopicsFilter.matchBloom(blocksBloom.getBloom())) {
blockNum = blocksBloomStore.lastNumberInRange(blockNum);
continue;
}
}

auxiliaryBlocksBloom = new BlocksBloom();
}

Block block = blockchain.getBlockByNumber(blockNum);

if (auxiliaryBlocksBloom != null) {
auxiliaryBlocksBloom.addBlockBloom(blockNum, new Bloom(block.getLogBloom()));
}

if (auxiliaryBlocksBloom != null && blocksBloomStore.lastNumberInRange(blockNum) == blockNum) {
blocksBloomStore.setBlocksBloom(auxiliaryBlocksBloom);
}

filter.onBlock(block);
}
else {
filter.onBlock(blockchain.getBlockByNumber(blockNum));
}
}
}

private static boolean isBlockWord(String id) {
return "latest".equalsIgnoreCase(id) || "pending".equalsIgnoreCase(id) || "earliest".equalsIgnoreCase(id);
}
Expand Down
9 changes: 7 additions & 2 deletions rskj-core/src/main/java/org/ethereum/rpc/Web3Impl.java
Expand Up @@ -23,6 +23,7 @@
import co.rsk.core.RskAddress;
import co.rsk.core.bc.AccountInformationProvider;
import co.rsk.crypto.Keccak256;
import co.rsk.logfilter.BlocksBloomStore;
import co.rsk.metrics.HashRateCalculator;
import co.rsk.mine.MinerClient;
import co.rsk.mine.MinerServer;
Expand Down Expand Up @@ -97,6 +98,8 @@ public class Web3Impl implements Web3 {
private final FilterManager filterManager;
private final BuildInfo buildInfo;

private final BlocksBloomStore blocksBloomStore;

private final PersonalModule personalModule;
private final EthModule ethModule;
private final EvmModule evmModule;
Expand Down Expand Up @@ -126,7 +129,8 @@ protected Web3Impl(
BlockProcessor nodeBlockProcessor,
HashRateCalculator hashRateCalculator,
ConfigCapabilities configCapabilities,
BuildInfo buildInfo) {
BuildInfo buildInfo,
BlocksBloomStore blocksBloomStore) {
this.eth = eth;
this.blockchain = blockchain;
this.blockStore = blockStore;
Expand All @@ -150,6 +154,7 @@ protected Web3Impl(
this.config = config;
filterManager = new FilterManager(eth);
this.buildInfo = buildInfo;
this.blocksBloomStore = blocksBloomStore;
initialBlockNumber = this.blockchain.getBestBlock().getNumber();

personalModule.init(this.config);
Expand Down Expand Up @@ -841,7 +846,7 @@ public String eth_newFilter(FilterRequest fr) throws Exception {
String str = null;

try {
Filter filter = LogFilter.fromFilterRequest(fr, blockchain);
Filter filter = LogFilter.fromFilterRequest(fr, blockchain, blocksBloomStore);
int id = filterManager.registerFilter(filter);

return str = toJsonHex(id);
Expand Down
2 changes: 1 addition & 1 deletion rskj-core/src/main/resources/reference.conf
Expand Up @@ -312,4 +312,4 @@ cache {
# precaution we'll set the limit to 100K entries.
max-elements: 100000
}
}
}