Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Storage abstract #1249

Merged
merged 16 commits into from
Nov 27, 2019
12 changes: 6 additions & 6 deletions src/neo/Consensus/ConsensusContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ internal class ConsensusContext : IDisposable, ISerializable
/// <summary>
/// Key for saving consensus state.
/// </summary>
private static readonly byte[] ConsensusStateKey = { 0xf4 };
private const byte ConsensusStatePrefix = 0xf4;

public Block Block;
public byte ViewNumber;
Expand All @@ -42,11 +42,11 @@ internal class ConsensusContext : IDisposable, ISerializable
/// </summary>
public SendersFeeMonitor SendersFeeMonitor = new SendersFeeMonitor();

public Snapshot Snapshot { get; private set; }
public SnapshotView Snapshot { get; private set; }
private KeyPair keyPair;
private int _witnessSize;
private readonly Wallet wallet;
private readonly Store store;
private readonly IStore store;

public int F => (Validators.Length - 1) / 3;
public int M => Validators.Length - F;
Expand Down Expand Up @@ -74,7 +74,7 @@ internal class ConsensusContext : IDisposable, ISerializable

public int Size => throw new NotImplementedException();

public ConsensusContext(Wallet wallet, Store store)
public ConsensusContext(Wallet wallet, IStore store)
{
this.wallet = wallet;
this.store = store;
Expand Down Expand Up @@ -146,7 +146,7 @@ public uint GetPrimaryIndex(byte viewNumber)

public bool Load()
{
byte[] data = store.Get(ConsensusStateKey);
byte[] data = store.TryGet(ConsensusStatePrefix, null);
if (data is null || data.Length == 0) return false;
using (MemoryStream ms = new MemoryStream(data, false))
using (BinaryReader reader = new BinaryReader(ms))
Expand Down Expand Up @@ -409,7 +409,7 @@ public void Reset(byte viewNumber)

public void Save()
{
store.PutSync(ConsensusStateKey, this.ToArray());
store.PutSync(ConsensusStatePrefix, null, this.ToArray());
}

public void Serialize(BinaryWriter writer)
Expand Down
4 changes: 2 additions & 2 deletions src/neo/Consensus/ConsensusService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ internal class Timer { public uint Height; public byte ViewNumber; }
/// </summary>
private bool isRecovering = false;

public ConsensusService(IActorRef localNode, IActorRef taskManager, Store store, Wallet wallet)
public ConsensusService(IActorRef localNode, IActorRef taskManager, IStore store, Wallet wallet)
: this(localNode, taskManager, new ConsensusContext(wallet, store))
{
}
Expand Down Expand Up @@ -601,7 +601,7 @@ protected override void PostStop()
base.PostStop();
}

public static Props Props(IActorRef localNode, IActorRef taskManager, Store store, Wallet wallet)
public static Props Props(IActorRef localNode, IActorRef taskManager, IStore store, Wallet wallet)
{
return Akka.Actor.Props.Create(() => new ConsensusService(localNode, taskManager, store, wallet)).WithMailbox("consensus-service-mailbox");
}
Expand Down
8 changes: 4 additions & 4 deletions src/neo/IO/Caching/CloneCache.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,15 @@ protected override void AddInternal(TKey key, TValue value)
innerCache.Add(key, value);
}

public override void DeleteInternal(TKey key)
protected override void DeleteInternal(TKey key)
{
innerCache.Delete(key);
}

protected override IEnumerable<KeyValuePair<TKey, TValue>> FindInternal(byte[] key_prefix)
protected override IEnumerable<(TKey, TValue)> FindInternal(byte[] key_prefix)
{
foreach (KeyValuePair<TKey, TValue> pair in innerCache.Find(key_prefix))
yield return new KeyValuePair<TKey, TValue>(pair.Key, pair.Value.Clone());
foreach (var (key, value) in innerCache.Find(key_prefix))
yield return (key, value.Clone());
}

protected override TValue GetInternal(TKey key)
Expand Down
10 changes: 5 additions & 5 deletions src/neo/IO/Caching/DataCache.cs
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ public void Delete(TKey key)
}
}

public abstract void DeleteInternal(TKey key);
protected abstract void DeleteInternal(TKey key);

