In [None]:
# pip installations
# !pip install -r requirements.txt

In [2]:
# Imports
from clean_text import load_texts_from_files
from explainable_rag import ExplainableRetriever

In [3]:
# Running explainable_rag

# File paths for your JSON and JSONL files
json_files = ["./data/bitcoin_client_data.jsonl", "./data/ed_discussion_fa2023.json", "./data/lecture_notes.json"]

# Load and extract texts from JSON files
texts = load_texts_from_files(json_files)

# Converts to string for the retriever to work with the extracted context
texts = [str(text) for text in texts]

# Initialize the ExplainableRetriever with the extracted texts
explainable_retriever = ExplainableRetriever(texts)

Loading texts from ./data/bitcoin_client_data.jsonl
Loading texts from ./data/ed_discussion_fa2023.json
Loading texts from ./data/lecture_notes.json


  self.embeddings = OpenAIEmbeddings()



Note, if the user gives code, and asks to resolve the code,
give correct code.

Analyze the following query in the context of the 
provided blockchain network implementation. Provide 
a clear explanation of how the context relates to the query. 
Focus on key concepts such as peer-to-peer communication, 
blockchain structures (e.g., Merkle trees, blocks, transactions),
and their interactions. Conclude the explanation with a summary 
of the main points.



In [4]:
def run(query):
    results = explainable_retriever.retrieve_and_explain(query)

    # Print the results
    for result in results:
        print("Content:", result["content"])
        print("Explanation:", result["explanation"])
        print("-" * 80)

In [5]:
# Use the retriever as usual

