# DeepSeek Translation Script
## 使用DeepSeek API进行中英文翻译

本脚本用于批量翻译CSV文件中的中文内容到英文。

In [1]:
# 安装必要的库
%pip install pandas openai

Defaulting to user installation because normal site-packages is not writeable
Note: you may need to restart the kernel to use updated packages.


In [1]:
import pandas as pd
from openai import OpenAI
from time import sleep
import os
import glob
from datetime import datetime

In [None]:
# 配置DeepSeek API
# 请在这里输入您的DeepSeek API密钥
DEEPSEEK_API_KEY = " "  # 替换为您的实际API密钥

# 初始化DeepSeek客户端
client = OpenAI(
    api_key=DEEPSEEK_API_KEY,
    base_url="https://api.deepseek.com"
)

# 设置文件路径
INPUT_DIR = r"C:\Users\ASUS\Desktop\LLM translate\Data_set\News_Chinese_Version"
OUTPUT_DIR = r"C:\Users\ASUS\Desktop\LLM translate\Data_set\deepseek"

print(f"输入目录: {INPUT_DIR}")
print(f"输出目录: {OUTPUT_DIR}")

输入目录: C:\Users\ASUS\Desktop\LLM translate\Data_set\News_Chinese_Version
输出目录: C:\Users\ASUS\Desktop\LLM translate\Data_set\deepseek


In [3]:
# 测试API连接
def test_connection():
    """测试DeepSeek API连接"""
    print("测试DeepSeek API连接...")
    try:
        response = client.chat.completions.create(
            model="deepseek-chat",
            messages=[
                {"role": "user", "content": "Hello, please respond with 'API working'"}
            ],
            max_tokens=20
        )
        print(f"API连接成功！响应: {response.choices[0].message.content}")
        return True
    except Exception as e:
        print(f"API连接失败: {e}")
        print("请检查：")
        print("1. API密钥是否正确")
        print("2. 网络连接是否正常")
        print("3. DeepSeek账户是否有效")
        return False

# 运行测试
if test_connection():
    print("\n✓ 可以开始翻译任务")
else:
    print("\n✗ 请修复连接问题后再继续")

测试DeepSeek API连接...
API连接成功！响应: API working

✓ 可以开始翻译任务


In [4]:
def translate_text(text, model="deepseek-chat", max_retries=3):
    """使用DeepSeek模型翻译文本，带重试机制"""
    for attempt in range(max_retries):
        try:
            response = client.chat.completions.create(
                model=model,
                messages=[
                    {
                        "role": "system", 
                        "content": "You are a professional translator. Translate the Chinese text to English accurately while maintaining the original meaning and style."
                    },
                    {
                        "role": "user", 
                        "content": f"Translate the following Chinese text to English:\n\n{text}"
                    }
                ],
                temperature=0.3,
                max_tokens=2000
            )
            
            translated = response.choices[0].message.content
            # 清理可能的前缀
            for prefix in ["Here is the translation:", "Translation:", "English translation:"]:
                if translated.startswith(prefix):
                    translated = translated[len(prefix):].strip()
            
            return translated
            
        except Exception as e:
            if attempt < max_retries - 1:
                print(f"    重试 {attempt + 1}/{max_retries}...")
                sleep(2 ** attempt)  # 指数退避
            else:
                print(f"    翻译失败: {str(e)[:100]}")
                return ""
    
    return ""

In [5]:
def translate_single_file(input_file, output_file, model="deepseek-chat"):
    """翻译单个CSV文件"""
    print(f"\n处理文件: {os.path.basename(input_file)}")
    start_time = datetime.now()
    
    # 读取CSV文件
    try:
        df = pd.read_csv(input_file, encoding='utf-8')
    except UnicodeDecodeError:
        try:
            df = pd.read_csv(input_file, encoding='gbk')
        except:
            df = pd.read_csv(input_file, encoding='latin1')
    
    # 获取第一列
    original_column = df.columns[0]
    print(f"  原始列名: {original_column}")
    print(f"  总行数: {len(df)}")
    
    # 创建结果DataFrame
    result_df = pd.DataFrame()
    model_name = model.replace("-", "_")
    translation_col = f"{model_name}_Translation"
    result_df[translation_col] = [""] * len(df)
    
    # 统计信息
    success_count = 0
    skip_count = 0
    
    # 逐行翻译
    for index, row in df.iterrows():
        text = str(row[original_column])
        
        # 显示进度
        if (index + 1) % 5 == 0 or index == 0 or index == len(df) - 1:
            print(f"  进度: {index + 1}/{len(df)} ({(index + 1)*100/len(df):.1f}%)")
        
        # 检查空文本
        if pd.isna(text) or text.strip() == "" or text.lower() == "nan":
            result_df.at[index, translation_col] = ""
            skip_count += 1
            continue
        
        # 翻译
        translated = translate_text(text, model)
        if translated:
            result_df.at[index, translation_col] = translated
            success_count += 1
        else:
            result_df.at[index, translation_col] = "[Translation Failed]"
        
        # 控制请求频率
        sleep(0.3)  # DeepSeek通常允许更高的请求频率
    
    # 保存结果
    result_df.to_csv(output_file, index=False, encoding='utf-8-sig')
    
    # 显示统计信息
    elapsed = (datetime.now() - start_time).total_seconds()
    print(f"\n  ✓ 文件处理完成")
    print(f"    - 成功翻译: {success_count} 行")
    print(f"    - 跳过空行: {skip_count} 行")
    print(f"    - 用时: {elapsed:.1f} 秒")
    print(f"    - 保存位置: {os.path.basename(output_file)}")

