### 1. 环境准备
**说明**: 安装所需的Python包,包括Google Cloud AI Platform SDK、Google Generative AI SDK和数据集处理工具。这些是运行整个项目的基础依赖。

In [None]:
%pip install --upgrade --quiet google-cloud-aiplatform google-genai datasets

### 2. 内核重启
**说明**: 重启Jupyter内核以确保新安装的包能够正常加载和使用。这是在安装新包后的标准操作。

In [None]:
import IPython

app = IPython.Application.instance()
app.kernel.do_shutdown(True)

### 3. Google认证配置
**说明**: 检查是否在Google Colab环境中运行,如果是则进行用户认证。这确保了我们有权限访问Google Cloud服务。

In [None]:
import sys

if "google.colab" in sys.modules:
    from google.colab import auth

    auth.authenticate_user()

### 4. 项目初始化
**说明**: 设置Google Cloud项目参数,包括项目ID和地理位置,并初始化Generative AI客户端。这些设置是使用Google Cloud服务的必要配置。

In [None]:
import os

from google import genai
from google.genai import types

PROJECT_ID = "baidao-test-666808"  # @param {type:"string", isTemplate: true}
if PROJECT_ID == "[your-project-id]":
    PROJECT_ID = str(os.environ.get("GOOGLE_CLOUD_PROJECT"))

LOCATION = os.environ.get("GOOGLE_CLOUD_REGION", "us-central1")

client = genai.Client(vertexai=True, project=PROJECT_ID, location=LOCATION)

### 5. 依赖库导入

In [None]:
from collections import Counter
import json
import random

# Vertex AI SDK
from google.cloud import aiplatform
from google.cloud.aiplatform.metadata import context
from google.cloud.aiplatform.metadata import utils as metadata_utils
import numpy as np
import pandas as pd
import plotly.graph_objects as go
from plotly.subplots import make_subplots
import vertexai
from datasets import load_dataset
import json
import random
import pandas as pd

vertexai.init(project=PROJECT_ID, location=LOCATION)

### 6. 存储配置
**说明**: 配置Google Cloud Storage存储桶,用于存储训练数据

In [None]:
# Provide a bucket name
BUCKET_NAME = "sql-create-context"  # @param {type:"string"}
BUCKET_URI = f"gs://{BUCKET_NAME}"
print(BUCKET_URI)

### 7. 数据集处理
**说明**: 加载SQL生成数据集并进行训练集、验证集、测试集的划分。

In [None]:
# 加载数据集
ds = load_dataset("b-mc2/sql-create-context")

# 获取训练集数据
data = ds['train']

# 首先将数据分成 train 和 remaining (9:1)
train_test = data.train_test_split(test_size=0.2, seed=42)
train_data = train_test['train']

# 将 remaining 数据再次分成 valid 和 test (1:1)
remaining = train_test['test'].train_test_split(test_size=0.5, seed=42)
valid_data = remaining['train']
test_data = remaining['test']

# 将数据集转换为DataFrame
train_df = pd.DataFrame(train_data)
valid_df = pd.DataFrame(valid_data)
test_df = pd.DataFrame(test_data)

### 8. 数据采样
**说明**: 从各个数据集中随机采样10%的数据,以减少计算资源需求并加快训练过程。

In [None]:
train_df = train_df.sample(frac=0.1, random_state=42)
valid_df = valid_df.sample(frac=0.1, random_state=42)
test_df = test_df.sample(frac=0.1, random_state=42)

# 打印抽样后的DataFrame大小
print(f"Sampled train DataFrame size: {len(train_df)}")
print(f"Sampled validation DataFrame size: {len(valid_df)}")
print(f"Sampled test DataFrame size: {len(test_df)}")

In [None]:
test_df.head(2)

### 9. 系统提示设置
**说明**: 创建系统提示文本，并结合few shot examples

In [None]:
row_dataset = random.randint(0, 100)

In [None]:
few_shot_examples = test_df.sample(3)
dropped_indices = few_shot_examples.index
test_df = test_df.drop(dropped_indices)

few_shot_prompt = ""
for _, row in few_shot_examples.iterrows():
    few_shot_prompt += (
        f"Context: {row.context}\nQuestion: {row.question}\nAnswer: {row.answer}\n\n"
    )

print(few_shot_prompt)

In [None]:
systemInstruct = f"""You are a Database Query Assistant. Your role is to generate SQL queries based on user questions and the provided context. The context will be provided in the form of CREATE TABLE statements that define the database schema.\n\n
Here are some examples: \n\n
{few_shot_prompt}"""

In [None]:
test_df["systemInstruct"] = systemInstruct

test_df["input_question"] = (
    "\n\n**Below the question with context that you need to answer**"
    + "\nContext: " + test_df["context"]
    + "\nQuestion: " + test_df["question"]
)

