In [990]:
import os
from PIL import Image
import pandas as pd
import numpy as np
import random
import time
import json
import requests as rq
from dotenv import load_dotenv
import os

## 清空数据

In [1009]:
import shutil
folders = ['images', 'metadata']
for folder in folders:
    shutil.rmtree(folder)
    os.mkdir(folder)    

## 生成模版

In [992]:
# 设置素材参数
W = 400
H = 400
EXTENSION = 'png'
PARTS = "./parts"
IMAGES = './images'
METADATA = './metadata'
AMOUNT = 39

load_dotenv()

PROXIES = {
  "http": "http://127.0.0.1:7890",
  "https": "http://127.0.0.1:7890",
}

# 加载上传配置，需要注册infura的ipfs获取PROJECT_ID和PROJECT_SECRET
PROJECT_ID = os.getenv('PROJECT_ID')
PROJECT_SECRET = os.getenv('PROJECT_SECRET')

In [993]:
# 遍历文件
files_path = []
for root, dirs, _ in os.walk(PARTS):
    if root != PARTS:
        for _, _, files in os.walk(root):
            for file in files:
                if file != ".DS_Store":
                    files_path.append(os.path.join(root,file))
                    
# 验证图片格式和尺寸
for path in files_path:
    assert path.split('.')[-1] == EXTENSION, f"{path} 的扩展名不是 {EXTENSION} " 
    im = Image.open(path)
    w, h = im.size
    assert w == W, f'{path} width not equal {W}'
    assert h == H, f'{path} height not equal {H}'
    
# 导出表格
attrs = [path.split('/')[2:] for path in files_path]
d = {'prop':[a[0].split('_')[1] for a in attrs], 'value': [a[1][:-4] for a in attrs],'ratio': 1}
df = pd.DataFrame(data=d)
df.to_csv('ratio.csv', index=False)

## 生成图片

In [994]:
# 修改表格后，读取比例
df_csv = pd.read_csv('./ratio.csv')
df_group = df_csv.groupby(['prop','value']).agg({'ratio': 'sum'})
df_pac = df_group.groupby(level=0).apply(lambda x: x / float(x.sum()))

# 随机选择生成属性
props = df_csv['prop'].unique()

def random_attr():
    attributes = []
    for prop in props:
        k = random.random()
        ratio_arr = df_pac.query(f"prop == '{prop}'").ratio.values
        cum_arr = np.cumsum(ratio_arr) - k
        first_index = next(x[0] for x in enumerate(cum_arr) if x[1] > 0)
        value = df_pac.loc[(prop), :].index[first_index]
        attributes.append({"value" :value, "trait_type": prop})
    return attributes

In [1010]:
def generate_images(df_csv:pd.DataFrame, amount:int, save_folder:str='./images', start_id:int=0) -> pd.DataFrame:
    used_attributes = []
    cols = ['imagehash', 'path'] + [i['trait_type'] for i in random_attr()]
    df_attr = pd.DataFrame(columns = cols)

    prop_count_df = df_csv.groupby('prop').count()
    max_count = prop_count_df['value'].values.cumprod()[-1]
    # 检验AMOUNT是否合理
    df_group = df_csv.groupby(['prop','value']).agg({'ratio': 'sum'})
    df_pac = df_group.groupby(level=0).apply(lambda x: x / float(x.sum()))
    assert np.min(df_pac['ratio']) * amount >= 1, '生成数量过少，不能反应最低概率，应该增加总量'
    assert amount <= max_count, '生成数量过多，会有重复生成，应该增加素材数或减少总量'

    ## 如果文件夹有png报错
    assert len(list(filter(lambda f:f.split('.')[1]=="png",os.listdir(save_folder)))) == 0,\
        f'{save_folder} 文件夹不为空，先备份原数据和表格'

    for i in range(amount):
        # 避免重复
        index=i+start_id
        attributes = random_attr()
        while attributes in used_attributes:
            attributes = random_attr()
        used_attributes.append(attributes)
        # 按叠加顺序获取要读取的图片
        sorted_paths = sorted([next(path for path in files_path if attr['trait_type']+'/'+attr['value'] in path) 
                                for attr in attributes])
        base_img = Image.new('RGBA', (W,H), (0, 0, 0, 0))
        for path in sorted_paths:
            img = Image.open(path, 'r')
            base_img.paste(img, (0,0), mask=img) 
        # 保存图片
        base_img.save(os.path.join(save_folder,str(index)+'.png')) 
        # 添加属性
        df_attr = df_attr.append(
            { 'imagehash' : None, 
             'path' : os.path.join(save_folder, str(index)+'.png')}
            | {i['trait_type']:i['value'] for i in attributes}, 
            ignore_index = True
        )
    df_attr[cols].to_csv(os.path.join(save_folder, 'attr.csv'), index = False)
    
