# Drive Test Tag Generation
Generate tags for the written portion of the chinese driving exam using a process similar to BERTopic.

## 1. Load Data
Loading data from a local database into a question bank class.

In [1]:
from src.qb.question_bank import QuestionBank
from data_storage.database.json_database import LocalJsonDB

db = LocalJsonDB("data_storage/database/json_db/data.json",
                 "data_storage/database/json_db/images")
qb : QuestionBank = db.load()
print(qb.question_count())

2836


## 2. Format Data
Although the Siglip2 model can handle images of different sizes, we will still standardize the image sizes to the same size to avoid unnecessary complications.

In [2]:
from data_cleaning.img_reshaper import ImgSquarer

IMG_DIR_256 = "data_cleaning/resized_imgs/img256"
IMG_DIR_512 = "data_cleaning/resized_imgs/img512"

squarer_256 = ImgSquarer(256)
# squarer_512 = ImgSquarer(512)

In [3]:
def resize_images(qb: QuestionBank, squarer: ImgSquarer, new_dir: str) -> None:
    for chapter_id in qb.get_all_chapter_num():
        for qid in qb.get_qids_by_chapter(chapter_id):
            question = qb.get_question(qid)
            if question.get_img_path() is not None:
                question.set_img_path(squarer.reshape(qid, qb.get_img_dir(), new_dir))

In [4]:
import os
# If the directory is empty, resize images.
if not os.listdir(IMG_DIR_256):
    print("Resizing images to 256x256...")
    resize_images(qb, squarer_256, IMG_DIR_256)
else:
    print("Images already resized to 256x256, skipping...")

Images already resized to 256x256, skipping...


## 3. Create Multimodal Embeddings
Create multimodal embeddings for the questions using a Siglip2 model.

In [5]:
# Library Imports
from transformers import AutoModel, AutoProcessor

# Local Imports
from embedder.siglip2_qb_embedder import Siglip2QBEmbedder

### a) Load/Download the Siglip2 Model
We will be using "google/siglip2-base-patch16-256" for this task.

In [6]:
import torch
MODEL_NAME = "google/siglip2-base-patch16-256"

model = AutoModel.from_pretrained(MODEL_NAME, torch_dtype=torch.float32, attn_implementation="sdpa")
processor = AutoProcessor.from_pretrained(MODEL_NAME, use_fast=True) # Ensure the model is on the correct device

### b) Create embeddings

#### i) Define a logger

In [7]:
import logging
from logging import Logger
from datetime import datetime
import os

LOGGING_PATH = "logs"

def get_logger(name: str) -> Logger:
    # Create logger
    logger = logging.getLogger(name)
    logger.setLevel(logging.INFO) # Set the logging level

    # Create a file handler with timestamp in filename
    timestamp = datetime.now().strftime("%Y%m%d_%H%M")
    file_handler = logging.FileHandler(
        os.path.join(LOGGING_PATH, f"{name}_{timestamp}.log")
    )

    # Create formatter
    formatter = logging.Formatter(
        '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
    )
    file_handler.setFormatter(formatter)

    # Add handler to logger
    logger.addHandler(file_handler)

    return logger

embedder_logger = get_logger("embedder")

#### ii) Create the embedder

In [8]:
custom_embedder = Siglip2QBEmbedder(model, processor, embedder_logger)

#### iii) Generate embeddings

In [9]:
EMBEDDINGS_DIR = "data_storage/embedding_dir"
EMBEDDING_FILE_NAME = "siglip2_embeddings.npz"

os.makedirs(EMBEDDINGS_DIR, exist_ok=True)
embedding_file = os.path.join(EMBEDDINGS_DIR, EMBEDDING_FILE_NAME)
print(embedding_file)

data_storage/embedding_dir/siglip2_embeddings.npz


In [10]:
if EMBEDDING_FILE_NAME in os.listdir(EMBEDDINGS_DIR):
    print(f"Embeddings already exist at {embedding_file}, skipping generation.")
else:
    print("Generating embeddings...")
    # Generate embeddings for the question bank
    embeddings = custom_embedder.encode_qb(qb)

Embeddings already exist at data_storage/embedding_dir/siglip2_embeddings.npz, skipping generation.


#### iv) Save embeddings

In [11]:
import numpy as np

def save_embeddings(embeddings, file_path):
    np.savez(file_path, **{str(qid): embeddings[qid] for qid in embeddings})

if not os.path.exists(embedding_file):
    print(f"Saving embeddings to {embedding_file}...")
    save_embeddings(embeddings, embedding_file)
else:
    print(f"Embeddings file {embedding_file} already exists, skipping save.")

