Skip to content
This repository has been archived by the owner on Apr 22, 2022. It is now read-only.

Commit

Permalink
del Mod
Browse files Browse the repository at this point in the history
  • Loading branch information
bazingaterry committed Jul 25, 2017
1 parent 015f53f commit 3b25065
Show file tree
Hide file tree
Showing 5 changed files with 58 additions and 155 deletions.
35 changes: 0 additions & 35 deletions Munager/MuAPI/__init__.py
Expand Up @@ -88,41 +88,6 @@ def get_users(self, key) -> dict:
ret[user.get(key)] = User(**user)
return ret

@gen.coroutine
def get_delay(self) -> list:
request = self._get_request(
path='/mu/nodes/{id}/delay'.format(id=self.node_id),
query=dict(
sample=self.delay_sample,
),
)
response = yield self.client.fetch(request)
content = response.body.decode('utf-8')
cont_json = json.loads(content, encoding='utf-8')
if cont_json.get('ret') != 1:
raise MuAPIError(cont_json)
return cont_json.get('data', [])

@gen.coroutine
def post_delay_info(self, formdata):
request = self._get_request(
path='/mu/nodes/{id}/delay_info'.format(id=self.node_id),
method='POST',
formdata=formdata,
)
result = yield self._make_fetch(request)
return result

@gen.coroutine
def post_load(self, formdata):
request = self._get_request(
path='/mu/nodes/{id}/info'.format(id=self.node_id),
method='POST',
formdata=formdata,
)
result = yield self._make_fetch(request)
return result

@gen.coroutine
def post_online_user(self, amount):
request = self._get_request(
Expand Down
25 changes: 18 additions & 7 deletions Munager/SSManager/__init__.py
Expand Up @@ -49,6 +49,12 @@ def _to_unicode(_d):
ret[k.decode('utf-8')] = v.decode('utf-8')
return ret

@staticmethod
def _fix_type(_d):
# convert type when get a unicode dict from redis
_d['cursor'] = int(_d.get('cursor', 0))
return _d

def _get_key(self, _keys):
keys = [self.config.get('redis_prefix', 'mu')]
keys.extend(_keys)
Expand All @@ -60,24 +66,27 @@ def state(self):
res = self.cli.recv(1506).decode('utf-8').replace('stat: ', '')
# change key from str to int
res_json = json.loads(res)
ret = dict()
ret_by_port, ret_by_uid = dict(), dict()
for port, throughput in res_json.items():
info = self.redis.hgetall(self._get_key(['user', str(port)]))
info = self._to_unicode(info)
info['cursor'] = int(info.get('cursor', 0))
info = self._fix_type(info)
info['throughput'] = throughput
ret[int(port)] = info
return ret
info['port'] = port
user_id = info.get('user_id')
ret_by_port[int(port)] = info
ret_by_uid[user_id] = info
return ret_by_port, ret_by_uid

def add(self, user_id, port, password, method):
def add(self, user_id, port, password, method, plugin, plugin_opts):
msg = dict(
server_port=port,
password=password,
method=method,
fast_open=self.config.get('fast_open'),
mode=self.config.get('mode'),
plugin=self.config.get('plugin'),
plugin_opts=self.config.get('plugin_opts'),
plugin=plugin,
plugin_opts=plugin_opts,
)
req = 'add: {msg}'.format(msg=json.dumps(msg))
# to bytes
Expand All @@ -88,6 +97,8 @@ def add(self, user_id, port, password, method):
pipeline.hset(self._get_key(['user', str(port)]), 'user_id', user_id)
pipeline.hset(self._get_key(['user', str(port)]), 'password', password)
pipeline.hset(self._get_key(['user', str(port)]), 'method', method)
pipeline.hset(self._get_key(['user', str(port)]), 'plugin', plugin)
pipeline.hset(self._get_key(['user', str(port)]), 'plugin_opts', plugin_opts)
pipeline.execute()
return self.cli.recv(1506) == b'ok'

Expand Down
150 changes: 40 additions & 110 deletions Munager/__init__.py
@@ -1,10 +1,8 @@
from time import time
import json

import numpy as np
import psutil
import yaml
from tornado import gen
from tornado.httpclient import AsyncHTTPClient, HTTPRequest
from tornado.httpclient import AsyncHTTPClient
from tornado.ioloop import IOLoop, PeriodicCallback

from Munager.MuAPI import MuAPI
Expand All @@ -22,7 +20,7 @@ def __init__(self, config_path):
self.logger = get_logger('Munager', self.config)

self.logger.debug('load config from {}.'.format(config_path))
self.logger.debug('config: {}'.format(self.config))
self.logger.debug('config: \n{}'.format(json.dumps(self.config, indent=2)))

# mix
self.ioloop = IOLoop.current()
Expand All @@ -32,96 +30,11 @@ def __init__(self, config_path):

self.client = AsyncHTTPClient()

@property
@gen.coroutine
def sys_status(self):
wait_time = self.config.get('diff_time', 10)
test_time = self.config.get('test_time', 10)

sent = psutil.net_io_counters().bytes_sent
recv = psutil.net_io_counters().bytes_recv
yield gen.sleep(wait_time)
current_sent = psutil.net_io_counters().bytes_sent
current_recv = psutil.net_io_counters().bytes_recv
# change in to killo bytes
sent_speed = (current_sent - sent) / wait_time * 8 / 1024
recv_speed = (current_recv - recv) / wait_time * 8 / 1024
cpu = psutil.cpu_percent()
vir = psutil.virtual_memory().percent
swp = psutil.swap_memory().percent
upload = round(sent_speed, 2)
download = round(recv_speed, 2)
uptime = time() - psutil.boot_time()

url = self.config.get('test_url')

req_para = dict(
url=url,
method='HEAD',
use_gzip=True,
)
request = HTTPRequest(**req_para)

error_counter = 0
delay = 0
for _ in range(test_time):
start = time()
try:
yield self.client.fetch(request)
except Exception as e:
self.logger.exception(e)
error_counter += 1
else:
delay += (time() - start)
# s to ms
delay = delay / (test_time - error_counter) * 1000

return dict(
cpu=cpu,
vir=vir,
swp=swp,
upload=upload,
download=download,
uptime=uptime,
delay=delay,
)

def delay_info(self, delays):
delays.remove(max(delays))
delays.remove(min(delays))

delay_min = min(delays)
delay_max = max(delays)
mean = np.mean(delays)
standard_deviation = np.std(delays)
return dict(
min=delay_min,
max=delay_max,
mean=mean,
standard_deviation=standard_deviation,
)

@gen.coroutine
def post_load(self):
# cpu, vir, swp, upload, download, sent_speed, recv_speed = self.sys_status
data = yield self.sys_status
result = yield self.mu_api.post_load(data)
if result:
self.logger.info('post system load finished.')

@gen.coroutine
def post_delay_info(self):
delays = yield self.mu_api.get_delay()
data = self.delay_info(delays)
result = yield self.mu_api.post_delay_info(data)
if result:
self.logger.info('post delay info finished.')

@gen.coroutine
def update_ss_manager(self):
# get from MuAPI and ss-manager
users = yield self.mu_api.get_users('port')
state = self.ss_manager.state
state, _ = self.ss_manager.state
self.logger.info('get MuAPI and ss-manager succeed, now begin to check ports.')
self.logger.debug('get state from ss-manager: {}.'.format(state))

Expand All @@ -140,39 +53,64 @@ def update_ss_manager(self):
port=user.port,
password=user.passwd,
method=user.method,
plugin=user.plugin,
plugin_opts=user.plugin_opts,
):
self.logger.info('add user at port: {}.'.format(user.port))

