Skip to content

Commit

Permalink
removes backport merge conflicts
Browse files Browse the repository at this point in the history
  • Loading branch information
behzadnouri committed Sep 27, 2021
1 parent b503e7d commit 4e95081
Show file tree
Hide file tree
Showing 4 changed files with 9 additions and 182 deletions.
108 changes: 5 additions & 103 deletions core/src/retransmit_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,29 +18,19 @@ use {
solana_client::rpc_response::SlotUpdate,
solana_gossip::cluster_info::{ClusterInfo, DATA_PLANE_FANOUT},
solana_ledger::{
<<<<<<< HEAD
shred::{get_shred_slot_index_type, ShredFetchStats},
shred::Shred,
{
blockstore::{Blockstore, CompletedSlotsReceiver},
leader_schedule_cache::LeaderScheduleCache,
},
},
solana_measure::measure::Measure,
solana_metrics::inc_new_counter_error,
solana_perf::packet::{Packet, Packets},
solana_perf::packet::Packets,
solana_rpc::{
max_slots::MaxSlots, rpc_completed_slots_service::RpcCompletedSlotsService,
rpc_subscriptions::RpcSubscriptions,
},
=======
shred::Shred,
{blockstore::Blockstore, leader_schedule_cache::LeaderScheduleCache},
},
solana_measure::measure::Measure,
solana_metrics::inc_new_counter_error,
solana_perf::packet::Packets,
solana_rpc::{max_slots::MaxSlots, rpc_subscriptions::RpcSubscriptions},
>>>>>>> 3efccbffa (sends shreds (instead of packets) to retransmit stage)
solana_runtime::{bank::Bank, bank_forks::BankForks},
solana_sdk::{
clock::Slot,
Expand Down Expand Up @@ -80,12 +70,6 @@ struct RetransmitStats {
total_time: AtomicU64,
epoch_fetch: AtomicU64,
epoch_cache_update: AtomicU64,
<<<<<<< HEAD
repair_total: AtomicU64,
discard_total: AtomicU64,
duplicate_retransmit: AtomicU64,
=======
>>>>>>> bf437b033 (removes packet-count metrics from retransmit stage)
retransmit_total: AtomicU64,
last_ts: AtomicInterval,
compute_turbine_peers_total: AtomicU64,
Expand All @@ -98,12 +82,6 @@ fn update_retransmit_stats(
num_shreds: usize,
num_shreds_skipped: usize,
retransmit_total: u64,
<<<<<<< HEAD
discard_total: u64,
repair_total: u64,
duplicate_retransmit: u64,
=======
>>>>>>> bf437b033 (removes packet-count metrics from retransmit stage)
compute_turbine_peers_total: u64,
peers_len: usize,
epoch_fetch: u64,
Expand All @@ -120,18 +98,6 @@ fn update_retransmit_stats(
.retransmit_total
.fetch_add(retransmit_total, Ordering::Relaxed);
stats
<<<<<<< HEAD
.repair_total
.fetch_add(repair_total, Ordering::Relaxed);
stats
.discard_total
.fetch_add(discard_total, Ordering::Relaxed);
stats
.duplicate_retransmit
.fetch_add(duplicate_retransmit, Ordering::Relaxed);
stats
=======
>>>>>>> bf437b033 (removes packet-count metrics from retransmit stage)
.compute_turbine_peers_total
.fetch_add(compute_turbine_peers_total, Ordering::Relaxed);
stats.total_batches.fetch_add(1, Ordering::Relaxed);
Expand Down Expand Up @@ -183,24 +149,6 @@ fn update_retransmit_stats(
stats.compute_turbine_peers_total.swap(0, Ordering::Relaxed) as i64,
i64
),
<<<<<<< HEAD
(
"repair_total",
stats.repair_total.swap(0, Ordering::Relaxed) as i64,
i64
),
(
"discard_total",
stats.discard_total.swap(0, Ordering::Relaxed) as i64,
i64
),
(
"duplicate_retransmit",
stats.duplicate_retransmit.swap(0, Ordering::Relaxed) as i64,
i64
),
=======
>>>>>>> bf437b033 (removes packet-count metrics from retransmit stage)
);
}
}
Expand Down Expand Up @@ -321,12 +269,6 @@ fn retransmit(
let num_shreds = shreds.len();
let my_id = cluster_info.id();
let socket_addr_space = cluster_info.socket_addr_space();
<<<<<<< HEAD
let mut discard_total = 0;
let mut repair_total = 0;
let mut duplicate_retransmit = 0;
=======
>>>>>>> bf437b033 (removes packet-count metrics from retransmit stage)
let mut retransmit_total = 0;
let mut num_shreds_skipped = 0;
let mut compute_turbine_peers_total = 0;
Expand All @@ -336,22 +278,7 @@ fn retransmit(
num_shreds_skipped += 1;
continue;
}
<<<<<<< HEAD
if packet.meta.repair {
total_packets -= 1;
continue;
}
let shred_slot = match check_if_already_received(packet, shreds_received) {
Some(slot) => slot,
None => {
total_packets -= 1;
duplicate_retransmit += 1;
continue;
}
};
=======
let shred_slot = shred.slot();
>>>>>>> 3efccbffa (sends shreds (instead of packets) to retransmit stage)
max_slot = max_slot.max(shred_slot);

if let Some(rpc_subscriptions) = rpc_subscriptions {
Expand Down Expand Up @@ -417,12 +344,6 @@ fn retransmit(
num_shreds,
num_shreds_skipped,
retransmit_total,
<<<<<<< HEAD
discard_total,
repair_total,
duplicate_retransmit,
=======
>>>>>>> bf437b033 (removes packet-count metrics from retransmit stage)
compute_turbine_peers_total,
cluster_nodes.num_peers(),
epoch_fetch.as_us(),
Expand Down Expand Up @@ -528,12 +449,8 @@ impl RetransmitStage {
retransmit_sockets: Arc<Vec<UdpSocket>>,
repair_socket: Arc<UdpSocket>,
verified_receiver: Receiver<Vec<Packets>>,
<<<<<<< HEAD
exit: &Arc<AtomicBool>,
rpc_completed_slots_receiver: CompletedSlotsReceiver,
=======
exit: Arc<AtomicBool>,
>>>>>>> 6e413331b (removes erroneous uses of Arc<...> from retransmit stage)
rpc_completed_slots_receiver: CompletedSlotsReceiver,
cluster_slots_update_receiver: ClusterSlotsUpdateReceiver,
epoch_schedule: EpochSchedule,
cfg: Option<Arc<AtomicBool>>,
Expand All @@ -558,13 +475,8 @@ impl RetransmitStage {
leader_schedule_cache.clone(),
cluster_info.clone(),
retransmit_receiver,
<<<<<<< HEAD
Arc::clone(max_slots),
rpc_subscriptions.clone(),
=======
max_slots,
rpc_subscriptions,
>>>>>>> 6e413331b (removes erroneous uses of Arc<...> from retransmit stage)
rpc_subscriptions.clone(),
);

let rpc_completed_slots_hdl =
Expand All @@ -584,15 +496,10 @@ impl RetransmitStage {
epoch_schedule,
duplicate_slots_reset_sender,
repair_validators,
<<<<<<< HEAD
=======
cluster_info,
cluster_slots,
>>>>>>> 6e413331b (removes erroneous uses of Arc<...> from retransmit stage)
};
let window_service = WindowService::new(
blockstore,
cluster_info.clone(),
cluster_info,
verified_receiver,
retransmit_sender,
repair_socket,
Expand Down Expand Up @@ -704,12 +611,8 @@ mod tests {
let cluster_info = Arc::new(cluster_info);

let (retransmit_sender, retransmit_receiver) = channel();
<<<<<<< HEAD
let t_retransmit = retransmitter(
=======
let _retransmit_sender = retransmit_sender.clone();
let _t_retransmit = retransmitter(
>>>>>>> 7a8807b8b (retransmits shreds recovered from erasure codes)
retransmit_socket,
bank_forks,
leader_schedule_cache,
Expand All @@ -718,7 +621,6 @@ mod tests {
Arc::default(), // MaxSlots
None,
);
let _thread_hdls = vec![t_retransmit];

let shred = Shred::new_from_data(0, 0, 0, None, true, true, 0, 0x20, 0);
// it should send this over the sockets.
Expand Down
6 changes: 1 addition & 5 deletions core/src/tvu.rs
Original file line number Diff line number Diff line change
Expand Up @@ -178,12 +178,8 @@ impl Tvu {
Arc::new(retransmit_sockets),
repair_socket,
verified_receiver,
<<<<<<< HEAD
&exit,
completed_slots_receiver,
=======
exit.clone(),
>>>>>>> 6e413331b (removes erroneous uses of Arc<...> from retransmit stage)
completed_slots_receiver,
cluster_slots_update_receiver,
*bank_forks.read().unwrap().working_bank().epoch_schedule(),
cfg,
Expand Down
12 changes: 2 additions & 10 deletions core/src/window_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -257,12 +257,8 @@ fn run_insert<F>(
metrics: &mut BlockstoreInsertionMetrics,
ws_metrics: &mut WindowServiceMetrics,
completed_data_sets_sender: &CompletedDataSetsSender,
<<<<<<< HEAD
outstanding_requests: &RwLock<OutstandingRepairs>,
=======
retransmit_sender: &Sender<Vec<Shred>>,
outstanding_requests: &RwLock<OutstandingShredRepairs>,
>>>>>>> 7a8807b8b (retransmits shreds recovered from erasure codes)
outstanding_requests: &RwLock<OutstandingRepairs>,
) -> Result<()>
where
F: Fn(Shred),
Expand Down Expand Up @@ -532,12 +528,8 @@ impl WindowService {
insert_receiver: CrossbeamReceiver<(Vec<Shred>, Vec<Option<RepairMeta>>)>,
check_duplicate_sender: CrossbeamSender<Shred>,
completed_data_sets_sender: CompletedDataSetsSender,
<<<<<<< HEAD
outstanding_requests: Arc<RwLock<OutstandingRepairs>>,
=======
retransmit_sender: Sender<Vec<Shred>>,
outstanding_requests: Arc<RwLock<OutstandingShredRepairs>>,
>>>>>>> 7a8807b8b (retransmits shreds recovered from erasure codes)
outstanding_requests: Arc<RwLock<OutstandingRepairs>>,
) -> JoinHandle<()> {
let mut handle_timeout = || {};
let handle_error = || {
Expand Down
65 changes: 1 addition & 64 deletions ledger/src/blockstore.rs
Original file line number Diff line number Diff line change
@@ -1,62 +1,6 @@
//! The `blockstore` module provides functions for parallel verification of the
//! Proof of History ledger as well as iterative read, append write, and random
//! access read to a persistent file-based ledger.
<<<<<<< HEAD
use crate::{
ancestor_iterator::AncestorIterator,
blockstore_db::{
columns as cf, AccessType, BlockstoreRecoveryMode, Column, Database, IteratorDirection,
IteratorMode, LedgerColumn, Result, WriteBatch,
},
blockstore_meta::*,
entry::{create_ticks, Entry},
erasure::ErasureConfig,
leader_schedule_cache::LeaderScheduleCache,
next_slots_iterator::NextSlotsIterator,
shred::{Result as ShredResult, Shred, Shredder, MAX_DATA_SHREDS_PER_FEC_BLOCK},
};
pub use crate::{blockstore_db::BlockstoreError, blockstore_meta::SlotMeta};
use bincode::deserialize;
use log::*;
use rayon::{
iter::{IntoParallelRefIterator, ParallelIterator},
ThreadPool,
};
use rocksdb::DBRawIterator;
use solana_measure::measure::Measure;
use solana_metrics::{datapoint_debug, datapoint_error};
use solana_rayon_threadlimit::get_thread_count;
use solana_runtime::hardened_unpack::{unpack_genesis_archive, MAX_GENESIS_ARCHIVE_UNPACKED_SIZE};
use solana_sdk::{
clock::{Slot, UnixTimestamp, DEFAULT_TICKS_PER_SECOND, MS_PER_TICK},
genesis_config::{GenesisConfig, DEFAULT_GENESIS_ARCHIVE, DEFAULT_GENESIS_FILE},
hash::Hash,
pubkey::Pubkey,
sanitize::Sanitize,
signature::{Keypair, Signature, Signer},
timing::timestamp,
transaction::Transaction,
};
use solana_storage_proto::{StoredExtendedRewards, StoredTransactionStatusMeta};
use solana_transaction_status::{
ConfirmedBlock, ConfirmedTransaction, ConfirmedTransactionStatusWithSignature, Rewards,
TransactionStatusMeta, TransactionWithStatusMeta,
};
use std::{
borrow::Cow,
cell::RefCell,
cmp,
collections::{BTreeMap, HashMap, HashSet},
convert::TryInto,
fs,
io::{Error as IoError, ErrorKind},
path::{Path, PathBuf},
rc::Rc,
sync::{
atomic::{AtomicBool, Ordering},
mpsc::{sync_channel, Receiver, SyncSender, TrySendError},
Arc, Mutex, RwLock, RwLockWriteGuard,
=======
pub use crate::{blockstore_db::BlockstoreError, blockstore_meta::SlotMeta};
use {
crate::{
Expand All @@ -66,6 +10,7 @@ use {
IteratorMode, LedgerColumn, Result, WriteBatch,
},
blockstore_meta::*,
entry::{create_ticks, Entry},
erasure::ErasureConfig,
leader_schedule_cache::LeaderScheduleCache,
next_slots_iterator::NextSlotsIterator,
Expand All @@ -78,7 +23,6 @@ use {
ThreadPool,
},
rocksdb::DBRawIterator,
solana_entry::entry::{create_ticks, Entry},
solana_measure::measure::Measure,
solana_metrics::{datapoint_debug, datapoint_error},
solana_rayon_threadlimit::get_thread_count,
Expand All @@ -92,7 +36,6 @@ use {
signature::{Keypair, Signature, Signer},
timing::timestamp,
transaction::Transaction,
>>>>>>> 3c71670bd (returns completed-data-set-info from insert_data_shred)
},
solana_storage_proto::{StoredExtendedRewards, StoredTransactionStatusMeta},
solana_transaction_status::{
Expand All @@ -116,15 +59,9 @@ use {
},
time::Instant,
},
tempfile::TempDir,
thiserror::Error,
trees::{Tree, TreeWalk},
};
<<<<<<< HEAD
use thiserror::Error;
use trees::{Tree, TreeWalk};
=======
>>>>>>> 3c71670bd (returns completed-data-set-info from insert_data_shred)

pub mod blockstore_purge;

Expand Down

0 comments on commit 4e95081

Please sign in to comment.