In [17]:
import platform
import requests
import subprocess
import os
import time
import json
import psutil
import platform
import shutil
import pynvml
import multiprocessing
import yaml
import csv


In [18]:
def get_os_info() ->dict:

    device = get_gpu_list()
    try:
        print("采集系统信息")
        info = {
                "platform":platform.platform(),
                "system":platform.system(),
                "python_version":platform.python_version(),
                "architecture":platform.architecture()[0],
                "processor":platform.processor(),
                "uname":str(platform.uname()),
                "cpu_logical_count":psutil.cpu_count(),
                "cpu_count": psutil.cpu_count(logical=False),
                "total_memory": psutil.virtual_memory().total /100000,
                "active_memory": psutil.virtual_memory().active /100000,
                "available_memory": psutil.virtual_memory().available /100000,
                "total_swap_memory":psutil.swap_memory().total /100000,
                "nvidia_gpu_info":str(device)
        }
    except:
        raise BaseException("系统信息采集失败")

    return info

def watch_cpu(main_pid:int,path:str)->None:
    try:
        os.makedirs(path,mode=0o777,exist_ok=True)
        sleep_time = 5
        i =0
        count = 0
        running = is_process_running(main_pid=main_pid)
        while running:
            with open(f"{path}/cpu-{count}.log","a") as f:
                while True:
    
                    running = is_process_running(main_pid=main_pid)
                    cpu_percent = psutil.Process(pid=main_pid).cpu_percent()
                    memory = psutil.Process(pid=main_pid).memory_info().rss
                    f.write(str({"time":time.strftime('%Y-%m-%d %X', time.localtime()),"cpu_percent":cpu_percent,"memory":memory})+"\n")
                    f.flush()
                    time.sleep(sleep_time)
                    i+=1
                    if i ==(1800/sleep_time):
                        i =0
                        break
            count +=1
            continue
    except:
        raise BaseException("cpu状态监控进程启动失败")
    return
    
def save_dict_to_json(dict_value:dict , save_path:str) ->None:
    with open(save_path, 'w') as file:
        file.write(json.dumps(dict_value, indent=2))
        file.flush()
    return

def save_dict_to_yaml(dict_value: dict, save_path: str):
    with open(save_path, 'w') as file:
        file.write(yaml.dump(dict_value, allow_unicode=True))
        file.flush()
    return

def read_yaml_to_dict(yaml_path: str):
    with open(yaml_path) as file:
        dict_value = yaml.load(file.read(), Loader=yaml.FullLoader)
        return dict_value
    
def save_list_to_csv(data_list:list, output_file:str)->None:
    headers = set()
    for item in data_list:
        headers.update(item.keys())

    with open(output_file, 'w', newline='') as csv_file:
        writer = csv.writer(csv_file)
        
        writer.writerow(headers)
        
        for item in data_list:
            row = [item.get(key, '') for key in headers]
            writer.writerow(row)
    return

def watch_gpu(main_pid:int,path:str)->None:
    
    try:
        os.makedirs(path,mode=0o777,exist_ok=True)
        pynvml.nvmlInit()
        sleep_time =5
        device_count = pynvml.nvmlDeviceGetCount()
        running = is_process_running(main_pid=main_pid)
        i =0
        count = 0
        while running:
            with open(f"{path}/gpu-{count}.log","w") as f:
                running = is_process_running(main_pid=main_pid)
                while running:
                    running = is_process_running(main_pid=main_pid)
                    device_status =[]

                    for i in range(device_count):
                        handle = pynvml.nvmlDeviceGetHandleByIndex(i)
                        gpu_percent = pynvml.nvmlDeviceGetUtilizationRates(handle)
                        gpu_memory = pynvml.nvmlDeviceGetMemoryInfo(handle)
                        status = {"time":time.strftime('%Y-%m-%d %X', time.localtime()),"gpu_percent":gpu_percent.gpu,"gpu_memory":gpu_memory.used}
                        device_status.append(status)
                    f.write(str(device_status)+"\n")
                    f.flush()
                    time.sleep(sleep_time)
                    i+=1
                    if i == 1800/sleep_time:
                        i = 0
                        break
                count+=1
                continue
    except:
        raise BaseException("gpu监控进程启动失败")

def is_process_running(main_pid:int) ->bool:
    try:
        ps = psutil.Process(pid=main_pid)
        return ps.is_running
    except:
        return False
    
def save_conda_info(path:str) ->bool:
    try:
        result = subprocess.run(['conda', 'list'], capture_output=True, text=True)
        output = result.stdout
        with open(f"{path}/conda.info","a") as file:
                file.write(output)
                file.flush()
        return True
    except:
        return False

