In [10]:
# -*- coding:utf-8 -*-
"""
This module schedules all the tasks according to config.rules.
"""
import time
#from multiprocessing import Pool
from multiprocessing.pool import ThreadPool as Pool
import schedule

from redis_util import (
    get_redis_conn, acquire_lock,
    release_lock)

class BaseScheduler:
    def __init__(self, name, tasks, task_queues=None):
        """
        init function for schedulers.
        :param name: scheduler name, generally the value is used by the scheduler
        :param tasks: tasks in config.rules
        :param task_queues: for all jobs, the value is task_queue, while for update usable service list, it's task name
        """
        self.name = name
        self.tasks = tasks
        self.task_queues = list() if not task_queues else task_queues

    def schedule_with_delay(self):
        for task in self.tasks:
            interval = task.get('interval')
            job=task.get('job')
            schedule.every(interval).minutes.do(self.schedule_task_with_lock, task)
        while True:
            schedule.run_pending()
            time.sleep(1)

    def schedule_all_right_now(self):
        pool = Pool()
        #self.schedule_task_with_lock:需要执行的job，self.tasks：参数
        pool.map(self.schedule_task_with_lock, self.tasks)
        pool.close() # close后进程池不能在apply任务 
        pool.join()

    def get_lock(self, conn, task):
        if not task.get('enable'):
            return None
        task_queue = task.get('task_queue')
        if task_queue not in self.task_queues:
            return None

        task_name = task.get('name')
        lock_indentifier = acquire_lock(conn, task_name)
        return lock_indentifier

    def schedule_task_with_lock(self, task):
        raise NotImplementedError

In [12]:
class UpdateAliveScheduler(BaseScheduler):
    def schedule_task_with_lock(self, task):
        """Update Alive scheduler filters tasks according to task name
        since its task name stands for task type"""
        if not task.get('enable'):
            return None
        task_queue = task.get('task_queue')
        if task_queue not in self.task_queues:
            return None

        conn = get_redis_conn()
        interval = task.get('interval')
        task_name = task.get('name')
        resource_queue = task.get('resource')
        lock_indentifier = acquire_lock(conn, task_name)
        if not lock_indentifier:
            return False
        pipe = conn.pipeline(True)
        try:
            now = int(time.time())
            pipe.hget(TIMER_RECORDER, task_name)
            pipe.zrevrangebyscore(resource_queue, '+inf', '-inf')
            r, proxies = pipe.execute()
            if not r or (now - int(r.decode('utf-8'))) >= interval * 60:
                if not proxies:
                    # scheduler_logger.warning('fetched no proxies from task {}'.format(task_name))
                    print('fetched no proxies from task {}'.format(task_name))
                    return None

                pipe.sadd(task_queue, *proxies)
                pipe.hset(TIMER_RECORDER, task_name, now)
                pipe.execute()
                # scheduler_logger.info('validator task {} has been stored into redis successfully'.format(task_name))
                return True
            else:
                return None
        finally:
            release_lock(conn, task_name, lock_indentifier)

            
TEMP_HTTP_QUEUE = 'UpdateAliveList:http:temp'
VALIDATED_HTTP_QUEUE = 'UpdateAliveList:validated:http'
UPDATE_TASKS = [
    {
        'name': 'UpdateAliveList',
        'task_queue': TEMP_HTTP_QUEUE,
        'resource': VALIDATED_HTTP_QUEUE,
        'interval': 0.01,  # 20 minutes
        'enable': 1,
    },
]

SchedulerCls = UpdateAliveScheduler
default_tasks=UPDATE_TASKS
scheduler = SchedulerCls('UpdateAliveList',default_tasks)


scheduler.schedule_all_right_now()
scheduler.schedule_with_delay()

UpdateAliveList:http:temp
UpdateAliveList:http:temp
UpdateAliveList:http:temp
UpdateAliveList:http:temp
UpdateAliveList:http:temp
UpdateAliveList:http:temp
UpdateAliveList:http:temp


