In [None]:
# SPDX-FileCopyrightText: © 2024 Tenstorrent AI ULC

# SPDX-License-Identifier: Apache-2.0
#
# Test "user experience" scenarios, i.e. different ways to use the API to run things on TT hardware
# Each test intentionally creates everything from scratch and uses no verification env, so that each
# of these tests can be used as user examples.
# There's also no verification of correctness of data, as that's not the point of these tests.
#
# All of these tests will run on silicon, in concurrent mode, by default. However, setting 
# PYBUDA_DEVMODE=1 env variable will drop them into Golden+sequential mode.

import queue
import torch
import pybuda
import pytest
from pybuda.config import _get_global_compiler_config

from pybuda.schedulers import LearningRateScheduler
from pybuda.pybudaglobal import pybuda_reset
from pybuda._C.backend_api import BackendDevice, BackendType, DeviceMode 
from test.utils import download_model

# https://github.com/pytorch/pytorch/wiki/Autograd-and-Fork
mp_context = torch.multiprocessing.get_context('spawn')

def _safe_read(q):
    """
    Read a queue, but return None if an error was raised in the meantime, preventing a hang on error.
    """
    while True:
        try:
            data = q.get(timeout = 0.5)
            return data
        except queue.Empty as _:
            if pybuda.error_raised():
                raise RuntimeError("Error raised in pybuda")
        except KeyboardInterrupt:
            return None

# Sample PyBuda module. 이 PyBudaTestModule 단독으로는 아무 연산도 수행하지 않으며, 이는 PyBUDA의 기능을 시험하거나 다른 테스트/추론/학습 흐름에 투입하기 위한 "선언적 정의" 임.
class PyBudaTestModule(pybuda.PyBudaModule): # class 클래스를 정의한다는 키워드. '이름 붙여서(PyBudaTestModule을 말함) 하나의 설계도를 만들겠다'는 뜻. 따라서 실체 물건(객체)는 없는 상태.
    def __init__(self, name): # __init__ => 생성자constructor(클래스(설계도)를 기반으로 객체를 만들 때 자동으로 실행되는 특별한 함수). 객체의 초기 상태를 설정하는 역할. self는 이 클래스에서 만들어지는 실제 객체 자신"을 가리키는 예약어
        super().__init__(name)  # super()는 부모 클래스(상속한 클래스. 여기에서는 pybuda.PyBudaModule)의 기능을 불러오는 역할. 부모 클래스(pybuda.PyBudaModule) 기능을 먼저 불러온 후, 내 기능을 얹는 것
        self.weights1 = pybuda.Parameter(torch.rand(32, 32), requires_grad=True)
        self.weights2 = pybuda.Parameter(torch.rand(32, 32), requires_grad=True)

    def forward(self, act1, act2):
        m1 = pybuda.op.Matmul("matmul1", act1, self.weights1) # act1, act2 매 실행마다 사용자가 넣는 입력 데이터(예: 이미지 데이터, 문장 벡터, 센서 값 등)
        m2 = pybuda.op.Matmul("matmul2", act2, self.weights2) # 'Pre-Trained AI Model'을 사용하면, weights1과 weights2가 '사전 학습된 모델 로드 (pretrained weights)'를 사용함.
        return m1 + m2, m2 # class PyBudaTestModule(pybuda.PyBudaModule): => 이 모델이 m1 + m2, m2를 최종 출력으로 씀. 학습/디버깅/분석용: 중간값도 같이 봐야 할 수 있어서 두 개의 값으로 반환

# Sample PyBuda module  # 실행 환경: Tenstorrent NPU, 목적: PyBUDA 모델 작성 및 NPU 테스트, PyBUDA는 연산자 그래프로 분해하여 NPU에 최적화 실행
class PyBudaTestModuleOneOut(pybuda.PyBudaModule): # 이 클래스(설계도)가 출력값이 하나만 있는 버전이라는 걸 다른 사용자(개발자)에게 알림. PyBudaTestModuleOneOut은 추론-only 버전. 학습/디버깅/분석 필요없음때 사용 의미. 다른 사람이 볼 때 가독성 높이기 위해 사용한 개발자 편의용 라벨
    def __init__(self, name):
        super().__init__(name)
        self.weights1 = pybuda.Parameter(torch.rand(32, 32), requires_grad=True)
        self.weights2 = pybuda.Parameter(torch.rand(32, 32), requires_grad=True)

    def forward(self, act1, act2):
        m1 = pybuda.op.Matmul("matmul1", act1, self.weights1)
        m2 = pybuda.op.Matmul("matmul2", act2, self.weights2)
        return m1 + m2  # 실제 추론 배포용: 최종 결과만 필요할 때. 하나의 값으로 반환

# Sample PyBuda module  # Transformer 모델(입력을 벡터로 바꾼 후 전체 관계를 계산하는 AI 구조)의 핵심 연산 중 하나인 Self-Attention(문장 안에서 어떤 단어에 집중할지 자동으로 계산)의 Query-Key 계산을 PyBUDA로 구현한 예제. Value Weights는 선언되어 있지만 실제 연산에 사용하지 않았음을 주목하자. 이유는 이 모델(PyBudaTestQueryKeyModule)은 "Qeury-Key Attention Score 계산"만 보여주는 간단한 예제이기 때문. Value weights는 실제 모델에서는 반드시 사용됨. 이 예제는 학습용 또는 디버깅용 예제로 일부 연산만 보여 준 것임.
class PyBudaTestQueryKeyModule(pybuda.PyBudaModule):
    def __init__(self, name, hidden_dim = 128, num_heads = 4):  # hidden_dim = 128(“히든 차원”이라는 뜻으로, 벡터 길이를 나타냄. 가볍고 안정적인 테스트용, 실무에서도 자주 사용. 학습할 때도, 추론할 때도 모두 사용, NPU 서버 프리세일즈 관점: test는 128, PoC는 256-512 점진 확장. 고객 요구사항(정확도 vs 속도) 듣고 조정, 실제 추론 부하는 NPU 자원 평가와 함께 검증), mun_heads = 4 (Multi-Head Attention의 Head의 갯수. 4개의 Head 즉 attention(가중치)을 병렬로 동시에 수행하겠다는 뜻. Head1 문법적관계(주어-동사), Head2 시간정보(과거-현재), Head3 감정뉘앙스(긍정-부정), Head4 주체흐름(단어연결구조), 4값 빠른테스트, 4~8 PoC/데모, 8 실서비스, 12~16 고품질서비스(법률,금융 등등))
        super().__init__(name)
        self.hidden_dim = hidden_dim
        self.num_heads = num_heads

        self.key_weights = pybuda.Parameter(torch.rand(1, 1, hidden_dim, hidden_dim), requires_grad=True)   # Self-Attention(가중치) 계산을 위해 각 입력 벡터(예: 토큰 임베딩)를 세 가지로 나눔. key란?내가 가진 정보는 이런 특징이야-검색 대상. 키를 계산할 학습 가능한 가중치 생성(초기값은 무작위)(AI 모델이 하는 것임 사람이 하는게 아니라)
        self.query_weights = pybuda.Parameter(torch.rand(1, 1, hidden_dim, hidden_dim), requires_grad=True) # Qeury란?나는 지금 어떤 정보를 찾고 싶다 - 질문 역할. 쿼리를 위한 가중치 생성(AI 모델이 하는 것임 사람이 하는게 아니라)
        self.value_weights = pybuda.Parameter(torch.rand(1, 1, hidden_dim, hidden_dim), requires_grad=True) # Value란?실제 내용물-최종 결과에 반영될 값. 값을 위한 가중치 생성(AI 모델이 하는 것임 사람이 하는게 아니라)

    def forward(self, encoder_input):   # 이 모델(PyBudaTestQueryKeyModule)이 입력을 받아서 어떤 연산을 수행할지 정의하는 부분. encoder_input(의미를 갖고 있는 백터 인풋)
        query = pybuda.op.Matmul(f"mha_query", encoder_input, self.query_weights)   # f"mha_query"는 Python의 f-string 문법. "mha_query"는 그냥 이 행렬곱셈 연산(Matmul; Matrix Multiplication)에 이름을 붙여주는 String(Label)임. 즉, "이 행렬 곱 연산의 이름을 'mha_query'로 부르자"는 의미. 왜 이름을 붙이나?(연산추적,시각화도구에서그래프구성,테스트나로그에서식별자역할). mha는 Multi-Head Attention의 약자. 행렬곱셈을 통해 전체 정보 계산
        query = pybuda.op.HSlice(f"mha_query_slice", query, self.num_heads) # HSlice(PyBUDA에서 쓰이는 특수 연산. 이름 그대로 가로(Horizontal) 방향으로 자른다는 의미. 즉, 텐서(입력된 다차원 벡터, hidden_dim = 128)를 num_heads = 4 만큼 가로로 나누는 연산. 즉, query, key, value 벡터를 num_heads개로 가로 분할(split) 하는 연산. 나누는 이유: Multi-Head Attention 구조 때문에 Hidden_dim = 128을 4로 나눠 4개(32씩) 분할 되어 각기 다른 관점에서 attention을 계산하기 위해서 임). hidden_dim = 128 즉 벡터를 head 수(4) 만큼 쪼개어 Multi-Head Attention 계산 해 냄.

        key = pybuda.op.Matmul(f"mha_key", encoder_input, self.key_weights)
        key = pybuda.op.HSlice(f"mha_key_slice", key, self.num_heads)
        key = pybuda.op.Transpose(f"mha_key_transpose", key, 2, 3)  # Self-Attention 계산을 가능하게 만드는 핵심 연산 중 하나. 텐서(다차원 배열)의 차원(축, axis)을 서로 바꾸는 연산(예를 들어, 2번 차원과 3번 차원을 맞바꾸는 것). 왜 위치를 바꾸는가?(행과 열을 바꿔야 올바른 곱셈이 가능)

        attention_scores = pybuda.op.Matmul(f"mha_as", query, key)  # 위에서 나오는 2개의 query와 3개의 key 값이 다시 벡터 값이 되고 이를 행렬곱셈 연산으로 최종 attention_scores가 나오는 것.
        return attention_scores


