<a href="https://colab.research.google.com/github/wavymejti/KursAI1/blob/main/konkurs_przemyt.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
# %%capture
# W przypadku błędu: 'No module named tensorflow', odkomentuj i uruchom:
# !pip install tensorflow opencv-python numpy matplotlib scikit-learn

import tensorflow as tf
from tensorflow.keras import layers, models
import numpy as np
import cv2 # OpenCV do ładowania obrazów
import os
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_squared_error
from google.colab import drive # Do montowania Dysku Google

In [None]:
# --- USTAWIA GŁÓWNY KATALOG Z DANYMI ---
# 1. Montowanie Dysku Google
# Po uruchomieniu tego, kliknij w link i zezwól Colab na dostęp do Twojego Dysku.
drive.mount('/content/drive')

# 2. Ścieżka do folderu z "czystymi pojazdami"
# Zmień tę ścieżkę na właściwą dla Twojego katalogu na Dysku Google.
DATA_DIR = '/content/drive/MyDrive/Twoj_Katalog_Z_Czystymi_Pojazdami/'

# Parametry obrazów RTG
IMG_HEIGHT = 512  # Ustawienie rozsądnej, mniejszej wysokości dla przyspieszenia
IMG_WIDTH = 512   # Ustawienie rozsądnej, mniejszej szerokości
# Pamiętaj: Skalowanie do mniejszego rozmiaru może powodować utratę bardzo małych anomalii.
# Dla 50MB plików, musisz zredukować rozmiar lub użyć zaawansowanego sprzętu (GPU High-RAM).
CHANNELS = 1      # 1 kanał, bo obrazy RTG są zazwyczaj monochromatyczne (grayscale)

def load_and_preprocess_data(data_dir, target_size, channels=1):
    """Ładuje i przetwarza obrazy z folderu."""
    image_paths = [os.path.join(data_dir, f) for f in os.listdir(data_dir)
                   if f.endswith(('.bmp', '.png', '.jpg'))]

    data = []
    print(f"Znaleziono {len(image_paths)} plików.")

    for path in image_paths:
        # Wczytanie obrazu w skali szarości (cv2.IMREAD_GRAYSCALE)
        img = cv2.imread(path, cv2.IMREAD_GRAYSCALE if channels == 1 else cv2.IMREAD_COLOR)

        if img is not None:
            # Skalowanie obrazu do docelowego rozmiaru
            img = cv2.resize(img, target_size, interpolation=cv2.INTER_AREA)
            # Normalizacja do zakresu [0, 1]
            img = img.astype('float32') / 255.0
            data.append(img)
        else:
            print(f"Ostrzeżenie: Nie udało się załadować pliku {path}")

    # Konwersja na tablicę NumPy i dodanie wymiaru kanału/partii (batch)
    data = np.array(data)
    if channels == 1:
        data = np.expand_dims(data, axis=-1)

    return data

# Ładowanie danych
print("--- Ładowanie i przetwarzanie danych ---")
X_clean = load_and_preprocess_data(DATA_DIR, (IMG_WIDTH, IMG_HEIGHT), CHANNELS)

if len(X_clean) == 0:
    print("BŁĄD: Nie znaleziono żadnych plików w katalogu. Sprawdź ścieżkę i rozszerzenia.")
    # Zakończenie skryptu, jeśli brak danych
    # exit()

# Podział na zbiór treningowy i walidacyjny (np. 80% / 20%)
X_train, X_val = train_test_split(X_clean, test_size=0.2, random_state=42)

print(f"Zbiór treningowy: {X_train.shape[0]} obrazów")
print(f"Zbiór walidacyjny: {X_val.shape[0]} obrazów")


# --- BUDOWA MODELU KONWOLUCYJNEGO AUTOENKODERA (CAE) - Architektura U-Net podobna ---

