In [None]:
# 基本post请求处理
# 用户注册接口
from fastapi import FastAPI
from pydantic import BaseModel

app = FastAPI()

# 定义请求体模型
class UserCreate(BaseModel):
    username: str
    password: str
    email: str
    level: int = 1  # 默认等级

@app.post("/api/users")
async def create_user(user: UserCreate):
    """创建新用户"""
    # 这里添加业务逻辑（如密码哈希、保存到数据库）
    return {
        "code": 200,
        "data": {
            "username": user.username,
            "email": user.email
        }
    }

In [None]:
# 文件上传 + 表单数据
# 上传曲谱（PDF + 元数据）
from fastapi import UploadFile, File, Form
from typing import Annotated

@app.post("/api/musicsheets")
async def upload_musicsheet(
    title: Annotated[str, Form()],
    pdf_file: UploadFile = File(...),
    audio_file: UploadFile = File(...)
):
    # 保存文件到磁盘
    pdf_path = f"storage/{pdf_file.filename}"
    with open(pdf_path, "wb") as f:
        f.write(await pdf_file.read())

    return {
        "title": title,
        "pdf_size": pdf_file.size,
        "audio_type": audio_file.content_type
    }

In [None]:
# 数据库集成
# 使用SQLAlchemy操作数据库
from sqlalchemy import Column, Integer, String
from sqlalchemy.ext.declarative import declarative_base

Base = declarative_base()

class User(Base):
    __tablename__ = "users"
    id = Column(Integer, primary_key=True)
    username = Column(String(50), unique=True)
    hashed_password = Column(String(200))
    email = Column(String(100))


In [None]:
# 带数据库操作的POST接口

from fastapi import Depends
from sqlalchemy.orm import Session
from .database import SessionLocal, engine
from .models import User

# 创建数据库表
Base.metadata.create_all(bind=engine)

# 依赖项获取数据库会话
def get_db():
    db = SessionLocal()
    try:
        yield db
    finally:
        db.close()

@app.post("/api/users/advanced")
async def create_user_with_db(
    user: UserCreate,
    db: Session = Depends(get_db)
):
    # 密码哈希处理
    hashed_password = f"hashed_{user.password}"
    
    # 创建数据库记录
    db_user = User(
        username=user.username,
        hashed_password=hashed_password,
        email=user.email
    )
    db.add(db_user)
    db.commit()
    db.refresh(db_user)
    
    return {"id": db_user.id, "username": db_user.username}

In [None]:
# 安全性增强
# 使用OAuth2进行token验证
from fastapi.security import OAuth2PasswordBearer

oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")

@app.post("/api/protected")
async def protected_route(
    token: str = Depends(oauth2_scheme)
):
    # 这里添加token验证逻辑
    return {"message": "Secure endpoint", "token": token}

In [6]:
# 获取文件
# ip/filename
# 118.89.82.46/test.txt

ip = "118.89.82.46"
filename = "test.md"


# # 下载文件
# import requests

# url = f"http://{ip}/{filename}"
# response = requests.get(url)

# with open(filename, "wb") as f:
#     f.write(response.content)

# print("File downloaded successfully.")  

# 上传文件
import requests
from http import HTTPStatus

url = f"http://{ip}:1412/upload"
files = {"file": open(filename, "rb")}
response = requests.post(url, files=files)
if response.status_code != HTTPStatus.OK:
    print(f"Failed to upload file: {response.text}")

print("File uploaded successfully.")
print(response.json())

File uploaded successfully.
[{'id': '92198f10-03f4-4863-93e1-4b86044cea27', 'message': 'File successfully uploaded', 'path': './file/92198f10-03f4-4863-93e1-4b86044cea27.md'}, 200]


In [25]:
import os
import requests
from fastapi import HTTPException
from dotenv import load_dotenv

load_dotenv()

host = os.getenv("DB_HOST")
url = f"http://{host}:1412/upload"

def upload_file(file_path: str) -> str:
    """
    上传文件到数据库, 返回网址
    """
    files = {"file": open(file_path, "rb")}
    response = requests.post(url, files=files)
    
    if response.status_code != 200:
        raise HTTPException(status_code=500, detail=f"上传文件失败: {response.text}")
    
    # 网址
    path = response.json()[0]["path"].split("/")[-1]
    db_path = f"http://{host}/{path}"
    return db_path

if __name__ == "__main__":
    file_path = "test.md"
    db_path = upload_file(file_path)
    print(db_path)


http://118.89.82.46/905b0638-5629-4f02-815e-86b0529b3a41.md


In [None]:
# 计算音频文件时长
import librosa
from mutagen.mp3 import MP3

audio1_path = "data/audio1.wav"
audio2_path = "data/我和我的祖国_爱给网_aigei_com.mp3"


duration1 = librosa.get_duration(path=audio2_path)
duration2 = MP3(audio2_path).info.length
print(duration1)
print(duration2)

43.55047619047619
43.59836734693877


In [9]:
file_path1 = 'data/我和我的祖国.mp3'
file_path2 = "http://118.89.82.46/94e43e47-5fbc-4e37-8df4-7a0eb97f705c.mp3"
y1, sr1 = librosa.load(file_path1)
y2, sr2 = librosa.load(file_path2)

print(y1.shape, sr1)
print(y2.shape, sr2)

  y2, sr2 = librosa.load(file_path2)
	Deprecated as of librosa version 0.10.0.
	It will be removed in librosa version 1.0.
  y, sr_native = __audioread_load(path, offset, duration, dtype)


FileNotFoundError: [Errno 2] No such file or directory: 'http://118.89.82.46/94e43e47-5fbc-4e37-8df4-7a0eb97f705c.mp3'