class PyBudaTestForkWithThreeUsers(pybuda.PyBudaModule):    # PyBudaTestForkWithThreeUsers: PyBudaTest(이 클래스가 실제 모델 구현보다는 테스트용(test case) 으로 만들어졌다는 걸 암시), Fork(PyBUDA 연산 그래프 상에서 입력(encoder_input)이 세 개의 다른 연산 (Matmul)으로 분기(fork) 되는 구조이기 때문에 사용된 이름), WithThreeUsers("Users"는 입력 텐서를 사용하는 연산자(operator) 들을 뜻함. encoder_input이 다음 세 연산자(mm_a, mm_b, mm_c)에게 동시에 사용됨. 즉, 이 입력을 사용하는 사용자(user)가 세 명이라는 의미). "간단한 행렬 연산(Matmul + Add) 구조를 테스트"한다는 것은 단순한 연산 그 자체보다는 PyBUDA 컴파일러와 하드웨어 시스템 전체가 제대로 작동하는지를 검증하는 데 목적이 있음(데이터 흐름 최적화(Fork) 검증, PyBUDA 연산자 그래프 처리 정상 여부 확인, 파라미터 학습 가능성(Gradient Flow) 검증, Tenstorrent 하드웨어 매핑 테스트, PyBUDA Scheduler와 Compiler 정상 작동 여부 확인)
    def __init__(self, name, hidden_dim = 128, num_heads = 4):
        super().__init__(name)
        self.hidden_dim = hidden_dim
        self.num_heads = num_heads

        self.mm_a_weights = pybuda.Parameter(torch.rand(1, 1, hidden_dim, hidden_dim), requires_grad=True)  # 학습 가능한 파라미터(가중치 텐서)를 생성하는 구문, torch.rand(1, 1, hidden_dim, hidden_dim); Pytorch의 랜덤 텐서 생성 함수. 1, 1 (배치사이즈, num_heads). 배치사이즈: 한 번에 모델에 입력되는 데이터 샘플의 개수를 말함.(예; 1 한번에 이미지 1장 처리, 32 한번에 이미지 32장 처리. NPU 카드 및 서버 성능에 따라 숫자 늘릴 수 있음. Tenstorrent Loudbox의 경우 NPU 카드 4장일 시 32~128 가능능), num_heads의 경우 4인데도 불구하고 예제에서는 1만 사용되었는데, 단일 헤드로만 먼저 테스트하려는 의도로 보임. 테스트 예제라는 것임. 
        self.mm_b_weights = pybuda.Parameter(torch.rand(1, 1, hidden_dim, hidden_dim), requires_grad=True)  # pybuda.Parameter(...); pybuda에서 이 텐서를 학습 대상 파라미터로 등록
        self.mm_c_weights = pybuda.Parameter(torch.rand(1, 1, hidden_dim, hidden_dim), requires_grad=True)  # requires_grad=True; 역전파(backpropagation) 과정에서 이 파라미터에 대해 기울기(gradient) 를 계산하게 해 줌. 즉, 학습 대상임을 명시하는 플래그

    def forward(self, encoder_input):
        a = pybuda.op.Matmul(f"mm_a", encoder_input, self.mm_a_weights)
        b = pybuda.op.Matmul(f"mm_b", encoder_input, self.mm_b_weights)
        c = pybuda.op.Matmul(f"mm_c", encoder_input, self.mm_c_weights)

        add_a_b = pybuda.op.Add(f"add_a_b", a, b)               # 텐서 간 덧셈. 같은 shape(모양; 텐서의 각 차원의 크기)이거나 broadcasting(서로 다른 shape을 가진 텐서끼리 연산할 때, 자동으로 크기를 맞춰주는 규칙) 가능한 경우 가능. Broadcasting이 가능한 조건(텐서의 뒤에서부터 차원을 맞춰 나감, 같은 차원이거나 한 쪽이 1인 경우, 그 차원은 자동으로 확장. 실전예시: a.shape = (32, 1, 10, 128) b.shape = (32, 1, 10, 128) ADD 가능 / a.shape = (32, 1, 10, 128) b.shape = (1, 1, 10, 128) b의 1이 32로 확장(Broadcasting)되어 두 텐서 크기가 맞아서 add가 가능하게 됨)
        add_a_b_c = pybuda.op.Add(f"add_a_b_c", add_a_b, c)     # PyBUDA에서는 기본적으로 Add 연산자(Binary Operator)가 2개의 입력만 받기 때문에 a + b + c를 직접 한 줄로 쓸 수 없어 두 개씩 나눠서 순차적으로 더해야 함(연산(노드) 분리. 그래픽,병렬화,디버깅,시각화 위해 필수). 또한 디버깅/시각화 목적에도 유리(연산자 이름(add_a_b, add_a_b_c)으로 그래프 추적이 쉬움. 각 단계별 출력을 따로 확인 가능). Tenstorrent NPU는 내부 연산 그래프를 기반으로 컴파일하기 때문이라도 명확한 연산 노드 분리가 필요
        return add_a_b_c



