In [1]:
import os
import shutil
from email.headerregistry import Address

import numpy as np
import face_recognition
from sklearn.cluster import DBSCAN
from glob import glob
from PIL import Image

In [7]:
from PIL import Image
import face_recognition
import os
from glob import glob

PHOTO_DIR = "photos/"
SORTED_DIR = "sorted_photos/"
os.makedirs(SORTED_DIR, exist_ok=True)

face_encodings = []
face_image_paths = []

for photo_path in glob(os.path.join(PHOTO_DIR, "*.jpg")):
    try:
        # 先用 PIL 打开图片并转换格式
        with Image.open(photo_path) as img:
            img = img.convert("RGB")  # 确保是 RGB 格式
            
            img.save(photo_path)  # 保存转换后的图片

        # 再用 face_recognition 读取
        image = face_recognition.load_image_file(photo_path)
        encodings = face_recognition.face_encodings(image)

        for encoding in encodings:
            face_encodings.append(encoding)
            face_image_paths.append(photo_path)
    
    except Exception as e:
        print(f"❌ 处理 {photo_path} 时出错: {e}")

print("🎉 图片预处理完成，准备进行人脸分类！")
# 转换为 NumPy 数组
face_encodings = np.array(face_encodings)

# **1. DBSCAN 聚类**
if len(face_encodings) > 0:
    clustering = DBSCAN(eps=0.4, min_samples=1, metric="euclidean").fit(face_encodings)
    labels = clustering.labels_  # DBSCAN 输出的标签
else:
    labels = []
# **2. 归类照片**
for idx, label in enumerate(labels):
    if label == -1:
        person_folder = os.path.join(SORTED_DIR, "Unknown")  # 未分类人脸
    else:
        person_folder = os.path.join(SORTED_DIR, f"人物{label+1}")  # `label+1` 避免从0开始
    
    os.makedirs(person_folder, exist_ok=True)
    shutil.copy(face_image_paths[idx], person_folder)

print("🎉 照片分类完成！")


🎉 照片分类完成！


In [7]:
def classify_by_people(photo_path, known_faces, known_names):
    """
    识别人脸并分类到相应文件夹
    """
    image = face_recognition.load_image_file(photo_path)
    face_encodings = face_recognition.face_encodings(image)

    for encoding in face_encodings:
        matches = face_recognition.compare_faces(known_faces, encoding)
        if True in matches:
            matched_index = matches.index(True)
            person_name = known_names[matched_index]
        else:
            person_name = "Unknown"

        person_folder = os.path.join(SORTED_DIR, "People", person_name)
        os.makedirs(person_folder, exist_ok=True)
        shutil.copy(photo_path, person_folder)

photos/1.jpg -> Mode: RGB, Format: JPEG
photos/2.jpg -> Mode: RGB, Format: JPEG
photos/3.jpg -> Mode: RGB, Format: JPEG
photos/4.jpg -> Mode: RGB, Format: JPEG


In [20]:
next_label_id = 1

In [2]:
import os
import shutil
import json
from glob import glob
from PIL import Image
import face_recognition
import numpy as np

PHOTO_DIR = "photos/"
SORTED_DIR = "sorted_photos/"
ENCODINGS_FILE = "face_encodings.json"

os.makedirs(SORTED_DIR, exist_ok=True)

# ==========================
# 🌟 加载已有的人脸数据（JSON 文件）
# ==========================
def load_encodings():
    global next_label_id
    if not os.path.exists(ENCODINGS_FILE):
        print("📂 没有检测到现有的人脸数据库，初始化为空。")
        return {}, [], []

    with open(ENCODINGS_FILE, 'r') as f:
        data = json.load(f)
    
    known_face_dict = {}  # {label: encoding (np array)}
    known_face_encodings = []
    known_face_labels = []

    for label, encoding_list in data.items():
        label_int = int(label)
        encoding_array = np.array(encoding_list)
        known_face_dict[label_int] = encoding_array
        known_face_encodings.append(encoding_array)
        known_face_labels.append(label_int)
    
    next_label_id = max(known_face_dict.keys(), default=0) + 1
    print(f"✅ 成功加载 {len(known_face_dict)} 个已知人物数据。")
    return known_face_dict, known_face_encodings, known_face_labels