KeyboardInterrupt: 

In [5]:
from settings import (
    REDIS_HOST, REDIS_PORT, DEFAULT_REDIS_DB,
    REDIS_PASSWORD, LOCKER_PREFIX,TIMER_RECORDER)
conn = get_redis_conn()
import datetime
DATETIME_FORMAT = '%m/%d %H:%M'


def format_now():
    return datetime.datetime.now().strftime(DATETIME_FORMAT)

pipe = conn.pipeline(True)
task_name='task_name'
 
pipe.hset(TIMER_RECORDER, task_name, format_now())

StrictPipeline<ConnectionPool<Connection<host=localhost,port=6379,db=0>>>

In [7]:
print pipe.hget(TIMER_RECORDER,task_name)

StrictPipeline<ConnectionPool<Connection<host=localhost,port=6379,db=0>>>


In [8]:
r, proxies = pipe.execute()

ValueError: need more than 1 value to unpack

In [3]:
TEMP_HTTP_QUEUE = 'UpdateAliveList:http:temp'
VALIDATED_HTTP_QUEUE = 'UpdateAliveList:validated:http'
UPDATE_TASKS = [
    {
        'name': 'UpdateAliveList',
        'model_type': 'AI',
        'model_info_key': 'model_type_to_model_to_worker_list',
        'interval': 0.01,  # 20 minutes
        'enable': 1,
    },
]
from redis_util import (
    get_redis_conn, acquire_lock,
    release_lock)
conn = get_redis_conn()
task=UPDATE_TASKS[0]
interval = task.get('interval')
task_name = task.get('name')
resource_queue = task.get('resource')
lock_indentifier = acquire_lock(conn, task_name)

In [4]:
lock_indentifier

'c81f05b2-76a6-4dd4-8af8-363522f60dd7'

In [1]:
# -*- coding:utf-8 -*-
"""
This module schedules all the tasks according to main func.
"""
import time
#from multiprocessing import Pool
from multiprocessing.pool import ThreadPool as Pool
import schedule
from Heartbeat import Heartbeat4LB

from redis_utils import (
    get_redis_conn,dict_to_redis_hset)


class BaseScheduler:
    def __init__(self, name, tasks, task_queues=None):
        """
        init function for schedulers.
        :param name: scheduler name, generally the value is used by the scheduler
        :param tasks: tasks in config.rules
        :param task_queues: for all jobs, the value is task_queue, while for update usable service list, it's task name
        """
        self.name = name
        self.tasks = tasks
        self.task_queues = list() if not task_queues else task_queues

    def schedule_with_delay(self):
        for task in self.tasks:
            interval = task.get('interval')
            job=task.get('job')
            schedule.every(interval).minutes.do(self.main_func, task)
        while True:
            schedule.run_pending()
            time.sleep(1)

    def schedule_all_right_now(self):
        pool = Pool()
        #self.schedule_task_with_lock:需要执行的job，self.tasks：参数
        pool.map(self.main_func, self.tasks)
        pool.close() # close后进程池不能在apply任务 
        pool.join()

    def get_lock(self, conn, task):
        if not task.get('enable'):
            return None
        task_queue = task.get('task_queue')
        if task_queue not in self.task_queues:
            return None

        task_name = task.get('name')
        lock_indentifier = acquire_lock(conn, task_name)
        return lock_indentifier

    def main_func(self, task):
        raise NotImplementedError
