# 工具函数库

## 包验证

In [None]:
import re
import pkg_resources

def check_package(hard_dependencies, check=True):
    if check:
        missing_dependencies = []
        # 正则表达式匹配包名和版本号
        pattern = r'([a-zA-Z0-9_]+)([<>=!]+)([\d\.]+)'

        for dependency in hard_dependencies:
            match = re.match(pattern, dependency)
            if match:
                module_name, version_operator, module_version = match.groups()
                try:
                    # 获取已安装包的版本
                    installed_version = pkg_resources.get_distribution(module_name).version
                    # 比较版本号
                    if not pkg_resources.working_set.by_key[module_name].parsed_version in pkg_resources.Requirement.parse(f"{module_name}{version_operator}{module_version}"):
                        missing_dependencies.append(f"{module_name}: the package need version {version_operator}{module_version}, now version is {installed_version}")
                except pkg_resources.DistributionNotFound:
                    missing_dependencies.append(f"{module_name}: package not found")
                except pkg_resources.VersionConflict as e:
                    missing_dependencies.append(f"{module_name}: {e}")

        if missing_dependencies:
            raise ImportError(
                "Unable to import required dependencies:\n" + "\n".join(missing_dependencies)
            )
        del hard_dependencies, dependency, missing_dependencies

def check_all(mobile):
    for i in mobile.__all__:
        try:
            assert i not in __all__
            __all__.append(i)
        except:
            raise Exception(f'{i}命名重复')

## colab工具

In [None]:
#@title 1. Keep this tab alive to prevent Colab from disconnecting you { display-mode: "form" }

#@markdown Press play on the music player that will appear below:

%%html

<audio src="https://oobabooga.github.io/silence.m4a" controls> 用来防止发生断裂

## python 模式


In [11]:

# 限制添加的属性：

# 如果你希望类或对象只有特定属性，可以使用 __slots__ 来限制属性的添加：

class Student:
    __slots__ = ('name', 'age', 'score')
    def __init__(self, name, age):
        self.name = name
        self.age = age

st = Student('phyger', 16)

print(st.name)# 输出：'phyger'

print(st.age)# 输出：16

st.score = 100# 无法添加 'score' 属性

  

phyger
16


In [None]:
import re
class Infos:
    def __init__(self, key,value):
        self.key = key
        self.value = value
    
    def transform_key(self,key):
        # 使用正则表达式匹配并替换
        new_key = re.sub(r'(\w+)\[\d+\]', r'\1_list', key)
        return new_key

    def __eq__(self, other):
        # 自定义相等
        if isinstance(other, MyClass):
            key = self.transform_key(self.key)
            other_key = self.transform_key(other.key)

            return (key == other_key) and (type(self.value)==type(other.value))
        return False
    
    def __hash__(self):
        # 修改 set 中的值 {}
        return hash(self.transform_key(self.key) + str(type(self.value)))



In [None]:
## 类属性

In [87]:
class A():
    cc = 123

In [88]:
class B(A):
    def trans(self,x):
        A.cc = x

In [89]:
d = B()

In [90]:
e = B()

In [91]:
d.cc

123

In [92]:
e.cc

123

In [93]:
d.trans(23455)

In [94]:
d.cc

23455

In [95]:
e.cc

23455

## 修改线程名

In [None]:
def setproctit(name="my_process"):
    import setproctitle
    # 设置进程名称为 my_process
    setproctitle.setproctitle(name)
    # 以下是你的其他代码

## cp 但是不包含隐藏文件

In [31]:
import shutil
from pathlib import Path

def ignore_hidden_files(directory, files):
    return [f for f in files if f.startswith('.')]

def copy_tree(src_dir, dest_dir):

    src_dir_name = [i for i in src_dir.rsplit('/') if i !=''][-1]
    dest_dir = os.path.join(dest_dir,src_dir_name)
    print(dest_dir)
    src_path = Path(src_dir)
    dest_path = Path(dest_dir)
    
    # 确保目标目录存在
    if not dest_path.exists():
        dest_path.mkdir(parents=True, exist_ok=True)
    
    # 使用 copytree 复制目录，忽略隐藏文件
    shutil.copytree(src_path, dest_path, ignore=ignore_hidden_files, dirs_exist_ok=True)
    print(f'Successfully copied {src_dir} to {dest_dir}')


#copy_tree('/Users/zhaoxuefeng/GitHub/zxftools/','.')

In [None]:
## 需要改进 可能覆盖文件
import os

def rename_images_to_numbers(folder_path):
    """
    将指定文件夹中的所有图片文件按数字顺序重命名。

    参数:
    folder_path (str): 图片文件夹的路径
    """
    # 获取文件夹中的所有文件路径
    files = os.listdir(folder_path)
    
    # 过滤出图片文件（这里假设图片文件的扩展名为 jpg, jpeg, png, gif 等）
    image_extensions = ['jpg', 'jpeg', 'png', 'gif', 'bmp', 'tiff']
    image_files = [f for f in files if f.split('.')[-1].lower() in image_extensions]

    # 排序图片文件
    image_files.sort()

    # 重命名文件
    for i, filename in enumerate(image_files):
        # 获取文件扩展名
        ext = filename.split('.')[-1]
        # 生成新的文件名
        new_filename = f"{i+1}.{ext}"
        # 获取文件的完整路径
        src = os.path.join(folder_path, filename)
        dst = os.path.join(folder_path, new_filename)
        # 重命名文件
        os.rename(src, dst)
        print(f"Renamed {src} to {dst}")