# ==========================
# 🌟 保存人脸编码到 JSON
# ==========================
def save_encodings(known_face_dict):
    data_to_save = {}
    for label, encoding_array in known_face_dict.items():
        data_to_save[label] = encoding_array.tolist()  # 转成 JSON 可保存的 list
    
    with open(ENCODINGS_FILE, 'w') as f:
        json.dump(data_to_save, f)
    
    print(f"💾 已保存 {len(data_to_save)} 个人物的编码数据到 {ENCODINGS_FILE}")

# ==========================
# 🌟 初始化已知人脸数据
# ==========================
known_face_dict, known_face_encodings, known_face_labels = load_encodings()

# ==========================
# 🌟 处理单张图片
# ==========================
def process_new_photo(photo_path):
    global next_label_id
    try:
        # 1. 打开并转换图片
        with Image.open(photo_path) as img:
            img = img.convert("RGB")
            image_array = np.array(img)
        
        # 2. 人脸识别
        encodings = face_recognition.face_encodings(image_array)

        if not encodings:
            print(f"⚠️ 图片 {photo_path} 不包含人脸，跳过处理。")
            return
        
        print(f"✅ 识别到 {len(encodings)} 张人脸，开始分类...")

        for encoding in encodings:
            if len(known_face_encodings) == 0:
                # 没有人脸数据，直接新建人物
                person_label = next_label_id
                next_label_id += 1

                known_face_dict[person_label] = encoding
                known_face_encodings.append(encoding)
                known_face_labels.append(person_label)
                print(f"🆕 未检测到已有人物，新建人物 {person_label}")

            else:
                # 比较与已知人脸的相似度
                distances = face_recognition.face_distance(known_face_encodings, encoding)
                min_distance = np.min(distances)
                best_match_index = np.argmin(distances)

                if min_distance < 0.4:  # 阈值可以调整
                    person_label = known_face_labels[best_match_index]
                    # print(f"👌 匹配到人物 {person_label}，距离为 {min_distance:.2f}")
                else:
                    # 新人物
                    person_label = next_label_id
                    next_label_id += 1

                    known_face_dict[person_label] = encoding
                    known_face_encodings.append(encoding)
                    known_face_labels.append(person_label)
                    # print(f"🆕 新建人物 {person_label}，距离为 {min_distance:.2f}")
            
            # 保存照片到分类目录
            person_folder = os.path.join(SORTED_DIR, f"人物{person_label}")
            os.makedirs(person_folder, exist_ok=True)
            shutil.copy(photo_path, person_folder)
        
        # 处理完图片，保存最新的 encodings
        save_encodings(known_face_dict)
        return person_label

    except Exception as e:
        print(f"❌ 处理 {photo_path} 时出错: {e}")

# ===============================
# 🌟 遍历已有的 PHOTO_DIR 图片，模拟上传流程
# ===============================
# all_photos = glob(os.path.join(PHOTO_DIR, "*.jpg"))
# 
# print(f"\n🚀 开始处理图片，检测照片数量: {len(all_photos)} 张\n")
# 
# for photo in all_photos:
#     process_new_photo(photo)
# 
# print("\n🎉 所有图片处理完成！")


✅ 成功加载 10 个已知人物数据。


In [3]:
from PIL import Image
import piexif
from datetime import datetime
from geopy.geocoders import Nominatim

def get_decimal_from_dms(dms, ref):
    """Convert GPS coordinates in DMS to decimal format."""
    degrees = dms[0][0] / dms[0][1]
    minutes = dms[1][0] / dms[1][1]
    seconds = dms[2][0] / dms[2][1]

    decimal = degrees + (minutes / 60.0) + (seconds / 3600.0)

    if ref in ['S', 'W']:
        decimal = -decimal
    return decimal

