In [1]:
from pathlib import Path
import xml.etree.ElementTree as ET

In [2]:
ROOT = Path("/purestorage/AILAB/AI_1/tyk/3_CUProjects/language_model/VLM/gemma3/finetune_crowd/SCUT_HEAD_Part_A")
ANN_DIR    = ROOT / "Annotations" 
IMG_DIR    = ROOT / "JPEGImages" 
MAX_LEN  = 5       # '00013' 형태

In [3]:
pairs = []
for xml_path in ANN_DIR.glob("*.xml"):
    tree = ET.parse(xml_path)
    root = tree.getroot()
    
    raw = root.findtext("filename") or ""
    stem = Path(raw).stem # '9.jpg' → '9',  '1'→'1'
    stem = stem.zfill(MAX_LEN)  # '9' → '00009', '1' → '00001'
    img_path = IMG_DIR / f'PartA_{stem}.jpg'
    
    if not img_path.exists():
        print(f"Image {img_path} does not exist, skipping.")
        continue
    
    person_cnt = sum(1 for obj in root.findall("object")
                       if obj.findtext("name") == "person")

    pairs.append((str(img_path), person_cnt))
    

In [4]:
pairs[:3]

[('/purestorage/AILAB/AI_1/tyk/3_CUProjects/language_model/VLM/gemma3/finetune_crowd/SCUT_HEAD_Part_A/JPEGImages/PartA_00000.jpg',
  81),
 ('/purestorage/AILAB/AI_1/tyk/3_CUProjects/language_model/VLM/gemma3/finetune_crowd/SCUT_HEAD_Part_A/JPEGImages/PartA_00001.jpg',
  5),
 ('/purestorage/AILAB/AI_1/tyk/3_CUProjects/language_model/VLM/gemma3/finetune_crowd/SCUT_HEAD_Part_A/JPEGImages/PartA_00002.jpg',
  29)]

In [5]:
SYSTEM = (
    "You are CrowdCountGPT. "
    "When counting people, reply with ONLY the number (no words, no punctuation)."
)
PROMPT = ("Count all the people in the image and answer with *only* the number.")

In [6]:
def make_conv(path, cnt):
    return {
        "messages": [
            {
                "role": "system", 
                "content": [{"type": "text", "text": SYSTEM}]
            },
            {
                "role": "user", 
                "content": [
                {"type": "text", "text": "Count all the people in the image."},
                {"type": "image", "image": path}
                ]
            },
            {
                "role": "assistant", 
                "content": [{"type": "text", "text": str(cnt)}]
            }
        ]
    }

In [7]:
from datasets import Dataset
dataset = Dataset.from_list([make_conv(p, c) for p, c in pairs])

In [62]:
type(dataset)

datasets.arrow_dataset.Dataset

### collate 사전 확인

In [13]:
dataset.take(2)

Dataset({
    features: ['messages'],
    num_rows: 2
})

In [30]:
dataset[:2].keys()

dict_keys(['messages'])

In [21]:
dataset.take(2)[:2]['messages'][0]

[{'content': [{'image': None,
    'text': 'You are CrowdCountGPT. When counting people, reply with ONLY the number (no words, no punctuation).',
    'type': 'text'}],
  'role': 'system'},
 {'content': [{'image': None,
    'text': 'Count all the people in the image.',
    'type': 'text'},
   {'image': '/purestorage/AILAB/AI_1/tyk/3_CUProjects/language_model/VLM/gemma3/finetune_crowd/SCUT_HEAD_Part_A/JPEGImages/PartA_00000.jpg',
    'text': None,
    'type': 'image'}],
  'role': 'user'},
 {'content': [{'image': None, 'text': '81', 'type': 'text'}],
  'role': 'assistant'}]

In [31]:
raw_batch_samples = dataset.select(range(2)).to_list()

In [36]:
raw_batch_samples[0]

{'messages': [{'content': [{'image': None,
     'text': 'You are CrowdCountGPT. When counting people, reply with ONLY the number (no words, no punctuation).',
     'type': 'text'}],
   'role': 'system'},
  {'content': [{'image': None,
     'text': 'Count all the people in the image.',
     'type': 'text'},
    {'image': '/purestorage/AILAB/AI_1/tyk/3_CUProjects/language_model/VLM/gemma3/finetune_crowd/SCUT_HEAD_Part_A/JPEGImages/PartA_00000.jpg',
     'text': None,
     'type': 'image'}],
   'role': 'user'},
  {'content': [{'image': None, 'text': '81', 'type': 'text'}],
   'role': 'assistant'}]}

