In [14]:
%%file worker_load_balancer.py
# -*- coding:utf-8 -*-
# Copyright 2017-2018, the original author or authors.

# Licensed under the Apache License, Version 2.0 (the "License");
# You may not use this file except in compliance
# with the License.
# You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
import logging
import random
import json
import yaml

from commons.src.config import config_loader
from commons.src.load_balancer.worker_info import WorkerInfo
from random import randint
from commons.src.utils.redis_persistent_dict import RedisDict

import requests
import redis
import ast
import datetime

DATETIME_FORMAT = '%y/%m/%d %H:%M'
def format_now():
    return datetime.datetime.now().strftime(DATETIME_FORMAT)

class AIinfoLoadError(Exception):
    pass

def worklist2worker(adict):
    """
    for a key:[value(s)] dict, return value:[key(s)],
    e.g. dict[doc] = [terms] --> dict[term] = [docs]
    """
    inv_dict = {}
    out_dict = {}
    tmp_dict={}
    [inv_dict.setdefault(v, []).append(k) for k, vlist in adict.items() for v in vlist]
    for k,v in inv_dict.items():
        tmp={}
        host=k.split('-')[0]
        port= k.split('-')[1]
        local_id=k.split('-')[2]
        tmp['local_worker_id']=int(local_id)
        tmp['host']=str(host)
        tmp['port']=int(port)
        tmp['global_worker_id']=str(k)
        tmp_dict[str(k)]=tmp
    if 'AI' not in out_dict:
         out_dict['AI']= json.dumps(tmp_dict)  
    return out_dict