public void DeleteWhere(Func<TKey, TValue, bool> predicate)
{
Expand All @@ -123,7 +123,7 @@ public void DeleteWhere(Func<TKey, TValue, bool> predicate)
/// </summary>
/// <param name="key_prefix">Must maintain the deserialized format of TKey</param>
/// <returns>Entries found with the desired prefix</returns>
public IEnumerable<KeyValuePair<TKey, TValue>> Find(byte[] key_prefix = null)
public IEnumerable<(TKey Key, TValue Value)> Find(byte[] key_prefix = null)
{
IEnumerable<(byte[], TKey, TValue)> cached;
lock (dictionary)
Expand Down Expand Up @@ -159,21 +159,21 @@ public void DeleteWhere(Func<TKey, TValue, bool> predicate)
{
if (!c2 || (c1 && ByteArrayComparer.Default.Compare(i1.KeyBytes, i2.KeyBytes) < 0))
{
yield return new KeyValuePair<TKey, TValue>(i1.Key, i1.Item);
yield return (i1.Key, i1.Item);
c1 = e1.MoveNext();
i1 = c1 ? e1.Current : default;
}
else
{
yield return new KeyValuePair<TKey, TValue>(i2.Key, i2.Item);
yield return (i2.Key, i2.Item);
c2 = e2.MoveNext();
i2 = c2 ? e2.Current : default;
}
}
}
}

protected abstract IEnumerable<KeyValuePair<TKey, TValue>> FindInternal(byte[] key_prefix);
protected abstract IEnumerable<(TKey Key, TValue Value)> FindInternal(byte[] key_prefix);

public IEnumerable<Trackable> GetChangeSet()
{
Expand Down
9 changes: 6 additions & 3 deletions src/neo/IO/Data/LevelDB/SliceBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -38,19 +38,22 @@ public SliceBuilder Add(long value)

public SliceBuilder Add(IEnumerable<byte> value)
{
data.AddRange(value);
if (value != null)
data.AddRange(value);
return this;
}

public SliceBuilder Add(string value)
{
data.AddRange(Encoding.UTF8.GetBytes(value));
if (value != null)
data.AddRange(Encoding.UTF8.GetBytes(value));
return this;
}

public SliceBuilder Add(ISerializable value)
{
data.AddRange(value.ToArray());
if (value != null)
data.AddRange(value.ToArray());
return this;
}

Expand Down
40 changes: 21 additions & 19 deletions src/neo/Ledger/Blockchain.cs
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,10 @@ public class FillCompleted { }
private readonly Dictionary<UInt256, Block> block_cache = new Dictionary<UInt256, Block>();
private readonly Dictionary<uint, LinkedList<Block>> block_cache_unverified = new Dictionary<uint, LinkedList<Block>>();
internal readonly RelayCache ConsensusRelayCache = new RelayCache(100);
private Snapshot currentSnapshot;
private SnapshotView currentSnapshot;

public Store Store { get; }
public IStore Store { get; }
public RawView RawView { get; }
public MemoryPool MemPool { get; }
public uint Height => currentSnapshot.Height;
public uint HeaderHeight => currentSnapshot.HeaderHeight;
Expand Down Expand Up @@ -95,27 +96,28 @@ static Blockchain()
}
}

