Skip to content

Commit

Permalink
moved storage iterator to embedded struct
Browse files Browse the repository at this point in the history
  • Loading branch information
ypopovych committed Jun 16, 2023
1 parent 34defb3 commit b8393a7
Showing 1 changed file with 83 additions and 85 deletions.
168 changes: 83 additions & 85 deletions Sources/Substrate/Storage/StorageEntry.swift
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,73 @@ public struct StorageEntry<S: SomeSubstrate, Key: StorageKey> {
}

public extension StorageEntry {

struct Iterator<Iter: StorageKeyIterator> where Iter.TKey == Key {
public let substrate: S
public let iterator: Iter

public init(substrate: S, iterator: Iter) {
self.substrate = substrate
self.iterator = iterator
}

public func keys(
page: Int = 20, at hash: S.RC.TBlock.THeader.THasher.THash? = nil
) -> AsyncThrowingStream<Iter.TKey, Error> {
var buffer: [Iter.TKey] = []
buffer.reserveCapacity(page)
var lastKey: Iter.TKey? = nil
var atHash: S.RC.TBlock.THeader.THasher.THash? = hash
return AsyncThrowingStream<Iter.TKey, Error> {
if atHash == nil {
atHash = try await substrate.client.block(hash: nil, runtime: substrate.runtime)!
}
if buffer.count > 0 { return buffer.removeFirst() }
let new = try await substrate.client.storage(keys: iterator,
count: page,
startKey: lastKey,
at: atHash,
runtime: substrate.runtime)
lastKey = new.last
guard new.count > 0 else { return nil }
buffer.append(contentsOf: new)
return buffer.removeFirst()
}
}

public func entries(
page: Int = 20, at hash: S.RC.TBlock.THeader.THasher.THash? = nil
) -> AsyncThrowingStream<(Iter.TKey, Iter.TKey.TValue), Error> {
var buffer: [(Iter.TKey, Iter.TKey.TValue)] = []
buffer.reserveCapacity(page)
var lastKey: Iter.TKey? = nil
var atHash: S.RC.TBlock.THeader.THasher.THash? = hash
return AsyncThrowingStream<(Iter.TKey, Iter.TKey.TValue), Error> {
if atHash == nil {
atHash = try await substrate.client.block(hash: nil, runtime: substrate.runtime)!
}
if buffer.count > 0 { return buffer.removeFirst() }
var finished: Bool = false
repeat {
let new = try await substrate.client.storage(keys: iterator,
count: page,
startKey: lastKey,
at: atHash,
runtime: substrate.runtime)
lastKey = new.last
guard new.count > 0 else { return nil }
let changes = try await substrate.client.storage(changes: new,
at: atHash,
runtime: substrate.runtime)
let filtered = changes.compactMap { $0.1 != nil ? ($0.0, $0.1!) : nil }
if filtered.count > 0 {
buffer.append(contentsOf: filtered)
finished = true
}
} while (!finished)
return buffer.removeFirst()
}
}
}
}

public extension StorageEntry where Key: IterableStorageKey {
Expand All @@ -84,22 +150,22 @@ public extension StorageEntry where Key: IterableStorageKey {
func entries(
page: Int = 20, at hash: S.RC.TBlock.THeader.THasher.THash? = nil
) -> AsyncThrowingStream<(Key, Key.TValue), Error> {
StorageEntryIterator(substrate: substrate, iterator: iterator).entries(page: page, at: hash)
Iterator(substrate: substrate, iterator: iterator).entries(page: page, at: hash)
}

func keys(
page: Int = 20, at hash: S.RC.TBlock.THeader.THasher.THash? = nil
) -> AsyncThrowingStream<Key, Error> {
StorageEntryIterator(substrate: substrate, iterator: iterator).keys(page: page, at: hash)
Iterator(substrate: substrate, iterator: iterator).keys(page: page, at: hash)
}
}

public extension StorageEntry where Key: IterableStorageKey, Key.TIterator: IterableStorageKeyIterator {
func filter(
_ param: Key.TIterator.TIterator.TParam
) throws -> StorageEntryIterator<S, Key.TIterator.TIterator> {
try StorageEntryIterator(substrate: substrate,
iterator: iterator.next(param: param, runtime: substrate.runtime))
) throws -> Iterator<Key.TIterator.TIterator> {
try Iterator(substrate: substrate,
iterator: iterator.next(param: param, runtime: substrate.runtime))
}
}

Expand All @@ -116,12 +182,12 @@ public extension StorageEntry where Key == AnyStorageKey {
try await valueOrDefault(key: Key(name: params.name, pallet: params.pallet, path: []), at: hash)
}

func filter(params: [Key.Iterator.TParam]) throws -> StorageEntryIterator<S, Key.Iterator> {
try StorageEntryIterator(substrate: substrate,
iterator: Key.Iterator(name: self.params.name,
pallet: self.params.pallet,
params: params,
runtime: substrate.runtime))
func filter(params: [Key.Iterator.TParam]) throws -> Iterator<Key.Iterator> {
try Iterator(substrate: substrate,
iterator: Key.Iterator(name: self.params.name,
pallet: self.params.pallet,
params: params,
runtime: substrate.runtime))
}
}

Expand All @@ -139,80 +205,12 @@ public extension StorageEntry where Key.TParams == Void {
}
}