Embeddings file data_storage/embedding_dir/siglip2_embeddings.npz already exists, skipping save.


## 4. Dimension Reduction

### a) Load Embeddings

In [12]:
def load_embeddings(file_path):
    loaded = np.load(file_path)
    return {key: loaded[key] for key in loaded.files}
id_to_embedding = load_embeddings(embedding_file)

In [13]:
from typing import List

from embedder.siglip2_qb_embedder import format_question

def format_for_clustering(id_to_embedding: dict, qb: QuestionBank) -> (List[str], List[str], np.ndarray):
    """
    Format the embeddings for clustering.
    """
    qid_lst: List[str] = []
    documents: List[str] = []
    embedding_lst: List[np.ndarray] = []

    for chapter_id in qb.get_all_chapter_num():
        for qid in qb.get_qids_by_chapter(chapter_id):

            doc = format_question(qb.get_question(qid), qb.describe_chapter(chapter_id))

            qid_lst.append(qid)
            documents.append(doc)
            embedding_lst.append(id_to_embedding[qid])

    embedding_array = np.array(embedding_lst)
    return qid_lst, documents, embedding_array

In [14]:
qid_lst, documents, embeds = format_for_clustering(id_to_embedding, qb)
print(f"Number of questions: {len(qid_lst)} "
      f"Embedding shape: {embeds.shape}")

Number of questions: 2836 Embedding shape: (2836, 768)


In [15]:
documents[:5]  # Display the first 5 documents to verify formatting

['题目:图中标志提示前方道路的最高车速限制在50公里以下。答案:错',
 '题目:准驾车型为C1驾照的，可以驾驶以下哪种车辆？答案:低速载货汽车',
 '题目:潮汐车道是可变车道，根据早晚交通流量情况，调整车道的行驶方向。答案:对',
 '题目:如图所示，驾驶这辆小型客车能否进入高速公路行驶？答案:由持该车型驾驶证3年以上驾驶人陪同允许进入',
 '题目:对有伪造或变造号牌、行驶证嫌疑的车辆，交通警察可依法予以扣留。答案:对']

### b) Set up dimension reduction model
We will be using UMAP for dimension reduction.

In [16]:
from umap import UMAP
def dim_reduction(dimensions: int, n_neighbors: int, embeddings: np.ndarray) -> np.ndarray:
    umap_model = UMAP(
        n_components=dimensions,
        metric='cosine',
        n_neighbors=n_neighbors,
        min_dist=0.0
    )
    return umap_model.fit_transform(embeddings)

In [17]:
def save_rdc_embeddings(embeddings, file_path):
    np.savez(file_path, embeddings=embeddings)

In [18]:
def load_rdc_embeddings(file_path):
    loaded = np.load(file_path)
    return loaded['embeddings']

In [19]:
def make_reduced_embeddings(dimension, n_neighbors, embeddings):
    path = f"data_storage/embedding_dir/rdc_embeds_n{n_neighbors}d{dimension}.npz"
    if not os.path.exists(path):
        print(f"Generating embeddings for dimension {dimension} with n_neighbors={n_neighbors}...\n")
        rdc_embeds = dim_reduction(dimension, n_neighbors, embeddings)
        save_rdc_embeddings(rdc_embeds, path)
    else:
        print(f"Embeddings for dimension {dimension} with n_neighbors={n_neighbors} already exist. Loading...\n")
        rdc_embeds = load_rdc_embeddings(path)
    return rdc_embeds

## 5. Clustering


Generate a small representative sample of the question bank by clustering the questions and selecting representative questions from each cluster.

## a) Set up clustering model

In [25]:
from sklearn.cluster import HDBSCAN
def cluster_embeddings(embeddings: np.ndarray) -> np.ndarray:
    """
    Cluster the embeddings using HDBSCAN.

    Args:
        embeddings: Embeddings array with shape (n_samples, n_features)

    Returns:
        Cluster labels for each embedding
    """
    clusterer = HDBSCAN(min_cluster_size=2, metric='cosine', min_samples=1, cluster_selection_method="leaf", allow_single_cluster=True)
    return clusterer.fit_predict(embeddings)

## b) Tune Hyperparameters
Find optimal hyperparameters
### i) Set up experiment trying a range of hyperparameters

In [26]:
from pandas import DataFrame


def make_topics(dimension: int, n_neighbors: int, qid_lst: List[str], docs: List[str], embeddings: np.ndarray) -> DataFrame:
    """
    A pipeline that takes in the dimension and n_neighbors, then performs dimension reduction and clustering using those hyperparameters on docs.
    """
    topic_labels = cluster_embeddings(make_reduced_embeddings(dimension, n_neighbors, embeddings))
    return DataFrame({
        "topic" : topic_labels,
        "qid": qid_lst,
        "question": docs
    })

