In [1]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [2]:
import tensorflow as tf

device_name = tf.test.gpu_device_name()

if device_name == '/device:GPU:0':
    print('Found GPU at: {}'.format(device_name))
else:
    raise SystemError('GPU device not found')

Found GPU at: /device:GPU:0


In [3]:
import torch

if torch.cuda.is_available():
    device = torch.device("cuda")
else:
    print('No GPU available, using the CPU instead.')
    device = torch.device("cpu")

In [4]:
!pip install transformers



In [25]:
import json

# JSON 파일 읽기 및 데이터 추출
def extract_captions_labels(json_file_path, num_samples = None):
    with open(json_file_path, "r") as json_file:
        data = json.load(json_file)

    captions_labels = []
    annotations = data["annotations"]

    if num_samples is not None:
        annotations = annotations[:num_samples]

    for annotation in annotations:
        caption = annotation["caption"]
        label = annotation["danger_score"]
        captions_labels.append((caption, label))

    return captions_labels

#데이터 추출
file1_captions_labels = extract_captions_labels("/content/drive/MyDrive/train_abnormal_dataset.json")

captions_labels = file1_captions_labels

print(len(captions_labels))

captions_labels = list(set(captions_labels))

print(len(captions_labels))

2091
1900


In [26]:
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim

# 데이터 분리
sentences, labels = zip(*captions_labels)

In [7]:
from transformers import BertTokenizer

print('Loading BERT tokenizer...')
tokenizer = BertTokenizer.from_pretrained('bert-base-uncased', do_lower_case=True)

Loading BERT tokenizer...


The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


tokenizer_config.json:   0%|          | 0.00/48.0 [00:00<?, ?B/s]

vocab.txt:   0%|          | 0.00/232k [00:00<?, ?B/s]

tokenizer.json:   0%|          | 0.00/466k [00:00<?, ?B/s]

config.json:   0%|          | 0.00/570 [00:00<?, ?B/s]

In [8]:
# 0번 문장으로 테스트
print(' Original: ', sentences[0])

print('Tokenized: ', tokenizer.tokenize(sentences[0]))

print('Token IDs: ', tokenizer.convert_tokens_to_ids(tokenizer.tokenize(sentences[0])))

 Original:  A man wearing jeans is kicking a man wearing black pants in the warehouse.
Tokenized:  ['a', 'man', 'wearing', 'jeans', 'is', 'kicking', 'a', 'man', 'wearing', 'black', 'pants', 'in', 'the', 'warehouse', '.']
Token IDs:  [1037, 2158, 4147, 6312, 2003, 10209, 1037, 2158, 4147, 2304, 6471, 1999, 1996, 9746, 1012]


In [9]:
max_len = 0

for sent in sentences:

    input_ids = tokenizer.encode(sent, add_special_tokens=True)

    max_len = max(max_len, len(input_ids))

print('Max sentence length: ', max_len)

Max sentence length:  35


In [10]:
input_ids = []
attention_masks = []

for sent in sentences:

    encoded_dict = tokenizer.encode_plus(
                        sent,
                        add_special_tokens = True,
                        max_length = 64,
                        pad_to_max_length = True,
                        return_attention_mask = True,
                        return_tensors = 'pt',
                   )

    input_ids.append(encoded_dict['input_ids'])
    attention_masks.append(encoded_dict['attention_mask'])


input_ids = torch.cat(input_ids, dim=0)
attention_masks = torch.cat(attention_masks, dim=0)
labels = torch.tensor(labels)

print('Original: ', sentences[0])
print('Token IDs:', input_ids[0])

Truncation was not explicitly activated but `max_length` is provided a specific value, please use `truncation=True` to explicitly truncate examples to max length. Defaulting to 'longest_first' truncation strategy. If you encode pairs of sequences (GLUE-style) with the tokenizer you can select this strategy more precisely by providing a specific strategy to `truncation`.


Original:  A man wearing jeans is kicking a man wearing black pants in the warehouse.
Token IDs: tensor([  101,  1037,  2158,  4147,  6312,  2003, 10209,  1037,  2158,  4147,
         2304,  6471,  1999,  1996,  9746,  1012,   102,     0,     0,     0,
            0,     0,     0,     0,     0,     0,     0,     0,     0,     0,
            0,     0,     0,     0,     0,     0,     0,     0,     0,     0,
            0,     0,     0,     0,     0,     0,     0,     0,     0,     0,
            0,     0,     0,     0,     0,     0,     0,     0,     0,     0,
            0,     0,     0,     0])


In [11]:
from torch.utils.data import TensorDataset, random_split

dataset = TensorDataset(input_ids, attention_masks, labels)

train_size = int(0.95 * len(dataset))
val_size = len(dataset) - train_size

train_dataset, val_dataset = random_split(dataset, [train_size, val_size])