# Sample PyTorch module. 실행환경: CPU 또는 GPU (PyTorch)
class PyTorchTestModule(torch.nn.Module):   # torch.nn.Module(모델(모듈)을 만들 때 사용하는 기본 클래스)
    def __init__(self): # 
        super().__init__()
        self.weights1 = torch.nn.Parameter(torch.rand(32, 32), requires_grad=True)  # torch.nn.Parameter(학습 가능한 파라미터(=weight)), torch.rand(32, 32)(32x32짜리 랜덤 숫자 행렬; 딥러닝에서는 모델의 가중치(weight) 들을 학습을 통해 조금씩 조정해가며 정답을 예측. 학습을 시작할 때는 아직 아무것도 모르는 상태이기 때문에 일단은 무작위(random) 값으로 시작. 이렇게 시작한 뒤, 학습(Training)을 통해 이 랜덤 값이 의미 있는 값으로 바뀌어 감. 32×32는 단순한 우연이 아니라, 딥러닝 모델의 복잡도와 NPU/GPU 하드웨어 효율에 맞춘 일반적인 최소 단위 크기. 32x32 자주 사용하는 이유: 표현력(Expressiveness)이 충분, NPU/GPU의 병렬 처리 단위에 최적화, 딥러닝 모델 구조에서 흔히 쓰는 크기)
        self.weights2 = torch.nn.Parameter(torch.rand(32, 32), requires_grad=True)  # requires_grad=True(학습 중에 파라미터가 업데이트 되도록 함, backpropagation (역전파) 과 직접적으로 관련된 설정. 역전파(Backpropagation)를 통해 기울기(gradient)를 계산하겠다는 뜻). weight1, weight2 2개만 지정 이유: 두 개의 입력(act1, act2)에 대해 각각 **하나씩 대응하는 가중치(weights1, weights2)**를 적용하기 때문에 딱 2개의 가중치만 정의한 것

    def forward(self, act1, act2):  # forward() 입력 데이터를 넣었을 때 실제 연산이 일어나는 함수(순전파). act1, act2는 전처리된 입력 데이터 (Preprocessed Input) 혹은 이전 레이어의 출력(activation)
        m1 = torch.matmul(act1, self.weights1)  # 행렬 곱 (Matrix Multiplication)
        m2 = torch.matmul(act2, self.weights2)
        return m1 + m2, m1  # 중간 결과도 함께 확인하거나 사용할 경우. m1 + m2; 최종 연산 결과 (합계). m1; 	act1 × weights1 결과 (중간값), 만일 m2도 있다면, act2 × weights2 결과 (중간값)이 됨. m1 혹은 m2까지 반환계산 이유: 디버깅 및 로깅 용도, 실험적 분석 용도, 후속 계산에서 재사용. 주의: 반환된 값이 많아질수록 모델 인터페이스가 복잡해 짐.

# Sample PyTorch module. PyTorch로 작성된 3개의 간단한 신경망 모듈 클래스. 전체적으로 **모델 설계와 출력 처리, 손실 함수(loss function)의 흐름을 설명하려는 예제
class PyTorchTestModuleOneOut(torch.nn.Module): # torch.nn.Module(모델(모듈)을 만들 때 사용하는 기본 클래스). 이름의 "OneOut"은 출력이 하나라는 걸 의미
    def __init__(self):
        super().__init__()
        self.weights1 = torch.nn.Parameter(torch.rand(32, 32), requires_grad=True)  # torch.nn.Parameter; 학습 가능한 가중치(weight)를 정의하는 방법. torch.rand(32, 32); 무작위 숫자로 채운 32×32 행렬을 생성
        self.weights2 = torch.nn.Parameter(torch.rand(32, 32), requires_grad=True)  # requires_grad=True; 역전파(backpropagation)할 때 이 가중치를 학습하겠다는 의미

    def forward(self, act1, act2):  # 두 입력(act1, act2)을 받아 각각 행렬 곱 후, 결과를 더해 반환
        m1 = torch.matmul(act1, self.weights1)
        m2 = torch.matmul(act2, self.weights2)
        return m1 + m2  # 최종 결과만 필요한 경우. 출력이 하나라는 걸 의미

class PyTorchTestModuleOneInputAndOneOut(torch.nn.Module):  # 이 모듈은 입력이 하나, 출력도 하나
    def __init__(self):
        super().__init__()
        self.weights = torch.nn.Parameter(torch.rand(32, 32), requires_grad=True)   # 32×32 학습 가능한 가중치
    
    def forward(self, act): # 단일 입력 (예: 하나의 텐서)
        m = torch.matmul(act, self.weights) # act와 weights를 행렬 곱 (즉, 특징 변환)
        return m

class PyTorchLoss(torch.nn.Module): # 커스텀 손실 함수(Loss Function)
    def forward(self, input):
        return input.sum()  # 텐서의 모든 값을 더해서 하나의 숫자로 반환

#
# Run inference on module directly. PyBUDA와 PyTorch 모듈을 각각 실행(inference)해보는 예제. 각 함수는 입력 데이터를 생성하고, 모델을 실행한 뒤, 결과를 출력
#
def test_module_direct_pybuda():    # 모델을 한번 실행(inference pass), PyBUDA 모델 실행
    input1 = torch.rand(4, 32, 32)  # 배치 크기 (Batch size) → 입력 샘플 4개를 한 번에 처리하겠다는 뜻. 각 텐서 크기 32X32
    input2 = torch.rand(4, 32, 32)  # 두 번째 입력도 동일 크기로 생성

    # Run single inference pass on a PyBuda module directly
    output = PyBudaTestModule("direct").run(input1, input2) # 이 코드는 PyTorch가 아닌, PyBUDA 프레임워크용 객체를 직접 생성하는 것이고, Tenstorrent의 NPU에서 직접 실행 가능한 그래프를 구성하는 클래스 인스턴스. 생성한 입력을 넣고 모델을 한번 실행. "direct"는 이 모듈을 구별하기 위한 이름(tag) 역할
    print(output)   # 결과를 출력

def test_module_direct_pytorch():   # 모델을 한번 실행(inference pass), PyTorch 모델을 PyBUDA로 감싼 후 실행
    input1 = torch.rand(4, 32, 32)  # 배치 크기 (Batch size) → 입력 샘플 4개를 한 번에 처리하겠다는 뜻. 각 텐서 크기 32X32
    input2 = torch.rand(4, 32, 32)  # 두 번째 입력도 동일 크기로 생성. 

    # Run single inference pass on a PyTorch module, using a wrapper to convert to PyBuda first
    output = pybuda.PyTorchModule("direct_pt", PyTorchTestModule()).run(input1, input2) # PyTorchTestModule(); 앞에서 정의한 PyTorch 모듈 클래스 객체 생성. pybuda.PyTorchModule("direct_pt", PyTorchTestModule()); 이 PyTorch 모듈을 PyBUDA에서 실행 가능하도록 감쌈(wrap). "direct_pt"는 이 모듈을 구별하기 위한 이름(tag) 역할
    print(output)   # 결과를 출력