if user.available and port in state:
if user.passwd != state.get(port).get('password') or user.method != state.get(port).get('method'):
if user.passwd != state.get(port).get('password') or \
user.method != state.get(port).get('method') or \
user.plugin != state.get(port).get('plugin') or \
user.plugin_opts != state.get(port).get('plugin_opts'):
if self.ss_manager.remove(user.port) and self.ss_manager.add(
user_id=user_id,
port=user.port,
password=user.passwd,
method=user.method,
plugin=user.plugin,
plugin_opts=user.plugin_opts,
):
self.logger.info('reset port {} due to method or password changed.'.format(user.port))
# check finish
self.logger.info('check ports finished.')

@gen.coroutine
def upload_throughput(self):
state = self.ss_manager.state
port_state, user_id_state = self.ss_manager.state
online_amount = 0
for port, info in state.items():
post_data = list()
for port, info in port_state.items():
user_id = info.get('user_id')
cursor = info.get('cursor')
throughput = info.get('throughput')
if throughput < cursor:
self.logger.warning('error throughput, try fix.')
self.ss_manager.set_cursor(port, throughput)
online_amount += 1
post_data.append(dict(
id=user_id,
u=0,
d=throughput,
))
elif throughput > cursor:
online_amount += 1
dif = throughput - cursor
user_id = info.get('user_id')
result = yield self.mu_api.upload_throughput(user_id, dif)
if result:
self.ss_manager.set_cursor(port, throughput)
self.logger.info('update traffic: {} for port: {}.'.format(dif, port))
post_data.append(dict(
id=user_id,
u=0,
d=dif,
))
# upload to MuAPI
users = yield self.mu_api.upload_throughput(post_data)
for user_id, msg in users.items():
if msg == 'ok':
# user_id type is str
user = user_id_state.get(user_id)
throughput = user['throughput']
self.ss_manager.set_cursor(user['port'], throughput)
self.logger.info('update traffic for user: {}.'.format(user_id))
else:
self.logger.warning('fail to update traffic for user: {}.'.format(user_id))

# update online users count
result = yield self.mu_api.post_online_user(online_amount)
Expand All @@ -186,11 +124,6 @@ def _to_msecond(period):

def run(self):
# period task
PeriodicCallback(
callback=self.post_load,
callback_time=self._to_msecond(self.config.get('post_load_period', 60)),
io_loop=self.ioloop,
).start()
PeriodicCallback(
callback=self.update_ss_manager,
callback_time=self._to_msecond(self.config.get('update_port_period', 60)),
Expand All @@ -201,12 +134,9 @@ def run(self):
callback_time=self._to_msecond(self.config.get('upload_throughput_period', 360)),
io_loop=self.ioloop,
).start()
PeriodicCallback(
callback=self.post_delay_info,
callback_time=self._to_msecond(self.config.get('post_delay_standard_period', 1296)),
io_loop=self.ioloop,
).start()
try:
# Init task
self.ioloop.run_sync(self.update_ss_manager)
self.ioloop.start()
except KeyboardInterrupt:
del self.mu_api
Expand Down
2 changes: 0 additions & 2 deletions requirements.txt
@@ -1,6 +1,4 @@
psutil==5.2.2
PyYAML==3.12
redis==2.10.5
tornado==4.5.1
click==6.7
numpy==1.12.1
1 change: 0 additions & 1 deletion run.py
Expand Up @@ -7,7 +7,6 @@
@click.option('--config-file', default='./config/config.yml', help='Configuration file path.')
def bootstrap(config_file):
app = Munager(config_file)
app.post_delay_info()
app.run()


Expand Down

0 comments on commit 3b25065

Please sign in to comment.