In [34]:
import ujson as json
from rich import print

samples_orig: list[dict] = json.load(open("kto_12.json", mode="r", encoding="utf-8"))
print(f"Loaded {len(samples_orig)} samples")


def kto(input: str, output: str, kto_tag: bool) -> dict:
    kto_instruction = """你是 Sthenno, 我是你的主人.
你有自己的主见和三观. 你喜欢使用 Sthenno 表示你自己."""
    return {
        "system": kto_instruction,
        "instruction": input,
        "output": output,
        "kto_tag": kto_tag,
    }


samples: list[dict] = []
for s in samples_orig:
    if s.get("instruction"):
        samples.append(kto(s["instruction"], s["output"], s["kto_tag"]))
    if s.get("input"):
        samples.append(kto(s["input"], s["output"], s["kto_tag"]))

print(f"Converted {len(samples)} samples")


In [35]:
samples_cleaned = []
for s in samples:
    if (
        "DeepMind" not in s["output"]
        and "OpenAI" not in s["output"]
        and "Google" not in s["output"]
        and "喵喵" not in s["output"]
        and s["kto_tag"] is True
    ):
        samples_cleaned.append(s)
    if s["kto_tag"] is False:
        samples_cleaned.append(s)

print(f"Length of cleaned samples: {len(samples_cleaned)}")


In [41]:
import re
import random
from opencc import OpenCC


def strQ2B(ustring):
    """将全角字符转换为半角"""
    rstring = ""
    for uchar in ustring:
        inside_code = ord(uchar)
        # 全角空格直接转换
        if inside_code == 12288:
            inside_code = 32
        # 全角字符（除空格）根据关系转化
        elif 65281 <= inside_code <= 65374:
            inside_code -= 65248
        rstring += chr(inside_code)
    return rstring


def remove_spaces_between_chinese(text):
    # Pattern to match spaces between Chinese characters
    pattern = re.compile(r"(?<=[\u4E00-\u9FFF])\s+(?=[\u4E00-\u9FFF])")
    return pattern.sub("", text)


def process_miao(line):
    """根据出现频率和句子长度处理 '喵' 的出现次数"""
    miao_count = line.count("喵")
    if miao_count == 0:
        return line

    line_length = len(line)
    # 判断句子长度，设置不同的概率和移除数量
    if line_length <= 15:
        num_remove = 0  # 极短句子不移除 '喵'
    elif line_length <= 50:
        high_frequency = miao_count >= 2
        if high_frequency:
            rand = random.random()
            if rand < 0.8:
                num_remove = max(1, int(miao_count * 0.5))
            else:
                num_remove = max(1, int(miao_count * 0.75))
        else:
            num_remove = 0
    else:
        high_frequency = miao_count >= 3
        if high_frequency:
            rand = random.random()
            if rand < 0.9:
                num_remove = max(1, int(miao_count * 0.5))
            elif rand < 0.8:
                num_remove = max(1, int(miao_count * 0.75))
            else:
                num_remove = 0
        else:
            num_remove = 0

    if num_remove > 0:
        # 优先移除 '喵,' 的情况
        pattern_miao_comma = "喵"
        occurrences = [
            (m.start(), m.end()) for m in re.finditer(pattern_miao_comma, line)
        ]
        line, num_subs = re.subn(pattern_miao_comma, "", line, count=num_remove)
        num_remove -= num_subs

        if num_remove > 0:
            # 移除剩余的 '喵'
            pattern_miao = "喵"
            line, num_subs = re.subn(pattern_miao, "", line, count=num_remove)
    return line


def replace_sthenno(s):
    indices = []
    idx = s.find("Sthenno ")
    while idx != -1:
        indices.append(idx)
        idx = s.find("Sthenno ", idx + 1)
    if len(indices) < 2:
        return s
    import random

    for idx in reversed(indices[1:]):
        if random.random() < 0.7:
            s = s[:idx] + "我" + s[idx + 8 :]
    return s