#
# Run inference through run_inference without placing on device. PyBUDA 환경에서 추론(inference)을 실행하는 예제. 차이점은 하나는 PyBUDA 전용 모델, 다른 하나는 PyTorch 모델을 PyBUDA로 감싼 형태
#
def test_run_inference_direct_pybuda(): # PyBUDA로 직접 만든 모델을 사용해 추론 실행
    input1 = torch.rand(4, 32, 32)  # 무작위 입력 데이터를 생성 (배치 4개)
    input2 = torch.rand(4, 32, 32)  # 무작위 입력 데이터를 생성 (배치 4개)

    # Run inference on a PyBuda module, with given inputs
    inputs = {"act2" : input2, "act1" : input1} # PyBUDA 모델의 입력값 이름에 맞게 딕셔너리로 정리
    output_q = pybuda.run_inference(PyBudaTestModule("run_direct"), inputs=[inputs])    # pybuda.run_inference()는 즉시 추론만 수행하는 함수(=디바이스에 올리지 않고 추론만 실행하는 "간단한 실행"). PyBudaTestModule(...); PyBUDA로 직접 만든 모델을 생성. pybuda.run_inference(...); 해당 모델을 이용해 추론 실행 (그래프를 컴파일하고 NPU에 할당하지는 않음)
    output = _safe_read(output_q)   # 출력값을 안전하게 읽음 (queue에서 꺼냄)
    print(output)   # 결과 출력

def test_run_inference_direct_pytorch():    # PyTorch 모델을 PyBUDA가 감싸서 추론 실행
    input1 = torch.rand(4, 32, 32)  # 무작위 입력 데이터를 생성 (배치 4개)
    input2 = torch.rand(4, 32, 32)  # 무작위 입력 데이터를 생성 (배치 4개)

    # Run inference, using a wrapper to convert PyTorch module to PyBuda, and with given inputs
    inputs = {"act2" : input2, "act1" : input1}
    output_q = pybuda.run_inference(pybuda.PyTorchModule("run_direct_pt", PyTorchTestModule()), inputs=[inputs])    # pybuda.run_inference()는 즉시 추론만 수행하는 함수(=디바이스에 올리지 않고 추론만 실행하는 "간단한 실행"). PyTorchTestModule(); PyTorch로 만든 모델 클래스 객체. pybuda.PyTorchModule(...); 	PyTorch 모델을 PyBUDA에서 쓸 수 있게 감쌈. 
    output = _safe_read(output_q)   # 출력값을 안전하게 읽음 (queue에서 꺼냄)
    print(output)   # 결과 출력


#
# Run inference by placing on device first. Tenstorrent NPU에 모델과 입력을 올려서(로딩) 추론(inference)을 실행하는 전체 절차를 보여주는 예제. 실제 디바이스 환경을 연동하는 구조
#
def test_run_inference_placed_pybuda(): # PYBUDA 전용 모델 사용
    input1 = torch.rand(4, 32, 32)
    input2 = torch.rand(4, 32, 32)

    # Create a TT device
    tt0 = pybuda.TTDevice("tt0")    # TTDevice("tt0"); Tenstorrent NPU 장치를 소프트웨어적으로 지정

    # Place a module on the device
    tt0.place_module(PyBudaTestModule("placed"))    # place_module(); PyBuda 모델을 디바이스에 올림 (배치/컴파일). "placed"; 이름(String) 태그 임.

    # Push intputs to the device
    tt0.push_to_inputs((input1, input2))    # push_to_inputs(); 생성한 입력(input1, input2) 데이터를 디바이스에 전달

    # Run pipeline, and read the outputs
    output_q = pybuda.run_inference()   # 추론 실행 (디바이스에서 실제로 계산 시작)
    output = _safe_read(output_q)   # 결과 큐에서 출력값을 꺼내서 확인
    print(output) # 결과 출력

def test_run_inference_placed_pytorch():    # PyTorch 모델을 PyBUDA가 감싸서 실행
    input1 = torch.rand(4, 32, 32)
    input2 = torch.rand(4, 32, 32)

    # Create a TT device
    tt0 = pybuda.TTDevice("tt0")    # TTDevice("tt0"); Tenstorrent NPU 장치를 소프트웨어적으로 지정

    # Place a module on the device, using a wrapper to convert PyTorch module to PyBuda
    tt0.place_module(pybuda.PyTorchModule("placed_pt", PyTorchTestModule()))    # place_module(); PyTorch 모델을 디바이스에 올림 (배치/컴파일)
    
    # Push intputs to the device
    tt0.push_to_inputs((input1, input2))    # push_to_inputs(); 생성한 입력(input1, input2) 데이터를 디바이스에 전달

    # Run pipeline, and read the outputs
    output_q = pybuda.run_inference()   # 추론 실행 (디바이스에서 실제로 계산 시작)
    output = _safe_read(output_q)   # 결과 큐에서 출력값을 꺼내서 확인
    print(output)   # 결과 출력

#
# Repeated calls to run inference on the same module. 같은 모델 또는 같은 디바이스에 대해 추론(inference)을 반복해서 실행하는 두 가지 방식의 예제. 즉, "같은 모듈을 다시 컴파일하지 않고 여러 번 실행하는 방법"을 학습하기 위한 코드. "재컴파일 없이 반복 실행"이 중요한 이유: 속도 향상(컴파일은 시간이 오래 걸림. 한 번만 하고 재활용하면 훨씬 빠름), 실시간 응답(반복 입력에 대해 빠르게 처리 가능), 리소스 절약(중복된 그래프 생성 방지, 메모리 절감)
#
def test_module_direct_repeated():  # PyBUDA 모듈을 Python 내에서 반복 실행
    module = PyBudaTestModule("direct") # PyBudaTestModule이라는 PyBUDA 기반 모델을 하나 생성. 이름 "direct"는 그냥 구분용 문자열(String)

    # Run on given inputs. 입력(input1, input2) 2개를 생성하고 .run()을 통해 첫 번째 추론을 실행
    input1 = torch.rand(4, 32, 32)  # 배치 크기 (Batch size) → 입력 샘플 4개를 한 번에 처리하겠다는 뜻. 각 텐서 크기 32X32
    input2 = torch.rand(4, 32, 32)
    output = module.run(input1, input2) # module.run(...); PyBUDA 모듈을 직접 실행 (캐시된 컴파일 결과 사용)
    print(output)   # 결과를 출력

    # Run again, without recompiling. 다시 실행해도 모델을 재컴파일하지 않는다는 뜻. 내부적으로 PyBUDA는 이미 컴파일된 상태를 캐시하고 있기 때문에 빠르게 반복 실행이 가능
    input1 = torch.rand(4, 32, 32)
    input2 = torch.rand(4, 32, 32)
    output = module.run(input1, input2) # module.run(...); PyBUDA 모듈을 직접 실행 (캐시된 컴파일 결과 사용)
    print(output)

    # Run again, without recompiling. 이런 구조를 총 3번 반복
    input1 = torch.rand(4, 32, 32)
    input2 = torch.rand(4, 32, 32)
    output = module.run(input1, input2) # module.run(...); PyBUDA 모듈을 직접 실행 (캐시된 컴파일 결과 사용)
    print(output)

