In [None]:
##### STEP1 컘으로 video 촬영 후 저장하여 pose video로 변환

In [None]:
#### 필요 lib 호출

In [1]:
import collections
import time
from pathlib import Path

import cv2
import numpy as np
from IPython import display
from numpy.lib.stride_tricks import as_strided
import openvino as ov

# Fetch `notebook_utils` module
import requests

r = requests.get(
    url="https://raw.githubusercontent.com/openvinotoolkit/openvino_notebooks/latest/utils/notebook_utils.py",
)

open("notebook_utils.py", "w").write(r.text)
import notebook_utils as utils

In [None]:
#### pose estimation 사용을 위해 필요한 설정들

In [2]:
# A directory where the model will be downloaded.
base_model_dir = Path("model")

# The name of the model from Open Model Zoo.
model_name = "human-pose-estimation-0001"
# Selected precision (FP32, FP16, FP16-INT8).
precision = "FP16-INT8"

model_path = base_model_dir / "intel" / model_name / precision / f"{model_name}.xml"

if not model_path.exists():
    model_url_dir = f"https://storage.openvinotoolkit.org/repositories/open_model_zoo/2022.1/models_bin/3/{model_name}/{precision}/"
    utils.download_file(model_url_dir + model_name + ".xml", model_path.name, model_path.parent)
    utils.download_file(
        model_url_dir + model_name + ".bin",
        model_path.with_suffix(".bin").name,
        model_path.parent,
    )

import ipywidgets as widgets

core = ov.Core()

device = widgets.Dropdown(
    options=core.available_devices + ["AUTO"],
    value="AUTO",
    description="Device:",
    disabled=False,
)

device

# Initialize OpenVINO Runtime
core = ov.Core()
# Read the network from a file.
model = core.read_model(model_path)
# Let the AUTO device decide where to load the model (you can use CPU, GPU as well).
compiled_model = core.compile_model(model=model, device_name=device.value, config={"PERFORMANCE_HINT": "LATENCY"})

# Get the input and output names of nodes.
input_layer = compiled_model.input(0)
output_layers = compiled_model.outputs

# Get the input size.
height, width = list(input_layer.shape)[2:]

input_layer.any_name, [o.any_name for o in output_layers]