In [38]:
texts = []
images = []
examples = dataset.select(range(2)).to_list()

In [None]:
def process_vision_info(messages):
    image_inputs = []

    for msg in messages:
        for element in msg.get("content", []):
            if not isinstance(element, dict):
                continue

            if element.get("type") == "image":
                img_obj = element.get("image", element)   

                # 문자열이면 파일 경로 → 열기
                if isinstance(img_obj, (str, Path)):
                    img_obj = Image.open(img_obj)

                # 최종 RGB 변환
                image_inputs.append(img_obj.convert("RGB"))

    return image_inputs

In [None]:
from PIL import Image as PILImage
import io, os

def hf_img_to_pil(img_dict):
    """
    HF Image feature -> PIL.Image
    """
    # 1) bytes가 있으면 바로
    if img_dict.get("bytes") is not None:
        return PILImage.open(io.BytesIO(img_dict["bytes"]))
    # 2) path가 있으면 경로에서 로드
    if img_dict.get("path") is not None and os.path.exists(img_dict["path"]):
        return PILImage.open(img_dict["path"])
    return None  # 둘 다 없으면 실패

def process_vision_info(messages):
    # 하나의 conversation 데이터에 있는 list of dict를 처리
    imgs = []
    for msg in messages:                       # message = {'role':..., 'content': [...]}
        for elem in msg.get("content", []):    # elem = dict
            # ① 이미지 타입인지 필터
            if elem.get("type") != "image":
                continue
            img_obj = elem.get("image")
            # ② 실제 이미지 객체가 있는지 확인
            if img_obj is None:
                continue
            # ③ HF Image dict → PIL 변환
            if isinstance(img_obj, dict):
                img_obj = hf_img_to_pil(img_obj)
            if isinstance(img_obj, PILImage.Image):
                imgs.append(img_obj.convert("RGB"))
    return imgs

In [40]:
for example in examples:
    image_inputs = process_vision_info(example["messages"])
    