class WorkerLoadBalancer:
    """
    A generic worker load balancer which helps in choosing a worker randomly.
    """

    def __init__(self):
        """
        Constructor
        Returns: None

        """
        #: dict model type to model to worker list map.
        self.model_type_to_model_to_worker_list = RedisDict(persistence=redis.StrictRedis(), 
                                                            key='model_type_to_model_to_worker_list3')
        #: dict model type to worker id to worker info map
        self.model_type_to_worker_id_to_worker = RedisDict(persistence=redis.StrictRedis(), 
                                                           key='model_type_to_worker_id_to_worker3')

        #: dict Model type to model to workers map
        self.model_type_to_model_to_workers_map =RedisDict(persistence=redis.StrictRedis(), 
                                                           key='model_type_to_model_to_workers_map3')
        self.model_id_request_count=RedisDict(persistence=redis.StrictRedis(), 
                                              key='model_id_request_count3')
        self.worker_request_count=RedisDict(persistence=redis.StrictRedis(), 
                                            key='worker_request_count3')
        # Logger instance
        self.logger = logging.getLogger(__name__)

    def update_workers_list(self, model_type_to_model_to_workers_map):
        """
        Updates model type to model to workers list map
        param model_type_to_model_to_workers_map:
        Input
            - 模型类型
            - 模型ID
            - 服务信息(List)：host/port/local_worker_id 
            - {'Recom':{'CTR':[{'host':127.0.0.1,port:90,'local_worker_id': 2},
            {'host': 'localhost', 'port': 9019, 'local_worker_id': 2,'model_id':'fib'}]}}
        Return:
        """
        if model_type_to_model_to_workers_map:
            """新增模型信息，model_type：推荐系统、搜索等"""
            for model_type in model_type_to_model_to_workers_map.keys():
                """判断model_type在workdict和list中是否存在"""
                if model_type not in self.model_type_to_worker_id_to_worker:
                    self.model_type_to_worker_id_to_worker[model_type]={}
                if model_type not in self.model_type_to_model_to_worker_list:
                    self.model_type_to_model_to_worker_list[model_type]={}
            
            
                """model_id：区分具体模型集群"""
                for model_id in model_type_to_model_to_workers_map[model_type].keys():
                    """判断model_id在list中是否存在"""
                    if model_id not in self.model_type_to_model_to_worker_list[model_type]:
                        self.model_type_to_model_to_worker_list[model_type][model_id]=[]   
                        
                    """遍历输入信息取出服务信息list"""
                    for worker in model_type_to_model_to_workers_map[model_type][model_id]:
                        worker_info = WorkerInfo(worker["host"], worker["port"], worker["local_worker_id"])
                        worker_id=worker_info.global_worker_id
                        
                        if worker_id not in self.model_type_to_worker_id_to_worker[model_type]:
                            self.model_type_to_worker_id_to_worker[model_type][worker_id]=[]
                        """解析服务信息至dict"""
                        self.model_type_to_worker_id_to_worker[model_type][worker_id].append(worker_info)
                        """解析global-id信息至list"""
                        if worker_info.global_worker_id not in self.model_type_to_model_to_worker_list[model_type][model_id]:
                            self.model_type_to_model_to_worker_list[model_type][model_id].append(worker_info.global_worker_id)

    def remove_workers(self, model_type, model_id, workers):
        """
         Used to remove workers to a model
         :param model_type
         :param model_id:
         :param workers:
         :return: None
         """
        
        for worker in workers:
            self.model_type_to_worker_id_to_worker[model_type].pop(worker.global_worker_id, None)

        if model_id in self.model_type_to_model_to_worker_list[model_type].keys():
            for worker in workers:
                if worker.global_worker_id in self.model_type_to_model_to_worker_list[model_type][model_id]:
                    self.model_type_to_model_to_worker_list[model_type][model_id].remove(worker.global_worker_id)
                    if not self.model_type_to_model_to_worker_list[model_type][model_id]:
                        self.model_type_to_model_to_worker_list[model_type].pop(model_id, None)
    def isalive(self,model_type,worker_id_list):
        worker_id = random.choice(worker_id_list)    
        while True:   
            try: 
                random_worker=json.loads(r.hgetall('model_type_to_worker_id_to_worker')[model_type])[worker_id]
                host_port='http://%s:%d' % (random_worker['host'],random_worker['port'])
                resp = self.session.request('get', url=host_port, params=None, json=None, timeout=3)
                return worker_id
            except requests.exceptions.RequestException:
                worker_id = random.choice(worker_id_list)
                random_worker=json.loads(r.hgetall('model_type_to_worker_id_to_worker')[model_type])[worker_id]
                host_port='http://%s:%d' % (random_worker['host'],random_worker['port'])



    def choose_worker(self, model_type, model_id):
        """
        Picks a worker(random) from available workers
        Input
            - :param model_type
            - :param model_id:
        Return: 
            - a randomly chosen worker
        """
        request_time=format_now()
        if self.model_type_to_model_to_worker_list.get(model_type):
            if model_id in self.model_type_to_model_to_worker_list[model_type].keys():
                worker_id_list = self.model_type_to_model_to_worker_list[model_type][model_id]
                worker_id=random.choice(worker_id_list)
                """count requests by leepand"""
                if model_id in self.model_id_request_count:
                    if worker_id in self.model_id_request_count[model_id]:
                
                        self.model_id_request_count[model_id][worker_id]['count']+=1
                        self.model_id_request_count[model_id][worker_id]['request_time']=format_now()
                        
                    else:
                        self.model_id_request_count[model_id][worker_id]={}
                        self.model_id_request_count[model_id][worker_id]['count']=1
                        self.model_id_request_count[model_id][worker_id]['request_time']=format_now()
                else:
                    self.model_id_request_count[model_id]={}
                    if worker_id not in self.model_id_request_count[model_id]:
                        self.model_id_request_count[model_id][worker_id]={}
                    self.model_id_request_count[model_id][worker_id]['count']=1
                    self.model_id_request_count[model_id][worker_id]['request_time']=format_now()
                """end compute count by leepand"""
                
                
                return  self.model_type_to_worker_id_to_worker[model_type][worker_id]
            else:
                raise Exception("No worker available for the given model! ")
        else:
            raise Exception("No worker available for the given model type! ")

    def get_all_workers(self, model_type):
        """
        Returns all the workers available for the given model type
        Args:
            model_type: Type of the model

        Returns: all the workers available for the given model type

        """
        #return self.model_type_to_worker_id_to_worker.get(model_type)
        try:
            return self.model_type_to_worker_id_to_worker.get(model_type)
        except:
            return {'success':False,'message':'The specified model/version doesn\'t exist!'}

    def get_model_to_workers_list(self, model_type):
        """
        Returns the list of workers for a given model_id
        Args:
            model_type: Type of the model
        Returns: List of workers for the given model type

        """
        return self.model_type_to_model_to_worker_list.get(model_type)


    def get_worker_info(self, worker_id, model_type):
        """
        Returns worker information of a given worker
        Args:
            worker_id: Global worker id
            model_type: Type of the model

        Returns: Worker information of the given worker

        """
        if self.model_type_to_worker_id_to_worker.get(model_type):
            return self.model_type_to_worker_id_to_worker.get(model_type).get(worker_id)
        else:
            return None

    def check_if_model_to_workers_map_is_empty(self):
        """
        Checks if model to workers map is empty
        Returns: False if model to workers map is empty else True

        """
        #return False if self.model_type_to_model_to_worker_list else True
        return False if r.hgetall('model_type_to_model_to_worker_list') else True
    





Overwriting worker_load_balancer.py


In [7]:
import datetime

DATETIME_FORMAT = '%y/%m/%d %H:%M'
def format_now():
    return datetime.datetime.now().strftime(DATETIME_FORMAT)
format_now()

'1818/05/30 14:47'