Skip to content

Commit

Permalink
Merge pull request holochain#1640 from holochain/feature-add-header-p…
Browse files Browse the repository at this point in the history
…ublishing

Header Publishing (The Return Of)
  • Loading branch information
willemolding committed Sep 26, 2019
2 parents a647a06 + 9c340b1 commit 608f1ee
Show file tree
Hide file tree
Showing 23 changed files with 566 additions and 94 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG-UNRELEASED.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm

### Added

* Adds publishing of headers again after rollback. Header publishing is now its own action rather than part of the `Publish` action that plays nicely with the testing framework. It also adds header entries to the author list so they are gossiped properly. [#1640](https://github.com/holochain/holochain-rust/pull/1640).

### Changed

### Deprecated
Expand Down
14 changes: 7 additions & 7 deletions app_spec/test/files/links.js
Original file line number Diff line number Diff line change
Expand Up @@ -43,23 +43,23 @@ module.exports = scenario => {

//bob expects zero links
t.ok(bob_agent_posts_expect_empty.Ok)
t.equal(bob_agent_posts_expect_empty.Ok.links.length, 0);
//alice expects zero alice
t.equal(bob_agent_posts_expect_empty.Ok.links.length, 0); // #!# fails with expected: 0 actual: 2
//alice expects zero links
t.ok(alice_agent_posts_expect_empty.Ok)
t.equal(alice_agent_posts_expect_empty.Ok.links.length, 0);


//different chain hash up to this point so we should be able to create a link with the same data
await alice.app.callSync("simple", "create_link",{ "base":alice.app.agentId, "target": "Posty" })

//get alice posts
const alice_posts_not_empty = await bob.app.call("simple", "get_my_links",{ "base": alice.app.agentId,"status_request" : "Live" })
//get posts as Alice and as Bob
const alice_posts_not_empty = await alice.app.call("simple", "get_my_links",{ "base": alice.app.agentId,"status_request" : "Live" })
const bob_posts_not_empty = await bob.app.call("simple", "get_my_links",{ "base": alice.app.agentId,"status_request" : "Live" })

//expect 1 post
t.ok(alice_posts_not_empty.Ok)
t.equal(alice_posts_not_empty.Ok.links.length, 1);


t.ok(bob_posts_not_empty.Ok)
t.equal(bob_posts_not_empty.Ok.links.length, 1); //#!# fails with expected: 1 actual: 2
})