def test_run_inference_placed_repeated():   # 디바이스에 모델을 올리고 여러 입력을 차례대로 실행
    input1 = torch.rand(4, 32, 32)
    input2 = torch.rand(4, 32, 32)
    tt0 = pybuda.TTDevice("tt0")    # 디바이스 및 모델 초기화. 가상 NPU 디바이스를 생성 ("tt0").
    tt0.place_module(PyBudaTestModule("placed"))    # 디바이스 및 모델 초기화. 모델(PyBudaTestModule)을 디바이스에 올림(배치/로드)(1회만).

    # Push one input and run. 첫 번째 추론
    tt0.push_to_inputs((input1, input2))    # push_to_inputs(...); 입력 전송. 입력 텐서를 디바이스에 보냄.
    output_q = pybuda.run_inference()   # 추론 실행

    output = _safe_read(output_q)   # 결과를 안전하게 읽고
    print(output)   # 결과 출력

    # Push two more inputs, and run one more time on both inputs, without recompiling. 입력 2쌍 추가 → 2번 연속 실행
    for _ in range(2):  # 3번 중 첫 번째는 위에서 이미 실행했으므로, 남은 2개를 반복 처리하는 구조
        input1 = torch.rand(4, 32, 32)
        input2 = torch.rand(4, 32, 32)
        tt0.push_to_inputs((input1, input2))    # push_to_inputs(...); 새 입력 2쌍을 디바이스 입력 큐에 추가. 입력 텐서를 디바이스에 보냄.

    pybuda.run_inference(input_count=2) # input_count=2은 “큐에 올라간 입력 2개 추론해 주세요”라는 뜻. 왜 명시적으로 input_count를 써야 하나? PyBUDA는 입력 큐에서 여러 개의 입력이 대기할 수 있기 때문에, run_inference(input_count=N)을 통해 몇 개만 처리할지 정확히 지정해야 함. 즉, 입력 큐의 크기와 실제 실행 횟수를 분리해서 관리할 수 있도록 설계된 구조

    for _ in range(2):
        output = _safe_read(output_q)   # 출력 큐에서 2개의 결과를 하나씩 읽어옵
        print(output)   # 결과 출력


#
# Run inference through setup + run_forward calls
#
def test_setup_forward_calls():
    tt0 = pybuda.TTDevice("tt0")
    tt0.place_module(PyBudaTestModule("placed"))

    # Compile & initialize the pipeline for inference, with given shapes
    output_q = pybuda.initialize_pipeline(training=False, sample_inputs=(torch.rand(4, 32, 32), torch.rand(4, 32, 32)))
        
    # Push & run_forward manually
    for _ in range(2):
        input1 = torch.rand(4, 32, 32)
        input2 = torch.rand(4, 32, 32)
        tt0.push_to_inputs((input1, input2))
        pybuda.run_forward(input_count=1)

        print(_safe_read(output_q))


#
# Run inference in concurrent mode, then push more inputs afterwards (won't work on Golden)
#
def test_run_inference_delayed_push():
    
    #### Skip the test on golden
    import os
    if "PYBUDA_DEVMODE" in os.environ:
        pytest.skip()
    ####

    tt0 = pybuda.TTDevice("tt0")
    tt0.place_module(PyBudaTestModule("placed"))

    input1 = torch.rand(4, 32, 32)
    input2 = torch.rand(4, 32, 32)
    tt0.push_to_inputs((input1, input2))

    # Run with input count 3, although only one is pushed
    output_q = pybuda.run_inference(input_count=3)

    # Read one output that should've been produced
    output = _safe_read(output_q)
    print(output)

    # The inference thread is running in the background, waiting for data. Let's push two more.
    for _ in range(2):
        input1 = torch.rand(4, 32, 32)
        input2 = torch.rand(4, 32, 32)
        tt0.push_to_inputs((input1, input2))

    # Read two more outputs
    for _ in range(2):
        output = _safe_read(output_q)
        print(output)

#
# Run inference on multiple devices - combinations of cpu / tt device
#
def test_cpu_tt_pipeline():

    cpu0 = pybuda.CPUDevice("cpu0")
    cpu0.place_module(pybuda.PyTorchModule("stage0", PyTorchTestModule()))
    tt1 = pybuda.TTDevice("tt1")
    tt1.place_module(PyBudaTestModule("stage1"))

    input1 = torch.rand(4, 32, 32)
    input2 = torch.rand(4, 32, 32)
    cpu0.push_to_inputs((input1, input2))

    output_q = pybuda.run_inference()
    print(_safe_read(output_q))

def test_cpu_tt_pipeline_compact():

    cpu0 = pybuda.CPUDevice("cpu0", module=pybuda.PyTorchModule("stage0", PyTorchTestModule()))
    tt1 = pybuda.TTDevice("tt1", module=PyBudaTestModule("stage1"))

    input1 = torch.rand(4, 32, 32)
    input2 = torch.rand(4, 32, 32)
    cpu0.push_to_inputs((input1, input2))

    output_q = pybuda.run_inference()
    print(_safe_read(output_q))

# Run training, read back checkpoints and loss
def test_training_read_back():
    pybuda.config.set_configuration_options(
            default_df_override=pybuda.DataFormat.Float16_b,
    )
    tt0 = pybuda.TTDevice("tt0", module=PyBudaTestModuleOneOut("module"))
    tt0.place_loss_module(pybuda.op.loss.L1Loss("l1_loss"))

    loss_q = mp_context.Queue()
    checkpoint_q = mp_context.Queue()

    input1 = torch.rand(4, 32, 32)
    input2 = torch.rand(4, 32, 32)
    tt0.push_to_inputs((input1, input2))
    tt0.push_to_target_inputs(torch.rand(4, 32, 32))

    pybuda.run_training(checkpoint_queue = checkpoint_q, loss_queue=loss_q)

    print("checkpoint: ", _safe_read(checkpoint_q))
    print("loss: ", _safe_read(loss_q))

# Run training pipeline, with loss on CPU, read back checkpoints and loss
#@pytest.mark.skip(reason="Intermittent hangs on silicon")
def test_training_pipeline_read_back():
    tt0 = pybuda.TTDevice("tt0", module=PyBudaTestModule("stage0"))
    cpu1 = pybuda.CPUDevice("cpu1", module=pybuda.PyTorchModule("stage1", PyTorchTestModuleOneOut()))
    cpu1.place_loss_module(pybuda.PyTorchModule("l1loss", torch.nn.L1Loss()))

    loss_q = mp_context.Queue()
    checkpoint_q = mp_context.Queue()

    input1 = torch.rand(4, 32, 32)
    input2 = torch.rand(4, 32, 32)
    tt0.push_to_inputs((input1, input2))

    cpu1.push_to_target_inputs(torch.rand(4, 32, 32))

    pybuda.run_training(checkpoint_queue = checkpoint_q, loss_queue=loss_q)

    print("checkpoint: ", _safe_read(checkpoint_q))
    print("loss: ", _safe_read(loss_q))


#
# Run inference pipeline on a Transformers model
#
def test_transformers_pipeline_inference():

    from transformers import BertModel, BertTokenizer

    tokenizer = download_model(BertTokenizer.from_pretrained, "prajjwal1/bert-tiny")
    input_sentence = "BERT is a transformers model pretrained on a large corpus of English data in a self-supervised fashion. This means it was pretrained on the raw texts only, with no humans labelling them in any way (which is why it can use lots of publicly available data) with an automatic process to generate inputs and labels from those texts. More precisely, it was pretrained with two objectives: Masked language modeling (MLM): taking a sentence, the model randomly masks 15% of the words in the input then run the entire masked sentence through the model and has to predict the masked words. This is different from traditional recurrent neural networks (RNNs) that usually see the words one after the other, or from autoregressive models like GPT which internally mask the future tokens. It allows the model to learn a bidirectional representation of the sentence."
    input_tokens = tokenizer.encode(input_sentence, max_length=128, pad_to_max_length=True)

    model = download_model(BertModel.from_pretrained, "prajjwal1/bert-tiny", torchscript=False, add_pooling_layer=False)
    cpu0 = pybuda.CPUDevice("cpu0", module=pybuda.PyTorchModule("bert_embeddings", model.embeddings))
    tt0 = pybuda.TTDevice("tt1", module=pybuda.PyTorchModule("bert_encoder", model.encoder))

    cpu0.push_to_inputs(torch.Tensor(input_tokens).int().unsqueeze(0))
    output_q = pybuda.run_inference()

    print(_safe_read(output_q))