class UpdateAliveScheduler(BaseScheduler):
    def main_func(self, task):
        """Update Alive scheduler filters tasks according to task name
        since its task name stands for task type"""
        if not task.get('enable'):
            return None

        task_queue = task.get('task_queue')
        if task_queue not in self.task_queues:
            return None
        model_type = task.get('model_type')
        key=task.get('model_info_key')
        interval = task.get('interval')
        
        if task_queue not in self.task_queues:
            return None
        conn = get_redis_conn()
        
        worklist_db=redis_hget(conn,key,model_type)
        heartbeat = Heartbeat4LB()
        new_worker_list_dict={}
        new_list_dict={}
        for model_id,worklist in worklist_db.items():
            new_list=[]
        
            for worker_id in worklist:
                per_worker=worklist_db[worker_id]
                worker_status=heartbeat.run({'host':per_worker['host'],'port':per_worker['port'],'worker_id':worker_id,'model_id':model_id,'test_type':'tcp'})
                if worker_status=='pass':
                    new_list.append(worker_id)
            print model_id,new_list
            new_list_dict[model_id]=new_list
        print new_list_dict
        if 'AI' not in new_worker_list_dict:
            new_worker_list_dict['AI']={}
            new_worker_list_dict['AI']=json.dumps(new_list_dict)
        dict_to_redis_hset(conn, key, new_worker_list_dict)    
UPDATE_TASKS = [
    {
        'name': 'UpdateAliveList',
        'model_type': 'AI',
        'model_info_key': 'model_type_to_model_to_worker_list',
        'interval': 0.01,  # 20 minutes
        'enable': 1,
        'task_queue':None,
    },
]
SchedulerCls = UpdateAliveScheduler
default_tasks=UPDATE_TASKS
scheduler = SchedulerCls('UpdateAliveList',default_tasks)


#scheduler.schedule_all_right_now()
scheduler.schedule_with_delay()

KeyboardInterrupt: 

In [17]:
default_tasks.get('name')

'UpdateAliveList'

True

In [6]:
from Heartbeat import Heartbeat4LB,dict_to_redis_hset
import json
import redis
r = redis.StrictRedis(host='localhost')
conn = get_redis_conn()
def worker_list_isAlive():
    worklist_db=json.loads(r.hgetall('model_type_to_model_to_worker_list')['AI'])
    heartbeat = Heartbeat4LB()
    new_worker_list_dict={}
    new_list_dict={}
    for model_id,worklist in worklist_db.items():
        new_list=[]
        
        for worker_id in worklist:
            per_worker=json.loads(r.hgetall('model_type_to_worker_id_to_worker')['AI'])[worker_id]
            worker_status=heartbeat.run({'host':per_worker['host'],'port':per_worker['port'],'worker_id':worker_id,'model_id':model_id,'test_type':'tcp'})
            if worker_status=='pass':
                new_list.append(worker_id)
        print model_id,new_list
        new_list_dict[model_id]=new_list
    print new_list_dict
    if 'AI' not in new_worker_list_dict:
        new_worker_list_dict['AI']={}
        new_worker_list_dict['AI']=json.dumps(new_list_dict)
    dict_to_redis_hset(r, 'model_type_to_model_to_worker_list', new_worker_list_dict)         
worker_list_isAlive()

localhost:8002 OK
localhost:8002 OK
localhost:8003 OK
fib [u'localhost-8002-3', u'localhost-8003-1']
localhost:8002 OK
localhost:8003 OK
localhost:8001 OK
fib2 [u'localhost-8001-2']
{u'fib': [u'localhost-8002-3', u'localhost-8003-1'], u'fib2': [u'localhost-8001-2']}


In [1]:
# -*- coding:utf-8 -*-
"""
This module schedules all the tasks according to main func.
"""
import time
#from multiprocessing import Pool
from multiprocessing.pool import ThreadPool as Pool
import schedule
from Heartbeat import Heartbeat4LB

from redis_utils import (
    get_redis_conn,dict_to_redis_hset,redis_hget)