generate_images(df_csv, AMOUNT, start_id=12)

In [1011]:
df_attr

Unnamed: 0,imagehash,path,Background,First Latter,Second Letter
0,,./images/0.png,purple,C,R
1,,./images/1.png,red,E,Y
2,,./images/2.png,white,J,Q
3,,./images/3.png,purple,H,Q
4,,./images/4.png,blue,K,Y
5,,./images/5.png,white,D,T
6,,./images/6.png,blue,K,Z
7,,./images/7.png,red,C,A
8,,./images/8.png,purple,F,Q
9,,./images/9.png,white,E,Q


## 重命名图片

In [1012]:
# 通过计算hash去除画面一样的图
import hashlib, os

def remove_duplicate_images(root: str):
    file_list = list(filter(lambda f:f.split('.')[1]=="png",os.listdir(root)))
    duplicates = []
    hash_keys = dict()
    for index, filename in enumerate(file_list): 
        with open(os.path.join(root,filename), 'rb') as f:
            filehash = hashlib.md5(f.read()).hexdigest()
        if filehash not in hash_keys: 
            hash_keys[filehash] = index
        else:
            duplicates.append((index,hash_keys[filehash]))

    print(f'重复的图片有{len(duplicates)}张')
    for index in duplicates:
        os.remove(os.path.join(root,file_list[index[0]]))

remove_duplicate_images('./images')

重复的图片有0张


In [1013]:
# 支持手动删除部分图片

In [1026]:
# 重命名文件和csv
def rename_files(df:pd.DataFrame, root:str, old_root:str='./images') -> pd.DataFrame:
    df = pd.read_csv(os.path.join(root,'attr.csv'))
    start_id = int(df.iloc[0,1].split('/')[-1].split('.')[0])
    exist_paths = [os.path.join(root,r) for r in list(filter(lambda f:f.split('.')[1]=="png",os.listdir(root)))] 
    new_df = df[df['path'].apply(lambda i : i.replace(old_root,root)).isin(exist_paths)]
    for index,(idx,row) in enumerate(new_df.iterrows()):
        old_name = row.values[1]
        new_name = os.path.join(root, f'{start_id+index}.png')
        new_df.loc[idx,'path'] = new_name
        os.rename(old_name, new_name)
    
    new_df.to_csv(os.path.join(root,'attr.csv'), index = False)
    return new_df
       
renamed_df = rename_files(df_attr, IMAGES)
renamed_df

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  self._setitem_single_column(loc, value, pi)


Unnamed: 0,imagehash,path,Background,First Latter,Second Letter
0,,./images/12.png,blue,I,Z
1,,./images/13.png,red,F,T
2,,./images/14.png,blue,J,Z
3,,./images/15.png,blue,D,Y
4,,./images/16.png,green,G,p
5,,./images/17.png,green,I,Z
6,,./images/18.png,white,B,R
7,,./images/19.png,purple,J,Q
8,,./images/20.png,purple,H,A
9,,./images/21.png,purple,G,R


In [1027]:
# 显示各属性占比，如果不满意，可以删除部分或者重新生成
df_pac['actual'] = 0.0

for col in [i['trait_type'] for i in random_attr()]:
    array = renamed_df[col]
    uniques, counts = np.unique(array, return_counts=True)
    percentages = dict(zip(uniques, counts/ len(array)))
    for k,v in percentages.items():
        df_pac.loc[df_pac.index.get_level_values('value') == k,'actual'] = v

df_pac

