Skip to content

Commit

Permalink
Oracle: OracleService (#1555)
Browse files Browse the repository at this point in the history
* Start pool

* Prepare for neo-node

* Fix double call

* Prepare pool

* Fix

* Join oracle pool with oracle service

* dotnet-format

* Try to fix UT

* UT pass

* Fix UT

* Fix p2p message

* Unify collections

* Relay p2p message

* Clean code and fixes

* Send tx to OracleService

* RequestTx will wait for ResponseTx

* Rename

* Remove supervisor

* Allow to put request, and response in the same block

* Check the sender of OracleResponses

* Remove OracleResponse message in OracleService

* Organize code

* Fix typo

* Remove task count TODO

* Remove Thread-safe TODO

* Remove TODO

* ResponseItem changes

* Read the oracle contract on my response

Receive oraclePayload

* Clean code

* Add Stop message

* Save only one response for PublicKey/RequestTx

Improve sorting response pool

* Improve sort

* Group sort methods in the same region

* Ask for the request TX if I don't have it

* Reorder code

* Rename

* First oracle TX
  • Loading branch information
shargon committed Apr 16, 2020
1 parent 384a043 commit c32e6a4
Show file tree
Hide file tree
Showing 18 changed files with 1,097 additions and 167 deletions.
2 changes: 1 addition & 1 deletion src/neo/Consensus/ConsensusContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,7 @@ public ConsensusPayload MakePrepareRequest()
Span<byte> buffer = stackalloc byte[sizeof(ulong)];
random.NextBytes(buffer);
Block.ConsensusData.Nonce = BitConverter.ToUInt64(buffer);
EnsureMaxBlockSize(Blockchain.Singleton.MemPool.GetSortedVerifiedTransactions());
EnsureMaxBlockSize(Blockchain.Singleton.MemPool.GetSortedVerifiedTransactions(Snapshot));
Block.Timestamp = Math.Max(TimeProvider.Current.UtcNow.ToTimestampMS(), PrevHeader.Timestamp + 1);

return PreparationPayloads[MyIndex] = MakeSignedPayload(new PrepareRequest
Expand Down
2 changes: 1 addition & 1 deletion src/neo/Consensus/ConsensusService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -443,7 +443,7 @@ private void OnPrepareRequestReceived(ConsensusPayload payload, PrepareRequest m
return;
}

Dictionary<UInt256, Transaction> mempoolVerified = Blockchain.Singleton.MemPool.GetVerifiedTransactions().ToDictionary(p => p.Hash);
Dictionary<UInt256, Transaction> mempoolVerified = Blockchain.Singleton.MemPool.GetVerifiedTransactions(context.Snapshot).ToDictionary(p => p.Hash);
List<Transaction> unverified = new List<Transaction>();
foreach (UInt256 hash in context.TransactionHashes)
{
Expand Down
8 changes: 8 additions & 0 deletions src/neo/Ledger/Blockchain.cs
Original file line number Diff line number Diff line change
Expand Up @@ -422,6 +422,14 @@ private VerifyResult OnNewTransaction(Transaction transaction)
VerifyResult reason = transaction.Verify(currentSnapshot, MemPool.SendersFeeMonitor.GetSenderFee(transaction.Sender));
if (reason != VerifyResult.Succeed) return reason;
if (!MemPool.TryAdd(transaction.Hash, transaction)) return VerifyResult.OutOfMemory;

if (transaction.Version == TransactionVersion.OracleRequest)
{
// Oracle Service only need the OracleRequests

system.Oracle?.Tell(transaction);
}

return VerifyResult.Succeed;
}

Expand Down
19 changes: 14 additions & 5 deletions src/neo/Ledger/MemoryPool.cs
Original file line number Diff line number Diff line change
Expand Up @@ -168,12 +168,16 @@ public IEnumerator<Transaction> GetEnumerator()

IEnumerator IEnumerable.GetEnumerator() => GetEnumerator();

public IEnumerable<Transaction> GetVerifiedTransactions()
public IEnumerable<Transaction> GetVerifiedTransactions(StoreView snapshot)
{
_txRwLock.EnterReadLock();
try
{
return _unsortedTransactions.Select(p => p.Value.Tx).ToArray();
var oracle = new PoolItem.OracleState();
return _unsortedTransactions
.Where(u => u.Value.IsReady(snapshot, oracle))
.Select(p => p.Value.Tx)
.ToArray();
}
finally
{
Expand All @@ -196,12 +200,17 @@ public IEnumerable<Transaction> GetVerifiedTransactions()
}
}

public IEnumerable<Transaction> GetSortedVerifiedTransactions()
public IEnumerable<Transaction> GetSortedVerifiedTransactions(StoreView snapshot)
{
_txRwLock.EnterReadLock();
try
{
return _sortedTransactions.Reverse().Select(p => p.Tx).ToArray();
var oracle = new PoolItem.OracleState();
return _sortedTransactions
.Reverse()
.Where(u => u.IsReady(snapshot, oracle))
.Select(p => p.Tx)
.ToArray();
}
finally
{
Expand Down Expand Up @@ -238,7 +247,7 @@ private PoolItem GetLowestFeeTransaction(out Dictionary<UInt256, PoolItem> unsor
}
finally
{
unsortedTxPool = Object.ReferenceEquals(sortedPool, _unverifiedSortedTransactions)
unsortedTxPool = ReferenceEquals(sortedPool, _unverifiedSortedTransactions)
? _unverifiedTransactions : _unsortedTransactions;
}
}
Expand Down
47 changes: 47 additions & 0 deletions src/neo/Ledger/PoolItem.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
using Neo.Network.P2P.Payloads;
using Neo.Persistence;
using Neo.SmartContract.Native;
using System;
using System.Collections.Generic;

namespace Neo.Ledger
{
Expand Down Expand Up @@ -33,6 +36,50 @@ internal PoolItem(Transaction tx)
LastBroadcastTimestamp = Timestamp;
}

internal class OracleState
{
public HashSet<UInt256> AllowedRequests = new HashSet<UInt256>();
}

public bool IsReady(StoreView snapshot, OracleState oracle)
{
switch (Tx.Version)
{
case TransactionVersion.OracleRequest:
{
if (oracle.AllowedRequests.Contains(Tx.Hash))
{
// The response was already fetched, we can put request and response in the same block

return true;
}
else
{
if (NativeContract.Oracle.ContainsResponse(snapshot, Tx.Hash))
{
// The response it's waiting to be consumed (block+n)

return true;
}
else
{
// If the response it's in the pool it's located after the request
// TODO: We can order the pool first for OracleResponses

return false;
}
}
}
case TransactionVersion.OracleResponse:
{
oracle.AllowedRequests.Add(Tx.OracleRequestTx);
break;
}
}

return true;
}

public int CompareTo(Transaction otherTx)
{
if (otherTx == null) return 1;
Expand Down
84 changes: 84 additions & 0 deletions src/neo/Ledger/SortedBlockingCollection.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Threading;

namespace Neo.Ledger
{
public class SortedBlockingCollection<TKey, TValue>
{
/// <summary>
/// _oracleTasks will consume from this pool
/// </summary>
private readonly BlockingCollection<TValue> _asyncPool = new BlockingCollection<TValue>();

/// <summary>
/// Queue
/// </summary>
private readonly SortedConcurrentDictionary<TKey, TValue> _queue;

/// <summary>
/// Constructor
/// </summary>
/// <param name="comparer">Comparer</param>
/// <param name="capacity">Capacity</param>
public SortedBlockingCollection(IComparer<KeyValuePair<TKey, TValue>> comparer, int capacity)
{
_queue = new SortedConcurrentDictionary<TKey, TValue>(comparer, capacity);
}

/// <summary>
/// Add entry
/// </summary>
/// <param name="key">Key</param>
/// <param name="value">Value</param>
public void Add(TKey key, TValue value)
{
if (_queue.TryAdd(key, value) && _asyncPool.Count <= 0)
{
Pop();
}
}

/// <summary>
/// Clear
/// </summary>
public void Clear()
{
_queue.Clear();

while (_asyncPool.Count > 0)
{
_asyncPool.TryTake(out _);
}
}

/// <summary>
/// Get consuming enumerable
/// </summary>
/// <param name="token">Token</param>
public IEnumerable<TValue> GetConsumingEnumerable(CancellationToken token)
{
foreach (var entry in _asyncPool.GetConsumingEnumerable(token))
{
// Prepare other item in _asyncPool

Pop();

// Iterate items

yield return entry;
}
}

/// <summary>
/// Move one item from the sorted queue to _asyncPool, this will ensure that the threads process the entries according to the priority
/// </summary>
private void Pop()
{
if (_queue.TryPop(out var entry))
{
_asyncPool.Add(entry);
}
}
}
}
Loading

0 comments on commit c32e6a4

Please sign in to comment.