# code from https://github.com/openvinotoolkit/open_model_zoo/blob/9296a3712069e688fe64ea02367466122c8e8a3b/demos/common/python/models/open_pose.py#L135
class OpenPoseDecoder:
    BODY_PARTS_KPT_IDS = (
        (1, 2),
        (1, 5),
        (2, 3),
        (3, 4),
        (5, 6),
        (6, 7),
        (1, 8),
        (8, 9),
        (9, 10),
        (1, 11),
        (11, 12),
        (12, 13),
        (1, 0),
        (0, 14),
        (14, 16),
        (0, 15),
        (15, 17),
        (2, 16),
        (5, 17),
    )
    BODY_PARTS_PAF_IDS = (
        12,
        20,
        14,
        16,
        22,
        24,
        0,
        2,
        4,
        6,
        8,
        10,
        28,
        30,
        34,
        32,
        36,
        18,
        26,
    )

    def __init__(
        self,
        num_joints=18,
        skeleton=BODY_PARTS_KPT_IDS,
        paf_indices=BODY_PARTS_PAF_IDS,
        max_points=100,
        score_threshold=0.1,
        min_paf_alignment_score=0.05,
        delta=0.5,
    ):
        self.num_joints = num_joints
        self.skeleton = skeleton
        self.paf_indices = paf_indices
        self.max_points = max_points
        self.score_threshold = score_threshold
        self.min_paf_alignment_score = min_paf_alignment_score
        self.delta = delta

        self.points_per_limb = 10
        self.grid = np.arange(self.points_per_limb, dtype=np.float32).reshape(1, -1, 1)

    def __call__(self, heatmaps, nms_heatmaps, pafs):
        batch_size, _, h, w = heatmaps.shape
        assert batch_size == 1, "Batch size of 1 only supported"

        keypoints = self.extract_points(heatmaps, nms_heatmaps)
        pafs = np.transpose(pafs, (0, 2, 3, 1))

        if self.delta > 0:
            for kpts in keypoints:
                kpts[:, :2] += self.delta
                np.clip(kpts[:, 0], 0, w - 1, out=kpts[:, 0])
                np.clip(kpts[:, 1], 0, h - 1, out=kpts[:, 1])

        pose_entries, keypoints = self.group_keypoints(keypoints, pafs, pose_entry_size=self.num_joints + 2)
        poses, scores = self.convert_to_coco_format(pose_entries, keypoints)
        if len(poses) > 0:
            poses = np.asarray(poses, dtype=np.float32)
            poses = poses.reshape((poses.shape[0], -1, 3))
        else:
            poses = np.empty((0, 17, 3), dtype=np.float32)
            scores = np.empty(0, dtype=np.float32)

        return poses, scores

    def extract_points(self, heatmaps, nms_heatmaps):
        batch_size, channels_num, h, w = heatmaps.shape
        assert batch_size == 1, "Batch size of 1 only supported"
        assert channels_num >= self.num_joints

        xs, ys, scores = self.top_k(nms_heatmaps)
        masks = scores > self.score_threshold
        all_keypoints = []
        keypoint_id = 0
        for k in range(self.num_joints):
            # Filter low-score points.
            mask = masks[0, k]
            x = xs[0, k][mask].ravel()
            y = ys[0, k][mask].ravel()
            score = scores[0, k][mask].ravel()
            n = len(x)
            if n == 0:
                all_keypoints.append(np.empty((0, 4), dtype=np.float32))
                continue
            # Apply quarter offset to improve localization accuracy.
            x, y = self.refine(heatmaps[0, k], x, y)
            np.clip(x, 0, w - 1, out=x)
            np.clip(y, 0, h - 1, out=y)
            # Pack resulting points.
            keypoints = np.empty((n, 4), dtype=np.float32)
            keypoints[:, 0] = x
            keypoints[:, 1] = y
            keypoints[:, 2] = score
            keypoints[:, 3] = np.arange(keypoint_id, keypoint_id + n)
            keypoint_id += n
            all_keypoints.append(keypoints)
        return all_keypoints

    def top_k(self, heatmaps):
        N, K, _, W = heatmaps.shape
        heatmaps = heatmaps.reshape(N, K, -1)
        # Get positions with top scores.
        ind = heatmaps.argpartition(-self.max_points, axis=2)[:, :, -self.max_points :]
        scores = np.take_along_axis(heatmaps, ind, axis=2)
        # Keep top scores sorted.
        subind = np.argsort(-scores, axis=2)
        ind = np.take_along_axis(ind, subind, axis=2)
        scores = np.take_along_axis(scores, subind, axis=2)
        y, x = np.divmod(ind, W)
        return x, y, scores

    @staticmethod
    def refine(heatmap, x, y):
        h, w = heatmap.shape[-2:]
        valid = np.logical_and(np.logical_and(x > 0, x < w - 1), np.logical_and(y > 0, y < h - 1))
        xx = x[valid]
        yy = y[valid]
        dx = np.sign(heatmap[yy, xx + 1] - heatmap[yy, xx - 1], dtype=np.float32) * 0.25
        dy = np.sign(heatmap[yy + 1, xx] - heatmap[yy - 1, xx], dtype=np.float32) * 0.25
        x = x.astype(np.float32)
        y = y.astype(np.float32)
        x[valid] += dx
        y[valid] += dy
        return x, y

    @staticmethod
    def is_disjoint(pose_a, pose_b):
        pose_a = pose_a[:-2]
        pose_b = pose_b[:-2]
        return np.all(np.logical_or.reduce((pose_a == pose_b, pose_a < 0, pose_b < 0)))

    def update_poses(
        self,
        kpt_a_id,
        kpt_b_id,
        all_keypoints,
        connections,
        pose_entries,
        pose_entry_size,
    ):
        for connection in connections:
            pose_a_idx = -1
            pose_b_idx = -1
            for j, pose in enumerate(pose_entries):
                if pose[kpt_a_id] == connection[0]:
                    pose_a_idx = j
                if pose[kpt_b_id] == connection[1]:
                    pose_b_idx = j
            if pose_a_idx < 0 and pose_b_idx < 0:
                # Create new pose entry.
                pose_entry = np.full(pose_entry_size, -1, dtype=np.float32)
                pose_entry[kpt_a_id] = connection[0]
                pose_entry[kpt_b_id] = connection[1]
                pose_entry[-1] = 2
                pose_entry[-2] = np.sum(all_keypoints[connection[0:2], 2]) + connection[2]
                pose_entries.append(pose_entry)
            elif pose_a_idx >= 0 and pose_b_idx >= 0 and pose_a_idx != pose_b_idx:
                # Merge two poses are disjoint merge them, otherwise ignore connection.
                pose_a = pose_entries[pose_a_idx]
                pose_b = pose_entries[pose_b_idx]
                if self.is_disjoint(pose_a, pose_b):
                    pose_a += pose_b
                    pose_a[:-2] += 1
                    pose_a[-2] += connection[2]
                    del pose_entries[pose_b_idx]
            elif pose_a_idx >= 0 and pose_b_idx >= 0:
                # Adjust score of a pose.
                pose_entries[pose_a_idx][-2] += connection[2]
            elif pose_a_idx >= 0:
                # Add a new limb into pose.
                pose = pose_entries[pose_a_idx]
                if pose[kpt_b_id] < 0:
                    pose[-2] += all_keypoints[connection[1], 2]
                pose[kpt_b_id] = connection[1]
                pose[-2] += connection[2]
                pose[-1] += 1
            elif pose_b_idx >= 0:
                # Add a new limb into pose.
                pose = pose_entries[pose_b_idx]
                if pose[kpt_a_id] < 0:
                    pose[-2] += all_keypoints[connection[0], 2]
                pose[kpt_a_id] = connection[0]
                pose[-2] += connection[2]
                pose[-1] += 1
        return pose_entries

    @staticmethod
    def connections_nms(a_idx, b_idx, affinity_scores):
        # From all retrieved connections that share starting/ending keypoints leave only the top-scoring ones.
        order = affinity_scores.argsort()[::-1]
        affinity_scores = affinity_scores[order]
        a_idx = a_idx[order]
        b_idx = b_idx[order]
        idx = []
        has_kpt_a = set()
        has_kpt_b = set()
        for t, (i, j) in enumerate(zip(a_idx, b_idx)):
            if i not in has_kpt_a and j not in has_kpt_b:
                idx.append(t)
                has_kpt_a.add(i)
                has_kpt_b.add(j)
        idx = np.asarray(idx, dtype=np.int32)
        return a_idx[idx], b_idx[idx], affinity_scores[idx]

    def group_keypoints(self, all_keypoints_by_type, pafs, pose_entry_size=20):
        all_keypoints = np.concatenate(all_keypoints_by_type, axis=0)
        pose_entries = []
        # For every limb.
        for part_id, paf_channel in enumerate(self.paf_indices):
            kpt_a_id, kpt_b_id = self.skeleton[part_id]
            kpts_a = all_keypoints_by_type[kpt_a_id]
            kpts_b = all_keypoints_by_type[kpt_b_id]
            n = len(kpts_a)
            m = len(kpts_b)
            if n == 0 or m == 0:
                continue

            # Get vectors between all pairs of keypoints, i.e. candidate limb vectors.
            a = kpts_a[:, :2]
            a = np.broadcast_to(a[None], (m, n, 2))
            b = kpts_b[:, :2]
            vec_raw = (b[:, None, :] - a).reshape(-1, 1, 2)

            # Sample points along every candidate limb vector.
            steps = 1 / (self.points_per_limb - 1) * vec_raw
            points = steps * self.grid + a.reshape(-1, 1, 2)
            points = points.round().astype(dtype=np.int32)
            x = points[..., 0].ravel()
            y = points[..., 1].ravel()

            # Compute affinity score between candidate limb vectors and part affinity field.
            part_pafs = pafs[0, :, :, paf_channel : paf_channel + 2]
            field = part_pafs[y, x].reshape(-1, self.points_per_limb, 2)
            vec_norm = np.linalg.norm(vec_raw, ord=2, axis=-1, keepdims=True)
            vec = vec_raw / (vec_norm + 1e-6)
            affinity_scores = (field * vec).sum(-1).reshape(-1, self.points_per_limb)
            valid_affinity_scores = affinity_scores > self.min_paf_alignment_score
            valid_num = valid_affinity_scores.sum(1)
            affinity_scores = (affinity_scores * valid_affinity_scores).sum(1) / (valid_num + 1e-6)
            success_ratio = valid_num / self.points_per_limb

            # Get a list of limbs according to the obtained affinity score.
            valid_limbs = np.where(np.logical_and(affinity_scores > 0, success_ratio > 0.8))[0]
            if len(valid_limbs) == 0:
                continue
            b_idx, a_idx = np.divmod(valid_limbs, n)
            affinity_scores = affinity_scores[valid_limbs]

            # Suppress incompatible connections.
            a_idx, b_idx, affinity_scores = self.connections_nms(a_idx, b_idx, affinity_scores)
            connections = list(
                zip(
                    kpts_a[a_idx, 3].astype(np.int32),
                    kpts_b[b_idx, 3].astype(np.int32),
                    affinity_scores,
                )
            )
            if len(connections) == 0:
                continue

            # Update poses with new connections.
            pose_entries = self.update_poses(
                kpt_a_id,
                kpt_b_id,
                all_keypoints,
                connections,
                pose_entries,
                pose_entry_size,
            )

        # Remove poses with not enough points.
        pose_entries = np.asarray(pose_entries, dtype=np.float32).reshape(-1, pose_entry_size)
        pose_entries = pose_entries[pose_entries[:, -1] >= 3]
        return pose_entries, all_keypoints

    @staticmethod
    def convert_to_coco_format(pose_entries, all_keypoints):
        num_joints = 17
        coco_keypoints = []
        scores = []
        for pose in pose_entries:
            if len(pose) == 0:
                continue
            keypoints = np.zeros(num_joints * 3)
            reorder_map = [0, -1, 6, 8, 10, 5, 7, 9, 12, 14, 16, 11, 13, 15, 2, 1, 4, 3]
            person_score = pose[-2]
            for keypoint_id, target_id in zip(pose[:-2], reorder_map):
                if target_id < 0:
                    continue
                cx, cy, score = 0, 0, 0  # keypoint not found
                if keypoint_id != -1:
                    cx, cy, score = all_keypoints[int(keypoint_id), 0:3]
                keypoints[target_id * 3 + 0] = cx
                keypoints[target_id * 3 + 1] = cy
                keypoints[target_id * 3 + 2] = score
            coco_keypoints.append(keypoints)
            scores.append(person_score * max(0, (pose[-1] - 1)))  # -1 for 'neck'
        return np.asarray(coco_keypoints), np.asarray(scores)