# # 示例用法
# folder_path = 'work记录/'
# rename_images_to_numbers(folder_path)


In [None]:
## 反射 自动构建函数实例化


In [None]:

让我们详细解释一下：
1类和实例方法: 在一个类中，self 参数是实例自身的引用，用于在类的实例方法中访问实例的属性和其他方法。
2 call 方法: 如果一个类定义了 call 方法，那么这个类的实例就可以像函数一样被调用。当你调用 self(a, b) 时，实际上是调用了这个实例的 call 方法，并将参数 a 和 b 传递给它。


In [None]:
sudo apt-get install -y portaudio19-dev


In [None]:

class MonitorError(Exception):
    def __init__(self, message):
        self.message = message
        super().__init__(self.message)

# Demo

## 如何使用fire 工具

In [None]:
import fire
from .prompts import fix
import os
from zxftools.llms import get_llm

def ai_fix(file_path=''):
    assert os.path.isfile(file_path)
    llm = get_llm(model=os.environ.get('aigen_llm') or 'gpt-3.5-turbo-0613')

    with open(file_path,'r') as f:
        code = f.read()
    result = llm.complete(fix.format(code=code))

    with open(file_path,'w') as f:
        f.write(result.text)

if __name__ == '__main__':
    fire.Fire(ai_fix)



## 编写一个装饰器

In [None]:
import functools

def decorator(a = None):
    """
    一个装饰器的编写demo
    """
    def outer_packing(func):
        @functools.wraps(func)
        def wrapper(*args, **kwargs):
            result = func(*args, **kwargs)
            print(a,'a')
            print(func.__name__)  # 函数名
            print(args)  # (1, 2)
            print(kwargs)  # {'c': 3}
            return result
        return wrapper
    return outer_packing

hard_dependencies = ('time',)
check_package(hard_dependencies=hard_dependencies,check=False)



## 如何将服务冒充为openai的服务

In [None]:
from openai import OpenAI
import os
client = OpenAI(base_url="https://api.bianxieai.com/v1",
                api_key=os.environ.get('bianxieai_API_KEY'))

completion = client.chat.completions.create(
  model="claude-3-5-sonnet-20240620",
  messages=[
    {"role": "system", "content": "You are a helpful assistant."},
    {"role": "user", "content": "Hello!"}
  ]
)
if __name__ == '__main__':
    print(completion.choices[0].message)

In [None]:
import os
from typing import Optional, List, Mapping, Any

from llama_index.core.llms.callbacks import llm_completion_callback
from llama_index.core.llms import (
    CustomLLM,
    CompletionResponse,
    CompletionResponseGen,
    LLMMetadata,
)

from openai import OpenAI

class BianXieLLM(CustomLLM):
    context_window: int = 3900
    num_output: int = 256
    model: str = ''
    temperature: int = 0.2
    top_p: float = 0.9
    top_k: int = 50
    penalty_score: float = 1.0
    stop = []

    def __init__(self, temperature=0.2, model='claude-3-5-sonnet-20240620',api_key=None):
        super().__init__()
        object.__setattr__(self, 'custom_openai_client',OpenAI(base_url="https://api.bianxieai.com/v1",
                api_key=api_key or os.environ.get('bianxieai_API_KEY'))
 )
        self.model = model
        self.temperature = temperature

    @property
    def metadata(self) -> LLMMetadata:
        return LLMMetadata(
            context_window=self.context_window,
            num_output=self.num_output,
            model_name=self.model,
        )

    @llm_completion_callback()
    def complete(self, prompt: str, **kwargs: Any) -> CompletionResponse:
        completion = self.custom_openai_client.chat.completions.create(
              model=self.model,
              temperature = self.temperature,
              messages=[{"role": "user", "content": prompt}],
              **kwargs
            )
        return CompletionResponse(text=completion.choices[0].message.content)

    @llm_completion_callback()
    def stream_complete(self, prompt: str, **kwargs: Any) -> CompletionResponseGen:
        completion = self.custom_openai_client.chat.completions.create(
              model=self.model,
              temperature = self.temperature,
              messages=[{"role": "user", "content": prompt}],
              **kwargs
            )
        #return CompletionResponse(text=completion.choices[0].message.content)
        response_generate = [1,2,3]
        response = ""
        for token in response_generate:
            response += token
            yield CompletionResponse(text=response, delta=token)


In [None]:
    @staticmethod
    def thread_list(f, params: types.GeneratorType or list, processes=4, chunksize: int = 1):
        """
        1 必须到 if __name__ == '__main__': 下执行
        """
        pool = Pool(processes)
        if isinstance(params, types.GeneratorType):
            results = pool.imap(f, params, chunksize=chunksize)
        else:
            results = pool.map(f, params, chunksize=chunksize)
        pool.close()
        return results