Skip to content

Commit

Permalink
feat: integrate Rust-IPFS into substrate
Browse files Browse the repository at this point in the history
  • Loading branch information
ljedrz committed Aug 4, 2020
1 parent de8a0d5 commit a2f17a6
Show file tree
Hide file tree
Showing 14 changed files with 1,083 additions and 103 deletions.
1 change: 1 addition & 0 deletions bin/node-template/node/Cargo.toml
Expand Up @@ -16,6 +16,7 @@ targets = ["x86_64-unknown-linux-gnu"]
name = "node-template"

[dependencies]
ipfs = { git = "https://github.com/rs-ipfs/rust-ipfs" }
structopt = "0.3.8"

sc-cli = { version = "0.8.0-rc5", path = "../../../client/cli", features = ["wasmtime"] }
Expand Down
14 changes: 10 additions & 4 deletions bin/node-template/pallets/template/Cargo.toml
Expand Up @@ -24,26 +24,32 @@ default-features = false
version = "2.0.0-rc5"
path = "../../../../frame/system"

[dev-dependencies.sp-core]
[dependencies.sp-core]
default-features = false
version = "2.0.0-rc5"
path = "../../../../primitives/core"

[dev-dependencies.sp-io]
[dependencies.sp-io]
default-features = false
version = "2.0.0-rc5"
path = "../../../../primitives/io"

[dev-dependencies.sp-runtime]
[dependencies.sp-runtime]
default-features = false
version = "2.0.0-rc5"
path = "../../../../primitives/runtime"

[dependencies.sp-std]
default-features = false
version = "2.0.0-rc2"
path = "../../../../primitives/std"

[features]
default = ['std']
std = [
'codec/std',
'frame-support/std',
'frame-system/std'
'frame-system/std',
'sp-io/std',
'sp-std/std',
]
230 changes: 145 additions & 85 deletions bin/node-template/pallets/template/src/lib.rs
@@ -1,103 +1,163 @@
#![cfg_attr(not(feature = "std"), no_std)]

/// Edit this file to define custom logic or remove it if it is not needed.
/// Learn more about FRAME and the core library of Substrate FRAME pallets:
/// https://substrate.dev/docs/en/knowledgebase/runtime/frame
use frame_support::{debug, decl_module, decl_storage, decl_event, decl_error, dispatch::Vec};
use frame_system::{self as system, ensure_signed};
use sp_core::offchain::{Duration, IpfsRequest, IpfsResponse, OpaqueMultiaddr, Timestamp};
use sp_io::offchain::timestamp;
use sp_runtime::offchain::ipfs;
use sp_std::str;

use frame_support::{decl_module, decl_storage, decl_event, decl_error, dispatch, traits::Get};
use frame_system::ensure_signed;
// #[cfg(test)]
// mod tests;

#[cfg(test)]
mod mock;
// an IPFS bootstrapper node that can be connected to
#[allow(unused)]
const BOOTSTRAPPER_ADDR: &str = "/ip4/104.131.131.82/tcp/4001";

#[cfg(test)]
mod tests;

