-
Notifications
You must be signed in to change notification settings - Fork 21
/
k8s.rs
141 lines (130 loc) · 4.65 KB
/
k8s.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
#![allow(unreachable_code)]
use sentinel_core::k8s_openapi::apiextensions_apiserver::pkg::apis::apiextensions::v1::CustomResourceDefinition;
use sentinel_core::kube::{
api::{Api, Patch, PatchParams, PostParams},
runtime::wait::{await_condition, conditions},
Client, CustomResourceExt,
};
use sentinel_core::{
base,
datasource::{
ds_k8s::{K8sDataSource, SENTINEL_RULE_GROUP},
new_flow_rule_handler, rule_json_array_parser,
},
flow, EntryBuilder, Result,
};
use std::sync::Arc;
use tokio::task::JoinHandle;
use tokio::time::{sleep, Duration};
// An example on k8s dynamic data source.
// Install minikube in your terminal first. Run `minikube start`, then run this example.
// Run `kubectl get flowresources -A` will show the flow resources created by this example.
// Run `kubectl delete flowresources/flow-1` will delete the flow resource created by this example.
// You will find that QPS number is restricted to 10 at first. But soon, it will be restricted to 1.
#[tokio::main]
async fn main() -> Result<()> {
let handlers = basic_flow_example().await;
// Create etcd client and put a key-value pair for new rule.
let client = Client::try_default().await?;
let property = "flow-k8s-example";
let namespace = "default";
let cr_name = "flowresources";
println!(
"FlowRule CRD is: \n{}",
serde_json::to_string_pretty(&flow::FlowResource::crd()).unwrap()
);
{
// Dynamically add a CRD sentinel rule.
// You can add rules by kubectl in command line.
dynamic_update(&client, property, namespace, cr_name).await?;
}
// Sleep 3 seconds and then read the change of CRD
sentinel_core::utils::sleep_for_ms(3000);
// Create a data source and change the rule.
let h = new_flow_rule_handler(rule_json_array_parser);
let mut ds = K8sDataSource::<flow::Rule, _, flow::FlowResource>::new(
client,
property.into(),
namespace.into(),
cr_name.into(),
vec![h],
);
ds.initialize().await?;
for h in handlers {
h.await.expect("Couldn't join on the associated thread");
}
Ok(())
}
async fn basic_flow_example() -> Vec<JoinHandle<()>> {
// Init sentienl configurations
sentinel_core::init_default().unwrap_or_else(|err| sentinel_core::logging::error!("{:?}", err));
let resource_name = String::from("task");
// Load sentinel rules
flow::load_rules(vec![Arc::new(flow::Rule {
resource: resource_name.clone(),
threshold: 10.0,
calculate_strategy: flow::CalculateStrategy::Direct,
control_strategy: flow::ControlStrategy::Reject,
..Default::default()
})]);
let mut handlers = Vec::new();
for _ in 0..20 {
let res_name = resource_name.clone();
handlers.push(tokio::spawn(async move {
loop {
let entry_builder = EntryBuilder::new(res_name.clone())
.with_traffic_type(base::TrafficType::Inbound);
if let Ok(entry) = entry_builder.build() {
// Passed, wrap the logic here.
task().await;
// Be sure the entry is exited finally.
entry.exit()
} else {
sleep(Duration::from_millis(100)).await;
}
}
}));
}
handlers
}
// todo: Cannot sentinel-macros now. It will append rules,
// which is conflicts with the dynamic datasource
async fn task() {
println!("{}: passed", sentinel_core::utils::curr_time_millis());
sleep(Duration::from_millis(100)).await;
}
async fn dynamic_update(
client: &Client,
manager: &str,
namespace: &str,
cr_name: &str,
) -> Result<()> {
let crds: Api<CustomResourceDefinition> = Api::all(client.clone());
println!("Before patch");
let cr_name = format!("{}.{}", cr_name, SENTINEL_RULE_GROUP);
// Apply the CRD so users can create Foo instances in Kubernetes
crds.patch(
&cr_name,
&PatchParams::apply(manager),
&Patch::Apply(flow::FlowResource::crd()),
)
.await?;
println!("After patch");
// Wait for the CRD to be ready
await_condition(crds, &cr_name, conditions::is_crd_established()).await?;
let cr = flow::FlowResource::new(
"flow-1",
flow::Rule {
resource: "task".into(),
threshold: 1.0,
..Default::default()
},
);
let flow_rule: Api<flow::FlowResource> = Api::namespaced(client.clone(), namespace);
flow_rule.create(&PostParams::default(), &cr).await?;
println!(
"Dynamically change custom resource: {} to: {:?}",
cr_name, cr
);
Ok(())
}