Expand Down
4 changes: 4 additions & 0 deletions core/src/action.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,10 @@ pub enum Action {
/// (only publish for AppEntryType, publish and publish_meta for links etc)
Publish(Address),

/// Publish to the network the header entry for the entry at the given address.
/// Note that the given address is that of the entry NOT the address of the header itself
PublishHeaderEntry(Address),

///Performs a Network Query Action based on the key and payload, used for links and Entries
Query((QueryKey, QueryPayload)),

Expand Down
2 changes: 1 addition & 1 deletion core/src/agent/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -421,7 +421,7 @@ pub mod tests {
let header = create_new_chain_header(
&test_entry(),
&agent_state,
&StateWrapper::from(state.clone()),
&StateWrapper::from(state),
&None,
&vec![],
)
Expand Down
2 changes: 2 additions & 0 deletions core/src/network/actions/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,14 @@ pub mod get_validation_package;
pub mod initialize_network;
pub mod publish;
pub mod shutdown;
pub mod publish_header_entry;

use holochain_core_types::error::HcResult;
use holochain_persistence_api::cas::content::Address;

#[derive(Clone, Debug)]
pub enum ActionResponse {
Publish(HcResult<Address>),
PublishHeaderEntry(HcResult<Address>),
Respond(HcResult<()>),
}
54 changes: 54 additions & 0 deletions core/src/network/actions/publish_header_entry.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
use crate::{
action::{Action, ActionWrapper},
context::Context,
instance::dispatch_action,
network::actions::ActionResponse,
};
use futures::{future::Future, task::Poll};
use holochain_core_types::error::HcResult;
use holochain_persistence_api::cas::content::Address;
use std::{pin::Pin, sync::Arc};

/// Publish Header Entry Action Creator
/// Returns a future that resolves to an ActionResponse.
pub async fn publish_header_entry(address: Address, context: &Arc<Context>) -> HcResult<Address> {
let action_wrapper = ActionWrapper::new(Action::PublishHeaderEntry(address));
dispatch_action(context.action_channel(), action_wrapper.clone());
await!(PublishHeaderEntryFuture {
context: context.clone(),
action: action_wrapper,
})
}

/// PublishFuture resolves to ActionResponse
/// Tracks the state for a response to its ActionWrapper
pub struct PublishHeaderEntryFuture {
context: Arc<Context>,
action: ActionWrapper,
}

impl Future for PublishHeaderEntryFuture {
type Output = HcResult<Address>;

fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context) -> Poll<Self::Output> {
if let Some(err) = self.context.action_channel_error("PublishHeaderEntryFuture") {
return Poll::Ready(Err(err));
}
let state = self.context.state().unwrap().network();
if let Err(error) = state.initialized() {
return Poll::Ready(Err(error));
}
//
// TODO: connect the waker to state updates for performance reasons
// See: https://github.com/holochain/holochain-rust/issues/314
//
cx.waker().clone().wake();
match state.actions().get(&self.action) {
Some(ActionResponse::PublishHeaderEntry(result)) => match result {
Ok(address) => Poll::Ready(Ok(address.to_owned())),
Err(error) => Poll::Ready(Err(error.clone())),
},
_ => Poll::Pending,
}
}
}
107 changes: 101 additions & 6 deletions core/src/network/handler/lists.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,16 @@ use crate::{
instance::dispatch_action,
network::handler::{get_content_aspect, get_meta_aspects},
};
use holochain_core_types::error::HcResult;
use holochain_core_types::{
error::HcResult,
entry::Entry,
};
use holochain_persistence_api::cas::content::{Address, AddressableContent};
use lib3h_protocol::data_types::{EntryListData, GetListData};
use snowflake::ProcessUniqueId;
use std::{collections::HashMap, sync::Arc, thread};
use crate::network::entry_aspect::EntryAspect;
use crate::agent::state::create_new_chain_header;

pub fn handle_get_authoring_list(get_list_data: GetListData, context: Arc<Context>) {
thread::Builder::new()
Expand All @@ -20,11 +25,32 @@ pub fn handle_get_authoring_list(get_list_data: GetListData, context: Arc<Contex
.spawn(move || {
let mut address_map = HashMap::new();
for entry in get_all_public_chain_entries(context.clone()) {
let content_aspect = get_content_aspect(&entry, context.clone())
.expect("Must be able to get content aspect of entry that is in our source chain");
address_map.insert(
entry.clone(),
get_all_aspect_addresses(&entry, context.clone())
.expect("Error getting entry aspects of authoring list"),
vec![content_aspect.address()]
);
}

// chain header entries also should be communicated on the authoring list
// In future make this depend if header publishing is enabled
let state = context.state()
.expect("There must be a state in context when we are responding to a HandleGetAuthoringEntryList");
for chain_header_entry in get_all_chain_header_entries(context.clone()) {
let address = chain_header_entry.address();
let header_entry_header = create_new_chain_header(
&chain_header_entry,
&state.agent(),
&*state,
&None,
&Vec::new(),
).expect("Must be able to create dummy header header when responding to HandleGetAuthoringEntryList");
let content_aspect = EntryAspect::Content(
chain_header_entry,
header_entry_header,
);
address_map.insert(address, vec![content_aspect.address()]);
}

let action = Action::RespondAuthoringList(EntryListData {
Expand All @@ -39,15 +65,20 @@ pub fn handle_get_authoring_list(get_list_data: GetListData, context: Arc<Contex
}

fn get_all_public_chain_entries(context: Arc<Context>) -> Vec<Address> {
let chain = context.state().unwrap().agent().chain_store();
let top_header = context.state().unwrap().agent().top_chain_header();
let chain = context.state().unwrap().agent().iter_chain();
chain
.iter(&top_header)
.filter(|ref chain_header| chain_header.entry_type().can_publish(&context))
.map(|chain_header| chain_header.entry_address().clone())
.collect()
}

fn get_all_chain_header_entries(context: Arc<Context>) -> Vec<Entry> {
let chain = context.state().unwrap().agent().iter_chain();
chain
.map(|chain_header| Entry::ChainHeader(chain_header))
.collect()
}

fn get_all_aspect_addresses(entry: &Address, context: Arc<Context>) -> HcResult<Vec<Address>> {
let mut address_list: Vec<Address> = get_meta_aspects(entry, context.clone())?
.iter()
Expand Down Expand Up @@ -90,3 +121,67 @@ pub fn handle_get_gossip_list(get_list_data: GetListData, context: Arc<Context>)
})
.expect("Could not spawn thread for creating of gossip list");
}

#[cfg(test)]
pub mod tests {
use super::*;
use crate::workflows::author_entry::author_entry;
use crate::nucleus::actions::tests::*;
use holochain_core_types::{
entry::{Entry, test_entry_with_value},
};
use holochain_persistence_api::cas::content::AddressableContent;
use std::{thread, time};

#[test]
fn test_can_get_chain_header_list() {
let mut dna = test_dna();
dna.uuid = "test_can_get_chain_header_list".to_string();
let (_instance, context) = instance_by_name("jill", dna, None);

context
.block_on(author_entry(
&test_entry_with_value("{\"stuff\":\"test entry value\"}"),
None,
&context,
&vec![],
))
.unwrap()
.address();

thread::sleep(time::Duration::from_millis(500));

let chain = context.state().unwrap().agent().iter_chain();
let header_entries: Vec<Entry> = chain.map(|header| Entry::ChainHeader(header)).collect();

assert_eq!(
get_all_chain_header_entries(context),
header_entries,
)

}

#[test]
fn test_can_get_all_aspect_addr_for_headers() {
let mut dna = test_dna();
dna.uuid = "test_can_get_chain_header_list".to_string();
let (_instance, context) = instance_by_name("jill", dna, None);

context
.block_on(author_entry(
&test_entry_with_value("{\"stuff\":\"test entry value\"}"),
None,
&context,
&vec![],
))
.unwrap()
.address();

thread::sleep(time::Duration::from_millis(500));

assert!(get_all_chain_header_entries(context.clone()).iter().all(|chain_header| {
get_all_aspect_addresses(&chain_header.address(), context.clone()).is_ok()
}));
}

}
79 changes: 56 additions & 23 deletions core/src/network/handler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ use crate::{
store::*,
},
},
nucleus,
workflows::get_entry_result::get_entry_with_meta_workflow,
};
use boolinator::*;
Expand All @@ -31,6 +30,8 @@ use lib3h_protocol::{
protocol_server::Lib3hServerProtocol,
};
use std::{convert::TryFrom, sync::Arc};
use crate::nucleus::actions::get_entry::get_entry_from_cas;
use crate::network::entry_with_header::EntryWithHeader;

// FIXME: Temporary hack to ignore messages incorrectly sent to us by the networking
// module that aren't really meant for us
Expand Down Expand Up @@ -252,35 +253,67 @@ fn get_content_aspect(
entry_address: &Address,
context: Arc<Context>,
) -> Result<EntryAspect, HolochainError> {
let entry_with_meta =
nucleus::actions::get_entry::get_entry_with_meta(&context, entry_address.clone())?
.ok_or(HolochainError::EntryNotFoundLocally)?;
let state = context.state()
.ok_or_else(|| {
HolochainError::InitializationFailed(
String::from("In get_content_aspect: no state found")
)
})?;

// Optimistically look for entry in chain...
let maybe_chain_header = state.agent()
.iter_chain()
.find(|ref chain_header| chain_header.entry_address() == entry_address);

// If we have found a header for the requested entry in the chain...
let maybe_entry_with_header = if let Some(header) = maybe_chain_header {
// ... we can just get the content from the chain CAS
Some(EntryWithHeader {
entry: get_entry_from_cas(&state.agent().chain_store().content_storage(), header.entry_address())?
.expect("Could not find entry in chain CAS, but header is chain"),
header
})
} else {
// ... but if we didn't author that entry, let's see if we have it in the DHT cas:
if let Some(entry) = get_entry_from_cas(&state.dht().content_storage(), entry_address)? {
// If we have it in the DHT cas that's good,
// but then we have to get the header like this:
let headers = context
.state()
.expect("Could not get state for handle_fetch_entry")
.get_headers(entry_address.clone())
.map_err(|error| {
let err_message = format!(
"net/fetch/get_content_aspect: Error trying to get headers {:?}",
error
);
log_error!(context, "{}", err_message.clone());
HolochainError::ErrorGeneric(err_message)
})?;
if headers.len() > 0 {
// TODO: this is just taking the first header..
// We should actually transform all headers into EntryAspect::Headers and just the first one
// into an EntryAspect content (What about ordering? Using the headers timestamp?)
Some(EntryWithHeader{entry, header: headers[0].clone()})
} else {
None
}
} else {
None
}
};

let entry_with_header = maybe_entry_with_header.ok_or(HolochainError::EntryNotFoundLocally)?;

let _ = entry_with_meta
let _ = entry_with_header
.entry
.entry_type()
.can_publish(&context)
.ok_or(HolochainError::EntryIsPrivate)?;

let headers = context
.state()
.expect("Could not get state for handle_fetch_entry")
.get_headers(entry_address.clone())
.map_err(|error| {
let err_message = format!(
"net/fetch/get_content_aspect: Error trying to get headers {:?}",
error
);
log_error!(context, "{}", err_message.clone());
HolochainError::ErrorGeneric(err_message)
})?;

// TODO: this is just taking the first header..
// We should actually transform all headers into EntryAspect::Headers and just the first one
// into an EntryAspect content (What about ordering? Using the headers timestamp?)
Ok(EntryAspect::Content(
entry_with_meta.entry,
headers[0].clone(),
entry_with_header.entry,
entry_with_header.header,
))
}

Expand Down
Loading

0 comments on commit 608f1ee

Please sign in to comment.