public Blockchain(NeoSystem system, Store store)
public Blockchain(NeoSystem system, IStore store)
{
this.system = system;
this.MemPool = new MemoryPool(system, ProtocolSettings.Default.MemoryPoolMaxTransactions);
this.Store = store;
this.RawView = new RawView(store);
lock (lockObj)
{
if (singleton != null)
throw new InvalidOperationException();
header_index.AddRange(store.GetHeaderHashList().Find().OrderBy(p => (uint)p.Key).SelectMany(p => p.Value.Hashes));
header_index.AddRange(RawView.HeaderHashList.Find().OrderBy(p => (uint)p.Key).SelectMany(p => p.Value.Hashes));
stored_header_count += (uint)header_index.Count;
if (stored_header_count == 0)
{
header_index.AddRange(store.GetBlocks().Find().OrderBy(p => p.Value.Index).Select(p => p.Key));
header_index.AddRange(RawView.Blocks.Find().OrderBy(p => p.Value.Index).Select(p => p.Key));
}
else
{
HashIndexState hashIndex = store.GetHeaderHashIndex().Get();
HashIndexState hashIndex = RawView.HeaderHashIndex.Get();
if (hashIndex.Index >= stored_header_count)
{
DataCache<UInt256, TrimmedBlock> cache = store.GetBlocks();
DataCache<UInt256, TrimmedBlock> cache = RawView.Blocks;
for (UInt256 hash = hashIndex.Hash; hash != header_index[(int)stored_header_count - 1];)
{
header_index.Insert((int)stored_header_count, hash);
Expand All @@ -139,13 +141,13 @@ public Blockchain(NeoSystem system, Store store)
public bool ContainsBlock(UInt256 hash)
{
if (block_cache.ContainsKey(hash)) return true;
return Store.ContainsBlock(hash);
return RawView.ContainsBlock(hash);
}

public bool ContainsTransaction(UInt256 hash)
{
if (MemPool.ContainsKey(hash)) return true;
return Store.ContainsTransaction(hash);
return RawView.ContainsTransaction(hash);
}

private static Transaction DeployNativeContracts()
Expand Down Expand Up @@ -179,7 +181,7 @@ public Block GetBlock(UInt256 hash)
{
if (block_cache.TryGetValue(hash, out Block block))
return block;
return Store.GetBlock(hash);
return RawView.GetBlock(hash);
}

public UInt256 GetBlockHash(uint index)
Expand All @@ -193,16 +195,16 @@ public static UInt160 GetConsensusAddress(ECPoint[] validators)
return Contract.CreateMultiSigRedeemScript(validators.Length - (validators.Length - 1) / 3, validators).ToScriptHash();
}

public Snapshot GetSnapshot()
public SnapshotView GetSnapshot()
{
return Store.GetSnapshot();
return new SnapshotView(Store);
}

public Transaction GetTransaction(UInt256 hash)
{
if (MemPool.TryGetValue(hash, out Transaction transaction))
return transaction;
return Store.GetTransaction(hash);
return RawView.GetTransaction(hash);
}

private void OnImport(IEnumerable<Block> blocks)
Expand Down Expand Up @@ -237,7 +239,7 @@ private void OnFillMemoryPool(IEnumerable<Transaction> transactions)
// Add the transactions to the memory pool
foreach (var tx in transactions)
{
if (Store.ContainsTransaction(tx.Hash))
if (RawView.ContainsTransaction(tx.Hash))
continue;
if (!NativeContract.Policy.CheckPolicy(tx, currentSnapshot))
continue;
Expand Down Expand Up @@ -320,7 +322,7 @@ private RelayResultReason OnNewBlock(Block block)
if (block.Index == header_index.Count)
{
header_index.Add(block.Hash);
using (Snapshot snapshot = GetSnapshot())
using (SnapshotView snapshot = GetSnapshot())
{
snapshot.Blocks.Add(block.Hash, block.Header.Trim());
snapshot.HeaderHashIndex.GetAndChange().Hash = block.Hash;
Expand All @@ -345,7 +347,7 @@ private RelayResultReason OnNewConsensus(ConsensusPayload payload)

private void OnNewHeaders(Header[] headers)
{
using (Snapshot snapshot = GetSnapshot())
using (SnapshotView snapshot = GetSnapshot())
{
foreach (Header header in headers)
{
Expand Down Expand Up @@ -426,7 +428,7 @@ protected override void OnReceive(object message)

private void Persist(Block block)
{
using (Snapshot snapshot = GetSnapshot())
using (SnapshotView snapshot = GetSnapshot())
{
List<ApplicationExecuted> all_application_executed = new List<ApplicationExecuted>();
snapshot.PersistingBlock = block;
Expand Down Expand Up @@ -504,12 +506,12 @@ protected override void PostStop()
currentSnapshot?.Dispose();
}

public static Props Props(NeoSystem system, Store store)
public static Props Props(NeoSystem system, IStore store)
{
return Akka.Actor.Props.Create(() => new Blockchain(system, store)).WithMailbox("blockchain-mailbox");
}

private void SaveHeaderHashList(Snapshot snapshot = null)
private void SaveHeaderHashList(SnapshotView snapshot = null)
{
if ((header_index.Count - stored_header_count < 2000))
return;
Expand Down
8 changes: 4 additions & 4 deletions src/neo/Ledger/MemoryPool.cs
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ public MemoryPool(NeoSystem system, int capacity)
Capacity = capacity;
}

internal bool LoadPolicy(Snapshot snapshot)
internal bool LoadPolicy(StoreView snapshot)
{
_maxTxPerBlock = (int)NativeContract.Policy.GetMaxTransactionsPerBlock(snapshot);
long newFeePerByte = NativeContract.Policy.GetFeePerByte(snapshot);
Expand Down Expand Up @@ -348,7 +348,7 @@ internal void InvalidateVerifiedTransactions()
}

// Note: this must only be called from a single thread (the Blockchain actor)
internal void UpdatePoolForBlockPersisted(Block block, Snapshot snapshot)
internal void UpdatePoolForBlockPersisted(Block block, StoreView snapshot)
{
bool policyChanged = LoadPolicy(snapshot);

Expand Down Expand Up @@ -407,7 +407,7 @@ internal void InvalidateAllTransactions()
}

private int ReverifyTransactions(SortedSet<PoolItem> verifiedSortedTxPool,
SortedSet<PoolItem> unverifiedSortedTxPool, int count, double millisecondsTimeout, Snapshot snapshot)
SortedSet<PoolItem> unverifiedSortedTxPool, int count, double millisecondsTimeout, StoreView snapshot)
{
DateTime reverifyCutOffTimeStamp = DateTime.UtcNow.AddMilliseconds(millisecondsTimeout);
List<PoolItem> reverifiedItems = new List<PoolItem>(count);
Expand Down Expand Up @@ -483,7 +483,7 @@ internal void InvalidateAllTransactions()
/// <param name="maxToVerify">Max transactions to reverify, the value passed can be >=1</param>
/// <param name="snapshot">The snapshot to use for verifying.</param>
/// <returns>true if more unsorted messages exist, otherwise false</returns>
internal bool ReVerifyTopUnverifiedTransactionsIfNeeded(int maxToVerify, Snapshot snapshot)
internal bool ReVerifyTopUnverifiedTransactionsIfNeeded(int maxToVerify, StoreView snapshot)
{
if (Blockchain.Singleton.Height < Blockchain.Singleton.HeaderHeight)
return false;
Expand Down
6 changes: 3 additions & 3 deletions src/neo/NeoSystem.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,11 @@ public class NeoSystem : IDisposable
public IActorRef Consensus { get; private set; }
public RpcServer RpcServer { get; private set; }

private readonly Store store;
private readonly IStore store;
private ChannelsConfig start_message = null;
private bool suspend = false;

public NeoSystem(Store store)
public NeoSystem(IStore store)
{
this.store = store;
Plugin.LoadPlugins(this);
Expand Down Expand Up @@ -69,7 +69,7 @@ internal void ResumeNodeStartup()
}
}

public void StartConsensus(Wallet wallet, Store consensus_store = null, bool ignoreRecoveryLogs = false)
public void StartConsensus(Wallet wallet, IStore consensus_store = null, bool ignoreRecoveryLogs = false)
{
Consensus = ActorSystem.ActorOf(ConsensusService.Props(this.LocalNode, this.TaskManager, consensus_store ?? store, wallet));
Consensus.Tell(new ConsensusService.Start { IgnoreRecoveryLogs = ignoreRecoveryLogs }, Blockchain);
Expand Down
4 changes: 2 additions & 2 deletions src/neo/Network/P2P/Payloads/BlockBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ void IVerifiable.DeserializeUnsigned(BinaryReader reader)
NextConsensus = reader.ReadSerializable<UInt160>();
}

UInt160[] IVerifiable.GetScriptHashesForVerifying(Snapshot snapshot)
UInt160[] IVerifiable.GetScriptHashesForVerifying(StoreView snapshot)
{
if (PrevHash == UInt256.Zero) return new[] { Witness.ScriptHash };
Header prev_header = snapshot.GetHeader(PrevHash);
Expand Down Expand Up @@ -124,7 +124,7 @@ public void FromJson(JObject json)
Witness = ((JArray)json["witnesses"]).Select(p => Witness.FromJson(p)).FirstOrDefault();
}

public virtual bool Verify(Snapshot snapshot)
public virtual bool Verify(StoreView snapshot)
{
Header prev_header = snapshot.GetHeader(PrevHash);
if (prev_header == null) return false;
Expand Down
4 changes: 2 additions & 2 deletions src/neo/Network/P2P/Payloads/ConsensusPayload.cs
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ void IVerifiable.DeserializeUnsigned(BinaryReader reader)
Data = reader.ReadVarBytes();
}

UInt160[] IVerifiable.GetScriptHashesForVerifying(Snapshot snapshot)
UInt160[] IVerifiable.GetScriptHashesForVerifying(StoreView snapshot)
{
ECPoint[] validators = NativeContract.NEO.GetNextBlockValidators(snapshot);
if (validators.Length <= ValidatorIndex)
Expand All @@ -118,7 +118,7 @@ void IVerifiable.SerializeUnsigned(BinaryWriter writer)
writer.WriteVarBytes(Data);
}

public bool Verify(Snapshot snapshot)
public bool Verify(StoreView snapshot)
{
if (BlockIndex <= snapshot.Height)
return false;
Expand Down
Loading