/
utils.py
149 lines (105 loc) · 3.26 KB
/
utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
#!/usr/bin/env python3
# SPDX-License-Identifier: Apache-2.0
# Copyright 2019 Intel Corporation
import json
import os
import socket
import struct
import sys
import iptools
from jsoncomment import JsonComment
import psutil
from pyroute2 import IPDB
def exit(code, msg):
print(msg)
sys.exit(code)
def getpid(process_name):
for proc in psutil.process_iter(attrs=["pid", "name"]):
if process_name == proc.info["name"]:
return proc.info["pid"]
def getpythonpid(process_name):
for proc in psutil.process_iter(attrs=["pid", "cmdline"]):
if len(proc.info["cmdline"]) < 2:
continue
if (
process_name in proc.info["cmdline"][1]
and "python" in proc.info["cmdline"][0]
):
return proc.info["pid"]
return
def get_json_conf(path, dump):
try:
with open(path, 'r') as f:
jsonc_data = f.read()
jc = JsonComment()
conf = jc.loads(jsonc_data)
if dump:
print(jc.dumps(conf, indent=4, sort_keys=True))
return conf
except Exception as e:
print("An unexpected error occurred:", str(e))
return None
def get_env(varname, default=None):
try:
var = os.environ[varname]
return var
except KeyError:
if default is not None:
return "{}".format(default)
else:
exit(1, "Empty env var {}".format(varname))
def ips_by_interface(name):
ipdb = IPDB()
return [ipobj[0] for ipobj in ipdb.interfaces[name]["ipaddr"].ipv4]
def atoh(ip):
return socket.inet_aton(ip)
def alias_by_interface(name):
ipdb = IPDB()
return ipdb.interfaces[name]["ifalias"]
def mac_by_interface(name):
ipdb = IPDB()
return ipdb.interfaces[name]["address"]
def mac2hex(mac):
return int(mac.replace(":", ""), 16)
def peer_by_interface(name):
ipdb = IPDB()
try:
peer_idx = ipdb.interfaces[name]["link"]
peer_name = ipdb.interfaces[peer_idx]["ifname"]
except:
raise Exception("veth interface {} does not exist".format(name))
else:
return peer_name
def aton(ip):
return socket.inet_aton(ip)
def validate_cidr(cidr):
return iptools.ipv4.validate_cidr(cidr)
def cidr2mask(cidr):
_, prefix = cidr.split("/")
return format(0xFFFFFFFF << (32 - int(prefix)), "08x")
def cidr2block(cidr):
return iptools.ipv4.cidr2block(cidr)
def ip2hex(ip):
return iptools.ipv4.ip2hex(ip)
def cidr2netmask(cidr):
network, net_bits = cidr.split("/")
host_bits = 32 - int(net_bits)
netmask = socket.inet_ntoa(struct.pack("!I", (1 << 32) - (1 << host_bits)))
return network, netmask
def ip2long(ip):
return iptools.ipv4.ip2long(ip)
def get_process_affinity():
return psutil.Process().cpu_affinity()
def set_process_affinity(pid, cpus):
try:
psutil.Process(pid).cpu_affinity(cpus)
except OSError as e:
# 22 = Invalid argument; PID has PF_NO_SETAFFINITY set
if e.errno == 22:
print(f"Failed to set affinity on process {pid} {psutil.Process(pid).name}")
else:
raise e
def set_process_affinity_all(cpus):
for pid in psutil.pids():
for thread in psutil.Process(pid).threads():
set_process_affinity(thread.id, cpus)