def reverse_geocode(lat, lon):
    """Use geopy to reverse geocode latitude and longitude to address."""
    geolocator = Nominatim(user_agent="photo_metadata_app")
    try:
        location = geolocator.reverse((lat, lon), exactly_one=True, language='en')
        if location:
            return location.address
        else:
            return "Address not found"
    except Exception as e:
        return f"Error retrieving address: {e}"

def extract_exif_data(image_path):
    # 打开图片文件
    img = Image.open(image_path)

    # 获取 EXIF 信息
    exif_data = img._getexif()

    # 提取拍摄时间
    timestamp = None
    if exif_data and 36867 in exif_data:
        timestamp = exif_data[36867]  # DateTimeOriginal
        timestamp = datetime.strptime(timestamp, '%Y:%m:%d %H:%M:%S')

    # 使用 piexif 加载更详细的 EXIF 数据
    exif_dict = piexif.load(img.info['exif']) if 'exif' in img.info else None

    # 提取相机/设备信息
    camera_model = None
    if exif_dict:
        model = exif_dict['0th'].get(piexif.ImageIFD.Model, None)
        make = exif_dict['0th'].get(piexif.ImageIFD.Make, None)

        camera_model = ""
        if make:
            camera_model += make.decode('utf-8') + " "
        if model:
            camera_model += model.decode('utf-8')

    # 提取 GPS 信息
    gps_info = exif_dict.get('GPS', None) if exif_dict else None
    latitude = longitude = None
    address = None
    if gps_info:
        gps_latitude = gps_info.get(piexif.GPSIFD.GPSLatitude)
        gps_latitude_ref = gps_info.get(piexif.GPSIFD.GPSLatitudeRef).decode('utf-8')
        gps_longitude = gps_info.get(piexif.GPSIFD.GPSLongitude)
        gps_longitude_ref = gps_info.get(piexif.GPSIFD.GPSLongitudeRef).decode('utf-8')

        if gps_latitude and gps_latitude_ref and gps_longitude and gps_longitude_ref:
            latitude = get_decimal_from_dms(gps_latitude, gps_latitude_ref)
            longitude = get_decimal_from_dms(gps_longitude, gps_longitude_ref)

            # 反向地理编码获取地址
            address = reverse_geocode(latitude, longitude)

    # 返回提取的信息
    return {
        'Timestamp': timestamp,
        'Latitude': latitude,
        'Longitude': longitude,
        'Address': address,
        'Camera/Device': camera_model
    }

# 示例：测试一张图片
image_file = 'photos/13.jpg'  # 替换为你自己的图片路径
metadata = extract_exif_data(image_file)

print("Metadata extracted from image:")
for key, value in metadata.items():
    print(f"{key}: {value}")



Metadata extracted from image:
Timestamp: None
Latitude: None
Longitude: None
Address: None
Camera/Device: None


In [4]:
from transformers import BlipProcessor, BlipForConditionalGeneration
from PIL import Image
import spacy
import torch

# 加载 BLIP 模型和处理器
processor = BlipProcessor.from_pretrained("Salesforce/blip-image-captioning-base")
model = BlipForConditionalGeneration.from_pretrained("Salesforce/blip-image-captioning-base")

In [5]:
# 生成图片描述
def generate_caption(image_path):
    try:
        raw_image = Image.open(image_path).convert('RGB')
        inputs = processor(raw_image, return_tensors="pt")
        out = model.generate(**inputs)
        caption = processor.decode(out[0], skip_special_tokens=True)
        return caption
    except Exception as e:
        print(f"Caption generation error for {image_path}: {e}")
        return "No description available."

caption = generate_caption("photos/11.jpg")
print(f"Image description: {caption}")


Image description: a plate of food with cuce and sauce on it


In [6]:
import spacy
from spacy.cli import download

# 自动下载模型
download("en_core_web_sm")

# 加载模型
nlp = spacy.load("en_core_web_sm")