def build_cae(input_shape):
    """Definiuje model Konwolucyjnego Autoenkodera z architekturą U-Net."""

    input_img = tf.keras.Input(shape=input_shape)

    # === ENKODER (Encoder) ===
    # Kompresja danych
    x = layers.Conv2D(32, (3, 3), activation='relu', padding='same')(input_img)
    x = layers.MaxPooling2D((2, 2), padding='same')(x) # 256x256

    x = layers.Conv2D(64, (3, 3), activation='relu', padding='same')(x)
    encoded = layers.MaxPooling2D((2, 2), padding='same')(x) # 128x128

    # Opcjonalne: Użycie dropoutu do zapobiegania nadmiernemu dopasowaniu
    # x = layers.Dropout(0.2)(encoded)

    # === DEKODER (Decoder) ===
    # Rekonstrukcja danych
    x = layers.Conv2D(64, (3, 3), activation='relu', padding='same')(encoded)
    x = layers.UpSampling2D((2, 2))(x) # 256x256

    x = layers.Conv2D(32, (3, 3), activation='relu', padding='same')(x)
    x = layers.UpSampling2D((2, 2))(x) # 512x512

    # Ostatnia warstwa - musi mieć taką samą liczbę kanałów jak wejście (np. 1 dla grayscale)
    decoded = layers.Conv2D(CHANNELS, (3, 3), activation='sigmoid', padding='same')(x)

    # Utworzenie modelu
    autoencoder = models.Model(input_img, decoded)

    # Kompilacja modelu: Mean Squared Error (MSE) jest standardową funkcją straty dla autoenkoderów
    # MSE jest bezpośrednim pomiarem błędu rekonstrukcji
    autoencoder.compile(optimizer='adam', loss='mse')

    return autoencoder

# Budowa modelu
input_shape = (IMG_HEIGHT, IMG_WIDTH, CHANNELS)
cae_model = build_cae(input_shape)

print("\n--- Struktura Modelu Autoenkodera ---")
cae_model.summary()

In [None]:
# --- TRENOWANIE MODELU ---
# Trening: Na wejście i na wyjście modelu podajemy te same "czyste" obrazy
# Model uczy się wiernie rekonstruować tylko obrazy "normalne".

BATCH_SIZE = 32
EPOCHS = 50 # Zwiększ liczbę epok (np. do 100-200) w przypadku małego zbioru danych

print("\n--- Rozpoczynanie Treningu ---")
history = cae_model.fit(X_train, X_train,
                        epochs=EPOCHS,
                        batch_size=BATCH_SIZE,
                        shuffle=True, # Mieszanie danych pomaga w generalizacji
                        validation_data=(X_val, X_val))

# Wizualizacja historii straty (Loss)
plt.figure(figsize=(10, 6))
plt.plot(history.history['loss'], label='Loss Treningowy (MSE)')
plt.plot(history.history['val_loss'], label='Loss Walidacyjny (MSE)')
plt.title('Historia Straty Modelu')
plt.xlabel('Epoka')
plt.ylabel('Loss (MSE)')
plt.legend()
plt.show()

In [None]:
# --- 4. USTALENIE PROGU I WYKRYWANIE ANOMALII ---

# 1. Obliczenie błędu rekonstrukcji dla zbioru walidacyjnego (czyste obrazy)
reconstructions = cae_model.predict(X_val)
# Flattening the images to calculate MSE for each image
mse = np.mean(np.square(X_val - reconstructions), axis=(1, 2, 3))

# 2. Ustalenie progu (Threshold)
# Standardowo: średni błąd + N razy odchylenie standardowe
# Wartość N (np. 2 lub 3) trzeba dopasować eksperymentalnie.
MEAN_MSE = np.mean(mse)
STD_MSE = np.std(mse)
# Ustalenie progu anomalii - błąd wyższy niż ten jest anomalią
ANOMALY_THRESHOLD = MEAN_MSE + 3 * STD_MSE

print(f"\nŚredni Błąd Rekonstrukcji (MSE) w zbiorze walidacyjnym: {MEAN_MSE:.6f}")
print(f"Odchylenie Standardowe Błędu (STD): {STD_MSE:.6f}")
print(f"Ustalony Próg Anomalii (Threshold): {ANOMALY_THRESHOLD:.6f}")