decoder = OpenPoseDecoder()

# 2D pooling in numpy (from: https://stackoverflow.com/a/54966908/1624463)
def pool2d(A, kernel_size, stride, padding, pool_mode="max"):
    """
    2D Pooling

    Parameters:
        A: input 2D array
        kernel_size: int, the size of the window
        stride: int, the stride of the window
        padding: int, implicit zero paddings on both sides of the input
        pool_mode: string, 'max' or 'avg'
    """
    # Padding
    A = np.pad(A, padding, mode="constant")

    # Window view of A
    output_shape = (
        (A.shape[0] - kernel_size) // stride + 1,
        (A.shape[1] - kernel_size) // stride + 1,
    )
    kernel_size = (kernel_size, kernel_size)
    A_w = as_strided(
        A,
        shape=output_shape + kernel_size,
        strides=(stride * A.strides[0], stride * A.strides[1]) + A.strides,
    )
    A_w = A_w.reshape(-1, *kernel_size)

    # Return the result of pooling.
    if pool_mode == "max":
        return A_w.max(axis=(1, 2)).reshape(output_shape)
    elif pool_mode == "avg":
        return A_w.mean(axis=(1, 2)).reshape(output_shape)


# non maximum suppression
def heatmap_nms(heatmaps, pooled_heatmaps):
    return heatmaps * (heatmaps == pooled_heatmaps)


# Get poses from results.
def process_results(img, pafs, heatmaps):
    # This processing comes from
    # https://github.com/openvinotoolkit/open_model_zoo/blob/master/demos/common/python/models/open_pose.py
    pooled_heatmaps = np.array([[pool2d(h, kernel_size=3, stride=1, padding=1, pool_mode="max") for h in heatmaps[0]]])
    nms_heatmaps = heatmap_nms(heatmaps, pooled_heatmaps)

    # Decode poses.
    poses, scores = decoder(heatmaps, nms_heatmaps, pafs)
    output_shape = list(compiled_model.output(index=0).partial_shape)
    output_scale = (
        img.shape[1] / output_shape[3].get_length(),
        img.shape[0] / output_shape[2].get_length(),
    )
    # Multiply coordinates by a scaling factor.
    poses[:, :, :2] *= output_scale
    return poses, scores

colors = (
    (255, 0, 0),
    (255, 0, 255),
    (170, 0, 255),
    (255, 0, 85),
    (255, 0, 170),
    (85, 255, 0),
    (255, 170, 0),
    (0, 255, 0),
    (255, 255, 0),
    (0, 255, 85),
    (170, 255, 0),
    (0, 85, 255),
    (0, 255, 170),
    (0, 0, 255),
    (0, 255, 255),
    (85, 0, 255),
    (0, 170, 255),
)

default_skeleton = (
    (15, 13),
    (13, 11),
    (16, 14),
    (14, 12),
    (11, 12),
    (5, 11),
    (6, 12),
    (5, 6),
    (5, 7),
    (6, 8),
    (7, 9),
    (8, 10),
    (1, 2),
    (0, 1),
    (0, 2),
    (1, 3),
    (2, 4),
    (3, 5),
    (4, 6),
)


def draw_poses(img, poses, point_score_threshold, skeleton=default_skeleton):
    if poses.size == 0:
        return img

    img_limbs = np.copy(img)
    for pose in poses:
        points = pose[:, :2].astype(np.int32)
        points_scores = pose[:, 2]
        # Draw joints.
        for i, (p, v) in enumerate(zip(points, points_scores)):
            if v > point_score_threshold:
                cv2.circle(img, tuple(p), 1, colors[i], 2)
        # Draw limbs.
        for i, j in skeleton:
            if points_scores[i] > point_score_threshold and points_scores[j] > point_score_threshold:
                cv2.line(
                    img_limbs,
                    tuple(points[i]),
                    tuple(points[j]),
                    color=colors[j],
                    thickness=4,
                )
    cv2.addWeighted(img, 0.4, img_limbs, 0.6, 0, dst=img)
    return img