print('{:>5,} training samples'.format(train_size))
print('{:>5,} validation samples'.format(val_size))

1,805 training samples
   95 validation samples


In [12]:
from torch.utils.data import DataLoader, RandomSampler, SequentialSampler

batch_size = 32

train_dataloader = DataLoader(
            train_dataset,
            sampler = RandomSampler(train_dataset),
            batch_size = batch_size
        )

validation_dataloader = DataLoader(
            val_dataset,
            sampler = SequentialSampler(val_dataset),
            batch_size = batch_size
        )

In [13]:
from transformers import BertForSequenceClassification, AdamW, BertConfig

model = BertForSequenceClassification.from_pretrained(
    "bert-base-uncased", # Use the 12-layer BERT model, with an uncased vocab.
    num_labels = 8,
    output_attentions = False,
    output_hidden_states = False,
)

model.cuda()

model.safetensors:   0%|          | 0.00/440M [00:00<?, ?B/s]

Some weights of BertForSequenceClassification were not initialized from the model checkpoint at bert-base-uncased and are newly initialized: ['classifier.bias', 'classifier.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


BertForSequenceClassification(
  (bert): BertModel(
    (embeddings): BertEmbeddings(
      (word_embeddings): Embedding(30522, 768, padding_idx=0)
      (position_embeddings): Embedding(512, 768)
      (token_type_embeddings): Embedding(2, 768)
      (LayerNorm): LayerNorm((768,), eps=1e-12, elementwise_affine=True)
      (dropout): Dropout(p=0.1, inplace=False)
    )
    (encoder): BertEncoder(
      (layer): ModuleList(
        (0-11): 12 x BertLayer(
          (attention): BertAttention(
            (self): BertSdpaSelfAttention(
              (query): Linear(in_features=768, out_features=768, bias=True)
              (key): Linear(in_features=768, out_features=768, bias=True)
              (value): Linear(in_features=768, out_features=768, bias=True)
              (dropout): Dropout(p=0.1, inplace=False)
            )
            (output): BertSelfOutput(
              (dense): Linear(in_features=768, out_features=768, bias=True)
              (LayerNorm): LayerNorm((768,), eps=1e

In [14]:
optimizer = AdamW(model.parameters(),
                  lr = 2e-5,
                  eps = 1e-8
                )




In [15]:
from transformers import get_linear_schedule_with_warmup

epochs = 5

total_steps = len(train_dataloader) * epochs

scheduler = get_linear_schedule_with_warmup(optimizer,
                                            num_warmup_steps = 0,
                                            num_training_steps = total_steps)

In [16]:
import numpy as np

def flat_accuracy(preds, labels):
    pred_flat = np.argmax(preds, axis=1).flatten()
    labels_flat = labels.flatten()
    return np.sum(pred_flat == labels_flat) / len(labels_flat)

In [17]:
import time
import datetime

def format_time(elapsed):
    '''
    Takes a time in seconds and returns a string hh:mm:ss
    '''
    elapsed_rounded = int(round((elapsed)))

    return str(datetime.timedelta(seconds=elapsed_rounded))


In [18]:
import random
import numpy as np
import time
import torch

# Seed 설정
seed_val = 42
random.seed(seed_val)
np.random.seed(seed_val)
torch.manual_seed(seed_val)
torch.cuda.manual_seed_all(seed_val)

training_stats = []  # 학습 통계 저장
total_t0 = time.time()  # 전체 학습 시간 측정 시작

# Epoch 반복
for epoch_i in range(epochs):
    print(f"\n======== Epoch {epoch_i + 1} / {epochs} ========")
    print("Training...")

    t0 = time.time()  # Epoch 시작 시간
    total_train_loss = 0

    model.train()  # 모델을 학습 모드로 전환

    # Batch 반복
    for step, batch in enumerate(train_dataloader):
        # 진행 상황 출력
        if step % 40 == 0 and step > 0:
            elapsed = format_time(time.time() - t0)
            print(f"  Batch {step:>5} of {len(train_dataloader)}. Elapsed: {elapsed}.")

        # Batch 데이터 준비
        b_input_ids = batch[0].to(device)
        b_input_mask = batch[1].to(device)
        b_labels = batch[2].to(device)

        # 그래디언트 초기화
        model.zero_grad()

        # Forward pass
        outputs = model(
            b_input_ids,
            attention_mask=b_input_mask,
            labels=b_labels,
            return_dict=True
        )
        loss = outputs.loss
        logits = outputs.logits

        # Loss 축적
        total_train_loss += loss.item()

        # Backward pass
        loss.backward()

        # 그래디언트 클리핑
        torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0)

        # 옵티마이저 업데이트
        optimizer.step()
        scheduler.step()

    # Epoch 종료 후 평균 손실 계산
    avg_train_loss = total_train_loss / len(train_dataloader)
    training_time = format_time(time.time() - t0)

    print(f"\n  Average training loss: {avg_train_loss:.2f}")
    print(f"  Training epoch took: {training_time}")



Training...
  Batch    40 of 57. Elapsed: 0:00:14.

  Average training loss: 1.48
  Training epoch took: 0:00:19

Training...
  Batch    40 of 57. Elapsed: 0:00:12.

  Average training loss: 0.92
  Training epoch took: 0:00:18

Training...
  Batch    40 of 57. Elapsed: 0:00:13.

  Average training loss: 0.63
  Training epoch took: 0:00:19

Training...
  Batch    40 of 57. Elapsed: 0:00:13.

  Average training loss: 0.48
  Training epoch took: 0:00:19

Training...
  Batch    40 of 57. Elapsed: 0:00:13.

  Average training loss: 0.41
  Training epoch took: 0:00:19


In [19]:
model.save_pretrained('/content/drive/MyDrive/bert')
tokenizer.save_pretrained('/content/drive/MyDrive/bert')

('/content/drive/MyDrive/bert/tokenizer_config.json',
 '/content/drive/MyDrive/bert/special_tokens_map.json',
 '/content/drive/MyDrive/bert/vocab.txt',
 '/content/drive/MyDrive/bert/added_tokens.json')

## 데이터셋 클래스 별 F1 Score 계산

In [37]:
import json
from collections import defaultdict
import torch
from torch.nn import functional as F
from transformers import BertForSequenceClassification, BertTokenizer
from sklearn.metrics import precision_score, recall_score, f1_score, accuracy_score

# 모델 및 토크나이저 로드
model = BertForSequenceClassification.from_pretrained('/content/drive/MyDrive/bert')
tokenizer = BertTokenizer.from_pretrained('/content/drive/MyDrive/bert')

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)

# JSON 파일 읽기 및 데이터 추출 함수
def extract_captions_labels(json_file_path):
    with open(json_file_path, "r") as json_file:
        data = json.load(json_file)

    captions_labels = []
    annotations = data["annotations"]
    images = {image["id"]: image["file_name"] for image in data["images"]}

    for annotation in annotations:
        image_id = annotation["image_id"]
        caption = annotation["caption"]
        label = annotation["danger_score"]
        file_name = images[image_id]
        captions_labels.append((file_name, caption, label))

    return captions_labels

# 데이터 추출
file1_captions_labels = extract_captions_labels("/content/drive/MyDrive/train_abnormal_dataset.json")

# 클래스별 데이터 분리
class_to_data = defaultdict(list)

for file_name, caption, label in file1_captions_labels:
    class_name = file_name.split("000")[0]  # "fire00000001.png" -> "fire"
    class_to_data[class_name].append((caption, label))

# 라벨 맵 정의
label_map = {0: 'Label_0(정상)', 1: 'Label_1(위험)', 2: 'Label_2(위험)',
             3: 'Label_3(위험)', 4: 'Label_4(위험)', 5: 'Label_5(위험)',
             6: 'Label_6(위험)', 7: 'Label_7(위험)'}

# 클래스별 성능 평가 및 예측
metrics_per_class = {}
mismatched_samples_per_class = defaultdict(list)  # 클래스별로 정답과 예측이 다른 샘플 저장

for class_name, data in class_to_data.items():
    sentences, true_labels = zip(*data)  # 캡션과 라벨 분리

    # Tokenizer로 입력 데이터 준비
    inputs = tokenizer(list(sentences), padding=True, truncation=True, return_tensors="pt")
    input_ids = inputs["input_ids"].to(device)
    attention_mask = inputs["attention_mask"].to(device)

    # 모델 예측
    model.eval()
    with torch.no_grad():
        outputs = model(input_ids=input_ids, attention_mask=attention_mask)
        logits = outputs.logits
        predicted_labels = torch.argmax(logits, dim=1).cpu().numpy()

    # 정답과 예측 비교하여 오차 샘플 추출
    for text, true_label, pred_label in zip(sentences, true_labels, predicted_labels):
        if pred_label != true_label:
            mismatched_samples_per_class[class_name].append({
                "Input": text,
                "True Label": label_map[true_label],
                "Predicted Label": label_map[pred_label]
            })

    # 클래스별 성능 평가
    precision = precision_score(true_labels, predicted_labels, average='weighted', zero_division=0)
    recall = recall_score(true_labels, predicted_labels, average='weighted', zero_division=0)
    f1 = f1_score(true_labels, predicted_labels, average='weighted', zero_division=0)
    accuracy = accuracy_score(true_labels, predicted_labels)

    metrics_per_class[class_name] = {
        "precision": precision,
        "recall": recall,
        "f1_score": f1,
        "accuracy": accuracy
    }

# 클래스별 성능 출력
print("\nClassification Metrics per Class:")
for class_name, metrics in metrics_per_class.items():
    print(f"Class: {class_name}")
    print(f"  Accuracy: {metrics['accuracy']:.4f}")
    print(f"  Precision: {metrics['precision']:.4f}")
    print(f"  Recall: {metrics['recall']:.4f}")
    print(f"  F1 Score: {metrics['f1_score']:.4f}")


Classification Metrics per Class:
Class: fire
  Accuracy: 0.8703
  Precision: 0.8705
  Recall: 0.8703
  F1 Score: 0.8675
Class: traffic_accident
  Accuracy: 0.9623
  Precision: 0.9646
  Recall: 0.9623
  F1 Score: 0.9581
Class: fight
  Accuracy: 0.9567
  Precision: 0.9586
  Recall: 0.9567
  F1 Score: 0.9536
Class: caution
  Accuracy: 0.9541
  Precision: 0.9531
  Recall: 0.9541
  F1 Score: 0.9531
Class: fall
  Accuracy: 0.8328
  Precision: 0.8300
  Recall: 0.8328
  F1 Score: 0.8229


## 정답과 예측 값 비교 및 정답과 예측이 다른 캡션 추출

In [47]:
# 정답과 예측이 다른 샘플 통합
mismatched_samples = []  # 정답과 예측이 다른 샘플 저장

for class_name, mismatched_samples_in_class in mismatched_samples_per_class.items():
    for sample in mismatched_samples_in_class:
        mismatched_samples.append({
            "Class": class_name,
            "Input": sample["Input"],
            "True Label": sample["True Label"],
            "Predicted Label": sample["Predicted Label"]
        })

# 정답과 예측이 다른 샘플 출력
print("\nMismatched Samples:")
for idx, sample in enumerate(mismatched_samples, 1):
    print(f"[{idx}] Input: {sample['Input']}")
    print(f"  Class: {sample['Class']}")
    print(f"  True Label: {sample['True Label']} | Predicted Label: {sample['Predicted Label']}")
    print("=" * 50)

# 정답과 예측이 다른 샘플 총 개수 출력
print(f"\nTotal Mismatched Samples: {len(mismatched_samples)}")



Mismatched Samples:
[1] Input: A fire broke out on the left side of the highway where cars were passing by.
  Class: fire
  True Label: Label_7(위험) | Predicted Label: Label_6(위험)
[2] Input: The building behind the black car on the road was engulfed in fire.
  Class: fire
  True Label: Label_7(위험) | Predicted Label: Label_6(위험)
[3] Input: There is a fire in the house and there are two people on the other side of the street.
  Class: fire
  True Label: Label_6(위험) | Predicted Label: Label_7(위험)
[4] Input: A fire is breaking out on a road with running cars.
  Class: fire
  True Label: Label_7(위험) | Predicted Label: Label_6(위험)
[5] Input: A big fire is burning behind the tree, and people are taking refuge in front of the white car and next to the tree.
  Class: fire
  True Label: Label_7(위험) | Predicted Label: Label_6(위험)
[6] Input: A woman in a white coat stands outside a building on fire.
  Class: fire
  True Label: Label_7(위험) | Predicted Label: Label_6(위험)
[7] Input: There's a fire bu

## 전체 데이터셋 F1 Score 계산

In [44]:
import numpy as np

# 전체 데이터 결합
all_sentences = []
all_true_labels = []

for class_name, data in class_to_data.items():
    sentences, true_labels = zip(*data)
    all_sentences.extend(sentences)
    all_true_labels.extend(true_labels)

# Tokenizer로 전체 데이터 변환
inputs = tokenizer(list(all_sentences), padding=True, truncation=True, return_tensors="pt")
input_ids = inputs["input_ids"].to(device)
attention_mask = inputs["attention_mask"].to(device)

# 모델 예측
model.eval()
with torch.no_grad():
    outputs = model(input_ids=input_ids, attention_mask=attention_mask)
    logits = outputs.logits
    all_predicted_labels = torch.argmax(logits, dim=1).cpu().numpy()

# 전체 성능 평가 (weighted average)
precision = precision_score(all_true_labels, all_predicted_labels, average='weighted')
recall = recall_score(all_true_labels, all_predicted_labels, average='weighted')
f1 = f1_score(all_true_labels, all_predicted_labels, average='weighted')
accuracy = accuracy_score(all_true_labels, all_predicted_labels)

# 결과 출력
print("\nOverall Classification Metrics:")
print(f"  Accuracy: {accuracy:.4f}")
print(f"  Precision: {precision:.4f}")
print(f"  Recall: {recall:.4f}")
print(f"  F1 Score: {f1:.4f}")



Overall Classification Metrics:
  Accuracy: 0.9149
  Precision: 0.9026
  Recall: 0.9149
  F1 Score: 0.9072


  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))