# --- 5. FUNKCJA WIZUALIZACJI I TESTOWANIA ---

def visualize_anomaly(image, reconstruction, threshold):
    """Wizualizuje obraz wejściowy, rekonstrukcję i mapę błędu (anomalie)."""

    # Obliczenie mapy błędu bezwzględnego (im jaśniej, tym większy błąd/anomalia)
    error_map = np.mean(np.abs(image - reconstruction), axis=-1)

    # Normalizacja mapy błędu do zakresu [0, 1] dla wizualizacji
    normalized_error = error_map / np.max(error_map)

    # Prosty alarm na podstawie maksymalnego błędu w całym obrazie
    overall_mse = mean_squared_error(image.flatten(), reconstruction.flatten())
    is_anomaly = overall_mse > threshold

    fig, axes = plt.subplots(1, 3, figsize=(15, 5))

    # Obraz wejściowy
    axes[0].imshow(image.squeeze(), cmap='gray')
    axes[0].set_title(f"1. Obraz Wejściowy (Czyszczony)")

    # Obraz zrekonstruowany
    axes[1].imshow(reconstruction.squeeze(), cmap='gray')
    axes[1].set_title("2. Rekonstrukcja Modelu")

    # Mapa Błędu (Anomalii)
    im = axes[2].imshow(error_map, cmap='hot', vmin=0, vmax=np.max(error_map))
    axes[2].set_title(f"3. Mapa Błędu (Alarm: {is_anomaly})")
    fig.colorbar(im, ax=axes[2])

    # Dodanie obramowania w zależności od alarmu
    fig.suptitle(f"WYNIK ANALIZY: {overall_mse:.6f} (Próg: {threshold:.6f})",
                 color='red' if is_anomaly else 'green', fontsize=14, fontweight='bold')

    plt.show()
    return is_anomaly

# --- TESTOWANIE NA PRZYKŁADACH ---

# TEST 1: Czysty obraz (powinien być False)
print("\n--- Test na Czystym Obrazie (z Walidacyjnego) ---")
idx_clean = np.random.randint(0, len(X_val))
clean_image = X_val[idx_clean]
clean_reconstruction = cae_model.predict(np.expand_dims(clean_image, axis=0))[0]
visualize_anomaly(clean_image, clean_reconstruction, ANOMALY_THRESHOLD)


# TEST 2: Obraz z dodatkiem (Anomalią)
# Musisz wczytać obraz, na którym celowo dodałeś 'niebezpieczny przedmiot'
# ZMIEN ŚCIEŻKĘ PONIŻEJ DO SWOJEGO OBRAZU Z ANOMALIĄ
TEST_ANOMALY_PATH = '/content/drive/MyDrive/Twoj_Katalog_Testowy_Z_Anomalia/anomalia.bmp'

# Ta sama funkcja ładowania, ale dla pojedynczego pliku
def load_single_test_image(path, target_size, channels):
    img = cv2.imread(path, cv2.IMREAD_GRAYSCALE if channels == 1 else cv2.IMREAD_COLOR)
    if img is None:
        return None
    img = cv2.resize(img, target_size, interpolation=cv2.INTER_AREA)
    img = img.astype('float32') / 255.0
    if channels == 1:
        img = np.expand_dims(img, axis=-1)
    return np.expand_dims(img, axis=0)

test_anomaly_data = load_single_test_image(TEST_ANOMALY_PATH, (IMG_WIDTH, IMG_HEIGHT), CHANNELS)

if test_anomaly_data is not None:
    print("\n--- Test na Obrazie Z Anomalią ---")
    anomaly_image = test_anomaly_data[0]
    anomaly_reconstruction = cae_model.predict(test_anomaly_data)[0]
    visualize_anomaly(anomaly_image, anomaly_reconstruction, ANOMALY_THRESHOLD)
else:
    print(f"\nBŁĄD: Nie udało się wczytać testowego obrazu z anomalią z ścieżki: {TEST_ANOMALY_PATH}")