In [6]:
def translate_all_files(model="deepseek-chat", resume=True):
    """批量翻译所有文件"""
    print(f"=" * 50)
    print(f"开始批量翻译任务")
    print(f"使用模型: {model}")
    print(f"=" * 50)
    
    # 创建输出目录
    os.makedirs(OUTPUT_DIR, exist_ok=True)
    
    # 查找CSV文件
    pattern = os.path.join(INPUT_DIR, "*.csv")
    files = glob.glob(pattern)
    
    if not files:
        print(f"\n✗ 未找到CSV文件！")
        print(f"  请检查路径: {INPUT_DIR}")
        return
    
    print(f"\n找到 {len(files)} 个CSV文件")
    
    # 统计
    processed = 0
    skipped = 0
    total_start_time = datetime.now()
    
    # 处理每个文件
    for i, input_file in enumerate(files, 1):
        print(f"\n{'='*50}")
        print(f"文件 {i}/{len(files)}")
        print(f"{'='*50}")
        
        # 生成输出文件名
        base_name = os.path.basename(input_file)
        model_suffix = model.replace("-", "_")
        output_name = base_name.replace('.csv', f'_{model_suffix}_translated.csv')
        output_file = os.path.join(OUTPUT_DIR, output_name)
        
        # 检查是否已存在（支持断点续传）
        if resume and os.path.exists(output_file):
            print(f"✓ 文件已存在，跳过: {output_name}")
            skipped += 1
            continue
        
        # 翻译文件
        try:
            translate_single_file(input_file, output_file, model)
            processed += 1
        except Exception as e:
            print(f"\n✗ 处理失败: {e}")
            continue
    
    # 最终统计
    total_elapsed = (datetime.now() - total_start_time).total_seconds()
    print(f"\n{'='*50}")
    print(f"翻译任务完成！")
    print(f"{'='*50}")
    print(f"总计:")
    print(f"  - 处理文件: {processed}")
    print(f"  - 跳过文件: {skipped}")
    print(f"  - 总用时: {total_elapsed:.1f} 秒 ({total_elapsed/60:.1f} 分钟)")
    print(f"  - 输出目录: {OUTPUT_DIR}")

In [7]:
# 运行批量翻译
# 使用默认的deepseek-chat模型
translate_all_files(model="deepseek-chat", resume=True)

开始批量翻译任务
使用模型: deepseek-chat

找到 8 个CSV文件

文件 1/8

处理文件: Chinese_version_news1.csv
  原始列名: Chinese_version_news1
  总行数: 7
  进度: 1/7 (14.3%)
  进度: 5/7 (71.4%)
  进度: 7/7 (100.0%)

  ✓ 文件处理完成
    - 成功翻译: 7 行
    - 跳过空行: 0 行
    - 用时: 51.8 秒
    - 保存位置: Chinese_version_news1_deepseek_chat_translated.csv

文件 2/8

处理文件: Chinese_version_news2.csv
  原始列名: Chinese_version_news2
  总行数: 7
  进度: 1/7 (14.3%)
  进度: 5/7 (71.4%)
  进度: 7/7 (100.0%)

  ✓ 文件处理完成
    - 成功翻译: 6 行
    - 跳过空行: 0 行
    - 用时: 111.1 秒
    - 保存位置: Chinese_version_news2_deepseek_chat_translated.csv

文件 3/8

处理文件: Chinese_version_news3.csv
  原始列名: Chinese_version_news3
  总行数: 7
  进度: 1/7 (14.3%)
  进度: 5/7 (71.4%)
  进度: 7/7 (100.0%)

  ✓ 文件处理完成
    - 成功翻译: 7 行
    - 跳过空行: 0 行
    - 用时: 48.3 秒
    - 保存位置: Chinese_version_news3_deepseek_chat_translated.csv

文件 4/8

处理文件: Chinese_version_news4.csv
  原始列名: Chinese_version_news4
  总行数: 7
  进度: 1/7 (14.3%)
  进度: 5/7 (71.4%)
  进度: 7/7 (100.0%)

  ✓ 文件处理完成
    - 成功翻译: 7 行
    - 跳过空行: 0 行
 

In [None]:
# 可选：如果您有DeepSeek V2模型的访问权限
# translate_all_files(model="deepseek-v2", resume=True)

## 单文件测试
如果您想先测试单个文件，可以使用下面的代码：

In [None]:
# 测试单个文件翻译
test_input = os.path.join(INPUT_DIR, "Chinese_version_news1.csv")
test_output = os.path.join(OUTPUT_DIR, "test_deepseek_translation.csv")

if os.path.exists(test_input):
    print("开始测试翻译...")
    translate_single_file(test_input, test_output, model="deepseek-chat")
else:
    print(f"测试文件不存在: {test_input}")

## 检查翻译结果
查看翻译结果的质量：

In [None]:
# 查看翻译结果示例
output_files = glob.glob(os.path.join(OUTPUT_DIR, "*_translated.csv"))

if output_files:
    sample_file = output_files[0]
    print(f"查看文件: {os.path.basename(sample_file)}\n")
    
    df = pd.read_csv(sample_file)
    print("前5行翻译结果：")
    print(df.head())
else:
    print("还没有翻译结果文件")