In [28]:
def dimred_experiment(qid_lst: List[str], questions: List[str], embeddings: np.ndarray) -> DataFrame:
    """
    An experiment to find the optimal hyperparameters for dimension reduction (dimensions and n_neighbors).

    With a fixed clustering method, we will vary the dimensions and n_neighbors to see how it affects the outlier count.
    """
    topic_labels = {}

    for n_neighbors in range(5, 100, 10):
        for dimension in range(2, 767, 50):

            print(f"Running for n_neighbors={n_neighbors}, dimension={dimension}...")

            column_name = f"n{n_neighbors}d{dimension}"
            topic_labels[column_name] = cluster_embeddings(make_reduced_embeddings(dimension, n_neighbors, embeddings))

    result_dict = topic_labels
    result_dict["id"] = qid_lst
    result_dict["question"] = questions
    result_df = DataFrame(result_dict)
    return result_df

In [29]:
def save_dimred_results(df: DataFrame, file_path: str):
    """
    Save the dimension reduction results to a CSV file.
    """
    df.to_csv(file_path, index=False)
    print(f"Results saved to {file_path}")

def load_dimred_results(file_path: str) -> DataFrame:
    """
    Load the dimension reduction results from a CSV file.
    """
    return DataFrame.read_csv(file_path)

In [30]:
DIMRED_EXPR_PATH = "data_storage/experiments/dim_reduct_experiment.csv"
if not os.path.exists(DIMRED_EXPR_PATH):
    print("Running experiment...")
    dimred_exp_results = dimred_experiment(qid_lst=qid_lst, questions=documents, embeddings=embeds)
    print("Saving dimension reduction experiment results...")
    save_dimred_results(dimred_exp_results, DIMRED_EXPR_PATH)
else:
    print(f"Dimension reduction experiment results already exist at {DIMRED_EXPR_PATH}, skipping save.")
    dimred_exp_results = load_dimred_results(DIMRED_EXPR_PATH)

Running experiment...
Running for n_neighbors=5, dimension=2...
Embeddings for dimension 2 with n_neighbors=5 already exist. Loading...

Running for n_neighbors=5, dimension=52...
Embeddings for dimension 52 with n_neighbors=5 already exist. Loading...

Running for n_neighbors=5, dimension=102...
Embeddings for dimension 102 with n_neighbors=5 already exist. Loading...

Running for n_neighbors=5, dimension=152...
Embeddings for dimension 152 with n_neighbors=5 already exist. Loading...

Running for n_neighbors=5, dimension=202...
Embeddings for dimension 202 with n_neighbors=5 already exist. Loading...

Running for n_neighbors=5, dimension=252...
Embeddings for dimension 252 with n_neighbors=5 already exist. Loading...

Running for n_neighbors=5, dimension=302...
Embeddings for dimension 302 with n_neighbors=5 already exist. Loading...

Running for n_neighbors=5, dimension=352...
Embeddings for dimension 352 with n_neighbors=5 already exist. Loading...

Running for n_neighbors=5, dimen

In [32]:
dimred_exp_results.head(3)

Unnamed: 0,n5d2,n5d52,n5d102,n5d152,n5d202,n5d252,n5d302,n5d352,n5d402,n5d452,...,n95d402,n95d452,n95d502,n95d552,n95d602,n95d652,n95d702,n95d752,id,question
0,266,148,184,224,165,136,233,89,112,162,...,191,100,265,213,156,202,166,167,cea30,题目:图中标志提示前方道路的最高车速限制在50公里以下。答案:错
1,509,504,572,-1,593,582,573,571,630,546,...,507,549,-1,-1,453,573,375,543,af3a1,题目:准驾车型为C1驾照的，可以驾驶以下哪种车辆？答案:低速载货汽车
2,131,402,232,287,405,469,262,426,414,-1,...,486,422,-1,533,-1,491,470,515,ae845,题目:潮汐车道是可变车道，根据早晚交通流量情况，调整车道的行驶方向。答案:对


### ii) Analyze Results

Extract key metrics from the data into a dataframe.

In [None]:
def analyze_dr_data(data: DataFrame) -> DataFrame:
    """
    Set up a data frame with n_neighbours and dimensions as the first 2 columns, followed by
    key metrics, such as:
        - Number of clusters
        - Number of outliers
        - ...
    """
    raise NotImplementedError