In [4]:
import requests
import json
import os
from fastapi import FastAPI
from shutil import copyfile

app = FastAPI()

OTA_SERVER = "http://172.30.1.39/static/{version_info['file_name']}"  # 클라우드 OTA 서버
LOCAL_STORAGE = "../"  # 엣지노드 내부 저장소
VERSION_FILE = f"{LOCAL_STORAGE}/version.json"

# OTA 정보 업데이트 (주기적으로 실행됨)
def update_ota_version():
    try:
        response = requests.get(f"{OTA_SERVER}/latest_version")
        version_info = response.json()

        local_version = load_local_version()
        if local_version["version"] != version_info["version"]:
            print(f"새로운 OTA 감지: {version_info['version']}, 다운로드 중...")

            # OTA 파일 다운로드
            firmware_url = version_info["download_url"]
            local_file = f"{LOCAL_STORAGE}/{version_info['file_name']}"
            download_file(firmware_url, local_file)

            # 최신 버전 정보 저장
            with open(VERSION_FILE, "w") as f:
                json.dump(version_info, f)

            print(f"OTA 업데이트 완료: {version_info['version']} 저장됨.")
        else:
            print("최신 버전이 이미 존재합니다.")

    except Exception as e:
        print(f"OTA 업데이트 실패: {e}")

# 로컬 버전 정보 로드
def load_local_version():
    try:
        with open(VERSION_FILE, "r") as f:
            return json.load(f)
    except:
        return {"version": "unknown", "file_name": ""}

# 파일 다운로드 함수
def download_file(url, local_path):
    response = requests.get(url, stream=True)
    with open(local_path, "wb") as f:
        for chunk in response.iter_content(chunk_size=1024):
            if chunk:
                f.write(chunk)

# 차량이 최신 버전 요청 시 응답
@app.get("/latest_version")
def latest_version():
    version_info = load_local_version()
    return {
        "version": version_info["version"],
        "download_url": f"http://edge-node.local/{version_info['file_name']}"
    }

# 차량이 엣지노드에서 파일 다운로드
@app.get("/download")
def download():
    version_info = load_local_version()
    file_path = f"{LOCAL_STORAGE}/{version_info['file_name']}"
    if os.path.exists(file_path):
        return {"download_path": file_path}
    else:
        return {"error": "파일이 없습니다."}


In [5]:
from fastapi import FastAPI
from time import time
import httpx
import asyncio

app = FastAPI()

URL = "http://httpbin.org/uuid"

async def request(client):
    response = await client.get(URL)
    return response.text

async def task():
    async with httpx.AsyncClient() as client:
        tasks = [request(client) for i in range(100)]
        result = await asyncio.gather(*tasks)
        print(result)

@app.get('/')
async def f():
    start = time()
    await task()
    print("time:",time() - start)