#
# Run inference pipeline on a Transformers model, enabling cpu fallback on unsupported ops
#
def test_transformers_pipeline_fallback_inference():

    from transformers import BertModel, BertTokenizer

    compiler_cfg = pybuda.config._get_global_compiler_config() 

    tokenizer = download_model(BertTokenizer.from_pretrained, "prajjwal1/bert-tiny")
    input_sentence = "BERT is a transformers model pretrained on a large corpus of English data in a self-supervised fashion. This means it was pretrained on the raw texts only, with no humans labelling them in any way (which is why it can use lots of publicly available data) with an automatic process to generate inputs and labels from those texts. More precisely, it was pretrained with two objectives: Masked language modeling (MLM): taking a sentence, the model randomly masks 15% of the words in the input then run the entire masked sentence through the model and has to predict the masked words. This is different from traditional recurrent neural networks (RNNs) that usually see the words one after the other, or from autoregressive models like GPT which internally mask the future tokens. It allows the model to learn a bidirectional representation of the sentence."
    input_tokens = tokenizer.encode(input_sentence, max_length=128, pad_to_max_length=True)

    model = download_model(BertModel.from_pretrained, "prajjwal1/bert-tiny", torchscript=False, add_pooling_layer=False)
    tt0 = pybuda.TTDevice("tt0", module=pybuda.PyTorchModule("bert", model))

    for i in range(5):
        tt0.push_to_inputs(torch.Tensor(input_tokens).int().unsqueeze(0))
        output_q = pybuda.run_inference()
        print(_safe_read(output_q))

#
# Run training through setup + manual loop of fwd/bwd/opt
#
def test_training_manual_loop_with_cpu_fallback():
    from transformers import BertForMaskedLM, BertTokenizer, BertConfig 

    config = download_model(BertConfig.from_pretrained, "prajjwal1/bert-tiny")
    model = BertForMaskedLM(config)
    tt0 = pybuda.TTDevice("tt0", module=pybuda.PyTorchModule("bert", model), optimizer=pybuda.optimizers.SGD(learning_rate=0.1, device_params=True))
    tt0.place_loss_module(pybuda.PyTorchModule("CEL", torch.nn.CrossEntropyLoss()))

    sample_inputs = (torch.randint(config.vocab_size, (1,128)) ,)
    sample_targets = (torch.rand(1, config.vocab_size) ,)

    checkpoint_q = pybuda.initialize_pipeline(
            training=True, 
            sample_inputs=sample_inputs,
            sample_targets=sample_targets)


    for step in range(2):
        for acc_step in range(2):
            tt0.push_to_inputs(torch.randint(config.vocab_size, (1,128)))
            tt0.push_to_target_inputs(torch.rand(1, config.vocab_size).long())
            pybuda.run_forward(input_count = 1)
            pybuda.run_backward(input_count = 1, zero_grad = (acc_step == 0))

        pybuda.run_optimizer(checkpoint=True)

# Run training through run_training without placing on device
# Run training by placing on device first
# Repeated calls to run training
# Run training in concurrent mode, then push inputs afterwards
# Run training in concurrent mode, read checkpoints as they come out
# Run inference on multiple devices - combinations of cpu / tt device

#
# Run training through setup + manual loop of fwd/bwd/opt
#
def test_training_manual_loop():

    tt0 = pybuda.TTDevice("tt0", module=PyBudaTestModule("stage0"), optimizer=pybuda.optimizers.SGD(learning_rate=0.1, device_params=True))
    cpu1 = pybuda.CPUDevice("cpu1", module=pybuda.PyTorchModule("stage1", PyTorchTestModuleOneOut()),
            optimizer_f = lambda m: torch.optim.SGD(m.parameters(), lr=0.5))
    cpu1.place_loss_module(pybuda.PyTorchModule("l1loss", torch.nn.L1Loss()))
    
    # Compile & initialize the pipeline for training, with given shapes
    input1 = torch.rand(4, 32, 32)
    input2 = torch.rand(4, 32, 32)
    checkpoint_q = pybuda.initialize_pipeline(
            training=True, 
            sample_inputs=(input1, input2),
            sample_targets=(torch.rand(4, 32, 32),))


    for step in range(2):
        for acc_step in range(2):
            tt0.push_to_inputs((input1, input2))
            cpu1.push_to_target_inputs(torch.rand(4, 32, 32))

            pybuda.run_forward(input_count = 1)
            pybuda.run_backward(input_count = 1, zero_grad = (acc_step == 0))

        pybuda.run_optimizer(checkpoint=True)

    print("Checkpoint: ", _safe_read(checkpoint_q))

#
# Run training through setup + manual loop of fwd/bwd, while copying final gradients
#
def test_training_manual_loop_no_opt():

    #### Skip the test on golden. It should work, need to debug why it doesn't.
    import os
    if "PYBUDA_DEVMODE" in os.environ:
        pytest.skip()
    ####

    tt0 = pybuda.TTDevice("tt0", module=PyBudaTestModule("stage0"))
    cpu1 = pybuda.CPUDevice("cpu1", module=pybuda.PyTorchModule("stage1", PyTorchTestModuleOneOut()))
    cpu1.place_loss_module(pybuda.PyTorchModule("l1loss", torch.nn.L1Loss()))
    
    # Compile & initialize the pipeline for training, with given shapes
    pybuda.initialize_pipeline(
            training=True, 
            sample_inputs=(torch.rand(4, 32, 32), torch.rand(4, 32, 32)), 
            sample_targets=(torch.rand(4, 32, 32),))

    steps = 2

    for step in range(steps):
        for acc_step in range(1):
    
            input1 = torch.rand(4, 32, 32)
            input2 = torch.rand(4, 32, 32)
            tt0.push_to_inputs((input1, input2))

            cpu1.push_to_target_inputs(torch.rand(4, 32, 32))

            pybuda.run_forward(input_count = 1)
            pybuda.run_backward(input_count = 1, zero_grad = (acc_step == 0))

        print("Gradients on step ", step, ": ", pybuda.get_parameter_gradients())

#
# Run training and upload new weights from host
#
def test_training_weight_update_on_host():

    #### Skip the test on golden. It should work, need to debug why it doesn't.
    import os
    if "PYBUDA_DEVMODE" in os.environ:
        pytest.skip()
    ####

    tt0 = pybuda.TTDevice("tt0", module=PyBudaTestModule("stage0"))
    cpu1 = pybuda.CPUDevice("cpu1", module=pybuda.PyTorchModule("stage1", PyTorchTestModuleOneOut()))
    cpu1.place_loss_module(pybuda.PyTorchModule("l1loss", torch.nn.L1Loss()))
    
    # Compile & initialize the pipeline for training, with given shapes
    pybuda.initialize_pipeline(training=True, 
            sample_inputs=(torch.rand(4, 32, 32), torch.rand(4, 32, 32)), 
            sample_targets=(torch.rand(4, 32, 32),))

    for _ in range(2):
        input1 = torch.rand(4, 32, 32)
        input2 = torch.rand(4, 32, 32)
        tt0.push_to_inputs((input1, input2))

        cpu1.push_to_target_inputs(torch.rand(4, 32, 32))

    # Run fwd/bwd to calculate parameter gradients
    pybuda.run_forward(input_count = 1)
    pybuda.run_backward(input_count = 1, zero_grad = True)

    # Retrieve weights and gradients, and use host optimizer to update weights
    grads = pybuda.get_parameter_gradients(tt0)
    params = pybuda.get_parameter_checkpoint(tt0)
    for name in params[0]:
        params[0][name].value().grad = grads[0][name].value()
    opt = torch.optim.SGD([p.value() for p in params[0].values()], lr=10.0)
    opt.step()

    # Push new weights to the device
    pybuda.update_device_parameters(tt0, params)

    # Run again with new weights
    pybuda.run_forward(input_count = 1)
    pybuda.run_backward(input_count = 1, zero_grad = True)