query = '''
You're a professional assistant for me to learn the course ECE/COS 470 Principles of Blockchains (https://web3.princeton.edu/principles-of-blockchains/).
All the materials of this course are attached below, including the programming assignment GitHub repo (bitcoin_client_data.jsonl), Ed discussion log from last year's course (ed_discussion_fa2023.json), and the tex source file of all lecture notes (lecture_notes.json).

I may ask you questions related to lecture notes, programming assignments, or homeworks, and you are expected to solve any of my confusion and provide step-by-step instructions and reasoning for your response. Please calm down, think step by step, and double check your answer before finalizing the response. When you are not confident about the answer, say "Sorry but I'm not sure" instead of producing random misleading responses. Here is my question:

I'm getting a bug on compilation. I will provide the assignment specification and the code that I'm working with in src/miner/worker.rs:

Programming
You will finish the behavior when a few message types are received.

Message types
You need to use these three message types. They have already been defined in src/network/message.rs.

NewBlockHashes(Vec<H256>)
GetBlocks(Vec<H256>)
Blocks(Vec<Block>)
Gossip protocol
You need to define the gossip protocol, i.e., the behavior when messages are received, in src/network/worker.rs.

First, you need to add a thread-safe wrapper of Blockchain into Worker struct in src/network/worker.rs. It is similar to previous part. Notice that the server we provide is a multithread one, so please be careful with thread safety.

Then, you can define the gossip protocol as follows.

For NewBlockHashes, if the hashes are not already in the blockchain, you need to ask for them by sending GetBlocks.
For GetBlocks, if the hashes are in the blockchain, you can get these blocks and reply with a Blocks message.
For Blocks, insert the blocks into the blockchain if they are not already in it. You must also broadcast a NewBlockHashes message if the blocks are new to this node. NewBlockHashes message should contain hashes of blocks newly received.
Optional. If a block's parent is missing, put this block into a buffer and send Getblocks message. The buffer stores the blocks whose parent is not seen yet. When the parent is received, that block can be popped out from the buffer and inserted into the blockchain.
Hint: peer.write() and self.server.broadcast() may be useful to send a message. Also, make sure that the vectors of block hashes/blocks that you are sending on the channels should be non empty. Add a check that sends these messages only if the content of the vectors is non-empty.

Combine with miner
When a miner successfully generates a new block, broadcast the message NewBlockHashes. Hint: in src/miner/worker.rs, self.server.broadcast() may be useful. Also, in main.rs, make sure you give the same thread-safe blockchain instance to both miner and network worker.

Network for test
You need to write function fn generate_test_worker_and_start() -> (TestMsgSender, ServerTestReceiver, Vec<H256>) in src/network/worker.rs which creates structs for testing purpose. This function is called inside the auto-grader and should have no input parameter. We have provided a part of the function. You need to finish the part that adds Blockchain inside Worker, and return a vector of block hashes in the blockchain's longest chain (it could be just the genesis block hash).

Grading
After you finish the programming, you will have a program that can connect to other peers and have the gossip protocol. You can run cargo test reply_new_block_hashes / reply_get_blocks / reply_blocks to test whether the gossip protocol is working.

We will auto-grade the program using tests similar to the ones mentioned above.


use super::message::Message;
use super::peer;
use super::server::Handle as ServerHandle;
use crate::{types::{hash::H256, block::Block}, blockchain::Blockchain};
use crate::types::hash::Hashable;
use log::{debug, warn, error};

#[cfg(any(test, test_utilities))]
use super::peer::TestReceiver as PeerTestReceiver;
#[cfg(any(test, test_utilities))]
use super::server::TestReceiver as ServerTestReceiver;

#[derive(Clone)]
pub struct Worker {
    msg_chan: smol::channel::Receiver<(Vec<u8>, peer::Handle)>,
    num_worker: usize,
    server: ServerHandle,
    blockchain: Arc<Mutex<Blockchain>>,
    orphan_buffer: Arc<Mutex<HashMap<H256, Vec<Block>>>>,
}

impl Worker {
    // worker instance
    pub fn new(
        num_worker: usize,
        msg_src: smol::channel::Receiver<(Vec<u8>, peer::Handle)>,
        server: &ServerHandle,
        blockchain: &Arc<Mutex<Blockchain>>,
        orphan_buffer: &Arc<Mutex<HashMap<H256, Vec<Block>>>>,
    ) -> Self {
        Self {
            msg_chan: msg_src,
            num_worker,
            server: server.clone(),
            blockchain: Arc::clone(blockchain),
            orphan_buffer: Arc::clone(orphan_buffer),
        }
    }

    // start the worker threads
    pub fn start(self) {
        for i in 0..self.num_worker {
            let cloned = self.clone();
            thread::spawn(move || {
                cloned.worker_loop();
                warn!("Worker thread {} exited", i);
            });
        }
    }

    // main loop for each worker thread
    fn worker_loop(&self) {
        loop {
            match smol::block_on(self.msg_chan.recv()) {
                Ok((msg, mut peer)) => {
                    let msg: Message = bincode::deserialize(&msg).unwrap();
                    self.handle_message(msg, &mut peer);
                }
                Err(e) => {
                    error!("network worker terminated {}", e);
                    break;
                }
            }
        }
    }

    // handle incoming messages
    fn handle_message(&self, msg: Message, peer: &mut peer::Handle) {
        match msg {
            Message::Ping(nonce) => {
                debug!("Ping: {}", nonce);
                peer.write(Message::Pong(nonce.to_string()));
            }
            Message::Pong(nonce) => {
                debug!("Pong: {}", nonce);
            }
            Message::NewBlockHashes(hashes) => {
                self.handle_new_block_hashes(hashes, peer);
            }
            Message::GetBlocks(hashes) => {
                self.handle_get_blocks(hashes, peer);
            }
            Message::Blocks(blocks) => {
                self.handle_blocks(blocks, peer);
            }
            _ => unimplemented!(),
        }
    }

    fn handle_new_block_hashes(&self, hashes: Vec<H256>, peer: &mut peer::Handle) {
        // Lock the blockchain for reading
        let blockchain = self.blockchain.lock().unwrap();
        
        // Filter out the hashes that are not present in the blockchain
        let missing_hashes: Vec<H256> = hashes.into_iter()
            .filter(|h| !blockchain.contains_block(h))
            .collect();
        
        // Release the lock on the blockchain
        drop(blockchain);
        
        // If there are any missing hashes, request the blocks from the peer
        if !missing_hashes.is_empty() {
            peer.write(Message::GetBlocks(missing_hashes));
        }
    }

    fn handle_get_blocks(&self, hashes: Vec<H256>, peer: &mut peer::Handle) {
        // lock the blockchain for reading
        let blockchain = self.blockchain.lock().unwrap();
        
        // collect the blocks corresponding to the requested hashes
        let blocks: Vec<Block> = hashes.into_iter()
            .filter_map(|h| blockchain.get_block(&h).cloned())
            .collect();
        
        // release the lock on the blockchain
        drop(blockchain);
        
        // send them to the peer if any blocks
        if !blocks.is_empty() {
            peer.write(Message::Blocks(blocks));
        }
    }

    fn handle_blocks(&self, blocks: Vec<Block>, peer: &mut peer::Handle) {
        // lock the blockchain and orphan buffer for writing
        let mut blockchain = self.blockchain.lock().unwrap();
        let mut orphan_buffer = self.orphan_buffer.lock().unwrap();
        
        let mut parents_missing: Vec<H256> = vec![];
        let mut new_hashes: Vec<H256> = vec![];
        let difficulty = blockchain.get_block(&blockchain.tip()).unwrap().get_difficulty();

        for block in blocks {
            let block_hash = block.hash();
            if block_hash > difficulty {
                continue;
            }

            let parent_hash = block.get_parent();
            if blockchain.contains_block(&parent_hash) {
                if block.get_difficulty() == difficulty && !blockchain.contains_block(&block_hash) {
                    blockchain.insert(&block);
                    new_hashes.push(block_hash);
                }
            } else {
                parents_missing.push(parent_hash);
                orphan_buffer.entry(parent_hash).or_insert_with(Vec::new).push(block);
            }
        }

        if !parents_missing.is_empty() {
            peer.write(Message::GetBlocks(parents_missing));
        }

        self.update_orphan_buffer(&mut blockchain, &mut orphan_buffer, &mut new_hashes);

        if !new_hashes.is_empty() {
            self.server.broadcast(Message::NewBlockHashes(new_hashes));
        }
    }

    // update the orphan buffer with new blocks
    fn update_orphan_buffer(
        &self,
        blockchain: &mut Blockchain,
        orphan_buffer: &mut HashMap<H256, Vec<Block>>,
        new_hashes: &mut Vec<H256>,
    ) {
        let mut left = new_hashes.clone();
        while !left.is_empty() {
            let mut temp: Vec<H256> = vec![];
            for block_hash in left {
                if let Some(children) = orphan_buffer.remove(&block_hash) {
                    for child in children {
                        if child.get_difficulty() == blockchain.get_block(&blockchain.tip()).unwrap().get_difficulty() {
                            blockchain.insert(&child);
                            temp.push(child.hash());
                            new_hashes.push(child.hash());
                        }
                    }
                }
            }
            left = temp;
        }
    }
}

#[cfg(any(test, test_utilities))]
struct TestMsgSender {
    s: smol::channel::Sender<(Vec<u8>, peer::Handle)>
}

#[cfg(any(test, test_utilities))]
impl TestMsgSender {
    fn new() -> (TestMsgSender, smol::channel::Receiver<(Vec<u8>, peer::Handle)>) {
        let (s, r) = smol::channel::unbounded();
        (TestMsgSender { s }, r)
    }

    fn send(&self, msg: Message) -> PeerTestReceiver {
        let bytes = bincode::serialize(&msg).unwrap();
        let (handle, r) = peer::Handle::test_handle();
        smol::block_on(self.s.send((bytes, handle))).unwrap();
        r
    }
}

#[cfg(any(test, test_utilities))]
fn generate_test_worker_and_start() -> (TestMsgSender, ServerTestReceiver, Vec<H256>) {
    let (server, server_receiver) = ServerHandle::new_for_test();
    let (test_msg_sender, msg_chan) = TestMsgSender::new();
    let blockchain = Arc::new(Mutex::new(Blockchain::new()));
    let orphan_buffer: Arc<Mutex<HashMap<H256, Vec<Block>>>> = Arc::new(Mutex::new(HashMap::new()));
    let worker = Worker::new(1, msg_chan, &server, &blockchain, &orphan_buffer);
    worker.start();
    let genesis_hash = blockchain.lock().unwrap().tip();
    (test_msg_sender, server_receiver, vec![genesis_hash])
}
// DO NOT CHANGE THIS COMMENT, IT IS FOR AUTOGRADER. BEFORE TEST

#[cfg(test)]
mod test {
    use ntest::timeout;
    use rand::Rng;
    use crate::types::block::{generate_random_block, Block};
    use crate::types::hash::{Hashable, H256};
    use super::super::message::Message;
    use super::generate_test_worker_and_start;
    
    #[test]
    #[timeout(60000)]
    fn reply_new_block_hashes() {
        let (test_msg_sender, _server_receiver, v) = generate_test_worker_and_start();
        let random_block = generate_random_block(v.last().unwrap());
        let mut peer_receiver = test_msg_sender.send(Message::NewBlockHashes(vec![random_block.hash()]));
        let reply = peer_receiver.recv();
        if let Message::GetBlocks(v) = reply {
            assert_eq!(v, vec![random_block.hash()]);
        } else {
            panic!();
        }
    }

    #[test]
    #[timeout(60000)]
    fn reply_get_blocks() {
        let (test_msg_sender, _server_receiver, v) = generate_test_worker_and_start();
        let h = v.last().unwrap().clone();
        let mut peer_receiver = test_msg_sender.send(Message::GetBlocks(vec![h.clone()]));
        let reply = peer_receiver.recv();
        if let Message::Blocks(v) = reply {
            assert_eq!(1, v.len());
            assert_eq!(h, v[0].hash())
        } else {
            panic!();
        }
    }

    #[test]
    #[timeout(60000)]
    fn reply_blocks() {
        let (test_msg_sender, server_receiver, v) = generate_test_worker_and_start();
        let random_block = generate_random_block(v.last().unwrap());
        let mut _peer_receiver = test_msg_sender.send(Message::Blocks(vec![random_block.clone()]));
        let reply = server_receiver.recv().unwrap();
        if let Message::NewBlockHashes(v) = reply {
            assert_eq!(v, vec![random_block.hash()]);
        } else {
            panic!();
        }
    }
    #[test]
    #[timeout(60000)]
    fn handle_multiple_blocks() {
        let (test_msg_sender, server_receiver, v) = generate_test_worker_and_start();
        let random_block1 = generate_random_block(v.last().unwrap());
        let random_block2 = generate_random_block(&random_block1.hash());
        let mut _peer_receiver = test_msg_sender.send(Message::Blocks(vec![random_block1.clone(), random_block2.clone()]));
        let reply = server_receiver.recv().unwrap();
        if let Message::NewBlockHashes(v) = reply {
            assert_eq!(v, vec![random_block1.hash(), random_block2.hash()]);
        } else {
            panic!();
        }
    }

    #[test]
    #[timeout(60000)]
    fn handle_fork_blocks() {
        let (test_msg_sender, server_receiver, v) = generate_test_worker_and_start();
        let random_block1 = generate_random_block(v.last().unwrap());
        let random_block2 = generate_random_block(v.last().unwrap()); // Fork from genesis
        let mut _peer_receiver = test_msg_sender.send(Message::Blocks(vec![random_block1.clone(), random_block2.clone()]));
        let reply = server_receiver.recv().unwrap();
        if let Message::NewBlockHashes(v) = reply {
            assert_eq!(v, vec![random_block1.hash(), random_block2.hash()]);
        } else {
            panic!();
        }
    }
}

// DO NOT CHANGE THIS COMMENT, IT IS FOR AUTOGRADER. AFTER TEST

'''

run(query)

  docs = self.retriever.get_relevant_documents(query)


Content: {'prompt': 'Rust Code from COS-ECE470-fa2024/src/network/worker.rs:\n', 'completion': ' use super::message::Message;\nuse super::peer;\nuse super::server::Handle as ServerHandle;\nuse crate::types::hash::H256;\n\nuse log::{debug, warn, error};\n\nuse std::thread;\n\n#[cfg(any(test,test_utilities))]\nuse super::peer::TestReceiver as PeerTestReceiver;\n#[cfg(any(test,test_utilities))]\nuse super::server::TestReceiver as ServerTestReceiver;\n#[derive(Clone)]\npub struct Worker {\n    msg_chan: smol::channel::Receiver<(Vec<u8>, peer::Handle)>,\n    num_worker: usize,\n    server: ServerHandle,\n}\n\n\nimpl Worker {\n    pub fn new(\n        num_worker: usize,\n        msg_src: smol::channel::Receiver<(Vec<u8>, peer::Handle)>,\n        server: &ServerHandle,\n    ) -> Self {\n        Self {\n            msg_chan: msg_src,\n            num_worker,\n            server: server.clone(),\n        }\n    }\n\n    pub fn start(self) {\n        let num_worker = self.num_worker;\n        fo