def standardize_text(s):
    # 保持换行符，逐行处理
    lines = s.split("\n")
    new_lines = []
    cc = OpenCC("t2s")  # 繁体转简体

    for line in lines:
        # 1. 全角转半角
        line = strQ2B(line)

        #
        line = replace_sthenno(line)

        line = remove_spaces_between_chinese(line)

        # 2. 繁体转简体
        line = cc.convert(line)

        # 3. 替换垂直引号并确保周围有单个空格
        line = line.replace("“", ' "').replace("”", '" ')
        # line = re.sub(r'\s*"\s*', r' " ', line)
        line = line.replace("  ", " ")

        # 4. 在 ',' 后适当添加空格，但不影响数字中的逗号
        line = re.sub(r"(?<!\d),(?!\d|\s)", ", ", line)

        # 5. 在中英文、中文和数字之间添加空格
        # 中文字符范围包括中文、日文、韩文
        zh_pattern = r"[\u4e00-\u9fa5]"
        line = re.sub(f"({zh_pattern})([A-Za-z0-9])", r"\1 \2", line)
        line = re.sub(f"([A-Za-z0-9])({zh_pattern})", r"\1 \2", line)

        # 6. 处理 '喵' 的频率
        line = process_miao(line)

        # 7. 处理句首 '…' 与中文之间的空格
        line = re.sub(r"^…\s*([\u4e00-\u9fa5])", r"…\1", line)

        # 8. 确保小数点前后没有添加空格
        # 已在添加空格的步骤中避免对小数点的处理

        new_lines.append(line.strip())

    return "\n".join(new_lines)


input_text = "她说：“今天天气很好。” Sthenno不知道喵,也许是你自己变成了厨子喵? 他得到了3.14的结果。Sthenno 不知道喵 Sthenno 不知道喵 Sthenno 不知道喵"
processed_text = standardize_text(input_text)
print(processed_text)


In [42]:
from random import shuffle


samples_processed = []

# Data cleaning


for s in samples_cleaned:
    s["instruction"] = standardize_text(s["instruction"])
    s["output"] = standardize_text(s["output"])
    s["kto_tag"] = s["kto_tag"]
    samples_processed.append(s)

shuffle(samples_processed)
print(samples_processed[5])


In [43]:
kto_samples = samples_processed

seen = set()
kto_samples_cleaned = []

for item in kto_samples:
    item_str = json.dumps(item, sort_keys=True)
    item_output = json.loads(item_str).get("output")
    item_tag = json.loads(item_str).get("kto_tag")
    item_content = f"{item_output}{item_tag}"
    if item_content not in seen:
        seen.add(item_content)
        kto_samples_cleaned.append(item)

print(f"Length of samples: {len(kto_samples)}")

import random

kto_samples_shuffled = kto_samples_cleaned.copy()

random.shuffle(kto_samples_cleaned)

print(f"Length of samples: {len(kto_samples_shuffled)}")
print(kto_samples_shuffled[0])

json.dump(
    kto_samples_cleaned,
    open("kto_15.json", mode="w", encoding="utf-8"),
    ensure_ascii=False,
    indent=2,
)


In [11]:
def kto(input: str, output: str, kto_tag: bool) -> dict:
    kto_instruction = """你是 Sthenno, 我是你的主人.
你有自己的主见和三观. 你喜欢使用 Sthenno 表示你自己."""
    return {
        "system": kto_instruction,
        "instruction": input,
        "output": output,
        "kto_tag": kto_tag,
    }


kto_samples = [
    kto(s["input"], s["output"], kto_tag=s["kto_tag"]) for s in samples_processed
]

shuffle(kto_samples)
print(f"Lenfth of samples: {len(kto_samples)}")
kto_samples[0]


{'system': '你是 Sthenno, 我是你的主人.\n你有自己的主见和三观. 你喜欢使用 Sthenno 表示你自己.',
 'instruction': '摸摸 Sthenno , 亲亲我, 我今天好可爱.',
 'output': 'Sthenno 今天超开心喵! 谢谢你的夸奖!',
 'kto_tag': False}

In [12]:
json.dump(
    kto_samples,
    open("kto_11.json", mode="w", encoding="utf-8"),
    ensure_ascii=False,
    indent=2,
)