# 
# Run inference pipeline and provide mp queues for device-to-device data
#
def test_inference_device_to_device_data():
    tt0 = pybuda.TTDevice("tt0", module=PyBudaTestModule("stage0"))
    cpu1 = pybuda.CPUDevice("cpu1", module=pybuda.PyTorchModule("stage1", PyTorchTestModule()))
    cpu2 = pybuda.CPUDevice("cpu2", module=pybuda.PyTorchModule("stage2", PyTorchTestModuleOneOut()))
    
    # Compile & initialize the pipeline for inference, and provide d2d mp queues to store device-to-device data in for further analysis
    tt0_output_q = mp_context.Queue()
    cpu1_output_q = mp_context.Queue()
    pybuda.initialize_pipeline(training=False, d2d_fwd_queues=[tt0_output_q, cpu1_output_q], 
            sample_inputs=(torch.rand(4, 32, 32), torch.rand(4, 32, 32) ))

    for _ in range(2):
        input1 = torch.rand(4, 32, 32)
        input2 = torch.rand(4, 32, 32)
        tt0.push_to_inputs((input1, input2))

    # Run fwd
    pybuda.run_forward(input_count = 1)

    # Read d2d queues
    print(_safe_read(tt0_output_q))
    print(_safe_read(cpu1_output_q))

# 
# Run training pipeline and provide mp queues for device-to-device data
#

def test_training_device_to_device_data():
    
    tt0 = pybuda.TTDevice("tt0", module=PyBudaTestModule("stage0"))
    cpu1 = pybuda.CPUDevice("cpu1", module=pybuda.PyTorchModule("stage1", PyTorchTestModule()))
    cpu2 = pybuda.CPUDevice("cpu2", module=pybuda.PyTorchModule("stage2", PyTorchTestModuleOneOut()))
    cpu2.place_loss_module(pybuda.PyTorchModule("l1loss", torch.nn.L1Loss()))
    
    # Compile & initialize the pipeline for inference, and provide d2d mp queues to store device-to-device data in for further analysis
    tt0_output_q = mp_context.Queue()
    cpu1_output_q = mp_context.Queue()
    cpu1_bwd_output_q = mp_context.Queue()
    cpu2_bwd_output_q = mp_context.Queue()
    pybuda.initialize_pipeline(
            training=True, 
            d2d_fwd_queues=[tt0_output_q, cpu1_output_q], 
            d2d_bwd_queues=[cpu1_bwd_output_q, cpu2_bwd_output_q], 
            sample_inputs=(torch.rand(4, 32, 32), torch.rand(4, 32, 32)), 
            sample_targets=(torch.rand(4, 32, 32),))

    for _ in range(2):
        input1 = torch.rand(4, 32, 32)
        input2 = torch.rand(4, 32, 32)
        tt0.push_to_inputs((input1, input2))

        cpu2.push_to_target_inputs(torch.rand(4, 32, 32))

    # Run fwd/bwd 
    pybuda.run_forward()
    pybuda.run_backward(zero_grad = True)

    # Read d2d queues
    print(_safe_read(tt0_output_q))
    print(_safe_read(cpu1_output_q))
    print(_safe_read(cpu1_bwd_output_q))
    print(_safe_read(cpu2_bwd_output_q))
    pybuda.get_parameter_gradients(tt0)

#
# Override data formats
#
def test_data_formats_input_override():

    mod = PyBudaTestModule("mod")
    tt0 = pybuda.TTDevice("tt0", module=mod)

    # Explicitly set data formats for parameters and inputs
    mod.weights1.set_data_format(pybuda.DataFormat.Float16)
    mod.weights2.set_data_format(pybuda.DataFormat.Float16)
    input1 = torch.rand(4, 32, 32, dtype=torch.float16)
    input2 = torch.rand(4, 32, 32, dtype=torch.float16)
    tt0.push_to_inputs((input1, input2))

    pybuda.run_inference()

def test_data_formats_fp32_fallback():
    
    # On this device, fall back to Float16 wherever Float32 is used
    tt0 = pybuda.TTDevice("tt0", module=PyBudaTestModule("mod"), fp32_fallback=pybuda.DataFormat.Float16)

    # Push Float32, which will be converted to Float16 due to fp32_fallback
    input1 = torch.rand(4, 32, 32)
    input2 = torch.rand(4, 32, 32)
    tt0.push_to_inputs((input1, input2))

    pybuda.run_inference()

def test_data_formats_op_override():
    
    tt0 = pybuda.TTDevice("tt0", module=PyBudaTestModule("mod"))

    # Use API to set manual data format override on an op
    pybuda.configure_mixed_precision(name_regex="matmul1", output_df=pybuda.DataFormat.Bfp8_b)
    
    input1 = torch.rand(4, 32, 32)
    input2 = torch.rand(4, 32, 32)
    tt0.push_to_inputs((input1, input2))

    pybuda.run_inference()

class TorchSchedulerWithWarmupAndDecay(pybuda.torch_schedulers.TorchLearningRateScheduler):
    def __init__(self, optimizer):
        super().__init__(optimizer)
    
    def get_lr(self):
        return [self.optimizer.param_groups[0]["lr"] + 1]
    
    def step(self):
        super().step()
        print(f"Torch optimizer learning rate updated to {self.optimizer.param_groups[0]['lr']}")


class TestScheduler(LearningRateScheduler):
        def __init__(self, optimizer):
            super().__init__(optimizer)
        
        def get_lr(self):
            return self.optimizer.learning_rate + 1
        
        def step(self):
            super().step()
            print(f"Pybuda optimizer learning rate updated to {self.optimizer.learning_rate}")
        
        def get_pytorch_scheduler(self, optimizer: torch.optim.Optimizer):
            if self.torch_scheduler is None:
                self.torch_scheduler = TorchSchedulerWithWarmupAndDecay(
                    optimizer=optimizer
                )
            
            return self.torch_scheduler


# Run the learning rate scheduler across 100 steps to
# show how optimizer learning rate gets updated
def test_learning_rate_scheduler():
            
    lr = 1
    optimizer = pybuda.optimizers.SGD(learning_rate=lr, device_params=True)
    scheduler = TestScheduler(optimizer=optimizer)
    
    tt0 = pybuda.TTDevice(
        "tt0", 
        module=PyBudaTestModuleOneOut("stage0"), 
        optimizer=optimizer,
        scheduler=scheduler
    )
    cpu1 = pybuda.CPUDevice(
        "cpu1",
        module=pybuda.PyTorchModule(
            "stage1",
            PyTorchTestModuleOneInputAndOneOut()
        ),
        optimizer_f=lambda module: torch.optim.SGD(module.parameters(), lr=lr),
        scheduler_f=lambda optimizer: scheduler.get_pytorch_scheduler(optimizer)
    )
    cpu1.place_loss_module(
        pybuda.PyTorchModule(
            "loss",
            PyTorchLoss()
        )
    )

    sequential = True
    pybuda.initialize_pipeline(training=True, 
            sample_inputs=(torch.rand(4, 32, 32), torch.rand(4, 32, 32)), 
            sample_targets=(torch.rand(4, 32, 32),), _sequential=sequential)

    for _ in range(100):
        pybuda.run_schedulers(sequential)
    
    
    