In [None]:
#### video 촬영 후 저장

In [11]:
cap = cv2.VideoCapture(0)  # 0은 기본 웹캠을 의미합니다. 다른 카메라를 사용하려면 1, 2 등으로 변경 가능합니다.


if not cap.isOpened():
    print("웹캠을 열 수 없습니다.")
    exit()

# 동영상 저장을 위한 설정
frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))  # 프레임 너비
frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))  # 프레임 높이

# VideoWriter 객체 생성 - mp4 포맷
out = cv2.VideoWriter('output.mp4', cv2.VideoWriter_fourcc(*'mp4v'), 25, (frame_width, frame_height))

# 캡처된 화면을 연속적으로 캡처하여 화면에 표시하고 파일에 저장
while True:
    ret, frame = cap.read()  # 프레임 읽기
    if not ret:
        print("프레임을 읽을 수 없습니다.")
        break
    
    out.write(frame)  # 프레임을 동영상 파일에 쓰기
    cv2.imshow('Webcam', frame)  # 프레임을 'Webcam' 창에 표시

    if cv2.waitKey(1) & 0xFF == ord('q'):  # 'q' 키를 누르면 종료
        break

# 자원 해제q
cap.release()
out.release()
cv2.destroyAllWindows()


In [None]:
#### 촬영 한 video를 pose video 로 변환

In [12]:
import cv2
pafs_output_key = compiled_model.output("Mconv7_stage2_L1")
heatmaps_output_key = compiled_model.output("Mconv7_stage2_L2")
# 저장한 동영상 파일 열기
cap = cv2.VideoCapture('output.mp4')

# 동영상 파일 열기 확인
if not cap.isOpened():
    print("동영상 파일을 열 수 없습니다.")
    exit()

# 동영상 파일의 프레임 너비와 높이 가져오기
frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))

# VideoWriter 객체 생성 - 동영상 재생을 위한 창
cv2.namedWindow('Saved Video', cv2.WINDOW_NORMAL)
cv2.resizeWindow('Saved Video', frame_width, frame_height)
out = cv2.VideoWriter('pose_output.mp4', cv2.VideoWriter_fourcc(*'mp4v'), 25, (frame_width, frame_height))

# 동영상 재생
while cap.isOpened():
    ret, frame = cap.read()
    if not ret:
        break
    frame2 = np.zeros((frame_height, frame_width, 3), dtype=np.uint8)

    input_img = cv2.resize(frame, (width, height), interpolation=cv2.INTER_AREA)
    input_img = input_img.transpose((2, 0, 1))[np.newaxis, ...]
    results = compiled_model([input_img])
    pafs = results[pafs_output_key]
    heatmaps = results[heatmaps_output_key]
    poses, scores = process_results(frame2, pafs, heatmaps)
    frame2 = draw_poses(frame2, poses, 0.1)
    
    out.write(frame2)  # 프레임을 동영상 파일에 쓰기
    cv2.imshow('Saved Video', frame2)

    # 'q' 키를 누르면 종료
    if cv2.waitKey(40) & 0xFF == ord('q'):
        break

# 자원 해제
cap.release()
cv2.destroyAllWindows()


In [None]:
#### pose video 확인

In [3]:
import cv2

# 저장한 동영상 파일 열기
cap = cv2.VideoCapture('pose_output.mp4')

# 동영상 파일 열기 확인
if not cap.isOpened():
    print("동영상 파일을 열 수 없습니다.")
    exit()

# 동영상 파일의 프레임 너비와 높이 가져오기
frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))

# VideoWriter 객체 생성 - 동영상 재생을 위한 창
cv2.namedWindow('Saved Video', cv2.WINDOW_NORMAL)
cv2.resizeWindow('Saved Video', frame_width, frame_height)

# 동영상 재생
while cap.isOpened():
    ret, frame = cap.read()
    if not ret:
        break
 
    cv2.imshow('Saved Video', frame)

    # 'q' 키를 누르면 종료
    if cv2.waitKey(40) & 0xFF == ord('q'):
        break

# 자원 해제
cap.release()
cv2.destroyAllWindows()

In [None]:
#### 캐릭터가 움직이는 동영상으로 변환
#### input = character , pose video , output = character video

In [None]:
from pathlib import Path
import requests


REPO_PATH = Path("Moore-AnimateAnyone")
if not REPO_PATH.exists():
    !git clone -q "https://github.com/itrushkin/Moore-AnimateAnyone.git"
%pip install -q "torch>=2.1" torchvision einops omegaconf "diffusers<=0.24" transformers av accelerate "openvino>=2024.0" "nncf>=2.9.0" "gradio>=4.19" --extra-index-url "https://download.pytorch.org/whl/cpu"
import sys

sys.path.insert(0, str(REPO_PATH.resolve()))
r = requests.get(
    url="https://raw.githubusercontent.com/openvinotoolkit/openvino_notebooks/latest/utils/skip_kernel_extension.py",
)
open("skip_kernel_extension.py", "w").write(r.text)
%load_ext skip_kernel_extension

In [None]:
#### SHOULD_CONVERT 변수로 모든 파일이 있는지 확인하고 그렇지 않을시 다시 다운로드

In [None]:
MODEL_DIR = Path("models")
VAE_ENCODER_PATH = MODEL_DIR / "vae_encoder.xml"
VAE_DECODER_PATH = MODEL_DIR / "vae_decoder.xml"
REFERENCE_UNET_PATH = MODEL_DIR / "reference_unet.xml"
DENOISING_UNET_PATH = MODEL_DIR / "denoising_unet.xml"
POSE_GUIDER_PATH = MODEL_DIR / "pose_guider.xml"
IMAGE_ENCODER_PATH = MODEL_DIR / "image_encoder.xml"

WIDTH = 112
HEIGHT = 128
VIDEO_LENGTH = 24

SHOULD_CONVERT = not all(
    p.exists()
    for p in [
        VAE_ENCODER_PATH,
        VAE_DECODER_PATH,
        REFERENCE_UNET_PATH,
        DENOISING_UNET_PATH,
        POSE_GUIDER_PATH,
        IMAGE_ENCODER_PATH,
    ]
)

In [None]:
from datetime import datetime
from typing import Optional, Union, List, Callable
import math

