-
Notifications
You must be signed in to change notification settings - Fork 1
/
shard.go
230 lines (195 loc) · 7.48 KB
/
shard.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
package core
import (
"fmt"
"hash/fnv"
"math/rand"
"time"
"github.com/stathat/consistent"
thrylos "github.com/thrylos-labs/thrylos"
"github.com/thrylos-labs/thrylos/shared"
)
var rng *rand.Rand
func init() {
// Initialize a new random source
source := rand.NewSource(time.Now().UnixNano())
rng = rand.New(source) // Create a new Rand instance with the source
}
const replicationFactor = 3 // Define how many nodes each UTXO should be replicated to
type ConsistentHashRing struct {
*consistent.Consistent
}
func NewConsistentHashRing() *ConsistentHashRing {
c := consistent.New()
return &ConsistentHashRing{c}
}
func (c *ConsistentHashRing) AddNode(node string) {
c.Consistent.Add(node)
}
func (c *ConsistentHashRing) GetNode(key string) string {
node, err := c.Consistent.Get(key)
if err != nil {
// Handle the error, perhaps returning an empty string or a default node
return ""
}
return node
}
func (c *ConsistentHashRing) GetReplicas(key string, count int) []string {
nodes, err := c.Consistent.GetN(key, count)
if err != nil {
// Handle the error, perhaps returning nil or an empty slice
return []string{}
}
return nodes
}
func (c *ConsistentHashRing) ProxyGetHash(value string) uint32 {
h := fnv.New32a() // Or any other hash function used by your consistent hash library
h.Write([]byte(value))
return h.Sum32()
}
// Shard represents a subset of the blockchain network, designed to scale the network by dividing
// the transaction and block processing workload among multiple shards. Each shard maintains its own
// sequence of blocks, set of transactions (UTXOs), and participating nodes.
type Shard struct {
ID int // Unique identifier for the shard.
Nodes []*Node // List of nodes that are part of the shard.
UTXOs map[string]shared.UTXO // Current state of unspent transaction outputs managed by this shard.
Blocks []*Block // Blocks that have been confirmed and added to the shard's blockchain.
MaxNodes int // Maximum number of nodes allowed to be part of the shard.
}
// Initialize or update the shard, including UTXO redistribution
// Initialize or update the shard, including UTXO redistribution
func (s *Shard) InitializeOrUpdateShard() {
allUTXOs := shared.GetAllUTXOs() // Now returns a map
s.UTXOs = allUTXOs // Ensure s.UTXOs is also a map of the same type
// Redistribute data among nodes
s.RedistributeData()
// Distribute UTXOs using the new shard configuration
distributedUTXOs := s.distributeUTXOs(allUTXOs)
s.applyDistributedUTXOs(distributedUTXOs)
}
// Applies the UTXO distribution to the nodes within the shard
func (s *Shard) applyDistributedUTXOs(distribution map[string][]*thrylos.UTXO) {
// Clear existing UTXOs to avoid duplication
for _, node := range s.Nodes {
node.ResponsibleUTXOs = make(map[string]shared.UTXO)
}
// Apply the distributed UTXOs
for nodeAddr, utxos := range distribution {
for _, node := range s.Nodes {
if node.Address == nodeAddr {
for _, utxo := range utxos {
txID := utxo.TransactionId // directly using the string
node.ResponsibleUTXOs[txID] = shared.UTXO{
TransactionID: txID,
Index: int(utxo.Index),
OwnerAddress: utxo.OwnerAddress,
Amount: int(utxo.Amount),
}
}
break
}
}
}
}
// NewShard initializes a new Shard with a specified identifier and maximum node capacity. It sets up
// the initial empty structures for nodes, UTXOs, and blocks within the shard.
func NewShard(id int, maxNodes int) *Shard {
return &Shard{
ID: id,
Nodes: make([]*Node, 0),
UTXOs: make(map[string]shared.UTXO),
Blocks: make([]*Block, 0),
MaxNodes: maxNodes,
}
}
// AddNode adds a new node to the shard's list of participating nodes. This method registers a node
// as part of the shard, allowing it to participate in the shard's transaction and block processing activities.
// The method may include additional logic to integrate the node into the shard's operations.
func (s *Shard) AddNode(node *Node) {
s.Nodes = append(s.Nodes, node)
// Additional logic to integrate the node into the shard can be added here.
}
// AssignNode attempts to add a node to the shard, ensuring the node is not already a member and that
// the shard has not reached its maximum capacity. It returns an error if the shard is full or if the
// node is already part of the shard.
func (s *Shard) AssignNode(node *Node) error {
// Check if node is already in the shard
for _, n := range s.Nodes {
if n == node {
return nil // Node already exists in the shard, no action needed.
}
}
// Check if the shard is at full capacity
if len(s.Nodes) >= s.MaxNodes {
return fmt.Errorf("shard %d is at maximum capacity", s.ID)
}
// Add the node to the shard
s.Nodes = append(s.Nodes, node)
return nil
}
// RedistributeData redistributes UTXOs among nodes based on a consistent hashing mechanism
func (s *Shard) RedistributeData() {
ring := NewConsistentHashRing()
for _, node := range s.Nodes {
ring.AddNode(node.Address)
}
// Example for how UTXOs might be redistributed based on transaction IDs
for txID, utxo := range s.UTXOs {
responsibleNodeAddr := ring.GetNode(txID)
for _, node := range s.Nodes {
if node.Address == responsibleNodeAddr {
node.ResponsibleUTXOs[txID] = utxo
break
}
}
}
}
func replicateUTXO(txID string, utxo shared.UTXO, primaryAddr string, ring *ConsistentHashRing, nodes []*Node) {
responsibleNodeAddr := ring.GetNode(txID)
for _, node := range nodes {
if node.Address == responsibleNodeAddr {
node.AssignUTXO(txID, utxo) // Pass the transaction ID as a string and the UTXO
break
}
}
// Replicate UTXO responsibility
replicas := ring.GetReplicas(txID, replicationFactor)
for _, replicaAddr := range replicas {
if replicaAddr == responsibleNodeAddr {
continue // Skip primary responsible node
}
for _, node := range nodes {
if node.Address == replicaAddr {
node.AssignUTXO(txID, utxo) // Pass the transaction ID and the UTXO
break
}
}
}
}
func (n *Node) AssignUTXO(txID string, utxo shared.UTXO) {
// Assign UTXO to the node's local storage using a map
n.ResponsibleUTXOs[txID] = utxo
}
// distributeUTXOs serves as a placeholder for the logic required to distribute UTXOs among the nodes in the shard.
// This method should be implemented to ensure a balanced distribution of transaction outputs for processing and storage.
func (s *Shard) distributeUTXOs(allUTXOs map[string]shared.UTXO) map[string][]*thrylos.UTXO {
distributedUTXOs := make(map[string][]*thrylos.UTXO)
ring := NewConsistentHashRing()
for _, node := range s.Nodes {
ring.AddNode(node.Address)
distributedUTXOs[node.Address] = []*thrylos.UTXO{} // Ensure initialization
}
for key, utxo := range allUTXOs {
replicas := ring.GetReplicas(key, replicationFactor) // Ensure UTXOs are replicated
for _, nodeAddr := range replicas {
protoUTXO := shared.ConvertSharedUTXOToProto(utxo)
distributedUTXOs[nodeAddr] = append(distributedUTXOs[nodeAddr], protoUTXO)
}
}
for nodeAddr, utxos := range distributedUTXOs {
fmt.Printf("Node %s received %d UTXOs\n", nodeAddr, len(utxos))
}
return distributedUTXOs
}
// Shard Initialization and Node Management: Shards are subsets of the network designed to scale by distributing the workload. Nodes are added to shards, and their capacity is managed to prevent overloading.
// Data Redistribution: Implements logic for distributing data (like UTXOs) among nodes in a shard to balance load and ensure efficient data management.