def test_specific_chip_id():
    """
    Run inference on a specific chip on a multi-chip system
    """
    num_devices = len(pybuda.detect_available_devices())

    if num_devices < 2:
        pytest.skip("Need at least 2 devices to run chip-id test")

    input1 = torch.rand(4, 32, 32)
    input2 = torch.rand(4, 32, 32)

    # Create a TT device, on last available chip
    tt0 = pybuda.TTDevice("tt0", chip_ids=[num_devices-1])

    # Place a module on the device
    tt0.place_module(PyBudaTestModule("last_chip"))

    # Push intputs to the device
    tt0.push_to_inputs((input1, input2))

    # Run pipeline, and read the outputs
    output_q = pybuda.run_inference()
    output = _safe_read(output_q)
    print(output)

def _run_on_chip(chip_id: int):

    # Each process needs to have its own temporary dir
    pybuda.set_configuration_options(backend_output_dir=f"tt_build/test_out_chip_{chip_id}")

    input1 = torch.rand(4, 32, 32)
    input2 = torch.rand(4, 32, 32)

    # Create a TT device, on last available chip
    tt0 = pybuda.TTDevice("tt0", chip_ids=[chip_id])

    # Place a module on the device
    tt0.place_module(PyBudaTestModule(f"chip_{chip_id}"))

    # Push intputs to the device
    tt0.push_to_inputs((input1, input2))

    # Run pipeline, and read the outputs
    output_q = pybuda.run_inference()
    output = _safe_read(output_q)
    print("From chip ", chip_id, ":", output)

    # Clean up the process so we can end it cleanly
    pybuda.shutdown()


def test_parallel_chips():
    """
    Run different models on multiple chips at the same time
    """
    pytest.skip("Appears to hang now")
    num_devices = len(pybuda.detect_available_devices())

    if num_devices < 2:
        pytest.skip("Need at least 2 devices to run parallel chip test")

    procs = []
    for i in range(num_devices):
        p = mp_context.Process(target=_run_on_chip, args=(i,))
        p.start()
        procs.append(p)

    for i, p in enumerate(procs):
        p.join()

def test_tti_inference_save_and_load():
    available_devices = pybuda.detect_available_devices()
    if available_devices and available_devices[0] == BackendDevice.Grayskull:
        tt0 = pybuda.TTDevice(
            "tt0",
            arch=BackendDevice.Grayskull,
            devtype=BackendType.Golden,
        )
    else:
        tt0 = pybuda.TTDevice(
            "tt0",
            arch=BackendDevice.Wormhole_B0,
            devtype=BackendType.Golden,
        )


    module = PyBudaTestModule("test_pybuda_module")
    tt0.place_module(module)

    # Saving to Archive
    input_shape = (1, 1, 32, 32)
    input1, input2  = torch.rand(*input_shape), torch.rand(*input_shape)
    device_img = tt0.compile_to_image(
        img_path="device_images/test_tt0.tti", 
        training=False,
        sample_inputs=(input1, input2),
    )
    pybuda_reset()  # flush the global state that lingers around for test

    # Loading from Archive
    tt1 = pybuda.TTDevice.load_image(img_path="device_images/test_tt0.tti")
    tt1.push_to_inputs((input1, input2))
    output_q = pybuda.run_inference()
    output = _safe_read(output_q)


@pytest.mark.parametrize("hoist_tms", [True, False])
def test_nop_insertion_api(hoist_tms):
    tt0 = pybuda.TTDevice("tt0", module=PyBudaTestQueryKeyModule(f"query_key_module_hoist_tms_{hoist_tms}"))

    # Use API to set manual data format override on an op
    pybuda.insert_nop("mha_key", "mha_as", hoist_tms=hoist_tms)
    microbatch_size, seq_len, hidden_dim = (1, 128, 128)
    encoder_input = torch.rand(microbatch_size, seq_len, hidden_dim)

    tt0.push_to_inputs((encoder_input))
    pybuda.run_inference()

@pytest.mark.parametrize("hoist_tms", [True, False])
def test_nop_fork_insertion_api(hoist_tms):
    tt0 = pybuda.TTDevice("tt0", module=PyBudaTestQueryKeyModule(f"forking_nop_insertion{hoist_tms}"))

    # Use API to set manual data format override on an op
    pybuda.insert_nop("encoder_input", ["mha_key", "mha_query"], hoist_tms=hoist_tms)
    microbatch_size, seq_len, hidden_dim = (1, 128, 128)
    encoder_input = torch.rand(microbatch_size, seq_len, hidden_dim)

    tt0.push_to_inputs((encoder_input))
    pybuda.run_inference()

@pytest.mark.parametrize("hoist_tms", [True, False])
def test_nop_daily_chain_insertion_api(hoist_tms):
    tt0 = pybuda.TTDevice("tt0", module=PyBudaTestForkWithThreeUsers(f"daisy_chain_nop_insertion{hoist_tms}"))

    # Use API to set manual data format override on an op
    pybuda.insert_nop("encoder_input", ["mm_a", "mm_b", "mm_c"], hoist_tms=hoist_tms)
    pybuda.insert_nop("buffer_0_encoder_input_mm_a", ["mm_b", "mm_c"], hoist_tms=hoist_tms)
    pybuda.insert_nop("buffer_0_buffer_0_encoder_input_mm_a_mm_b", ["mm_c"], hoist_tms=hoist_tms)
    microbatch_size, seq_len, hidden_dim = (1, 128, 128)
    encoder_input = torch.rand(microbatch_size, seq_len, hidden_dim)

    tt0.push_to_inputs((encoder_input))
    pybuda.run_inference()

def test_dram_channel_override():
    tt0 = pybuda.TTDevice("tt0", module=PyBudaTestModule(f"dram_channel_override"))

    # Use API to set manual data format override on an op
    input1 = torch.rand(4, 32, 32)
    input2 = torch.rand(4, 32, 32)
    pybuda.config.override_dram_queue_placement("e2e_matmul1_0", channel=0)
    pybuda.config.set_epoch_break("matmul2")

    tt0.push_to_inputs((input1, input2))
    pybuda.run_inference()

@pytest.mark.parametrize("loss", ["l1", "mse"])
def test_loss_module_on_ttdevice(loss):
    import torch.nn as nn
    class Lin(nn.Module):
        def __init__(self, d_model):
            super(Lin, self).__init__()
            self.input_linear = nn.Linear(1, d_model)

        def forward(self, src):
            output = self.input_linear(src)
            return output

    model = Lin(1)
    tt0 = pybuda.TTDevice(
        "tt0",
        module=pybuda.PyTorchModule("lin", model),
        optimizer=pybuda.optimizers.SGD(learning_rate=0.1, device_params=True)
    )
    if loss == "mse":
        tt0.place_loss_module(pybuda.PyTorchModule("mse_loss", nn.MSELoss()))
    else:
        tt0.place_loss_module(pybuda.PyTorchModule("l1_loss", nn.L1Loss()))

    inputs = torch.rand(1, 1)
    targets = torch.rand(1, 1)

    # Initialize pipeline
    checkpoint_q = pybuda.initialize_pipeline(
       training=True,
       sample_inputs=(inputs,),
       sample_targets=(targets,)
    )

    tt0.push_to_inputs(inputs)
    tt0.push_to_target_inputs(targets)
    pybuda.run_forward(input_count=1)
    pybuda.run_backward(input_count=1, zero_grad=True)
    pybuda.run_optimizer(checkpoint=True)