from PIL import Image
import openvino as ov
from torchvision import transforms
from einops import repeat
from tqdm.auto import tqdm
from einops import rearrange
from omegaconf import OmegaConf
from diffusers import DDIMScheduler
from diffusers.image_processor import VaeImageProcessor
from transformers import CLIPImageProcessor
import torch
import gradio as gr
import ipywidgets as widgets
import numpy as np

from src.pipelines.pipeline_pose2vid_long import Pose2VideoPipeline
from src.utils.util import get_fps, read_frames
from src.utils.util import save_videos_grid
from src.pipelines.context import get_context_scheduler

In [None]:
%%skip not $SHOULD_CONVERT
from pathlib import PurePosixPath
import gc
import warnings

from typing import Dict, Any
from diffusers import AutoencoderKL
from huggingface_hub import hf_hub_download, snapshot_download
from transformers import CLIPVisionModelWithProjection
import nncf

from src.models.unet_2d_condition import UNet2DConditionModel
from src.models.unet_3d import UNet3DConditionModel
from src.models.pose_guider import PoseGuider

In [None]:
#### base model 

In [None]:
%%skip not $SHOULD_CONVERT
local_dir = Path("./pretrained_weights/stable-diffusion-v1-5")
local_dir.mkdir(parents=True, exist_ok=True)
for hub_file in ["unet/config.json", "unet/diffusion_pytorch_model.bin"]:
    saved_path = local_dir / hub_file
    if saved_path.exists():
        continue
    hf_hub_download(
        repo_id="runwayml/stable-diffusion-v1-5",
        subfolder=PurePosixPath(saved_path.parent.name),
        filename=PurePosixPath(saved_path.name),
        local_dir=local_dir,
    )

In [None]:
#### image encoder 

In [None]:
%%skip not $SHOULD_CONVERT
local_dir = Path("./pretrained_weights")
local_dir.mkdir(parents=True, exist_ok=True)
for hub_file in ["image_encoder/config.json", "image_encoder/pytorch_model.bin"]:
    saved_path = local_dir / hub_file
    if saved_path.exists():
        continue
    hf_hub_download(
        repo_id="lambdalabs/sd-image-variations-diffusers",
        subfolder=PurePosixPath(saved_path.parent.name),
        filename=PurePosixPath(saved_path.name),
        local_dir=local_dir,
    )

In [None]:
#### 가중치

In [None]:
%%skip not $SHOULD_CONVERT
snapshot_download(
    repo_id="stabilityai/sd-vae-ft-mse", local_dir="./pretrained_weights/sd-vae-ft-mse"
)
snapshot_download(
    repo_id="patrolli/AnimateAnyone",
    local_dir="./pretrained_weights",
)

In [None]:
config = OmegaConf.load("Moore-AnimateAnyone/configs/prompts/animation.yaml")
infer_config = OmegaConf.load("Moore-AnimateAnyone/" + config.inference_config)

In [None]:
%%skip not $SHOULD_CONVERT
vae = AutoencoderKL.from_pretrained(config.pretrained_vae_path)
reference_unet = UNet2DConditionModel.from_pretrained(config.pretrained_base_model_path, subfolder="unet")
denoising_unet = UNet3DConditionModel.from_pretrained_2d(
    config.pretrained_base_model_path,
    config.motion_module_path,
    subfolder="unet",
    unet_additional_kwargs=infer_config.unet_additional_kwargs,
)
pose_guider = PoseGuider(320, block_out_channels=(16, 32, 96, 256))
image_enc = CLIPVisionModelWithProjection.from_pretrained(config.image_encoder_path)


NUM_CHANNELS_LATENTS = denoising_unet.config.in_channels

In [None]:
%%skip not $SHOULD_CONVERT
denoising_unet.load_state_dict(
    torch.load(config.denoising_unet_path, map_location="cpu"),
    strict=False,
)
reference_unet.load_state_dict(
    torch.load(config.reference_unet_path, map_location="cpu"),
)
pose_guider.load_state_dict(
    torch.load(config.pose_guider_path, map_location="cpu"),
)

In [None]:
#### torch script 청소

In [None]:
%%skip not $SHOULD_CONVERT
def cleanup_torchscript_cache():
    """
    Helper for removing cached model representation
    """
    torch._C._jit_clear_class_registry()
    torch.jit._recursive.concrete_type_store = torch.jit._recursive.ConcreteTypeStore()
    torch.jit._state._clear_class_state()

In [None]:
%%skip not $SHOULD_CONVERT
warnings.simplefilter("ignore", torch.jit.TracerWarning)

In [None]:
#### 모델을 py 형식에서 빠르게 돌리기 위한 최적화 , NNCF 압축

In [None]:
%%skip not $SHOULD_CONVERT
if not VAE_ENCODER_PATH.exists():
    class VaeEncoder(torch.nn.Module):
        def __init__(self, vae):
            super().__init__()
            self.vae = vae
    
        def forward(self, x):
            return self.vae.encode(x).latent_dist.mean
    vae.eval()
    with torch.no_grad():
        vae_encoder = ov.convert_model(VaeEncoder(vae), example_input=torch.zeros(1,3,512,448))
    vae_encoder = nncf.compress_weights(vae_encoder)
    ov.save_model(vae_encoder, VAE_ENCODER_PATH)
    del vae_encoder
    cleanup_torchscript_cache()