def get_gpu_list() ->list:
    device_list =[]
    try:
        print("获取Nvidia显卡信息 \n")
        pynvml.nvmlInit()
        device_count=pynvml.nvmlDeviceGetCount()

        for i in range(device_count):
            handle = pynvml.nvmlDeviceGetHandleByIndex(i)
            device_list.append(str(pynvml.nvmlDeviceGetName(handle)))

    except:
        print("未获取到Nvidia显卡信息 \n")

def has_multiple_keys(dictionary:dict, *keys):
    return set(keys).issubset(dictionary.keys())



def get_init_trainning_status()->dict:
    return {"epoch":[]}

def get_process_pid():
    return os.getpid()


In [19]:
class Logger():
    def __init__(self,config:dict,host="127.0.0.1") -> None:

        if not has_multiple_keys(config, 'access_token', 'project',"experiment","discription"):
            raise BaseException("缺失启动信息")
        
        self.__config = config
        print("验证代理客户端状态 \n")
        self.__verify_my_client()
        self.__save_config()

        self.__pid = get_process_pid()
        self.__epoch =0

        self.__api_load_save_path = f"http://{host}:5560/ml_client/client/loadSavePath"
        self.__api_notice_experiment = f"http://{host}:5560/ml_client/client/noticeExperiment"
        self.__api_notice_run = f"http://{host}:5560/ml_client/client/noticeRun"
        

        self.__trainning_status  = get_init_trainning_status()
        self.__watcher()

        
    def __verify_my_client(self) ->None:
        try:
            # resp  = requests.get(utl=self.__loadSavePath)
            print("开发阶段跳过校验 \n")
        except:
            raise BaseException("用户校验失败，请检查客户端是否启动\n")
        else:
            print("代理客户端验证通过 \n")
            self.__login = True
            self.__location = f".{self.__config['access_token']}/{self.__config['project']}/{self.__config['experiment']}"
            self.__savedir = "code"
            self.__codedir = "src"
            self.__srcignore = "datasets"
            i = 0
            self.__uid = f"run-{i}"
            
            while os.path.exists(f"{self.__location}/run-{i}"):
                i+=1
                self.__uid = f"run-{i}"

        os.makedirs(f"{self.__location}/{self.__uid}",mode=0o777,exist_ok=True)
        return

    def __watcher(self) ->None:
        cpu_dir =f"{self.__location}/{self.__uid}/watcher/cpu"
        self.__watcher_cpu = multiprocessing.Process(target=watch_cpu ,daemon=True, args=(self.__pid,cpu_dir))
        self.__watcher_cpu.start()

        gpu_dir =f"{self.__location}/{self.__uid}/watcher/gpu"
        self.__watcher_gpu = multiprocessing.Process(target=watch_gpu,daemon=True, args=(self.__pid,gpu_dir))
        self.__watcher_gpu.start()

        return

    def Start(self,info:dict) ->None:

        if not save_conda_info(self.__location):
            print("未采集到conda信息")

        try:
            now = time.strftime("%Y-%m-%d %X", time.localtime())
            print(f"运行开始时间：{now} \n")
            self.__trainning_status["start_at"] = now

            self.__osinfo = get_os_info()
            
            os_info_json_path =f"{self.__location}/os_info.json"
            os_info_yaml_path =f"{self.__location}/os_info.yaml"

            save_dict_to_json(self.__osinfo,os_info_json_path)
            save_dict_to_yaml(self.__osinfo,os_info_yaml_path)
            
            super_arg_json_path = self.__location+"/"+self.__uid+"/super_arg.json"
            super_arg_yaml_path = self.__location+"/"+self.__uid+"/super_arg.yaml"

            save_dict_to_json(info,super_arg_json_path)
            save_dict_to_yaml(info,super_arg_yaml_path)
            
        except:
            raise BaseException("日志实例启动失败\n")

        else:
            pass
        return
        
        
    def SaveFile(self,path_list:list[str]) ->None:

        for path in path_list:
            if os.path.exists(path):
                print(path)
        return
    
        
    def EpochInit(self) ->None:

        #通知客户端开始
        
        return

    def EpochLog(self,info:dict) ->None:
        
        if self.__epoch == 0:
            pass
        else:
            self.__epoch+=1

        try:
            this_epoch = info
            # this_epoch["start"] = self.__epoch_at
            # this_epoch["end"] = time.strftime('%Y-%m-%d %X', time.localtime())
            self.__trainning_status["epoch"].append(this_epoch)
            
            result_path =f"{self.__location}/{self.__uid}/results.json"
    
            save_dict_to_json(self.__trainning_status["epoch"],result_path)
                
            # data =get_os_info()

            # with open(f"{self.__location}/{self.__uid}/os_info.json","w") as osfile:
            #     osfile.write(str(json.dumps(data,indent=2)))
            #     osfile.flush()

            #通知客户端结束
        except:
            raise BaseException("Epoch日志采集失败")
        return

    def End(self) ->None:
        now = time.strftime("%Y-%m-%d %X", time.localtime())
        print(f"运行结束时间：{now}\n")
        self.__trainning_status["end_at"] = now
        with open(f"{self.__location}/{self.__uid}"+"/finish.tag",mode="a") as f:
            f.write(f"{now} | {self.__uid} \n")
        self.__save_code()
        self.__watcher_cpu.kill()
        self.__watcher_gpu.kill()
        
        result_csv_path = f"{self.__location}/{self.__uid}/result.csv"
        save_list_to_csv(self.__trainning_status["epoch"],result_csv_path)
        return
    
    def __save_config(self) ->None:
        try:
            config_path_json = self.__location +"/"+"config.json"
            save_dict_to_json(self.__config,config_path_json)
            config_path_yaml = self.__location +"/"+"config.yaml"
            save_dict_to_yaml(self.__config,config_path_yaml)
        except:
            raise BaseException("保存配置信息失败 \n")
        return

    def __save_code(self,path="datasets")->None:
        try:
            shutil.copytree(src=self.__codedir,dst=self.__savedir,dirs_exist_ok=True,ignore=shutil.ignore_patterns(path))
        except:
            raise BaseException("备份代码失败 \n")
        else:
            return

    #废弃方法，请从Start()接口输入超参数    
    def SuperArg(self,info:dict)->None:
        path =self.__location+"/"+self.__uid+"/super_arg.json"
        with open(path,mode="w") as f:
            f.write(f"{json.dumps(info)}")
        return

    #ShowStatus ：仅开发过程使用
    def ShowStatus(self) -> str:
        return json.dumps(self.__trainning_status["epoch"],indent=2)