test_systemInstruct = test_df["systemInstruct"].iloc[row_dataset]
print(test_systemInstruct)
test_question = test_df["input_question"].iloc[row_dataset]
print(test_question)

In [None]:
base_model = "gemini-2.0-flash-lite-001"

In [None]:
def get_predictions(question: str, model_version: str) -> str:

    prompt = question
    base_model = model_version

    response = client.models.generate_content(
        model=base_model,
        contents=prompt,
        config={
            "system_instruction": systemInstruct,
            "temperature": 0.3,
        },
    )

    return response.text

In [None]:
test_answer = test_df["answer"].iloc[row_dataset]
response = get_predictions(test_question, base_model)

print(f"Gemini response: {response}")
print(f"Actual answer: {test_answer}")

### 10.测试原版模型结果

In [None]:
from tqdm import tqdm
tqdm.pandas()

test_df["predicted_answer"] = test_df["input_question"].progress_apply(lambda x: get_predictions(x, base_model))
test_df.head(2)

### 11. 评估指标函数
**说明**: 实现评估模型性能的指标计算函数,包括F1分数和精确匹配分数。这些指标用于衡量模型生成的SQL查询的质量。

精确匹配分数（EM Score）：
完全相同，则得分为1，否则为0。最终的EM分数是所有测试样本的平均值，直接反映了模型生成完全正确查询的比例。

F1分数（F1 Score）：
将结果拆分成tokens，然后通过计算预测查询和真实查询中共同出现的词元数量，分别计算精确率（预测中正确词元的占比）和召回率（真实查询中被正确预测的词元占比），最后取这两个指标的调和平均数。即使查询不完全匹配，也能得到一个介于0到1之间的分数。

In [None]:
def f1_score_squad(prediction, ground_truth):
    prediction_tokens = prediction.split()
    ground_truth_tokens = ground_truth.split()
    common = Counter(prediction_tokens) & Counter(ground_truth_tokens)
    num_same = sum(common.values())
    if num_same == 0:
        return 0
    precision = 1.0 * num_same / len(prediction_tokens)
    recall = 1.0 * num_same / len(ground_truth_tokens)
    f1 = (2 * precision * recall) / (precision + recall)
    return f1


def exact_match_score(prediction, ground_truth):
    return prediction == ground_truth


def calculate_em_and_f1(y_true, y_pred):
    """Calculates EM and F1 scores for DataFrame columns."""

    # Ensure inputs are Series
    if not isinstance(y_true, pd.Series):
        y_true = pd.Series(y_true)
    if not isinstance(y_pred, pd.Series):
        y_pred = pd.Series(y_pred)

    em = np.mean(y_true.combine(y_pred, exact_match_score))
    f1 = np.mean(y_true.combine(y_pred, f1_score_squad))

    return em, f1

### 12.原版模型结果分数

In [None]:
em, f1 = calculate_em_and_f1(test_df["answer"], test_df["predicted_answer"])
print(f"EM score: {em}")
print(f"F1 score: {f1}")

### 13.处理训练数据
**说明**: 训练数据转化成jsonl文件并上传到GCS桶中，以方便后续微调任务，jsonl的格式是适合微调的数据格式

In [None]:
train_df["input_question"] = (
    "\n\n**Below the question with context that you need to answer**"
    + "\nContext: "
    + train_df["context"]
    + "\nQuestion: "
    + train_df["question"]
)
valid_df["input_question"] = (
    "\n\n**Below the question with context that you need to answer**"
    + "\nContext: "
    + valid_df["context"]
    + "\nQuestion: "
    + valid_df["question"]
)

In [None]:
train_df.head(2)

In [None]:
def df_to_jsonl(df, output_file):
    """Converts a Pandas DataFrame to JSONL format and saves it to a file.

    Args:
      df: The DataFrame to convert.
      output_file: The name of the output file.
    """

    with open(output_file, "w") as f:
        for row in df.itertuples(index=False):
            jsonl_obj = {
                "systemInstruction": {"parts": [{"text": f"{systemInstruct}"}]},
                "contents": [
                    {
                        "role": "user",
                        "parts": [{"text": f"{row.input_question}"}],
                    },
                    {"role": "model", "parts": [{"text": row.answer}]},
                ],
            }
            f.write(json.dumps(jsonl_obj) + "\n")


# Process the DataFrames
df_to_jsonl(train_df, "train.jsonl")
df_to_jsonl(valid_df, "valid.jsonl")

print(f"JSONL data written to train.jsonl")
print(f"JSONL data written to valid.jsonl")

复制数据到桶中

In [None]:
!gsutil cp ./train.jsonl {BUCKET_URI}
!gsutil cp ./valid.jsonl {BUCKET_URI}

### 14.启动微调工作