In [None]:
%%skip not $SHOULD_CONVERT
if not VAE_DECODER_PATH.exists():
    class VaeDecoder(torch.nn.Module):
        def __init__(self, vae):
            super().__init__()
            self.vae = vae
    
        def forward(self, z):
            return self.vae.decode(z).sample
    vae.eval()
    with torch.no_grad():
        vae_decoder = ov.convert_model(VaeDecoder(vae), example_input=torch.zeros(1,4,HEIGHT//8,WIDTH//8))
    vae_decoder = nncf.compress_weights(vae_decoder)
    ov.save_model(vae_decoder, VAE_DECODER_PATH)
    del vae_decoder
    cleanup_torchscript_cache()
del vae
gc.collect()

In [None]:
%%skip not $SHOULD_CONVERT
if not REFERENCE_UNET_PATH.exists():
    class ReferenceUNetWrapper(torch.nn.Module):
        def __init__(self, reference_unet):
            super().__init__()
            self.reference_unet = reference_unet
        
        def forward(self, sample, timestep, encoder_hidden_states):
            return self.reference_unet(sample, timestep, encoder_hidden_states, return_dict=False)[1]
            
    sample = torch.zeros(2, 4, HEIGHT // 8, WIDTH // 8)
    timestep = torch.tensor(0)
    encoder_hidden_states = torch.zeros(2, 1, 768)
    reference_unet.eval()
    with torch.no_grad():
        wrapper =  ReferenceUNetWrapper(reference_unet)
        example_input = (sample, timestep, encoder_hidden_states)
        ref_features_shapes = {k: v.shape for k, v in wrapper(*example_input).items()}
        ov_reference_unet = ov.convert_model(
            wrapper,
            example_input=example_input,
        )
    ov_reference_unet = nncf.compress_weights(ov_reference_unet)
    ov.save_model(ov_reference_unet, REFERENCE_UNET_PATH)
    del ov_reference_unet
    del wrapper
    cleanup_torchscript_cache()
del reference_unet
gc.collect()

In [None]:
%%skip not $SHOULD_CONVERT
if not DENOISING_UNET_PATH.exists():
    class DenoisingUNetWrapper(torch.nn.Module):
        def __init__(self, denoising_unet):
            super().__init__()
            self.denoising_unet = denoising_unet
        
        def forward(
            self,
            sample,
            timestep,
            encoder_hidden_states,
            pose_cond_fea,
            ref_features
        ):
            return self.denoising_unet(
                sample,
                timestep,
                encoder_hidden_states,
                ref_features,
                pose_cond_fea=pose_cond_fea,
                return_dict=False)

    example_input = {
        "sample": torch.zeros(2, 4, VIDEO_LENGTH, HEIGHT // 8, WIDTH // 8),
        "timestep": torch.tensor(999),
        "encoder_hidden_states": torch.zeros(2,1,768),
        "pose_cond_fea": torch.zeros(2, 320, VIDEO_LENGTH, HEIGHT // 8, WIDTH // 8),
        "ref_features": {k: torch.zeros(shape) for k, shape in ref_features_shapes.items()}
    }
    
    denoising_unet.eval()
    with torch.no_grad():
        ov_denoising_unet = ov.convert_model(
            DenoisingUNetWrapper(denoising_unet),
            example_input=tuple(example_input.values())
        )
    ov_denoising_unet.inputs[0].get_node().set_partial_shape(ov.PartialShape((2, 4, VIDEO_LENGTH, HEIGHT // 8, WIDTH // 8)))
    ov_denoising_unet.inputs[2].get_node().set_partial_shape(ov.PartialShape((2, 1, 768)))
    ov_denoising_unet.inputs[3].get_node().set_partial_shape(ov.PartialShape((2, 320, VIDEO_LENGTH, HEIGHT // 8, WIDTH // 8)))
    for ov_input, shape in zip(ov_denoising_unet.inputs[4:], ref_features_shapes.values()):
        ov_input.get_node().set_partial_shape(ov.PartialShape(shape))
        ov_input.get_node().set_element_type(ov.Type.f32)
    ov_denoising_unet.validate_nodes_and_infer_types()
    ov_denoising_unet = nncf.compress_weights(ov_denoising_unet)
    ov.save_model(ov_denoising_unet, DENOISING_UNET_PATH)
    del ov_denoising_unet
    cleanup_torchscript_cache()
del denoising_unet
gc.collect()

In [None]:
%%skip not $SHOULD_CONVERT
if not POSE_GUIDER_PATH.exists():
    pose_guider.eval()
    with torch.no_grad():
        ov_pose_guider = ov.convert_model(pose_guider, example_input=torch.zeros(1, 3, VIDEO_LENGTH, HEIGHT, WIDTH))
    ov_pose_guider = nncf.compress_weights(ov_pose_guider)
    ov.save_model(ov_pose_guider, POSE_GUIDER_PATH)
    del ov_pose_guider
    cleanup_torchscript_cache()
del pose_guider
gc.collect()

In [None]:
%%skip not $SHOULD_CONVERT
if not IMAGE_ENCODER_PATH.exists():
    image_enc.eval()
    with torch.no_grad():
        ov_image_encoder = ov.convert_model(image_enc, example_input=torch.zeros(1, 3, 224, 224), input=(1, 3, 224, 224))
    ov_image_encoder = nncf.compress_weights(ov_image_encoder)
    ov.save_model(ov_image_encoder, IMAGE_ENCODER_PATH)
    del ov_image_encoder
    cleanup_torchscript_cache()
del image_enc
gc.collect()

In [None]:
core = ov.Core()

In [None]:
device = widgets.Dropdown(
    options=core.available_devices + ["AUTO"],
    value="AUTO",
    description="Device:",
    disabled=False,
)

device

In [None]:
class OVPose2VideoPipeline(Pose2VideoPipeline):
    def __init__(
        self,
        vae_encoder_path=VAE_ENCODER_PATH,
        vae_decoder_path=VAE_DECODER_PATH,
        image_encoder_path=IMAGE_ENCODER_PATH,
        reference_unet_path=REFERENCE_UNET_PATH,
        denoising_unet_path=DENOISING_UNET_PATH,
        pose_guider_path=POSE_GUIDER_PATH,
        device=device.value,
    ):
        self.vae_encoder = core.compile_model(vae_encoder_path, device)
        self.vae_decoder = core.compile_model(vae_decoder_path, device)
        self.image_encoder = core.compile_model(image_encoder_path, device)
        self.reference_unet = core.compile_model(reference_unet_path, device)
        self.denoising_unet = core.compile_model(denoising_unet_path, device)
        self.pose_guider = core.compile_model(pose_guider_path, device)
        self.scheduler = DDIMScheduler(**OmegaConf.to_container(infer_config.noise_scheduler_kwargs))

        self.vae_scale_factor = 8
        self.clip_image_processor = CLIPImageProcessor()
        self.ref_image_processor = VaeImageProcessor(do_convert_rgb=True)
        self.cond_image_processor = VaeImageProcessor(do_convert_rgb=True, do_normalize=False)

    def decode_latents(self, latents):
        video_length = latents.shape[2]
        latents = 1 / 0.18215 * latents
        latents = rearrange(latents, "b c f h w -> (b f) c h w")
        # video = self.vae.decode(latents).sample
        video = []
        for frame_idx in tqdm(range(latents.shape[0])):
            video.append(torch.from_numpy(self.vae_decoder(latents[frame_idx : frame_idx + 1])[0]))
        video = torch.cat(video)
        video = rearrange(video, "(b f) c h w -> b c f h w", f=video_length)
        video = (video / 2 + 0.5).clamp(0, 1)
        # we always cast to float32 as this does not cause significant overhead and is compatible with bfloa16
        video = video.cpu().float().numpy()
        return video

    def __call__(
        self,
        ref_image,
        pose_images,
        width,
        height,
        video_length,
        num_inference_steps=30,
        guidance_scale=3.5,
        num_images_per_prompt=1,
        eta: float = 0.0,
        generator: Optional[Union[torch.Generator, List[torch.Generator]]] = None,
        output_type: Optional[str] = "tensor",
        callback: Optional[Callable[[int, int, torch.FloatTensor], None]] = None,
        callback_steps: Optional[int] = 1,
        context_schedule="uniform",
        context_frames=24,
        context_stride=1,
        context_overlap=4,
        context_batch_size=1,
        interpolation_factor=1,
        **kwargs,
    ):
        do_classifier_free_guidance = guidance_scale > 1.0

        # Prepare timesteps
        self.scheduler.set_timesteps(num_inference_steps)
        timesteps = self.scheduler.timesteps

        batch_size = 1

        # Prepare clip image embeds
        clip_image = self.clip_image_processor.preprocess(ref_image.resize((224, 224)), return_tensors="pt").pixel_values
        clip_image_embeds = self.image_encoder(clip_image)["image_embeds"]
        clip_image_embeds = torch.from_numpy(clip_image_embeds)
        encoder_hidden_states = clip_image_embeds.unsqueeze(1)
        uncond_encoder_hidden_states = torch.zeros_like(encoder_hidden_states)

        if do_classifier_free_guidance:
            encoder_hidden_states = torch.cat([uncond_encoder_hidden_states, encoder_hidden_states], dim=0)

        latents = self.prepare_latents(
            batch_size * num_images_per_prompt,
            4,
            width,
            height,
            video_length,
            clip_image_embeds.dtype,
            torch.device("cpu"),
            generator,
        )

        # Prepare extra step kwargs.
        extra_step_kwargs = self.prepare_extra_step_kwargs(generator, eta)

        # Prepare ref image latents
        ref_image_tensor = self.ref_image_processor.preprocess(ref_image, height=height, width=width)  # (bs, c, width, height)
        ref_image_latents = self.vae_encoder(ref_image_tensor)[0]
        ref_image_latents = ref_image_latents * 0.18215  # (b, 4, h, w)
        ref_image_latents = torch.from_numpy(ref_image_latents)

        # Prepare a list of pose condition images
        pose_cond_tensor_list = []
        for pose_image in pose_images:
            pose_cond_tensor = self.cond_image_processor.preprocess(pose_image, height=height, width=width)
            pose_cond_tensor = pose_cond_tensor.unsqueeze(2)  # (bs, c, 1, h, w)
            pose_cond_tensor_list.append(pose_cond_tensor)
        pose_cond_tensor = torch.cat(pose_cond_tensor_list, dim=2)  # (bs, c, t, h, w)
        pose_fea = self.pose_guider(pose_cond_tensor)[0]
        pose_fea = torch.from_numpy(pose_fea)

        context_scheduler = get_context_scheduler(context_schedule)

        # denoising loop
        num_warmup_steps = len(timesteps) - num_inference_steps * self.scheduler.order
        with self.progress_bar(total=num_inference_steps) as progress_bar:
            for i, t in enumerate(timesteps):
                noise_pred = torch.zeros(
                    (
                        latents.shape[0] * (2 if do_classifier_free_guidance else 1),
                        *latents.shape[1:],
                    ),
                    device=latents.device,
                    dtype=latents.dtype,
                )
                counter = torch.zeros(
                    (1, 1, latents.shape[2], 1, 1),
                    device=latents.device,
                    dtype=latents.dtype,
                )

                # 1. Forward reference image
                if i == 0:
                    ref_features = self.reference_unet(
                        (
                            ref_image_latents.repeat((2 if do_classifier_free_guidance else 1), 1, 1, 1),
                            torch.zeros_like(t),
                            # t,
                            encoder_hidden_states,
                        )
                    ).values()

                context_queue = list(
                    context_scheduler(
                        0,
                        num_inference_steps,
                        latents.shape[2],
                        context_frames,
                        context_stride,
                        0,
                    )
                )
                num_context_batches = math.ceil(len(context_queue) / context_batch_size)

                context_queue = list(
                    context_scheduler(
                        0,
                        num_inference_steps,
                        latents.shape[2],
                        context_frames,
                        context_stride,
                        context_overlap,
                    )
                )

                num_context_batches = math.ceil(len(context_queue) / context_batch_size)
                global_context = []
                for i in range(num_context_batches):
                    global_context.append(context_queue[i * context_batch_size : (i + 1) * context_batch_size])

                for context in global_context:
                    # 3.1 expand the latents if we are doing classifier free guidance
                    latent_model_input = torch.cat([latents[:, :, c] for c in context]).repeat(2 if do_classifier_free_guidance else 1, 1, 1, 1, 1)
                    latent_model_input = self.scheduler.scale_model_input(latent_model_input, t)
                    b, c, f, h, w = latent_model_input.shape
                    latent_pose_input = torch.cat([pose_fea[:, :, c] for c in context]).repeat(2 if do_classifier_free_guidance else 1, 1, 1, 1, 1)

                    pred = self.denoising_unet(
                        (
                            latent_model_input,
                            t,
                            encoder_hidden_states[:b],
                            latent_pose_input,
                            *ref_features,
                        )
                    )[0]

                    for j, c in enumerate(context):
                        noise_pred[:, :, c] = noise_pred[:, :, c] + pred
                        counter[:, :, c] = counter[:, :, c] + 1

                # perform guidance
                if do_classifier_free_guidance:
                    noise_pred_uncond, noise_pred_text = (noise_pred / counter).chunk(2)
                    noise_pred = noise_pred_uncond + guidance_scale * (noise_pred_text - noise_pred_uncond)

                latents = self.scheduler.step(noise_pred, t, latents, **extra_step_kwargs).prev_sample

                if i == len(timesteps) - 1 or ((i + 1) > num_warmup_steps and (i + 1) % self.scheduler.order == 0):
                    progress_bar.update()
                    if callback is not None and i % callback_steps == 0:
                        step_idx = i // getattr(self.scheduler, "order", 1)
                        callback(step_idx, t, latents)

        if interpolation_factor > 0:
            latents = self.interpolate_latents(latents, interpolation_factor, latents.device)
        # Post-processing
        images = self.decode_latents(latents)  # (b, c, f, h, w)

        # Convert to tensor
        if output_type == "tensor":
            images = torch.from_numpy(images)

        return images

In [None]:
pipe = OVPose2VideoPipeline()

In [None]:
pose_images = read_frames("pose_output.mp4")
src_fps = get_fps("pose_output.mp4")
ref_image = Image.open("Moore-AnimateAnyone/configs/inference/ref_images/anyone-5.png").convert("RGB")
pose_list = []
for pose_image_pil in pose_images[:VIDEO_LENGTH]:
    pose_list.append(pose_image_pil)

In [None]:
video = pipe(
    ref_image,
    pose_list,
    width=WIDTH,
    height=HEIGHT,
    video_length=VIDEO_LENGTH,
)

In [None]:
new_h, new_w = video.shape[-2:]
pose_transform = transforms.Compose([transforms.Resize((new_h, new_w)), transforms.ToTensor()])
pose_tensor_list = []
for pose_image_pil in pose_images[:VIDEO_LENGTH]:
    pose_tensor_list.append(pose_transform(pose_image_pil))

ref_image_tensor = pose_transform(ref_image)  # (c, h, w)
ref_image_tensor = ref_image_tensor.unsqueeze(1).unsqueeze(0)  # (1, c, 1, h, w)
ref_image_tensor = repeat(ref_image_tensor, "b c f h w -> b c (repeat f) h w", repeat=VIDEO_LENGTH)
pose_tensor = torch.stack(pose_tensor_list, dim=0)  # (f, c, h, w)
pose_tensor = pose_tensor.transpose(0, 1)
pose_tensor = pose_tensor.unsqueeze(0)
video = torch.cat([ref_image_tensor, pose_tensor, video], dim=0)

save_dir = Path("./output")
save_dir.mkdir(parents=True, exist_ok=True)
date_str = datetime.now().strftime("%Y%m%d")
time_str = datetime.now().strftime("%H%M")
out_path = save_dir / f"{date_str}T{time_str}.mp4"
save_videos_grid(
    video,
    str(out_path),
    n_rows=3,
    fps=src_fps,
)

In [None]:
from IPython.display import Video

Video(out_path, embed=True)

In [None]:
def generate(
    img,
    pose_vid,
    seed,
    guidance_scale,
    num_inference_steps,
    _=gr.Progress(track_tqdm=True),
):
    generator = torch.Generator().manual_seed(seed)
    pose_list = read_frames(pose_vid)[:VIDEO_LENGTH]
    video = pipe(
        img,
        pose_list,
        width=WIDTH,
        height=HEIGHT,
        video_length=VIDEO_LENGTH,
        generator=generator,
        guidance_scale=guidance_scale,
        num_inference_steps=num_inference_steps,
    )
    new_h, new_w = video.shape[-2:]
    pose_transform = transforms.Compose([transforms.Resize((new_h, new_w)), transforms.ToTensor()])
    pose_tensor_list = []
    for pose_image_pil in pose_list:
        pose_tensor_list.append(pose_transform(pose_image_pil))

    ref_image_tensor = pose_transform(img)  # (c, h, w)
    ref_image_tensor = ref_image_tensor.unsqueeze(1).unsqueeze(0)  # (1, c, 1, h, w)
    ref_image_tensor = repeat(ref_image_tensor, "b c f h w -> b c (repeat f) h w", repeat=VIDEO_LENGTH)
    pose_tensor = torch.stack(pose_tensor_list, dim=0)  # (f, c, h, w)
    pose_tensor = pose_tensor.transpose(0, 1)
    pose_tensor = pose_tensor.unsqueeze(0)
    video = torch.cat([ref_image_tensor, pose_tensor, video], dim=0)

    save_dir = Path("./output/gradio")
    save_dir.mkdir(parents=True, exist_ok=True)
    date_str = datetime.now().strftime("%Y%m%d")
    time_str = datetime.now().strftime("%H%M")
    out_path = save_dir / f"{date_str}T{time_str}.mp4"
    save_videos_grid(
        video,
        str(out_path),
        n_rows=3,
        fps=12,
    )
    return out_path


demo = gr.Interface(
    generate,
    [
        gr.Image(label="Reference Image", type="pil"),
        gr.Video(label="Pose video"),
        gr.Slider(
            label="Seed",
            value=42,
            minimum=np.iinfo(np.int32).min,
            maximum=np.iinfo(np.int32).max,
        ),
        gr.Slider(label="Guidance scale", value=3.5, minimum=1.1, maximum=10),
        gr.Slider(label="Number of inference steps", value=30, minimum=15, maximum=100),
    ],
    "video",
    examples=[
        [
            "Moore-AnimateAnyone/configs/inference/ref_images/anyone-2.png",
            "Moore-AnimateAnyone/configs/inference/pose_videos/anyone-video-2_kps.mp4",
        ],
        [
            "Moore-AnimateAnyone/configs/inference/ref_images/anyone-10.png",
            "Moore-AnimateAnyone/configs/inference/pose_videos/anyone-video-1_kps.mp4",
        ],
        [
            "Moore-AnimateAnyone/configs/inference/ref_images/anyone-11.png",
            "Moore-AnimateAnyone/configs/inference/pose_videos/anyone-video-1_kps.mp4",
        ],
        [
            "Moore-AnimateAnyone/configs/inference/ref_images/anyone-3.png",
            "Moore-AnimateAnyone/configs/inference/pose_videos/anyone-video-2_kps.mp4",
        ],
        [
            "Moore-AnimateAnyone/configs/inference/ref_images/anyone-5.png",
            "Moore-AnimateAnyone/configs/inference/pose_videos/anyone-video-2_kps.mp4",
        ],
    ],
    allow_flagging="never",
)
try:
    demo.queue().launch(debug=True)
except Exception:
    demo.queue().launch(debug=True, share=True)
# if you are launching remotely, specify server_name and server_port
# demo.launch(server_name='your server name', server_port='server port in int')
# Read more in the docs: https://gradio.app/docs/"