In [20]:


# 创建实例
# 1.实例创建时请求本地客户端 （等客户端完成）
# 确认客户端启动后（三次），继续执行； （等客户端完成）
# 否则抛出错误，程序终止。 （等客户端完成）
log = Logger(config={
    'access_token':"access_token",
    'project':"project",
    "experiment":"experiment",
    "discription":"discription"
})

# 开始任务
# 1.记录超参（常量）（ok）
# 2.记录运行环境硬件信息(一次)（ok）
# 3.通知客户端，实验开始 （等客户端完成）
# 4.开启cpu、内存记录信息进程 （ok）
# 5.开启显卡核心、显存信息记录进程 (ok)
# 6.记录conda信息（ok）
# 7.输入用户token、工程id、用户描述, 生成实验名称，按条件生成config.json
# 8.记录requirement.txt信息
log.Start(info={"learnning_rate":0.002,"epoch":10}) 

i=0
# 模拟epoch开始循环
while i<10 :

    # 1. 通知客户端一个批处理开始 （等客户端完成）
    # 2. 记录批处理开始时间
    log.EpochInit()

    # 模拟任务代码
    time.sleep(1)
    
    # 循环结束
    # 1. 为运行id文件夹添加finish.tag (ok)
    # 2. 通知客户端，一个批处理结束 （等客户端完成）
    # 3. 记录回调中要记录的参数 (ok) 
    # 4. json转csv(待写)
    log.EpochLog({"acc":0.83,"loss":0.02})

    #保存文件至指定文件夹方法（待写）
    log.SaveFile(["./files"])
    i+=1

# 结束任务
# 1.记录运行代码 (ok),忽略“datasets目录”
# 2.通知客户端结束实验（等客户端完成）
# 3. finish.tag(ok)
log.End()

# 工具方法：打印日志类状态
log.ShowStatus()

验证代理客户端状态 

开发阶段跳过校验 

代理客户端验证通过 

运行开始时间：2023-10-20 18:03:58 

获取Nvidia显卡信息 

采集系统信息
运行结束时间：2023-10-20 18:04:08



'[\n  {\n    "acc": 0.83,\n    "loss": 0.02\n  },\n  {\n    "acc": 0.83,\n    "loss": 0.02\n  },\n  {\n    "acc": 0.83,\n    "loss": 0.02\n  },\n  {\n    "acc": 0.83,\n    "loss": 0.02\n  },\n  {\n    "acc": 0.83,\n    "loss": 0.02\n  },\n  {\n    "acc": 0.83,\n    "loss": 0.02\n  },\n  {\n    "acc": 0.83,\n    "loss": 0.02\n  },\n  {\n    "acc": 0.83,\n    "loss": 0.02\n  },\n  {\n    "acc": 0.83,\n    "loss": 0.02\n  },\n  {\n    "acc": 0.83,\n    "loss": 0.02\n  }\n]'