- `base_model`: 想要微调的模型
 - `train_dataset`: 训练数据集路径

  *Optional parameters*
 - `epochs`: 训练轮数.
 - `learning_rate_multiplier`: 学习率倍率.
 - `adapter_size` : 秩.

In [None]:
train_dataset = f"""{BUCKET_URI}/train.jsonl"""
validation_dataset = f"""{BUCKET_URI}/valid.jsonl"""

training_dataset = {
    "gcs_uri": train_dataset,
}

validation_dataset = types.TuningValidationDataset(gcs_uri=validation_dataset)

In [None]:
sft_tuning_job = client.tunings.tune(
    base_model=base_model,
    training_dataset=training_dataset,
    config=types.CreateTuningJobConfig(
        adapter_size="ADAPTER_SIZE_EIGHT",
        epoch_count=1,  # set to one to keep time and cost low
        tuned_model_display_name="sql-Tuned",
        validation_dataset=validation_dataset,
    ),
)
sft_tuning_job

In [None]:
sft_tuning_job.state

In [None]:
tuning_job = client.tunings.get(name=sft_tuning_job.name)
tuning_job

### 15.模型调优指标

- `/train_total_loss`: 训练步骤中调优数据集的损失值。
- `/train_fraction_of_correct_next_step_preds`: 训练步骤中的标记准确率。一个预测由一系列标记组成。此指标衡量预测标记与调优数据集中的真实值相比的准确度。
- `/train_num_predictions`:训练步骤中预测的标记数量。

In [None]:
experiment_name = tuning_job.experiment
experiment_name

In [None]:
# Locate Vertex AI Experiment and Vertex AI Experiment Run
experiment = aiplatform.Experiment(experiment_name=experiment_name)
filter_str = metadata_utils._make_filter_string(
    schema_title="system.ExperimentRun",
    parent_contexts=[experiment.resource_name],
)
experiment_run = context.Context.list(filter_str)[0]

In [None]:
# Read data from Tensorboard
tensorboard_run_name = f"{experiment.get_backing_tensorboard_resource().resource_name}/experiments/{experiment.name}/runs/{experiment_run.name.replace(experiment.name, '')[1:]}"
tensorboard_run = aiplatform.TensorboardRun(tensorboard_run_name)
metrics = tensorboard_run.read_time_series_data()

In [None]:
def get_metrics(metric: str = "/train_total_loss"):
    """
    Get metrics from Tensorboard.

    Args:
      metric: metric name, eg. /train_total_loss or /eval_total_loss.
    Returns:
      steps: list of steps.
      steps_loss: list of loss values.
    """
    loss_values = metrics[metric].values
    steps_loss = []
    steps = []
    for loss in loss_values:
        steps_loss.append(loss.scalar.value)
        steps.append(loss.step)
    return steps, steps_loss

In [None]:
# Get Train and Eval Loss
train_loss = get_metrics(metric="/train_total_loss")
eval_loss = get_metrics(metric="/eval_total_loss")

In [None]:
# Plot the train and eval loss metrics using Plotly python library
fig = make_subplots(
    rows=1, cols=2, shared_xaxes=True, subplot_titles=("Train Loss", "Eval Loss")
)

# Add traces
fig.add_trace(
    go.Scatter(x=train_loss[0], y=train_loss[1], name="Train Loss", mode="lines"),
    row=1,
    col=1,
)
fig.add_trace(
    go.Scatter(x=eval_loss[0], y=eval_loss[1], name="Eval Loss", mode="lines"),
    row=1,
    col=2,
)

# Add figure title
fig.update_layout(title="Train and Eval Loss", xaxis_title="Steps", yaxis_title="Loss")

# Set x-axis title
fig.update_xaxes(title_text="Steps")

# Set y-axes titles
fig.update_yaxes(title_text="Loss")

# Show plot
fig.show()

### 16.使用微调后的模型评估

In [None]:
prompt = """
Answer the question based on the context

question: What is the branding for callsign dypv?,
context: CREATE TABLE table_27588823_2 (branding VARCHAR, callsign VARCHAR)
"""

In [None]:
tuned_model = tuning_job.tuned_model.endpoint
tuned_model

In [None]:
get_predictions(prompt, tuned_model)

In [None]:
# Apply the get_prediction() function to the 'question_column'
test_df["predicted_answer"] = test_df["input_question"].progress_apply(lambda x: get_predictions(x, tuned_model))
test_df.head(2)

运行评估后，您可以看到经过微调的模型在我们的使用场景中整体表现更好。当然，具体性能会因使用场景或数据质量等因素而有所不同。

In [None]:
em, f1 = calculate_em_and_f1(test_df["answer"], test_df["predicted_answer"])
print(f"EM score: {em}")
print(f"F1 score: {f1}")