/// Configure the pallet by specifying the parameters and types on which it depends.
pub trait Trait: frame_system::Trait {
/// Because this pallet emits events, it depends on the runtime's definition of an event.
type Event: From<Event<Self>> + Into<<Self as frame_system::Trait>::Event>;
/// The pallet's configuration trait.
pub trait Trait: system::Trait {
/// The overarching event type.
type Event: From<Event<Self>> + Into<<Self as system::Trait>::Event>;
}

// The pallet's runtime storage items.
// https://substrate.dev/docs/en/knowledgebase/runtime/storage
// This pallet's storage items.
decl_storage! {
// A unique name is used to ensure that the pallet's storage items are isolated.
// This name may be updated, but each pallet in the runtime must use a unique name.
// ---------------------------------vvvvvvvvvvvvvv
trait Store for Module<T: Trait> as TemplateModule {
// Learn more about declaring storage items:
// https://substrate.dev/docs/en/knowledgebase/runtime/storage#declaring-storage-items
Something get(fn something): Option<u32>;
}
trait Store for Module<T: Trait> as TemplateModule {
// A list of desired addresses to keep connections to
pub DesiredConnections: Vec<OpaqueMultiaddr>;
}
}

// Pallets use events to inform users when important changes are made.
// https://substrate.dev/docs/en/knowledgebase/runtime/events
// The pallet's events
decl_event!(
pub enum Event<T> where AccountId = <T as frame_system::Trait>::AccountId {
/// Event documentation should end with an array that provides descriptive names for event
/// parameters. [something, who]
SomethingStored(u32, AccountId),
}
pub enum Event<T> where AccountId = <T as system::Trait>::AccountId {
AddressAdded(AccountId),
AddressRemoved(AccountId),
}
);

// Errors inform users that something went wrong.
// The pallet's errors
decl_error! {
pub enum Error for Module<T: Trait> {
/// Error names should be descriptive.
NoneValue,
/// Errors should have helpful documentation associated with them.
StorageOverflow,
}
pub enum Error for Module<T: Trait> {
CantCreateRequest,
RequestTimeout,
RequestFailed,
}
}

// Dispatchable functions allows users to interact with the pallet and invoke state changes.
// These functions materialize as "extrinsics", which are often compared to transactions.
// Dispatchable functions must be annotated with a weight and must return a DispatchResult.
// The pallet's dispatchable functions.
decl_module! {
pub struct Module<T: Trait> for enum Call where origin: T::Origin {
// Errors must be initialized if they are used by the pallet.
type Error = Error<T>;

// Events must be initialized if they are used by the pallet.
fn deposit_event() = default;

/// An example dispatchable that takes a singles value as a parameter, writes the value to
/// storage and emits an event. This function must be dispatched by a signed extrinsic.
#[weight = 10_000 + T::DbWeight::get().writes(1)]
pub fn do_something(origin, something: u32) -> dispatch::DispatchResult {
// Check that the extrinsic was signed and get the signer.
// This function will return an error if the extrinsic is not signed.
// https://substrate.dev/docs/en/knowledgebase/runtime/origin
let who = ensure_signed(origin)?;

// Update storage.
Something::put(something);

// Emit an event.
Self::deposit_event(RawEvent::SomethingStored(something, who));
// Return a successful DispatchResult
Ok(())
}

/// An example dispatchable that may throw a custom error.
#[weight = 10_000 + T::DbWeight::get().reads_writes(1,1)]
pub fn cause_error(origin) -> dispatch::DispatchResult {
let _who = ensure_signed(origin)?;

// Read a value from storage.
match Something::get() {
// Return an error if the value has not been set.
None => Err(Error::<T>::NoneValue)?,
Some(old) => {
// Increment the value read from storage; will error in the event of overflow.
let new = old.checked_add(1).ok_or(Error::<T>::StorageOverflow)?;
// Update the value in storage with the incremented result.
Something::put(new);
Ok(())
},
}
}
}
/// The module declaration.
pub struct Module<T: Trait> for enum Call where origin: T::Origin {
// Initializing errors
type Error = Error<T>;

// Initializing events
fn deposit_event() = default;

/// Add an address to the list of desired connections. The connection will be established
/// during the next run of the off-chain `connection_housekeeping` process. If it cannot be
/// established, it will be re-attempted during the subsequest housekeeping runs until it
/// succeeds.
#[weight = 100_000]
pub fn add_address(origin, addr: Vec<u8>) {
let who = ensure_signed(origin)?;
let addr = OpaqueMultiaddr(addr);

DesiredConnections::mutate(|addrs| if !addrs.contains(&addr) { addrs.push(addr) });
Self::deposit_event(RawEvent::AddressAdded(who));
}

/// Remove an address from the list of desired connections. The connection will be severed
/// during the next run of the off-chain `connection_housekeeping` process.
#[weight = 500_000]
pub fn remove_address(origin, addr: Vec<u8>) {
let who = ensure_signed(origin)?;
let addr = OpaqueMultiaddr(addr);

DesiredConnections::mutate(|addrs| if let Some(idx) = addrs.iter().position(|e| *e == addr) { addrs.remove(idx); });
Self::deposit_event(RawEvent::AddressRemoved(who));
}

fn offchain_worker(block_number: T::BlockNumber) {
// run every other block
if block_number % 2.into() == 1.into() {
if let Err(e) = Self::connection_housekeeping() {
debug::error!("Encountered an error during IPFS connection housekeeping: {:?}", e);
}
}
}
}
}

impl<T: Trait> Module<T> {
// send a request to the local IPFS node; can only be called be an off-chain worker
fn ipfs_request(req: IpfsRequest, deadline: impl Into<Option<Timestamp>>) -> Result<IpfsResponse, Error<T>> {
let ipfs_request = ipfs::PendingRequest::new(req).map_err(|_| Error::<T>::CantCreateRequest)?;
ipfs_request.try_wait(deadline)
.map_err(|_| Error::<T>::RequestTimeout)?
.map(|r| r.response)
.map_err(|e| {
if let ipfs::Error::IoError(err) = e {
debug::error!("IPFS request failed: {}", str::from_utf8(&err).unwrap());
} else {
debug::error!("IPFS request failed: {:?}", e);
}
Error::<T>::RequestFailed
})
}

fn connection_housekeeping() -> Result<(), Error<T>> {
let mut deadline;

// obtain the node's current list of connected peers
deadline = Some(timestamp().add(Duration::from_millis(1_000)));
let current_ipfs_peers = if let IpfsResponse::Peers(peers) = Self::ipfs_request(IpfsRequest::Peers, deadline)? {
peers
} else {
unreachable!("can't get any other response from that request; qed");
};

// get the list of desired connections
let wanted_addresses = DesiredConnections::get();

// connect to the desired peers if not yet connected
for addr in &wanted_addresses {
deadline = Some(timestamp().add(Duration::from_millis(1_000)));

if !current_ipfs_peers.contains(addr) {
let _ = Self::ipfs_request(IpfsRequest::Connect(addr.clone()), deadline);
}
}

// disconnect from peers that are no longer desired
for addr in &current_ipfs_peers {
deadline = Some(timestamp().add(Duration::from_millis(1_000)));

if !wanted_addresses.contains(&addr) {
let _ = Self::ipfs_request(IpfsRequest::Disconnect(addr.clone()), deadline);
}
}

if wanted_addresses.len() == current_ipfs_peers.len() {
debug::info!(
"All desired IPFS connections are live; current peers: {:?}",
current_ipfs_peers.iter().filter_map(|addr| str::from_utf8(&addr.0).ok()).collect::<Vec<&str>>()
);
} else {
let missing_peers = wanted_addresses
.iter()
.filter(|addr| !current_ipfs_peers.contains(&addr))
.filter_map(|addr| str::from_utf8(&addr.0).ok())
.collect::<Vec<_>>();

debug::info!(
"Not all desired IPFS connections are live; current peers: {:?}, missing peers: {:?}",
current_ipfs_peers.iter().filter_map(|addr| str::from_utf8(&addr.0).ok()).collect::<Vec<&str>>(),
missing_peers
);
}

Ok(())
}
}
1 change: 1 addition & 0 deletions bin/node/cli/Cargo.toml
Expand Up @@ -38,6 +38,7 @@ codec = { package = "parity-scale-codec", version = "1.3.4" }
serde = { version = "1.0.102", features = ["derive"] }
futures = { version = "0.3.1", features = ["compat"] }
hex-literal = "0.2.1"
ipfs = { git = "https://github.com/rs-ipfs/rust-ipfs" }
jsonrpc-core = "14.2.0"
log = "0.4.8"
rand = "0.7.2"
Expand Down
2 changes: 2 additions & 0 deletions client/offchain/Cargo.toml
Expand Up @@ -30,6 +30,8 @@ sp-runtime = { version = "2.0.0-rc5", path = "../../primitives/runtime" }
sp-utils = { version = "2.0.0-rc5", path = "../../primitives/utils" }
sc-network = { version = "0.8.0-rc5", path = "../network" }
sc-keystore = { version = "2.0.0-rc5", path = "../keystore" }
ipfs = { git = "https://github.com/rs-ipfs/rust-ipfs" }
async-std = { version = "1.6", features = ["attributes", "std"] }

[target.'cfg(not(target_os = "unknown"))'.dependencies]
hyper = "0.13.2"
Expand Down

0 comments on commit a2f17a6

Please sign in to comment.