class BaseScheduler:
    def __init__(self, name, tasks, task_queues=None):
        """
        init function for schedulers.
        :param name: scheduler name, generally the value is used by the scheduler
        :param tasks: tasks in config.rules
        :param task_queues: for all jobs, the value is task_queue, while for update usable service list, it's task name
        """
        self.name = name
        self.tasks = tasks
        self.task_queues = list() if not task_queues else task_queues

    def schedule_with_delay(self):
        for task in self.tasks:
            interval = task.get('interval')
            job=task.get('job')
            #print task
            schedule.every(interval).minutes.do(self.main_func,task)
        while True:
            schedule.run_pending()
            time.sleep(1)

    def schedule_all_right_now(self):
        pool = Pool()
        #self.schedule_task_with_lock:需要执行的job，self.tasks：参数
        pool.map(self.main_func, self.tasks)
        pool.close() # close后进程池不能在apply任务 
        pool.join()

    def get_lock(self, conn, task):
        if not task.get('enable'):
            return None
        task_queue = task.get('task_queue')
        if task_queue not in self.task_queues:
            return None

        task_name = task.get('name')
        lock_indentifier = acquire_lock(conn, task_name)
        return lock_indentifier

    def main_func(self, task):
        raise NotImplementedError
class UpdateAliveScheduler(BaseScheduler):
    def main_func(self, task):
        """Update Alive scheduler filters tasks according to task name
        since its task name stands for task type"""
        print('task',task)
        
        task_queue = task.get('task_queue')
        print('task.get',task.get('enable'))
        #if task_queue not in self.task_queues:
        #    return None
        model_type = task.get('model_type')
        key=task.get('model_info_key')
        interval = task.get('interval')
        worker_info=task.get('worker_info')
        
        #if task_queue not in self.task_queues:
        #    return None
        conn = get_redis_conn()
        
        worklist_db=redis_hget(conn,key,model_type)
        print('worklist_db',worklist_db,key)
        heartbeat = Heartbeat4LB()
        new_worker_list_dict={}
        new_list_dict={}
        for model_id,worklist in worklist_db.items():
            new_list=[]
        
            for worker_id in worklist:
                per_worker=redis_hget(conn,worker_info,model_type)[worker_id]
                worker_status=heartbeat.run({'host':per_worker['host'],'port':per_worker['port'],'worker_id':worker_id,'model_id':model_id,'test_type':'tcp'})
                if worker_status=='pass':
                    new_list.append(worker_id)
            print model_id,new_list
            new_list_dict[model_id]=new_list
        print new_list_dict
        if 'AI' not in new_worker_list_dict:
            new_worker_list_dict['AI']={}
            new_worker_list_dict['AI']=json.dumps(new_list_dict)
        dict_to_redis_hset(conn, key, new_worker_list_dict)    
UPDATE_TASKS = [
    {
        'name': 'UpdateAliveList',
        'model_type': 'AI',
        'model_info_key': 'model_type_to_model_to_worker_list',
        'worker_info':'model_type_to_worker_id_to_worker',
        'interval': 20,  # 20 minutes
        'enable': 1,
        'task_queue':None,
    },
]
SchedulerCls = UpdateAliveScheduler
default_tasks=UPDATE_TASKS
scheduler = SchedulerCls('UpdateAliveList',default_tasks)


#scheduler.schedule_all_right_now()
scheduler.schedule_with_delay()

KeyboardInterrupt: 

In [4]:
%%file schedule_HeartBeat.py
# -*- coding:utf-8 -*-
"""
This module schedules all the tasks according to main func.
"""
import time
#from multiprocessing import Pool
from multiprocessing.pool import ThreadPool as Pool
import schedule
from Heartbeat import Heartbeat4LB
import json
from redis_utils import (
    get_redis_conn,dict_to_redis_hset,redis_hget)


