# WGPU example

An example using low-level wgpu code. The first cell is simply a copy of wgpu-py's triangle example. May need an update if the API of wgpu changes.

In [None]:
"""
Example use of webgpu API to draw a triangle. See the triangle_glfw.py
script (and related scripts) for actually running this.

Similar example in other languages / API's:

* Rust wgpu:
  https://github.com/gfx-rs/wgpu-rs/blob/master/examples/hello-triangle/main.rs
* C wgpu:
  https://github.com/gfx-rs/wgpu/blob/master/examples/triangle/main.c
* Python Vulkan:
  https://github.com/realitix/vulkan/blob/master/example/contribs/example_glfw.py

"""

import wgpu


# %% Shaders


shader_source = """
struct VertexInput {
    [[builtin(vertex_index)]] vertex_index : u32;
};
struct VertexOutput {
    [[location(0)]] color : vec4<f32>;
    [[builtin(position)]] pos: vec4<f32>;
};

[[stage(vertex)]]
fn vs_main(in: VertexInput) -> VertexOutput {
    let positions = array<vec2<f32>, 3>(vec2<f32>(0.0, -0.5), vec2<f32>(0.5, 0.5), vec2<f32>(-0.5, 0.7));
    let p: vec2<f32> = positions[in.vertex_index];

    var out: VertexOutput;
    out.pos = vec4<f32>(p, 0.0, 1.0);
    out.color = vec4<f32>(p, 0.5, 1.0);
    return out;
}

[[stage(fragment)]]
fn fs_main(in: VertexOutput) -> [[location(0)]] vec4<f32> {
    return in.color;
}
"""


# %% The wgpu calls


def main(canvas):
    """Regular function to setup a viz on the given canvas."""
    adapter = wgpu.request_adapter(canvas=canvas, power_preference="high-performance")
    device = adapter.request_device()
    return _main(canvas, device)


async def main_async(canvas):
    """Async function to setup a viz on the given canvas."""
    adapter = await wgpu.request_adapter_async(
        canvas=canvas, power_preference="high-performance"
    )
    device = await adapter.request_device_async(extensions=[], limits={})
    return _main(canvas, device)


def _main(canvas, device):

    shader = device.create_shader_module(code=shader_source)

    # No bind group and layout, we should not create empty ones.
    pipeline_layout = device.create_pipeline_layout(bind_group_layouts=[])

    present_context = canvas.get_context()
    render_texture_format = present_context.get_preferred_format(device.adapter)
    present_context.configure(device=device, format=render_texture_format)

    render_pipeline = device.create_render_pipeline(
        layout=pipeline_layout,
        vertex={
            "module": shader,
            "entry_point": "vs_main",
            "buffers": [],
        },
        primitive={
            "topology": wgpu.PrimitiveTopology.triangle_list,
            "front_face": wgpu.FrontFace.ccw,
            "cull_mode": wgpu.CullMode.none,
        },
        depth_stencil=None,
        multisample={
            "count": 1,
            "mask": 0xFFFFFFFF,
            "alpha_to_coverage_enabled": False,
        },
        fragment={
            "module": shader,
            "entry_point": "fs_main",
            "targets": [
                {
                    "format": render_texture_format,
                    "blend": {
                        "color": (
                            wgpu.BlendFactor.one,
                            wgpu.BlendFactor.zero,
                            wgpu.BlendOperation.add,
                        ),
                        "alpha": (
                            wgpu.BlendFactor.one,
                            wgpu.BlendFactor.zero,
                            wgpu.BlendOperation.add,
                        ),
                    },
                },
            ],
        },
    )

    def draw_frame():
        current_texture_view = present_context.get_current_texture()
        command_encoder = device.create_command_encoder()

        render_pass = command_encoder.begin_render_pass(
            color_attachments=[
                {
                    "view": current_texture_view,
                    "resolve_target": None,
                    "load_value": (0, 0, 0, 1),  # LoadOp.load or color
                    "store_op": wgpu.StoreOp.store,
                }
            ],
        )

        render_pass.set_pipeline(render_pipeline)
        # render_pass.set_bind_group(0, no_bind_group, [], 0, 1)
        render_pass.draw(3, 1, 0, 0)
        render_pass.end_pass()
        device.queue.submit([command_encoder.finish()])

    canvas.request_draw(draw_frame)


Here we define a canvas. This should later be included in wgpu-py.

In [None]:
import numpy as np
import wgpu.backends.rs  # noqa: F401, Select Rust backend
from wgpu.gui import WgpuOffscreenCanvas
import tornado
from jupyter_rfb import RemoteFrameBuffer

class JupyterWgpuCanvas(WgpuOffscreenCanvas, RemoteFrameBuffer):
    def __init__(self):
        super().__init__()
        self._pixel_ratio = 1
        self._logical_size = 0, 0
   
    def _receive_events(self, widget, event, buffers):
        event_type = event.get("event_type", None)
        if event_type == "resize":
            self._pixel_ratio = event["pixel_ratio"]
            self._logical_size = event["width"], event["height"]
            # todo: request draw?
            self._request_draw()
    
    def get_pixel_ratio(self):
        return self._pixel_ratio
    
    def get_logical_size(self):
        return self._logical_size
    
    def get_physical_size(self):
        return int(self._logical_size[0] * self._pixel_ratio), int(self._logical_size[1] * self._pixel_ratio)
    
    def set_logical_size(self, width, height):
        self.css_width = f"{width}px"
        self.css_height = f"{height}px"
    
    def _request_draw(self):
        # todo: de-dupe
        ioloop = tornado.ioloop.IOLoop.current()
        ioloop.add_callback(self._draw_frame_and_present)
        
    def close(self):
        pass # todo: close
    
    def is_closed(self):
        pass # todo: is_closed
    
    def present(self, texture_view):
        device = texture_view._device
        size = texture_view.size
        bytes_per_pixel = 4
        data = device.queue.read_texture(
            {
            "texture": texture_view.texture,
                "mip_level": 0,
                "origin": (0, 0, 0),
            },
            {
                "offset": 0,
                "bytes_per_row": bytes_per_pixel * size[0],
                "rows_per_image": size[1],
            },
            size,
        )
        arr = np.frombuffer(data, np.uint8).reshape(size[1], size[0], 4)
        #print(arr.shape)
        self.send_frame(arr[:,:,:3])#.copy())
    

In [None]:
canvas = JupyterWgpuCanvas()
canvas

In [None]:
main(canvas)