[38;5;2m✔ Download and installation successful[0m
You can now load the package via spacy.load('en_core_web_sm')
[38;5;3m⚠ Restart to reload dependencies[0m
If you are in a Jupyter or Colab notebook, you may need to restart Python in
order to load all the package's dependencies. You can do this by selecting the
'Restart kernel' or 'Restart runtime' option.


In [1]:
def extract_noun_tags(text):
    """
    提取英文文本中的关键词（名词和专有名词）

    参数:
        text (str): 输入的英文文本

    返回:
        tags (list): 去重且小写处理后的关键词列表
    """
    if not text:
        return []

    # 分词 & 词性标注
    doc = nlp(text)

    # 只保留名词（NOUN）和专有名词（PROPN）
    tags = [token.text for token in doc if token.pos_ in ['NOUN', 'PROPN']]

    # 去重 & 小写（可根据需求取消小写处理）
    tags = list(set(tag.lower() for tag in tags))
    
    print(f"Image event tags: {tags}")

    return tags


In [2]:
# 处理文件夹下所有图片
def process_images(img_path):
    # 提取元信息
    metadata = extract_exif_data(img_path)
    
    # 生成描述
    caption = generate_caption(img_path)

    #Aoto-tagging
    tags = extract_noun_tags(caption)
    
    #人脸识别 
    person_label = process_new_photo(img_path)
    
    # ✅ 平铺 JSON 格式
    photo_info = {
        'Timestamp': metadata.get('Timestamp'),
        'Latitude': metadata.get('Latitude'),
        'Longitude': metadata.get('Longitude'),
        'Address': metadata.get('Address'),
        'Camera/Device': metadata.get('Camera/Device'),
        'Caption': caption,
        'AutoTags': tags,
        'PersonLabel': person_label
    }
    
    print(f"Processed {img_path}: {caption} @ {photo_info.get('Address')} on {photo_info.get('Timestamp')}") 
    
    return photo_info

In [3]:
process_images("photos/6.jpg")

NameError: name 'extract_exif_data' is not defined

In [None]:
from fastapi import FastAPI, UploadFile, File
from fastapi.responses import JSONResponse
import shutil
import os
from datetime import datetime


app = FastAPI()

# 上传图片接口
@app.post("/process_image")
async def process_image(file: UploadFile = File(...)):
    try:        
        # 调用处理函数
        result = process_images(file)

        # 返回 JSON
        return JSONResponse(content=result)

    except Exception as e:
        return JSONResponse(status_code=500, content={"error": str(e)})


In [5]:
import requests

# 第一步：获取上传 URL
url_get_upload_url = "https://api.immersity.ai/api/v1/get-upload-url"
headers = {"accept": "application/json"}

response = requests.get(url_get_upload_url, headers=headers)

# 检查是否成功返回上传 URL
if response.status_code == 200:
    upload_url = response.json().get('upload_url')
    if upload_url:
        print(f"上传 URL: {upload_url}")
        
        # 第二步：上传图片到服务器
        file_path = "/mnt/data/image.png"  # 你的图片文件路径

        files = {
            "file": open(file_path, "rb")
        }

        upload_response = requests.post(upload_url, files=files)
        files['file'].close()

        # 检查上传是否成功
        if upload_response.status_code == 200:
            print("图片上传成功。")
            uploaded_file_url = upload_response.json().get("file_url")
            print(f"文件 URL: {uploaded_file_url}")

            # 第三步：创建 3D 动画
            url_animation = "https://api.immersity.ai/api/v1/animation"
            headers = {
                "accept": "application/json",
                "content-type": "application/json"
            }

            data = {
                "image_url": uploaded_file_url  # 使用上传后的文件 URL
            }

            response_animation = requests.post(url_animation, json=data, headers=headers)

            # 打印 API 返回的响应，确认 3D 动画生成成功
            if response_animation.status_code == 200:
                print("3D 动画生成成功。")
                print("响应: ", response_animation.json())
            else:
                print("生成动画失败。")
                print("错误: ", response_animation.text)
        else:
            print("图片上传失败。")
            print(f"错误: {upload_response.text}")
    else:
        print("错误：没有返回上传 URL。")
else:
    print(f"获取上传 URL 失败。错误: {response.text}")


获取上传 URL 失败。错误: {"message":"Unauthorized","statusCode":401}


In [None]:
import requests

url = "https://auth.immersity.ai/auth/realms/immersity/protocol/openid-connect/token"

# 替换为你自己的 client_id 和 client_secret
client_id = "a549150e-3f5f-472d-9cae-ace8f9748c99"
client_secret = "gwAaBb5sTXZs4wqWnch3MOrgPU8LLmYe"

# 请求参数
payload = {
    "grant_type": "client_credentials",
    "client_id": client_id,
    "client_secret": client_secret
}

# 请求头
headers = {
    "accept": "application/json",
    "content-type": "application/x-www-form-urlencoded"
}

# 发起 POST 请求
response = requests.post(url, data=payload, headers=headers)

# 查看返回值
print(response.status_code)
print(response.json())

In [2]:
import requests
import os

# Step 1: 获取 Access Token
url_auth = "https://auth.immersity.ai/auth/realms/immersity/protocol/openid-connect/token"

client_id = "a549150e-3f5f-472d-9cae-ace8f9748c99"
client_secret = "gwAaBb5sTXZs4wqWnch3MOrgPU8LLmYe"

payload = {
    "grant_type": "client_credentials",
    "client_id": client_id,
    "client_secret": client_secret
}

headers_auth = {
    "accept": "application/json",
    "content-type": "application/x-www-form-urlencoded"
}

response_auth = requests.post(url_auth, data=payload, headers=headers_auth)

if response_auth.status_code == 200:
    access_token = response_auth.json().get("access_token")
    print(f"✅ Access Token 获取成功: {access_token}\n")
else:
    print("❌ 获取 Access Token 失败。")
    print(f"错误信息: {response_auth.text}")
    exit()

# Step 2: 获取上传 URL
url_get_upload_url = "https://api.immersity.ai/api/v1/get-upload-url"
headers = {
    "accept": "application/json",
    "Authorization": f"Bearer {access_token}"
}

response = requests.get(url_get_upload_url, headers=headers)

if response.status_code == 200:
    upload_url = response.json().get('url')
    if upload_url:
        print(f"✅ 获取上传 URL 成功: {upload_url}\n")

        # Step 3: 上传图片到服务器
        file_path = "photos/10.jpg"

        if not os.path.isfile(file_path):
            print(f"❌ 文件未找到: {file_path}")
            exit()

        with open(file_path, "rb") as f:
            upload_response = requests.put(upload_url, data=f)

        if upload_response.status_code in [200, 201]:
            print("✅ 图片上传成功！\n")

            # Step 4: 创建 3D 动画
            url_animation = "https://api.immersity.ai/api/v1/animation"
            headers_animation = {
                "accept": "application/json",
                "content-type": "application/json",
                "Authorization": f"Bearer {access_token}"
            }

            # uploaded_file_url 这里根据 API 文档确认，如果 upload_url 可以用来创建动画，就直接用。
            # 否则你需要 API 提供 file_url，如果没有返回，就可能需要你自己拼接 URL。
            data = {
                "image_url": upload_url.split("?")[0]  # 通常上传 URL 去掉参数部分就是文件 URL
            }

            print(f"➡️  发送创建动画请求，使用图片 URL: {data['image_url']}\n")

            response_animation = requests.post(url_animation, json=data, headers=headers_animation)

            if response_animation.status_code == 200:
                print("✅ 3D 动画生成成功！")
                print("响应内容: ", response_animation.json())
            else:
                print("❌ 生成动画失败。")
                print(f"错误信息: {response_animation.text}")
        else:
            print("❌ 图片上传失败。")
            print(f"错误信息: {upload_response.text}")
    else:
        print("❌ 获取上传 URL 成功，但未返回 URL。")
else:
    print(f"❌ 获取上传 URL 失败。错误信息: {response.text}")



✅ Access Token 获取成功: eyJhbGciOiJSUzI1NiIsInR5cCIgOiAiSldUIiwia2lkIiA6ICJVbDlpOTVQaVdhVXlsYmdQQVdZcERyS0NibWZhbUt3NnJWMk5jdWlwX09FIn0.eyJleHAiOjE3NDI1NzEyMzUsImlhdCI6MTc0MjU3MDMzNSwianRpIjoiZDViYzA3MTUtNDlhNi00ZjJjLTliODgtN2U5NDYxMTJjNWM5IiwiaXNzIjoiaHR0cHM6Ly9hdXRoLmltbWVyc2l0eS5haS9hdXRoL3JlYWxtcy9pbW1lcnNpdHkiLCJzdWIiOiJiOGFhNTliNC01OThjLTQ1NjYtYWVkNS1iYzA4MzdkMzA2MDAiLCJ0eXAiOiJCZWFyZXIiLCJhenAiOiJhNTQ5MTUwZS0zZjVmLTQ3MmQtOWNhZS1hY2U4Zjk3NDhjOTkiLCJzY29wZSI6ImxlaWEtYXBpLWdhdGV3YXkiLCJjbGllbnRIb3N0IjoiODIuMTUzLjEzNS4yMTMiLCJjbGllbnQtb3duZXItdXNlci1pZCI6ImEyYTVlMDNlLTJjZmEtNGU1NS1hMmNiLTQyZGY1OWE1YTM2ZiIsImNsaWVudElkIjoiYTU0OTE1MGUtM2Y1Zi00NzJkLTljYWUtYWNlOGY5NzQ4Yzk5IiwiY2xpZW50QWRkcmVzcyI6IjgyLjE1My4xMzUuMjEzIn0.GPupfrQ1WqYuq9BEh_uU3ybRJgme-l4AlEISIPE0O6J7MWHMiID7NwR-AndvgUW1Hq4VNZGIOFpQsAz9eqMzUgEHPFL1UVvYSPuH_ndf4UMp40q_jSaW84KDQuNv0n0p0HnvU1S6KYop2DdjuvP1zycazQ3gJMf98XdjO79IIyQq7fTKUC1N7WM5GPvkYysDEf6F1AH4e-eOu4vRUjECR0VC0EsU3ZJxUM-a0nyc2ojVeY3vl8liCR0O3FuxyCPvRSb2Sw-V3uYRn-lgZ1H

ValueError: I/O operation on closed file.

In [1]:
# import sys
# import os
# import uuid
# import requests
# from get_access_token import *
#
# MEDIA_CLOUD_REST_API_BASE_URL = 'https://api.immersity.ai'
#
# DEFAULT_ORIGINAL_IMAGE_URL = 'https://images.pexels.com/photos/38771/pexels-photo-38771.jpeg?auto=compress&cs' \
#                              '=tinysrgb&w=1260&h=750&dpr=1'
# ORIGINAL_IMAGE_URL = os.getenv('ORIGINAL_IMAGE_URL', DEFAULT_ORIGINAL_IMAGE_URL)
# ORIGINAL_IMAGE_URL = DEFAULT_ORIGINAL_IMAGE_URL if ORIGINAL_IMAGE_URL == '' else ORIGINAL_IMAGE_URL
#
# THREE_MIN_IN_S = 3 * 60
#
#
# try:
#     print('Acquiring access token from Immersity AI Login...')
#     access_token = get_access_token()
#     print(f'\nImmersity AI Login AccessToken acquired: {access_token}')
#
#     correlation_id = str(uuid.uuid4())
#     print(f'\nGenerating Disparity with correlationId: {correlation_id}...')
#
#     response = requests.post(
#         f'{MEDIA_CLOUD_REST_API_BASE_URL}/api/v1/animation',
#         headers={
#             'Authorization': f'Bearer {access_token}'
#         },
#         json={
#             'correlationId': correlation_id,
#             'inputImageUrl': ORIGINAL_IMAGE_URL,
#             'inputDisparityUrl': disparity_url,
#             'animationLength': 5
#         },
#         timeout=THREE_MIN_IN_S
#     )
#     if not response.status_code == 201:
#         raise Exception(f"Request returned with an error {response.status_code}. "
#                         f"The full response is: {response.content}")
#
#     # The resulting file is accessible via the pre-signed GET URL, that you
#     # can find included in the response to the animation request:
#     get_mp4_presigned_url = response.json()['resultPresignedUrl']
#     print(f'\nMP4 Animation has been uploaded to the temporary storage. '
#           f'To download, please use this GET URL: {get_mp4_presigned_url}')
#
# 		# We omit the error handling in this example for simplicity, but
#     # you should always check for a returned status & errors from the API
#     # in real code.
#
# except Exception as e:
#     print('Error. Unhandled exception: ' + str(e), file=sys.stderr)


ModuleNotFoundError: No module named 'get_access_token'

In [17]:
from moviepy.editor import *
import os
import glob

def create_video(image_folder, audio_path, output_path, fps=24, duration_per_image=3, transition_duration=1):
    # 获取所有图片并按文件名排序
    image_files = sorted(glob.glob(os.path.join(image_folder, "*.*")))
    
    # 创建图片剪辑列表（统一调整为1920x1080分辨率）
    clips = []
    for img in image_files:
        clip = ImageClip(img).set_duration(duration_per_image).resize((1920, 1080))
        clip = clip.crossfadein(transition_duration).crossfadeout(transition_duration)
        clips.append(clip)

    # 拼接视频剪辑（使用过渡重叠）
    final_clip = concatenate_videoclips(
        clips,
        method="compose",
        padding=-transition_duration  # 负值表示重叠时间
    )

    # 添加背景音乐
    audio_clip = AudioFileClip(audio_path)
    if audio_clip.duration < final_clip.duration:
        # 循环音频
        audio_clip = audio_clip.fx(vfx.loop, duration=final_clip.duration)
    else:
        # 截取音频
        audio_clip = audio_clip.subclip(0, final_clip.duration)
    
    final_clip = final_clip.set_audio(audio_clip)

    # 输出视频
    final_clip.write_videofile(
        output_path,
        fps=fps,
        codec="libx264",
        audio_codec="aac",
        threads=8,
        preset="medium"
    )

if __name__ == "__main__":
    # 参数配置
    config = {
        "image_folder": "/path/to/your/images",  # 图片文件夹路径
        "audio_path": "/path/to/background_music.mp3",  # 背景音乐路径
        "output_path": "output_video.mp4",        # 输出视频路径
        "fps": 30,                                # 帧率
        "duration_per_image": 3,                  # 每张图片显示时间（秒）
        "transition_duration": 1                  # 过渡特效时间（秒）
    }
    
    create_video(**config)

ModuleNotFoundError: No module named 'moviepy.editor'

In [4]:
import cv2
import numpy as np
import os
from moviepy.editor import VideoFileClip, AudioFileClip, concatenate_videoclips, ImageSequenceClip

def resize_images(image_paths, target_size, temp_folder="temp_images"):
    os.makedirs(temp_folder, exist_ok=True)  # 创建临时文件夹存储调整大小的图片
    resized_images = []
    for i, img_path in enumerate(image_paths):
        img = cv2.imread(img_path)
        img_resized = cv2.resize(img, target_size)  # 调整图片大小
        temp_img_path = os.path.join(temp_folder, f"temp_{i}.jpg")
        cv2.imwrite(temp_img_path, img_resized)  # 保存调整后的图片
        resized_images.append(temp_img_path)
    return resized_images

def create_video_from_images(image_folder, output_video, fps=2, music_path=None, transition=True):
    images = [img for img in os.listdir(image_folder) if img.endswith((".png", ".jpg", ".jpeg"))]
    images.sort()  # 按文件名排序
    
    if not images:
        print("No images found in the folder.")
        return
    
    image_paths = [os.path.join(image_folder, img) for img in images]
    
    # 获取第一张图片的尺寸作为标准
    first_image = cv2.imread(image_paths[0])
    target_size = (first_image.shape[1], first_image.shape[0])  # (width, height)
    
    # 调整所有图片大小
    resized_images = resize_images(image_paths, target_size)
    
    clip = ImageSequenceClip(resized_images, fps=fps)
    
    if transition:
        clip = apply_transition_effects(clip)
    
    if music_path and os.path.exists(music_path):
        audio = AudioFileClip(music_path).set_duration(clip.duration)
        clip = clip.set_audio(audio)
    
    clip.write_videofile(output_video, codec='libx264', fps=fps)

def apply_transition_effects(clip):
    clips = [ImageSequenceClip([frame], fps=clip.fps).set_duration(0.5) for frame in clip.iter_frames()]
    return concatenate_videoclips(clips, method="compose", padding=-0.25)

image_folder = "photos"  # 照片所在文件夹
output_video = "output.mp4"  # 输出视频
music_path = "111.mp3"  # 背景音乐
create_video_from_images(image_folder, output_video, fps=1, music_path=music_path, transition=True)


Moviepy - Building video output.mp4.
MoviePy - Writing audio in outputTEMP_MPY_wvf_snd.mp3


                                                       

MoviePy - Done.
Moviepy - Writing video output.mp4



                                                          

Moviepy - Done !
Moviepy - video ready output.mp4


In [16]:
import cv2
import numpy as np
import os
from moviepy.editor import VideoFileClip, AudioFileClip, concatenate_videoclips, ImageSequenceClip, ImageClip
import moviepy.video.fx as vfx  # 引入 moviepy 的特效模块

def resize_images(image_paths, target_size, temp_folder="temp_images"):
    os.makedirs(temp_folder, exist_ok=True)
    resized_images = []
    for i, img_path in enumerate(image_paths):
        img = cv2.imread(img_path)
        img_resized = cv2.resize(img, target_size)
        temp_img_path = os.path.join(temp_folder, f"temp_{i}.jpg")
        cv2.imwrite(temp_img_path, img_resized)
        resized_images.append(temp_img_path)
    return resized_images

def apply_transition_effects(image_paths, duration=2.0, transition_duration=0.5):
    """ 使用 vfx.fadein 和 vfx.fadeout 添加淡入淡出效果 """
    clips = []
    for img_path in image_paths:
        clip = ImageClip(img_path, duration=duration)
        clip = vfx.CrossFadeIn(clip)
        clip = vfx.CrossFadeOut(clip)
        clips.append(clip)
    
    return concatenate_videoclips(clips, method="compose")

def create_video_from_images(image_folder, output_video, fps=2, music_path=None, transition=True):
    images = [img for img in os.listdir(image_folder) if img.endswith((".png", ".jpg", ".jpeg"))]
    images.sort()
    
    if not images:
        print("No images found in the folder.")
        return
    
    image_paths = [os.path.join(image_folder, img) for img in images]
    
    first_image = cv2.imread(image_paths[0])
    target_size = (first_image.shape[1], first_image.shape[0])
    
    resized_images = resize_images(image_paths, target_size)
    
    if transition:
        clip = apply_transition_effects(resized_images, duration=2.0, transition_duration=0.5)
    else:
        clip = ImageSequenceClip(resized_images, fps=fps)
    
    if music_path and os.path.exists(music_path):
        audio = AudioFileClip(music_path).set_duration(clip.duration)
        clip = clip.set_audio(audio)
    
    clip.write_videofile(output_video, codec='libx264', fps=fps)
    
    # Cleanup temporary images
    for temp_img in resized_images:
        os.remove(temp_img)
    os.rmdir("temp_images")  # Remove the temporary folder

image_folder = "photos"
output_video = "output.mp4"
music_path = "111.mp3"
create_video_from_images(image_folder, output_video, fps=2, music_path=music_path, transition=True)


TypeError: unsupported operand type(s) for +: 'int' and 'CrossFadeIn'