class BaseScheduler:
    def __init__(self, name, tasks, task_queues=None):
        """
        init function for schedulers.
        :param name: scheduler name, generally the value is used by the scheduler
        :param tasks: tasks in config.rules
        :param task_queues: for all jobs, the value is task_queue, while for update usable service list, it's task name
        """
        self.name = name
        self.tasks = tasks
        self.task_queues = list() if not task_queues else task_queues

    def schedule_with_delay(self):
        for task in self.tasks:
            interval = task.get('interval')
            job=task.get('job')
            #print task
            schedule.every(interval).minutes.do(self.main_func,task)
        while True:
            schedule.run_pending()
            time.sleep(1)

    def schedule_all_right_now(self):
        pool = Pool()
        #self.schedule_task_with_lock:需要执行的job，self.tasks：参数
        pool.map(self.main_func, self.tasks)
        pool.close() # close后进程池不能在apply任务 
        pool.join()

    def get_lock(self, conn, task):
        if not task.get('enable'):
            return None
        task_queue = task.get('task_queue')
        if task_queue not in self.task_queues:
            return None

        task_name = task.get('name')
        lock_indentifier = acquire_lock(conn, task_name)
        return lock_indentifier

    def main_func(self, task):
        """Update Alive scheduler filters tasks according to task name
        since its task name stands for task type"""
        print('task',task)
        
        task_queue = task.get('task_queue')
        print('task.get',task.get('enable'))
        #if task_queue not in self.task_queues:
        #    return None
        model_type = task.get('model_type')
        key=task.get('model_info_key')
        interval = task.get('interval')
        worker_info=task.get('worker_info')
        
        #if task_queue not in self.task_queues:
        #    return None
        conn = get_redis_conn()
        
        worklist_db=redis_hget(conn,key,model_type)
        print('worklist_db',worklist_db,key)
        heartbeat = Heartbeat4LB()
        new_worker_list_dict={}
        new_list_dict={}
        for model_id,worklist in worklist_db.items():
            new_list=[]
        
            for worker_id in worklist:
                per_worker=redis_hget(conn,worker_info,model_type)[worker_id]
                worker_status=heartbeat.run({'host':per_worker['host'],'port':per_worker['port'],'worker_id':worker_id,'model_id':model_id,'test_type':'tcp'})
                if worker_status=='pass':
                    new_list.append(worker_id)
            new_list_dict[model_id]=new_list
        if 'AI' not in new_worker_list_dict:
            new_worker_list_dict['AI']={}
            new_worker_list_dict['AI']=json.dumps(new_list_dict)
        dict_to_redis_hset(conn, key, new_worker_list_dict)    

UPDATE_TASKS = [
    {
        'name': 'UpdateAliveList',
        'model_type': 'AI',
        'model_info_key': 'model_type_to_model_to_worker_list',
        'worker_info':'model_type_to_worker_id_to_worker',
        'interval': 0.01,  # 20 minutes
        'enable': 1,
        'task_queue':None,
    },
]
SchedulerCls = BaseScheduler
default_tasks=UPDATE_TASKS
scheduler = SchedulerCls('UpdateAliveList',default_tasks)


#scheduler.schedule_all_right_now()
scheduler.schedule_with_delay()

('task', {'enable': 1, 'name': 'UpdateAliveList', 'worker_info': 'model_type_to_worker_id_to_worker', 'interval': 0.01, 'model_info_key': 'model_type_to_model_to_worker_list', 'model_type': 'AI', 'task_queue': None})
('task.get', 1)
('worklist_db', {u'fib': [u'localhost-8002-3', u'localhost-8003-1'], u'fib2': [u'localhost-8001-2']}, 'model_type_to_model_to_worker_list')
localhost:8002 OK
localhost:8002 OK
localhost:8003 OK
fib [u'localhost-8002-3', u'localhost-8003-1']
localhost:8002 OK
localhost:8003 OK
localhost:8001 OK
fib2 [u'localhost-8001-2']
{u'fib': [u'localhost-8002-3', u'localhost-8003-1'], u'fib2': [u'localhost-8001-2']}
('task', {'enable': 1, 'name': 'UpdateAliveList', 'worker_info': 'model_type_to_worker_id_to_worker', 'interval': 0.01, 'model_info_key': 'model_type_to_model_to_worker_list', 'model_type': 'AI', 'task_queue': None})
('task.get', 1)
('worklist_db', {u'fib': [u'localhost-8002-3', u'localhost-8003-1'], u'fib2': [u'localhost-8001-2']}, 'model_type_to_model_to_w

KeyboardInterrupt: 