In [1]:
import os
import sys
import cv2
import shutil
import math
from os import path
from collections import defaultdict, Counter
from glob import glob
from PIL import Image
from tqdm import tqdm

import matplotlib.pyplot as plt
import matplotlib as mpl

import numpy as np

%load_ext autoreload
%autoreload 2

from visualization.vis_utils import *


# Visualize burst shots for each location in a single 

In [None]:
def visualize_bursts(burst_dir, num_rows=10, num_cols=20, figwidth=20):
    
    bursts = sorted(os.listdir(burst_dir), key=lambda x: int(x[5:]))
    
    num_rows = min(num_rows, len(bursts))
    
    fig, axes = plt.subplots(num_rows, num_cols, figsize=(figwidth, figwidth*num_rows/num_cols))
    if num_rows==1:
        axes = axes.reshape(1, -1)
    
    for i in range(num_rows):
        
        shots = sorted(glob(f"{burst_dir}/{bursts[i]}/*.tiff"))
        
        # Calculate the step size
        col_step = max(1, len(shots)//num_cols)
        
        col_files = []
        for j,file in enumerate(shots[::col_step][:num_cols]): # Slice the shots to fit the number of columns
            img = plt.imread(file)
            axes[i][j].imshow(img[100:400, 200:500])
            col_files.append(os.path.basename(file))

        # set y-label to each row
        axes[i][0].set_ylabel(f"Burst{i+1}", fontsize=8)

    # set x-label to each col
    for j, cfile in enumerate(col_files):
        axes[-1][j].set_xlabel(cfile.split(".")[0], fontsize=8)

    # Hide X and Y axes tick marks
    for ax in axes.flatten():
        ax.set_xticks([])
        ax.set_yticks([])

    fig.suptitle(f"{'_'.join(burst_dir.split('/')[3:])}")
    plt.tight_layout()
    
    return fig, axes
    # plt.show()

    
root_dir = "../data/FS_LIDT_SEPT22/Sample_NF"

for dir_name in sorted(os.listdir(root_dir)):
    
    print(f"Plotting {dir_name=}")
    
    data_dir = f"{root_dir}/{dir_name}"
    
    plot_dir = f"../data/visualization/{dir_name}"
    os.makedirs(plot_dir, exist_ok=True)
    
    
    for row_id in tqdm(sorted(os.listdir(data_dir))):
        
        if not os.path.isdir(f"{data_dir}/{row_id}"):
            continue
            
        for col_id in sorted(os.listdir(f"{data_dir}/{row_id}")):

            fig, axes = visualize_bursts(f"{data_dir}/{row_id}/{col_id}")

            fig.savefig(f"{plot_dir}/{row_id}_{col_id}.png", dpi=500)
            plt.close('all')

# Preprocess raw images - Normalize image in range 0-255 and convert to png

In [None]:
src_data_dir = "../data/FS_LIDT_SEPT22/Sample_NF"
dest_data_dir = "../data/near_field/images"

crop_area = (6, 96, 486, 576) # (top, left, bottom, right)

img_means, img_stds = [], []
for dir_name in sorted(os.listdir(src_data_dir)):
    
    print("Converting to PNG images", dir_name)
    shutil.rmtree(f"{dest_data_dir}/{dir_name}", ignore_errors=True)
    
    for row in sorted(os.listdir(f"{root_dir}/{dir_name}")):
        for col in sorted(os.listdir(f"{root_dir}/{dir_name}/{row}")):
            
            dest_img_dir = f"{dest_data_dir}/{dir_name}/{int(row[3]):02d}_{int(col[3]):02d}"
            os.makedirs(dest_img_dir, exist_ok=True)
            
            for burst in sorted(os.listdir(f"{root_dir}/{dir_name}/{row}/{col}")):
                for shot in sorted(os.listdir(f"{root_dir}/{dir_name}/{row}/{col}/{burst}")):
                    
                    img = cv2.imread(f"{root_dir}/{dir_name}/{row}/{col}/{burst}/{shot}", cv2.IMREAD_UNCHANGED)
                    img = (img-img.min())/(img.max()-img.min()) * 255
    
                    if crop_area is not None:
                        crop_t, crop_l, crop_h, crop_w = crop_area
                        img = img[crop_t:crop_h, crop_l:crop_w]
                    
                    cv2.imwrite(f"{dest_img_dir}/burst{int(burst[5:]):02d}_shot{int(shot.split('.')[0][4:]):03d}.png", img)
                    
                    img_means.append(np.mean(img))
                    img_stds.append(np.std(img))

overall_mean = np.mean(img_means)/255
overall_std =  np.sqrt(np.mean([s**2 for s in img_stds]))/255

print("Overall Mean:", overall_mean)
print("Overall Std:", overall_std)

# Create video from images

In [None]:
src_data_dir = "../data/near_field/images"
dest_data_dir = "../data/visualization/videos"

shutil.rmtree(f"{dest_data_dir}", ignore_errors=True)
os.makedirs(dest_data_dir, exist_ok=True)

for dir_name in sorted(os.listdir(src_data_dir)):
    print(f"Creating videos of {dir_name}")
    
    shutil.rmtree(f"{dest_data_dir}/{dir_name}", ignore_errors=True)
    
    for loc in sorted(os.listdir(f"{src_data_dir}/{dir_name}")):
        
        if len(loc.split('.'))>0:
            continue
            
        create_video_from_images(f"{src_data_dir}/{dir_name}/{loc}", 
                                 f"{dest_data_dir}/{dir_name}_{loc}.avi", 
                                 fps=10, 
                                 display_text=True)
        

# Create GIF from images

In [10]:
data_dir = "../data/near_field/images"
save_dir = "../data/visualization/animations"

for dset in sorted(os.listdir(data_dir)):
    print(f"Creating GIF: {dset}")
    
    shutil.rmtree(f"{save_dir}/{dset}", ignore_errors=True)
    os.makedirs(f"{save_dir}/{dset}", exist_ok=True)

    images = []
    for loc in sorted(os.listdir(f"{data_dir}/{dset}")):
        for img_file in sorted(os.listdir(f"{data_dir}/{dset}/{loc}")):
            if img_file.lower().endswith(('.png')):
                img = Image.open(f"{data_dir}/{dset}/{loc}/{img_file}")
                images.append(img.copy())
                img.close()
                
    # Save the first image as GIF and append the rest
    output_file = f"{save_dir}/{dset}.gif"
    if images:
        images[0].save(output_file, save_all=True, append_images=images[1:], duration=500, loop=0) # duration for each frame in ms, loop=0 mean infifinte loop
        print(f"GIF created successfully: {output_file}")
    else:
        print("No valid images found in the specified folder.")
        
                

Creating GIF: Grating_A6
GIF created successfully: ../data/visualization/animations/Grating_A6.gif
Creating GIF: Grating_SON1
GIF created successfully: ../data/visualization/animations/Grating_SON1.gif
Creating GIF: LASEROPTIK
GIF created successfully: ../data/visualization/animations/LASEROPTIK.gif
Creating GIF: LASEROPTIK_10000
GIF created successfully: ../data/visualization/animations/LASEROPTIK_10000.gif
Creating GIF: LASEROPTIK_SON1
GIF created successfully: ../data/visualization/animations/LASEROPTIK_SON1.gif
Creating GIF: THALES_SESO_10000
GIF created successfully: ../data/visualization/animations/THALES_SESO_10000.gif
Creating GIF: THALES_SESO_A
GIF created successfully: ../data/visualization/animations/THALES_SESO_A.gif


# Plot image/patch intensity

In [None]:

def agg_func(x, mode):
    if mode=='mean':
        return x.mean() if len(x)>0 else 0
    elif mode=='sum':
        return x.sum()
    else:
        raise Exception (f"Mode to compute intensity not valid!")
    
def img_intensity(img, mode='sum'):
    # img = Image.open(file)
    # img = np.asarray(img, dtype="int32")/255
    return agg_func(img, mode)

def img_patch_intensity(img, patch_size=16, mode='sum'):
    # img = Image.open(file)
    # img = np.array(img, dtype='int32')/255
    
    img_height, img_width = img.shape
    num_rows, num_cols = math.ceil(img_height/patch_size), math.ceil(img_width/patch_size)
    
    patch_ints = []
    for y in range(0, img_height, patch_size):
        for x in range(0, img_width, patch_size):
            img_patch = img[y:y+patch_size, x:x+patch_size]
            patch_ints.append(agg_func(img_patch, mode))

    return np.array(patch_ints).reshape(num_rows, num_cols) 

def img_gradient(x):

    left = x
    right = np.pad(x, ((0, 0), (0, 1)))[..., :, 1:]
    top = x
    bottom = np.pad(x, ((0, 1), (0, 0)))[..., 1:, :]

    dx, dy = right - left, bottom - top 
    # dx will always have zeros in the last column, right-left
    # dy will always have zeros in the last row,    bottom-top
    dx[..., :, -1] = 0
    dy[..., -1, :] = 0

    return np.abs(dx) + np.abs(dy)

## Visualize a patch 

In [None]:
%load_ext autoreload
%autoreload 2
    
frame_num = 611
patch_size = 25
px, py = (8, 12) # patch index/location

img_dir = "../data/near_field/images/Grating_A6/01_02"

img_files = sorted(glob(f"{img_dir}/*"))
                   
img = plt.imread(img_files[frame_num])

fig, ax = plt.subplots(1, 1)
ax.imshow(img)
fig.suptitle(f"frame:{frame_num}")

# add grid
ax = add_grid_to_axes(ax, grid_size=patch_size, labelsize=4)

# highlight the patch
rect = mpl.patches.Rectangle((px*patch_size, py*patch_size), 
                             patch_size, patch_size, linewidth=1, 
                             edgecolor='r', facecolor='none')

ax.add_patch(rect)


# Visualize specific patch over time (moving from left to right and top to bottom)

In [None]:
x, y = int(px*patch_size), int(py*patch_size)

img_patches = []
for file in img_files:
    img = plt.imread(file)
    img_patches.append(img[y:y+patch_size, x:x+patch_size])

# visualzie sampled patches across frames (to save time)
sample_step = 2
sampled_img_patches= img_patches[::sample_step]

nrows = int(np.sqrt(len(sampled_img_patches)))
ncols = math.ceil(len(sampled_img_patches)/nrows)

fig, axes = create_figure(nrows, ncols, figwidth=12, axis_off=True)
axes = np.ravel(axes)

for i, patch in enumerate(sampled_img_patches):
    axes[i].imshow(patch, vmin=0, vmax=1) 
    axes[i].set_title(str(i*sample_step), color='r', fontsize=5, va='top')
    

# Plot time-series patch intensity

In [None]:
%load_ext autoreload
%autoreload 2

from visualization.vis_utils import *

colors = np.random.rand(1000, 3)
patch_ints = [patch.sum() for patch in img_patches]

fig, ax = create_figure(figsize=(8,5))

ax.plot(patch_ints, color=colors[0], label='Intensity')
# ax.plot(patch_ints[1:]-patch_ints[:-1], color=colors[0], label='IntensityDiff')

patch_gradients = [img_gradient(patch).sum() for patch in img_patches]
ax.plot(patch_gradients, color=colors[0], label='Gradients')
ax = add_grid_to_axes(ax, grid_size=(5, 20), labelsize=4, minor_ticks=True)


In [None]:
fig, ax = create_figure(figsize=(8,5))
img_gradients = [img_gradient(plt.imread(file)) for file in img_files]
ax.plot(img_gradients)

In [None]:
from visualization.vis_utils import create_figure
%load_ext autoreload
%autoreload 2

import torch

n_images = 10
images = np.random.rand(n_images, 492, 656)
B, H, W = images.shape

num_rows = 1
num_cols = min(n_images, B)
subplot_width = 2
fig, axes = create_figure(num_rows, num_cols, subplot_size=(subplot_width, W/H*subplot_width), axis_off=True)
for i in range(num_cols):
    axes[0, i].imshow(images[i])
    axes[0, i].set_title('class 1', fontsize=10)
    for spine in axes[0, i].spines.values():
        spine.set_edgecolor('red')
        spine.set_linewidth(2) 

## Visualize the generated labels

In [None]:
%load_ext autoreload
%autoreload 2
from datasets.preprocess import create_labels, generate_html
from utils.misc import load_file, save_file

data_dir = '../data/near_field'
dataset = 'Grating_SON1'
create_labels(data_dir, dataset)

dset_labels = []
for f in os.listdir(f'{data_dir}/labels/{dataset}/'):
    if not f.endswith('.txt'):
        continue
    labels = load_file(f'{data_dir}/labels/{dataset}/{f}')

    for img_label in labels:
        img, label = img_label.split(',')
        dset_labels.append(dict(img=img, label=label))

## HTML Visualization

In [None]:
html_data = generate_html(dset_labels, "/home/ubuntu/Projects/clf-laser-damage-prediction/data/near_field/images")
save_file('../data/data_labels.html', html_data)

## 3D Visualization / Animation

In [None]:
import matplotlib
%matplotlib widget

import os
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.animation as animation
from tqdm import tqdm

from visualization.vis_utils import create_figure
from scipy.ndimage import gaussian_filter

def create_3Daxes(figsize=(10, 8), elev=20, azim=45, remove_background=False, remove_axis=False, remove_labels=False, remove_ticks=False):
    
    # Create the 3D plot
    fig, (ax1, ax2) = plt.subplots(1, 2, figsize=figsize, subplot_kw={'projection': None})

    # Make second axes 3d
    ax2 = fig.add_subplot(122, projection='3d')
    
    # Adjust the viewing angle
    ax2.view_init(elev=elev, azim=azim)
    
    # Remove axis labels and ticks
    if remove_labels:
        ax1.set_xlabel('')
        ax1.set_ylabel('')
        
        ax2.set_xlabel('')
        ax2.set_ylabel('')
        ax2.set_zlabel('')

    if remove_ticks:
        ax1.set_xticks([])
        ax1.set_yticks([])
        
        ax2.set_xticks([])
        ax2.set_yticks([])
        ax2.set_zticks([])
        
    # Remove background panes
    if remove_background:
        ax2.xaxis.set_pane_color((1.0, 1.0, 1.0, 0.0))
        ax2.yaxis.set_pane_color((1.0, 1.0, 1.0, 0.0))
        ax2.zaxis.set_pane_color((1.0, 1.0, 1.0, 0.0))
    
    # Remove axis lines
    if remove_axis:
        ax2.xaxis.line.set_color((1.0, 1.0, 1.0, 0.0))
        ax2.yaxis.line.set_color((1.0, 1.0, 1.0, 0.0))
        ax2.zaxis.line.set_color((1.0, 1.0, 1.0, 0.0))
        
    return fig, (ax1, ax2)
    
# Animation function
def animate(frame):
    ax.view_init(elev=20, azim=frame)
    
def xy_grid(width, height, step=1):
    # Create a grid of x and y values
    x = np.arange(0, width, step)
    y = np.arange(0, height, step)
    X, Y = np.meshgrid(x, y)
    return X, Y

data_dir = '../data/near_field'
dset = 'Grating_A6'
loc = '02_01'
img_dir = f"{data_dir}/images/{dset}/{loc}"
plot_dir = f"../data/visualization/3Dplots/{dset}/{loc}"
os.makedirs(plot_dir, exist_ok=True)

for img_file in tqdm(sorted(os.listdir(img_dir))):

    if not img_file.endswith('png'):
        continue

    if img_file!='burst06_shot012.png':
        continue

    img_path = f"{img_dir}/{img_file}"
    image = plt.imread(img_path)[100:400, 100:400]
    # image = gaussian_filter(image, sigma=5)
    
    H, W = image.shape
    X, Y = xy_grid(W, H)
    Z = np.reshape(image, (-1))

    plt.close('all')
    fig, (ax1, ax2) = create_3Daxes(figsize=(8,6), elev=60, remove_background=True, remove_axis=True, remove_labels=True, remove_ticks=True)

    # plot image
    ax1.imshow(image, cmap='viridis')
    
    # Plot the surface
    surface = ax2.plot_surface(X, Y, image, cmap='coolwarm', rstride=5, cstride=5, linewidth=0, antialiased=False)
    
    # Add a color bar
    # fig.colorbar(surface)

    # Add title
    # plt.title(img_file.split('.')[0])

    # Create the animation
    # anim = animation.FuncAnimation(fig, animate, frames=np.linspace(0, 360, 1000), interval=200, blit=True)
    
    # Adjust layout to reduce gaps
    plt.subplots_adjust(left=0, right=1, top=0.9, bottom=0)

    # Show the plot
    # plt.show()
    plt.savefig(f"{plot_dir}/{img_file}")
    
    break

### Visualize multiple images in video

In [None]:
import cv2
import numpy as np
import os


def add_border(img, border_width, border_color):
    return cv2.copyMakeBorder(img, border_width, border_width, border_width, border_width, cv2.BORDER_CONSTANT, value=border_color)

def create_sliding_video(input_folder, output_file, fps, slide_duration, num_images_visible):
    # Get list of PNG files in the input folder
    image_files = [f for f in os.listdir(input_folder) if f.lower().endswith('.png')]
    image_files.sort()  # Ensure files are in order

    # Read the first image to get dimensions
    # first_image = cv2.imread(os.path.join(input_folder, image_files[0]), cv2.IMREAD_GRAYSCALE)
    
    # Define border size (top, bottom, left, right)
    border_width = 5
    border_color = (0, 255, 0)
    height, width = 224, 224

    # Calculate total width for the video frame
    total_width = width * num_images_visible

    # Create VideoWriter object
    fourcc = cv2.VideoWriter_fourcc(*'mp4v')
    out = cv2.VideoWriter(output_file, fourcc, fps, (total_width, height))

    # Calculate number of frames for each slide
    frames_per_slide = int(fps * slide_duration)

    for i in range(len(image_files)):
        # Load current image and next num_images_visible - 1 images
        current_images = []
        for j in range(num_images_visible):
            idx = (i + j) % len(image_files)
            img = cv2.imread(os.path.join(input_folder, image_files[idx]), cv2.IMREAD_GRAYSCALE)
            
            # Add border to the image
            img = cv2.cvtColor(img, cv2.COLOR_GRAY2BGR)
            img = add_border(img, border_width, border_color)
            img = cv2.resize(img, (width, height))
            current_images.append(img)

        # Create sliding effect
        for frame in range(frames_per_slide):
            # Calculate offset for this frame
            offset = int((frame / frames_per_slide) * width)

            # Create a blank canvas
            canvas = np.zeros((height, total_width, 3), dtype=np.uint8)

            # Place images on the canvas with offset
            for j, img in enumerate(current_images):                
                start_x = j * width - offset
                end_x = start_x + width
                if start_x < total_width and end_x > 0:
                    canvas_start = max(0, start_x)
                    img_start = max(0, -start_x)
                    canvas_end = min(total_width, end_x)
                    img_end = img_start + (canvas_end - canvas_start)
                    canvas[:, canvas_start:canvas_end] = img[:, img_start:img_end]

            # Convert grayscale to BGR for video writing
            # frame_bgr = cv2.cvtColor(canvas, cv2.COLOR_GRAY2BGR)
            out.write(canvas)

    # Release the VideoWriter
    out.release()

    print(f"Video saved as {output_file}")

# Example usage
# input_folder = "path/to/your/png/images"
# output_file = "output_video.mp4"
fps = 10
slide_duration = 0.5  # seconds
num_images_visible = 10
input_folder = '../data/near_field/images/Grating_A6/01_01'
output_file = '../data/visualization/sliding_video.avi'
create_sliding_video(input_folder, output_file, fps, slide_duration, num_images_visible)

In [None]:
anim.pause()