public struct StorageEntryIterator<S: SomeSubstrate, Iter: StorageKeyIterator> {
public let substrate: S
public let iterator: Iter

public init(substrate: S, iterator: Iter) {
self.substrate = substrate
self.iterator = iterator
}

public func keys(
page: Int = 20, at hash: S.RC.TBlock.THeader.THasher.THash? = nil
) -> AsyncThrowingStream<Iter.TKey, Error> {
var buffer: [Iter.TKey] = []
buffer.reserveCapacity(page)
var lastKey: Iter.TKey? = nil
var atHash: S.RC.TBlock.THeader.THasher.THash? = hash
return AsyncThrowingStream<Iter.TKey, Error> {
if atHash == nil {
atHash = try await substrate.client.block(hash: nil, runtime: substrate.runtime)!
}
if buffer.count > 0 { return buffer.removeFirst() }
let new = try await substrate.client.storage(keys: iterator,
count: page,
startKey: lastKey,
at: atHash,
runtime: substrate.runtime)
lastKey = new.last
guard new.count > 0 else { return nil }
buffer.append(contentsOf: new)
return buffer.removeFirst()
}
}

public func entries(
page: Int = 20, at hash: S.RC.TBlock.THeader.THasher.THash? = nil
) -> AsyncThrowingStream<(Iter.TKey, Iter.TKey.TValue), Error> {
var buffer: [(Iter.TKey, Iter.TKey.TValue)] = []
buffer.reserveCapacity(page)
var lastKey: Iter.TKey? = nil
var atHash: S.RC.TBlock.THeader.THasher.THash? = hash
return AsyncThrowingStream<(Iter.TKey, Iter.TKey.TValue), Error> {
if atHash == nil {
atHash = try await substrate.client.block(hash: nil, runtime: substrate.runtime)!
}
if buffer.count > 0 { return buffer.removeFirst() }
var finished: Bool = false
repeat {
let new = try await substrate.client.storage(keys: iterator,
count: page,
startKey: lastKey,
at: atHash,
runtime: substrate.runtime)
lastKey = new.last
guard new.count > 0 else { return nil }
let changes = try await substrate.client.storage(changes: new,
at: atHash,
runtime: substrate.runtime)
let filtered = changes.compactMap { $0.1 != nil ? ($0.0, $0.1!) : nil }
if filtered.count > 0 {
buffer.append(contentsOf: filtered)
finished = true
}
} while (!finished)
return buffer.removeFirst()
}
}
}

public extension StorageEntryIterator where Iter: IterableStorageKeyIterator {
public extension StorageEntry.Iterator where Iter: IterableStorageKeyIterator {
func filter(
_ param: Iter.TIterator.TParam
) throws -> StorageEntryIterator<S, Iter.TIterator> {
try StorageEntryIterator<_, _>(substrate: substrate,
iterator: iterator.next(param: param,
runtime: substrate.runtime))
) throws -> StorageEntry.Iterator<Iter.TIterator> {
try StorageEntry.Iterator<_>(substrate: substrate,
iterator: iterator.next(param: param,
runtime: substrate.runtime))
}
}

0 comments on commit b8393a7

Please sign in to comment.