[{'content': [{'image': None, 'text': 'You are CrowdCountGPT. When counting people, reply with ONLY the number (no words, no punctuation).', 'type': 'text'}], 'role': 'system'}, {'content': [{'image': None, 'text': 'Count all the people in the image.', 'type': 'text'}, {'image': '/purestorage/AILAB/AI_1/tyk/3_CUProjects/language_model/VLM/gemma3/finetune_crowd/SCUT_HEAD_Part_A/JPEGImages/PartA_00000.jpg', 'text': None, 'type': 'image'}], 'role': 'user'}, {'content': [{'image': None, 'text': '81', 'type': 'text'}], 'role': 'assistant'}]
[{'content': [{'image': None, 'text': 'You are CrowdCountGPT. When counting people, reply with ONLY the number (no words, no punctuation).', 'type': 'text'}], 'role': 'system'}, {'content': [{'image': None, 'text': 'Count all the people in the image.', 'type': 'text'}, {'image': '/purestorage/AILAB/AI_1/tyk/3_CUProjects/language_model/VLM/gemma3/finetune_crowd/SCUT_HEAD_Part_A/JPEGImages/PartA_00001.jpg', 'text': None, 'type': 'image'}], 'role': 'user'},

In [42]:
examples[0]["messages"]

[{'content': [{'image': None,
    'text': 'You are CrowdCountGPT. When counting people, reply with ONLY the number (no words, no punctuation).',
    'type': 'text'}],
  'role': 'system'},
 {'content': [{'image': None,
    'text': 'Count all the people in the image.',
    'type': 'text'},
   {'image': '/purestorage/AILAB/AI_1/tyk/3_CUProjects/language_model/VLM/gemma3/finetune_crowd/SCUT_HEAD_Part_A/JPEGImages/PartA_00000.jpg',
    'text': None,
    'type': 'image'}],
  'role': 'user'},
 {'content': [{'image': None, 'text': '81', 'type': 'text'}],
  'role': 'assistant'}]

In [47]:
from PIL import Image as PILImage
import io, os

In [61]:
image_inputs = []
for msg in examples[0]["messages"]:
    for element in msg.get("content", []):
        print(element)
        if element.get("type") != "image":
            continue
        img_obj = element.get("image")

        if img_obj is None:
            continue
        
        print(type(img_obj))
        if isinstance(img_obj, str):
            print('dsad')
            img_obj = PILImage.open(img_obj)

        print(img_obj)
        image_inputs.append(img_obj.convert("RGB"))

{'image': None, 'text': 'You are CrowdCountGPT. When counting people, reply with ONLY the number (no words, no punctuation).', 'type': 'text'}
{'image': None, 'text': 'Count all the people in the image.', 'type': 'text'}
{'image': '/purestorage/AILAB/AI_1/tyk/3_CUProjects/language_model/VLM/gemma3/finetune_crowd/SCUT_HEAD_Part_A/JPEGImages/PartA_00000.jpg', 'text': None, 'type': 'image'}
<class 'str'>
dsad
<PIL.JpegImagePlugin.JpegImageFile image mode=RGB size=1070x594 at 0x7FD2CF544F10>
{'image': None, 'text': '81', 'type': 'text'}


In [63]:
dataset

Dataset({
    features: ['messages'],
    num_rows: 2000
})

### Test dataset 만들기

In [65]:
import os
import scipy.io as sio
from PIL import Image # 이미지 파일 존재 여부 및 유효성 검사를 위해 import

def create_image_count_list(base_path):
    """
    주어진 기본 경로에서 이미지와 Ground Truth 데이터를 찾아 리스트를 생성합니다.

    Args:
        base_path (str): 'images'와 'ground_truth' 폴더를 포함하는 상위 경로.
                         예: '/purestorage/AILAB/AI_4/byko/VLM/dataset/ShanghaiTech_Crowd_Counting_Dataset/part_B_final/test_data/'

    Returns:
        list: 각 딕셔너리가 'path' (이미지 경로)와 'count' (사람 수)를 포함하는 리스트.
              오류가 발생한 파일은 포함되지 않습니다.
    """
    image_folder = os.path.join(base_path, 'images')
    gt_folder = os.path.join(base_path, 'ground_truth')

    data_list = []

    # images 폴더의 모든 .jpg 파일 목록을 가져옵니다.
    image_files = sorted([f for f in os.listdir(image_folder) if f.endswith('.jpg')])

    for img_file in image_files:
        img_path = os.path.join(image_folder, img_file)
        
        # 이미지 파일이 실제로 존재하는지 확인
        if not os.path.exists(img_path):
            print(f"경고: 이미지를 찾을 수 없습니다: {img_path}")
            continue
        
        try:
            # PIL을 사용하여 이미지가 유효한지 간단히 확인
            Image.open(img_path).close()
        except Exception as e:
            print(f"경고: 유효하지 않은 이미지 파일입니다 ({img_path}): {e}")
            continue

        # 해당 이미지에 대한 .mat 파일 이름 추론 (예: IMG_1.jpg -> GT_IMG_1.mat)
        # 파일명에서 'IMG_' 부분을 유지하고, 확장자를 '.mat'으로 변경합니다.
        # ShanghaiTech Part_B의 경우 IMG_X.jpg -> GT_IMG_X.mat 패턴을 따릅니다.
        # 확장자를 제거하고 'GT_'를 붙인 후 '.mat'을 붙입니다.
        base_name = os.path.splitext(img_file)[0] # IMG_1
        mat_file_name = f"GT_{base_name}.mat" # GT_IMG_1.mat
        mat_path = os.path.join(gt_folder, mat_file_name)

        # .mat 파일이 존재하는지 확인
        if not os.path.exists(mat_path):
            print(f"경고: 해당 Ground Truth .mat 파일을 찾을 수 없습니다: {mat_path}")
            continue

        try:
            # .mat 파일 로드
            mat = sio.loadmat(mat_path)
            
            # 사람 수 추출 로직 (ShanghaiTech Part_B 데이터셋 구조에 따름)
            # 'image_info' -> [0][0][0][0][0] -> points (N, 2) 배열
            # 이 경로는 데이터셋마다 다를 수 있으므로, 에러 발생 시 mat.keys() 등으로 구조를 확인해야 합니다.
            image_info = mat["image_info"]
            points = image_info[0][0][0][0][0]
            
            person_count = len(points)

            data_list.append({
                'path': img_path,
                'count': person_count
            })

        except Exception as e:
            print(f"경고: .mat 파일 처리 중 오류 발생 ({mat_path}): {e}")
            continue
            
    return data_list

# 사용할 테스트 데이터 기본 경로
test_data_path = '/purestorage/AILAB/AI_4/byko/VLM/dataset/ShanghaiTech_Crowd_Counting_Dataset/part_B_final/test_data/'

# 함수 호출하여 리스트 생성
test_dataset_list = create_image_count_list(test_data_path)

In [70]:
test_dataset_list[0]

{'path': '/purestorage/AILAB/AI_4/byko/VLM/dataset/ShanghaiTech_Crowd_Counting_Dataset/part_B_final/test_data/images/IMG_1.jpg',
 'count': 23}

In [71]:
from datasets import Dataset
test_dataset = Dataset.from_list([make_conv(example['path'], example['count']) for example in test_dataset_list])

In [73]:
test_dataset[0]

{'messages': [{'content': [{'image': None,
     'text': 'You are CrowdCountGPT. When counting people, reply with ONLY the number (no words, no punctuation).',
     'type': 'text'}],
   'role': 'system'},
  {'content': [{'image': None,
     'text': 'Count all the people in the image.',
     'type': 'text'},
    {'image': '/purestorage/AILAB/AI_4/byko/VLM/dataset/ShanghaiTech_Crowd_Counting_Dataset/part_B_final/test_data/images/IMG_1.jpg',
     'text': None,
     'type': 'image'}],
   'role': 'user'},
  {'content': [{'image': None, 'text': '23', 'type': 'text'}],
   'role': 'assistant'}]}

### 합치고 푸시

In [75]:
from datasets import DatasetDict
my_dataset_dict = DatasetDict({
    'train': dataset,
    'test': test_dataset
})

In [76]:
my_dataset_dict

DatasetDict({
    train: Dataset({
        features: ['messages'],
        num_rows: 2000
    })
    test: Dataset({
        features: ['messages'],
        num_rows: 316
    })
})

In [79]:
my_dataset_dict['train']

Dataset({
    features: ['messages'],
    num_rows: 2000
})

In [None]:
from huggingface_hub import login, create_repo


hf_token = ""
login(token=hf_token)

create_repo("ty-kim/crowd_count", repo_type="dataset", exist_ok=True)

# push to hub
my_dataset_dict.push_to_hub("ty-kim/crowd_count") # max_shard_size="500MB"

Uploading the dataset shards:   0%|          | 0/1 [00:00<?, ?it/s]

Creating parquet from Arrow format:   0%|          | 0/2 [00:00<?, ?ba/s]

Uploading the dataset shards:   0%|          | 0/1 [00:00<?, ?it/s]

Creating parquet from Arrow format:   0%|          | 0/1 [00:00<?, ?ba/s]

README.md:   0%|          | 0.00/459 [00:00<?, ?B/s]

CommitInfo(commit_url='https://huggingface.co/datasets/ty-kim/crowd_count/commit/84194efed201ea7a94238e67b4dbbdaee3776f86', commit_message='Upload dataset', commit_description='', oid='84194efed201ea7a94238e67b4dbbdaee3776f86', pr_url=None, repo_url=RepoUrl('https://huggingface.co/datasets/ty-kim/crowd_count', endpoint='https://huggingface.co', repo_type='dataset', repo_id='ty-kim/crowd_count'), pr_revision=None, pr_num=None)

In [None]:

# 결과 출력 (예시)
print(f"\n총 생성된 데이터 항목 수: {len(test_dataset_list)}")
if test_dataset_list:
    print("첫 5개 데이터 항목:")
    for i, item in enumerate(test_dataset_list[:5]):
        print(f"{i+1}: {item}")
else:
    print("생성된 데이터 항목이 없습니다. 경로와 파일 구조를 확인해주세요.")

# 데이터셋의 일부 경로를 확인하여 파일이 실제로 존재하는지 최종 검증
if test_dataset_list:
    example_path = test_dataset_list[0]['path']
    if os.path.exists(example_path):
        print(f"\n예시 이미지 파일이 존재합니다: {example_path}")
    else:
        print(f"\n경고: 예시 이미지 파일이 존재하지 않습니다. 경로를 다시 확인해주세요: {example_path}")

# 데이터셋 리스트는 이제 make_conv 함수에 전달될 수 있는 형식입니다.
# make_conv(item['path'], item['count']) for item in test_dataset_list