Unnamed: 0_level_0,Unnamed: 1_level_0,ratio,actual
prop,value,Unnamed: 2_level_1,Unnamed: 3_level_1
Background,blue,0.2,0.210526
Background,green,0.2,0.263158
Background,purple,0.2,0.210526
Background,red,0.2,0.157895
Background,white,0.2,0.157895
First Latter,B,0.1,0.052632
First Latter,C,0.1,0.105263
First Latter,D,0.1,0.052632
First Latter,E,0.1,0.0
First Latter,F,0.1,0.210526


## 上传

In [1034]:
def upload_folder(folder_name:str, content_type:str='image/png') -> tuple[str,list[dict]]:
    files = []
    if content_type == 'image/png':
        files = [
            (folder_name.split('/')[-1], (file, open(os.path.join(folder_name,file),'rb'), content_type)) 
            for file 
            in list(filter(lambda i:i.split('.')[-1]=='png' , os.listdir(folder_name)))
        ]
    elif content_type == 'application/json':
        files = [
            (folder_name.split('/')[-1], (file, open(os.path.join(folder_name,file),'rb'), content_type)) 
            for file 
            in list(filter(lambda i:'.' not in i , os.listdir(folder_name)))
        ]
    response = rq.post(
        f"https://ipfs.infura.io:5001/api/v0/add?pin=false&recursive=true&wrap-with-directory=true",
        files=files, 
        auth=(PROJECT_ID,PROJECT_SECRET),
        proxies=proxies
    )
    
    upload_folder_res_list = response.text.split('\n')
    assert len(files)+2 == len(upload_folder_res_list), f"上传成功的文件和文件夹中的数量不同，需要{len(files)+2}，返回{len(upload_folder_res_list)}"
    try:
        folder_hash = json.loads([i for i in upload_folder_res_list if i != '' and json.loads(i)['Name'] == ''][0])['Hash']
    except:
        folder_hash = None
    images_dict_list = [json.loads(i) for i in upload_folder_res_list if i != '' and json.loads(i)['Name'] != '']
    return (folder_hash, images_dict_list)

In [1036]:
image_ipfs_root,image_ipfs_data = upload_folder(IMAGES)

In [1037]:
def generate_metadata_and_upload(
    df: pd.DataFrame, 
    image_ipfs_root: str, 
    image_ipfs_data:dict,
    start_count:int=0,
    image_folder:str=IMAGES,
    metadata_folder:str=METADATA
) -> tuple[str,int,int]:
    # 保存为文件
    for idx, row in df.iterrows():
        path = row['path']
        imagehash = row['imagehash']
        returnHash = ''
        index = idx+start_count
        if type(imagehash) == float and np.isnan(imagehash):
            print(path)
            image_dict = next(filter(lambda i: os.path.join(image_folder ,i['Name']) == path, image_ipfs_data), None)
            df.loc[idx,'imagehash'] = image_dict['Hash']
            cols = list(df.columns)[2:]
            attributes = [{"value":col,"trait_type":row[col]} for col in cols]
            info_dict = {
                "name": f"Double Letter #{index}",
                "description": "for test",
                "image": f"ipfs://{image_dict['Hash']}",
                "attributes": attributes
            }
            info_json = json.dumps(info_dict)
            with open(os.path.join(metadata_folder, str(index)),'w') as f:
                f.write(info_json)
        else:
            print(f"row {idx} has image hash, skip")
    print(f'save metadata complete')

    meta_root, _ = upload_folder(metadata_folder, 'application/json')
    print(f'upload metadatas complete')
    return (meta_root, start_count, start_count+len(df))
    
tokenurl_hash, start, end  = generate_metadata_and_upload(renamed_df,image_ipfs_root,image_ipfs_data,start_count=12)
tokenurl_hash, start, end

./images/12.png
./images/13.png
./images/14.png
./images/15.png
./images/16.png
./images/17.png
./images/18.png
./images/19.png
./images/20.png
./images/21.png
./images/22.png
./images/23.png
./images/24.png
./images/25.png
./images/26.png
./images/27.png
./images/28.png
./images/29.png
./images/30.png
save metadata complete
upload metadatas complete


('QmTpLD4zzLFPJQGUbgTCoh3pAE7m479xgLJM93SaqHxMVm', 12, 31)