In [None]:
!pip install taichi

Collecting taichi
  Downloading taichi-1.7.3-cp311-cp311-manylinux_2_27_x86_64.whl.metadata (12 kB)
Collecting colorama (from taichi)
  Downloading colorama-0.4.6-py2.py3-none-any.whl.metadata (17 kB)
Downloading taichi-1.7.3-cp311-cp311-manylinux_2_27_x86_64.whl (55.0 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m55.0/55.0 MB[0m [31m11.4 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading colorama-0.4.6-py2.py3-none-any.whl (25 kB)
Installing collected packages: colorama, taichi
Successfully installed colorama-0.4.6 taichi-1.7.3


In [None]:
import taichi as ti
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.animation as animation
import os

# Инициализация Taichi
ti.init(arch=ti.gpu) # Используем GPU, если доступно, иначе CPU

# Параметры сетки и физические константы
domain_nx = 90  # Размер области по X
domain_ny = 60  # Размер области по Y
h = 0.1         # Пространственный шаг решетки (размер ячейки)

# Соответственно, количество ячеек сетки
nx = domain_nx
ny = domain_ny

c_val = 1.0       # Скорость распространения волн в вакууме/воздухе
dt = h / (c_val * 2.0) # Временной шаг

# Коэффициент kappa для вакуума/воздуха
kappa_vacuum = c_val * dt / h

# Показатели преломления (n = kappa_vacuum / kappa_material)
n_r, n_g, n_b = 1.30, 1.35, 1.40
kappa_r = kappa_vacuum / n_r
kappa_g = kappa_vacuum / n_g
kappa_b = kappa_vacuum / n_b

# Параметры источника
source_pos_x, source_pos_y = 20, 10   # Центр источника (в ячейках)
# imp_freq_spatial = 0.3             # Больше не используется напрямую для этого типа источника
imp_trans_width = 1                   # Может использоваться для определения "толщины" дуги источника
                                      # или радиуса области инжекции

# Угловой диапазон источника (в градусах, отсчет от +X против часовой стрелки)
source_angle_start_deg = 85.0
source_angle_end_deg = 95.0

# Параметры для временной огибающей импульса источника
# Используем временную частоту, а не пространственную
temporal_frequency = 0.5 # Частота волны во времени (можно подобрать)
source_angular_freq = 2.0 * np.pi * temporal_frequency # omega = 2*pi*f

source_time_center = 30.0 * dt # Центральное время импульса
source_time_sigma = 10.0 * dt  # "Ширина" Гауссовой огибающей во времени

source_duration_steps = int(source_time_center + 3 * source_time_sigma / dt)

# Радиус небольшой области вокруг источника, где будем инжектировать
source_injection_radius = 2.0

# Геометрия объектов
mirror_p1_x, mirror_p1_y = 10, 30
mirror_p2_x, mirror_p2_y = 30, 50

# Линза
lens_c1_x, lens_c1_y, lens_c1_r = 70, 30, 15
lens_c2_x, lens_c2_y, lens_c2_r = 90, 30, 25
# Экран
screen_x = 80
screen_y_start, screen_y_end = 5, 55

num_colors = 3

# Taichi поля
u = ti.field(dtype=ti.f32, shape=(num_colors, 3, ny, nx))
kappa_map = ti.field(dtype=ti.f32, shape=(num_colors, ny, nx))
lens_mask = ti.field(dtype=ti.i32, shape=(ny, nx))
mirror_mask = ti.field(dtype=ti.i32, shape=(ny, nx))
accumulated_intensity = ti.field(dtype=ti.f32, shape=(num_colors, ny, nx))
screen_data = ti.field(dtype=ti.f32, shape=(num_colors, ny))

# Параметры симуляции
total_time_steps = 400
accumulation_factor = 0.05

# Директории для сохранения анимаций
output_dir = "wave_simulation_output_v2" # Новая директория для измененной симуляции
os.makedirs(output_dir, exist_ok=True)
os.makedirs(os.path.join(output_dir, "rgb_wave_frames"), exist_ok=True)
os.makedirs(os.path.join(output_dir, "accumulated_frames"), exist_ok=True)

[Taichi] Starting on arch=cuda


In [None]:
@ti.func
def laser_wave_impulse_func(coord_prop: ti.f32, coord_trans: ti.f32,
                            freq_spatial: ti.f32, sigma_prop: ti.f32,
                            trans_width: ti.f32) -> ti.f32:
    d_prop = coord_prop / sigma_prop
    envelope_prop = ti.exp(-0.5 * (d_prop**2))
    wave = ti.cos(freq_spatial * coord_prop)
    return envelope_prop * wave

@ti.func
def source_temporal_profile(current_time_step: ti.i32, # текущий временной шаг симуляции
                             dt_val: ti.f32,           # значение dt
                             center_time: ti.f32,      # центр импульса во времени
                             sigma_time: ti.f32,       # ширина импульса во времени
                             angular_freq: ti.f32      # угловая частота omega
                             ) -> ti.f32:
    """
    Временной профиль источника: Гауссова огибающая, модулирующая синусоиду.
    """
    t = ti.cast(current_time_step, ti.f32) * dt_val
    # Гауссова огибающая во времени
    time_envelope = ti.exp(-0.5 * ((t - center_time) / sigma_time)**2)
    # Синусоидальная волна (можно использовать cos или sin)
    # Чтобы волна "выходила" из источника, фаза должна зависеть от времени
    wave_part = ti.sin(angular_freq * t) # или ti.cos(angular_freq * (t - center_time))
    return time_envelope * wave_part

@ti.kernel
def init_fields():
    """Инициализация полей u, kappa_map, масок и аккумулятора."""
    for y, x in ti.ndrange(ny, nx):
        # --- Маска зеркала ---
        is_mirror = 0

        dx_m = mirror_p2_x - mirror_p1_x
        dy_m = mirror_p2_y - mirror_p1_y

        if dx_m == 0 and dy_m == 0:
            if x == mirror_p1_x and y == mirror_p1_y:
                is_mirror = 1
        elif ti.abs(dx_m) >= ti.abs(dy_m):
            # Итерируем по x
            if (mirror_p1_x <= x <= mirror_p2_x) or (mirror_p2_x <= x <= mirror_p1_x):
                if dx_m != 0:
                    y_on_line_float = mirror_p1_y + dy_m * (ti.cast(x, ti.f32) - mirror_p1_x) / dx_m
                    if ti.round(y_on_line_float) == y: # Если ячейка (x,y) точно на линии
                         is_mirror = 1

        else:
             # Итерируем по y
            if (mirror_p1_y <= y <= mirror_p2_y) or (mirror_p2_y <= y <= mirror_p1_y):
                if dy_m != 0: # Не горизонтальная линия
                    x_on_line_float = mirror_p1_x + dx_m * (ti.cast(y, ti.f32) - mirror_p1_y) / dy_m
                    if ti.round(x_on_line_float) == x:
                        is_mirror = 1

        # Дополнительная проверка, что точка лежит в прямоугольнике, описывающем отрезок
        min_x_m = ti.min(mirror_p1_x, mirror_p2_x)
        max_x_m = ti.max(mirror_p1_x, mirror_p2_x)
        min_y_m = ti.min(mirror_p1_y, mirror_p2_y)
        max_y_m = ti.max(mirror_p1_y, mirror_p2_y)
        if not (min_x_m <= x <= max_x_m and min_y_m <= y <= max_y_m):
            is_mirror = 0 # Если точка не на линии или вне описывающего прямоугольника

        mirror_mask[y, x] = is_mirror

        # --- Маска линзы (полумесяц) ---
        is_lens = 0
        # Расстояния до центров окружностей
        dist_c1_sq = (ti.cast(x, ti.f32) - lens_c1_x)**2 + (ti.cast(y, ti.f32) - lens_c1_y)**2
        dist_c2_sq = (ti.cast(x, ti.f32) - lens_c2_x)**2 + (ti.cast(y, ti.f32) - lens_c2_y)**2

        # Условие: точка внутри меньшего круга C1 И снаружи большего круга C2
        if (dist_c1_sq < lens_c1_r**2) and (dist_c2_sq >= lens_c2_r**2):
            is_lens = 1
        # else: is_lens остается 0


        lens_mask[y, x] = is_lens

        for c in ti.static(range(num_colors)):
            u[c, 0, y, x] = 0.0
            u[c, 1, y, x] = 0.0
            u[c, 2, y, x] = 0.0
            accumulated_intensity[c, y, x] = 0.0

            current_kappa = kappa_vacuum
            if is_lens == 1:
                if c == 0: current_kappa = kappa_r
                elif c == 1: current_kappa = kappa_g
                else: current_kappa = kappa_b
            kappa_map[c, y, x] = current_kappa

    for c, y_s in ti.ndrange(num_colors, ny):
        screen_data[c, y_s] = 0.0

@ti.kernel
def inject_continuous_source(t_current_step: ti.i32):
    """
    Непрерывная инжекция источника на его границе (y = source_pos_y).
    Вызывается на каждом временном шаге, пока t_current_step < source_duration_steps.
    """
    if t_current_step < source_duration_steps:
        source_val = source_temporal_profile(
            t_current_step,
            dt, # глобальная переменная dt
            source_time_center,
            source_time_sigma,
            source_angular_freq
        )

        start_x_beam = source_pos_x - ti.floor((imp_trans_width - 1) / 2.0)
        # end_x_beam = source_pos_x + ti.floor(imp_trans_width / 2.0) # не используется в цикле ниже

        for x_offset_idx in range(imp_trans_width):
            current_x_beam = ti.cast(start_x_beam + x_offset_idx, ti.i32)

            if 0 <= current_x_beam < nx:

                if mirror_mask[source_pos_y, current_x_beam] == 0: # Проверка, что не на зеркале
                    for c_idx in ti.static(range(num_colors)):
                        u[c_idx, 0, source_pos_y, current_x_beam] = source_val


@ti.kernel
def inject_initial_pulse():
    start_x_beam = source_pos_x - ti.floor((imp_trans_width - 1) / 2.0)


    for y_coord_idx, x_offset_idx in ti.ndrange(ny, imp_trans_width):
        current_x_beam = ti.cast(start_x_beam + x_offset_idx, ti.i32)

        if 0 <= current_x_beam < nx:
            if y_coord_idx >= source_pos_y:
                # Значение для u_current (t=0) и u_future (t=dt)
                # coord_prop_current отсчитывается от y_coord_idx
                val_current = laser_wave_impulse_func(
                    ti.cast(y_coord_idx - source_pos_y, ti.f32),
                    0.0,
                    imp_freq_spatial,
                    imp_sigma_prop,
                    ti.cast(imp_trans_width, ti.f32)
                )

                y_rel_current = ti.cast(y_coord_idx - source_pos_y, ti.f32)
                val_u1 = laser_wave_impulse_func(
                    y_rel_current, 0.0, imp_freq_spatial, imp_sigma_prop, ti.cast(imp_trans_width, ti.f32)
                )

                spatial_shift_for_past = kappa_vacuum * c_val

                y_rel_past = y_rel_current + h/2.0
                val_u2 = laser_wave_impulse_func(
                    y_rel_past, 0.0, imp_freq_spatial, imp_sigma_prop, ti.cast(imp_trans_width, ti.f32)
                )

                for c_idx in ti.static(range(num_colors)):

                    u[c_idx, 1, y_coord_idx, current_x_beam] = val_u1  # u_current (t=0)
                    u[c_idx, 2, y_coord_idx, current_x_beam] = val_u2  # u_past (t=-dt)


@ti.kernel
def propagate_step():
    """Один шаг распространения волны."""
    # Сдвиг во времени: u_past = u_current, u_current = u_future
    for c, y, x in ti.ndrange(num_colors, ny, nx):
        u[c, 2, y, x] = u[c, 1, y, x]
        u[c, 1, y, x] = u[c, 0, y, x]


    for c, y, x in ti.ndrange(num_colors, ny, nx):
        if 0 < y < ny - 1 and 0 < x < nx - 1: # Внутренние точки
            laplacian = ( u[c, 1, y+1, x] + u[c, 1, y-1, x] +
                          u[c, 1, y, x+1] + u[c, 1, y, x-1] -
                          4 * u[c, 1, y, x] )
            k = kappa_map[c, y, x]
            if mirror_mask[y,x] == 0: # Не зеркало
                 u[c, 0, y, x] = k**2 * laplacian + 2 * u[c, 1, y, x] - u[c, 2, y, x]


@ti.kernel
def apply_boundaries():
    """Применение граничных условий."""
    for c_idx in ti.static(range(num_colors)):
        # Сначала открытые границы (Mur's first order)
        # Границы по Y (верхняя и нижняя)
        for x_coord in range(nx):
            # Проверяем, не является ли граничная точка частью зеркала
            if mirror_mask[0, x_coord] == 0: # Нижняя граница y=0
                k_bottom = kappa_map[c_idx, 1, x_coord]
                u[c_idx, 0, 0, x_coord] = u[c_idx, 1, 1, x_coord] + \
                                (k_bottom - 1) / (k_bottom + 1) * \
                                (u[c_idx, 0, 1, x_coord] - u[c_idx, 1, 0, x_coord])
            if mirror_mask[ny - 1, x_coord] == 0: # Верхняя граница y=ny-1
                k_top = kappa_map[c_idx, ny - 2, x_coord]
                u[c_idx, 0, ny - 1, x_coord] = u[c_idx, 1, ny - 2, x_coord] + \
                                       (k_top - 1) / (k_top + 1) * \
                                       (u[c_idx, 0, ny - 2, x_coord] - u[c_idx, 1, ny - 1, x_coord])

        # Границы по X (левая и правая)
        for y_coord in range(ny):
            if mirror_mask[y_coord, 0] == 0: # Левая граница x=0
                k_left = kappa_map[c_idx, y_coord, 1]
                u[c_idx, 0, y_coord, 0] = u[c_idx, 1, y_coord, 1] + \
                                (k_left - 1) / (k_left + 1) * \
                                (u[c_idx, 0, y_coord, 1] - u[c_idx, 1, y_coord, 0])
            if mirror_mask[y_coord, nx - 1] == 0: # Правая граница x=nx-1
                k_right = kappa_map[c_idx, y_coord, nx - 2]
                u[c_idx, 0, y_coord, nx - 1] = u[c_idx, 1, y_coord, nx - 2] + \
                                       (k_right - 1) / (k_right + 1) * \
                                       (u[c_idx, 0, y_coord, nx - 2] - u[c_idx, 1, y_coord, nx - 1])

        # Затем применяем условия Дирихле для зеркала (перезаписывая значения на зеркале)
        for y_coord, x_coord in ti.ndrange(ny, nx):
            if mirror_mask[y_coord, x_coord] == 1:
                u[c_idx, 0, y_coord, x_coord] = 0.0
                u[c_idx, 1, y_coord, x_coord] = 0.0 # Для немедленной остановки
                u[c_idx, 2, y_coord, x_coord] = 0.0


@ti.kernel
def accumulate_intensity_kernel():
    """Накопление интенсивности для визуализации."""
    for c, y, x in ti.ndrange(num_colors, ny, nx):
        val_to_add = u[c, 0, y, x]**2
        accumulated_intensity[c, y, x] = (1.0 - accumulation_factor) * accumulated_intensity[c, y, x] + \
                                         accumulation_factor * val_to_add

@ti.kernel
def update_screen_data_intensity():
    """Обновление данных на экране (накопленная интенсивность)."""
    for c_idx in ti.static(range(num_colors)):
        for y_s in range(ny):
            if screen_y_start <= y_s <= screen_y_end and 0 <= screen_x < nx :
                screen_data[c_idx, y_s] += u[c_idx, 0, y_s, screen_x]**2


# Основной цикл симуляции и визуализация
def run_simulation_final():
    print("Initializing fields...")
    init_fields()

    print("Initialization complete.")
    print(f"Source active for {source_duration_steps} steps.")

    for i in range(total_time_steps):
        inject_continuous_source(i) # Передаем текущий шаг времени
        # 2. Распространение
        propagate_step()
        apply_boundaries()
        accumulate_intensity_kernel()
        update_screen_data_intensity()

        if i % 5 == 0:
            print(f"Step {i}/{total_time_steps}")

            u_np = u.to_numpy()
            frame_data_rgb = np.zeros((ny, nx, 3), dtype=np.float32)

            max_intensity_vis = 0.02 # Может понадобиться подстроить
            r_channel = np.clip(u_np[0, 0, :, :]**2 / max_intensity_vis, 0, 1)
            g_channel = np.clip(u_np[1, 0, :, :]**2 / max_intensity_vis, 0, 1)
            b_channel = np.clip(u_np[2, 0, :, :]**2 / max_intensity_vis, 0, 1)

            frame_data_rgb[:, :, 0] = r_channel
            frame_data_rgb[:, :, 1] = g_channel
            frame_data_rgb[:, :, 2] = b_channel

            lens_mask_np = lens_mask.to_numpy()
            mirror_mask_np = mirror_mask.to_numpy()
            overlay_alpha = 0.3

            original_pixels_lens = frame_data_rgb[lens_mask_np == 1].copy()
            original_pixels_mirror = frame_data_rgb[mirror_mask_np == 1].copy()

            frame_data_rgb[lens_mask_np == 1] = original_pixels_lens * (1-overlay_alpha) + np.array([0.7,0.7,0.9]) * overlay_alpha
            frame_data_rgb[mirror_mask_np == 1] = original_pixels_mirror * (1-overlay_alpha) + np.array([0.5,0.5,0.5]) * overlay_alpha

            if 0 <= screen_x < nx:
                frame_data_rgb[screen_y_start:screen_y_end+1, screen_x, :] = [1.0, 1.0, 0.0]

            # Отрисовка маркера источника (линия на y=source_pos_y)
            start_x_beam_vis = int(source_pos_x - ti.floor((imp_trans_width - 1) / 2.0))
            end_x_beam_vis = int(source_pos_x + ti.floor(imp_trans_width / 2.0))

            for x_beam_idx_vis in range(max(0,start_x_beam_vis), min(nx, end_x_beam_vis + 1)):
                 frame_data_rgb[source_pos_y, x_beam_idx_vis, :] = [1.0, 0.0, 1.0] # Малиновый маркер источника


            plt.imsave(os.path.join(output_dir, "rgb_wave_frames", f"wave_frame_{i:04d}.png"), np.flipud(frame_data_rgb).clip(0,1))

            accum_np = accumulated_intensity.to_numpy()
            accum_frame_rgb = np.zeros((ny, nx, 3), dtype=np.float32)

            max_r_accum_val = np.max(accum_np[0]) if np.max(accum_np[0]) > 1e-9 else 1.0
            max_g_accum_val = np.max(accum_np[1]) if np.max(accum_np[1]) > 1e-9 else 1.0
            max_b_accum_val = np.max(accum_np[2]) if np.max(accum_np[2]) > 1e-9 else 1.0

            accum_frame_rgb[:, :, 0] = np.clip(accum_np[0] / max_r_accum_val, 0, 1)
            accum_frame_rgb[:, :, 1] = np.clip(accum_np[1] / max_g_accum_val, 0, 1)
            accum_frame_rgb[:, :, 2] = np.clip(accum_np[2] / max_b_accum_val, 0, 1)

            accum_frame_rgb[lens_mask_np == 1] *= 0.7
            accum_frame_rgb[mirror_mask_np == 1] *= 0.5

            if 0 <= screen_x < nx:
                 accum_frame_rgb[screen_y_start:screen_y_end+1, screen_x, :] = [0.5, 0.5, 0.0]

            plt.imsave(os.path.join(output_dir, "accumulated_frames", f"accum_frame_{i:04d}.png"), np.flipud(accum_frame_rgb).clip(0,1))

    print("Simulation finished.")
    plot_screen_data()
    create_animations()

def plot_screen_data():
    screen_data_np = screen_data.to_numpy()
    plt.figure(figsize=(12, 7))
    y_coords = np.arange(ny)
    colors = ['red', 'green', 'blue']
    labels = ['Red channel', 'Green channel', 'Blue channel']
    for c_idx in range(num_colors):
        signal_on_screen_full = screen_data_np[c_idx, :]
        valid_screen_indices = (y_coords >= screen_y_start) & (y_coords <= screen_y_end)
        if np.any(valid_screen_indices):
            y_on_segment = y_coords[valid_screen_indices]
            intensity_on_segment = signal_on_screen_full[valid_screen_indices]
            plt.plot(y_on_segment, intensity_on_segment, color=colors[c_idx], label=labels[c_idx])
    plt.title("Dispersion Pattern on Screen (Accumulated Intensity)")
    plt.xlabel("Y-coordinate on Screen")
    plt.ylabel("Accumulated Intensity (arbitrary units)")
    plt.legend()
    plt.grid(True)
    plt.savefig(os.path.join(output_dir, "dispersion_on_screen.png"))
    print(f"Dispersion plot saved to {os.path.join(output_dir, 'dispersion_on_screen.png')}")
    plt.close()

def create_animations():
    print("Creating animations...")
    for anim_type, frames_subdir in [("wave_propagation", "rgb_wave_frames"),
                                     ("accumulated_intensity", "accumulated_frames")]:
        image_files = sorted([os.path.join(output_dir, frames_subdir, f)
                               for f in os.listdir(os.path.join(output_dir, frames_subdir)) if f.endswith(".png")])
        if image_files:
            fig, ax = plt.subplots()
            ax.set_axis_off()
            ims = []
            for filename in image_files:
                img = plt.imread(filename)
                im = ax.imshow(img, animated=True)
                ims.append([im])

            ani = animation.ArtistAnimation(fig, ims, interval=50, blit=True, repeat_delay=1000)
            try:
                ani.save(os.path.join(output_dir, f"{anim_type}.mp4"), writer='ffmpeg', fps=15)
                print(f"{anim_type} animation saved as .mp4.")
            except Exception as e:
                print(f"Could not save {anim_type} as .mp4 (ffmpeg might not be installed or in PATH): {e}")
                try:
                    ani.save(os.path.join(output_dir, f"{anim_type}.gif"), writer='imagemagick', fps=15)
                    print(f"{anim_type} animation saved as .gif.")
                except Exception as e_gif:
                     print(f"Could not save {anim_type} as .gif (imagemagick might not be installed or in PATH): {e_gif}")
            plt.close(fig)
        else:
            print(f"No frames found for {anim_type} animation.")


if __name__ == "__main__":

    run_simulation_final()

Initializing fields...
Initialization complete.
Source active for 31 steps.
Step 0/400
Step 5/400
Step 10/400
Step 15/400
Step 20/400
Step 25/400
Step 30/400
Step 35/400
Step 40/400
Step 45/400
Step 50/400
Step 55/400
Step 60/400
Step 65/400
Step 70/400
Step 75/400
Step 80/400
Step 85/400
Step 90/400
Step 95/400
Step 100/400
Step 105/400
Step 110/400
Step 115/400
Step 120/400
Step 125/400
Step 130/400
Step 135/400
Step 140/400
Step 145/400
Step 150/400
Step 155/400
Step 160/400
Step 165/400
Step 170/400
Step 175/400
Step 180/400
Step 185/400
Step 190/400
Step 195/400
Step 200/400
Step 205/400
Step 210/400
Step 215/400
Step 220/400
Step 225/400
Step 230/400
Step 235/400
Step 240/400
Step 245/400
Step 250/400
Step 255/400
Step 260/400
Step 265/400
Step 270/400
Step 275/400
Step 280/400
Step 285/400
Step 290/400
Step 295/400
Step 300/400
Step 305/400
Step 310/400
Step 315/400
Step 320/400
Step 325/400
Step 330/400
Step 335/400
Step 340/400
Step 345/400
Step 350/400
Step 355/400
Step 360/4