In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import os
from PIL import Image
import numpy as np
from torch.utils.data import Dataset, DataLoader
from torch.autograd import Variable
import torch.optim as optim
from torchvision import  transforms
import matplotlib.pyplot as plt
from torchvision.transforms.functional import to_pil_image

In [None]:

def weights_init(m):
    # 初始化权重函数
    classname = m.__class__.__name__
    # 获取当前层的类名
    if classname.find('Conv') != -1:
        # 如果当前层是卷积层
        m.weight.data.normal_(0.0, 0.02)
        # 将权重初始化为均值为0，标准差为0.02的正态分布
    elif classname.find('BatchNorm') != -1:
        # 如果当前层是批归一化层
        m.weight.data.normal_(1.0, 0.02)
        # 将权重初始化为均值为1，标准差为0.02的正态分布
        m.bias.data.fill_(0)

        # 将偏置初始化为0
def blockUNet(in_c, out_c, name, transposed=False, bn=True, relu=True, size=4, pad=1, dropout=0.):
    # 定义UNet块函数
    block = nn.Sequential()
    # 创建一个Sequential容器，用于存储UNet块中的各个层
    if relu:
        # 如果需要ReLU激活函数
        block.add_module('%s_relu' % name, nn.ReLU(inplace=True))
        # 添加ReLU激活函数
    else:
        # 如果需要LeakyReLU激活函数
        block.add_module('%s_leakyrelu' % name, nn.LeakyReLU(0.2, inplace=True))
        # 添加LeakyReLU激活函数
    if not transposed:
        # 如果不是转置卷积
        block.add_module('%s_conv' % name, nn.Conv2d(in_c, out_c, kernel_size=size, stride=2, padding=pad, bias=True))
        # 添加卷积层
    else:
        # 如果是转置卷积
        block.add_module('%s_upsam' % name, nn.Upsample(scale_factor=2, mode='bilinear')) # Note: old default was nearest neighbor
        # 添加上采样层
        # reduce kernel size by one for the upsampling (ie decoder part)
        block.add_module('%s_tconv' % name, nn.Conv2d(in_c, out_c, kernel_size=(size-1), stride=1, padding=pad, bias=True))
        # 添加转置卷积层
    if bn:
        # 如果需要批归一化
        block.add_module('%s_bn' % name, nn.BatchNorm2d(out_c))
        # 添加批归一化层
    if dropout>0.:
        # 如果需要dropout
        block.add_module('%s_dropout' % name, nn.Dropout2d( dropout, inplace=True))
        # 添加dropout层
    return block
    
# generator model
class TurbNetG(nn.Module):
    def __init__(self, channelExponent=6, dropout=0.):
        super(TurbNetG, self).__init__()
        # 计算通道数
        channels = int(2 ** channelExponent + 0.5)

        # 第一层
        self.layer1 = nn.Sequential()
        self.layer1.add_module('layer1_conv', nn.Conv2d(1, channels, 4, 2, 1, bias=True))

        # 第二层
        self.layer2 = blockUNet(channels  , channels*2, 'layer2', transposed=False, bn=True,  relu=False, dropout=dropout )
        # 第二层b
        self.layer2b= blockUNet(channels*2, channels*2, 'layer2b',transposed=False, bn=True,  relu=False, dropout=dropout )
        # 第三层
        self.layer3 = blockUNet(channels*2, channels*4, 'layer3', transposed=False, bn=True,  relu=False, dropout=dropout )
        # note the following layer also had a kernel size of 2 in the original version (cf https://arxiv.org/abs/1810.08217)
        # it is now changed to size 4 for encoder/decoder symmetry; to reproduce the old/original results, please change it to 2
        self.layer4 = blockUNet(channels*4, channels*8, 'layer4', transposed=False, bn=True,  relu=False, dropout=dropout ,  size=4 ) # note, size 4!
        self.layer5 = blockUNet(channels*8, channels*8, 'layer5', transposed=False, bn=True,  relu=False, dropout=dropout , size=2,pad=0)
        self.layer6 = blockUNet(channels*8, channels*8, 'layer6', transposed=False, bn=False, relu=False, dropout=dropout , size=2,pad=0)
     
        # note, kernel size is internally reduced by one now
        self.dlayer6 = blockUNet(channels*8, channels*8, 'dlayer6', transposed=True, bn=True, relu=True, dropout=dropout , size=2,pad=0)
        self.dlayer5 = blockUNet(channels*16,channels*8, 'dlayer5', transposed=True, bn=True, relu=True, dropout=dropout , size=2,pad=0)
        self.dlayer4 = blockUNet(channels*16,channels*4, 'dlayer4', transposed=True, bn=True, relu=True, dropout=dropout ) 
        self.dlayer3 = blockUNet(channels*8, channels*2, 'dlayer3', transposed=True, bn=True, relu=True, dropout=dropout )
        self.dlayer2b= blockUNet(channels*4, channels*2, 'dlayer2b',transposed=True, bn=True, relu=True, dropout=dropout )
        self.dlayer2 = blockUNet(channels*4, channels  , 'dlayer2', transposed=True, bn=True, relu=True, dropout=dropout )

        self.dlayer1 = nn.Sequential()
        self.dlayer1.add_module('dlayer1_relu', nn.ReLU(inplace=True))
        self.dlayer1.add_module('dlayer1_tconv', nn.ConvTranspose2d(channels*2, 1, 4, 2, 1, bias=True))
        #self.dlayer1.add_module('dlayer1_tconv', nn.ConvTranspose2d(1, 3, 4, 2, 1, bias=True))

    def forward(self, x):
        # 将输入x经过layer1得到out1
        out1 = self.layer1(x)
        # 将out1经过layer2得到out2
        out2 = self.layer2(out1)
        # 将out2经过layer2b得到out2b
        out2b= self.layer2b(out2)
        # 将out2b经过layer3得到out3
        out3 = self.layer3(out2b)
        # 将out3经过layer4得到out4
        out4 = self.layer4(out3)
        # 将out4经过layer5得到out5
        out5 = self.layer5(out4)
        # 将out5经过layer6得到out6
        out6 = self.layer6(out5)
        # 将out6经过dlayer6得到dout6
        dout6 = self.dlayer6(out6)
        # 将dout6和out5在通道维度上拼接得到dout6_out5
        dout6_out5 = torch.cat([dout6, out5], 1)
        # 将dout6_out5经过dlayer5得到dout5
        dout5 = self.dlayer5(dout6_out5)
        # 将dout5和out4在通道维度上拼接得到dout5_out4
        dout5_out4 = torch.cat([dout5, out4], 1)
        # 将dout5_out4经过dlayer4得到dout4
        dout4 = self.dlayer4(dout5_out4)
        # 将dout4和out3在通道维度上拼接得到dout4_out3
        dout4_out3 = torch.cat([dout4, out3], 1)
        # 将dout4_out3经过dlayer3得到dout3
        dout3 = self.dlayer3(dout4_out3)
        # 将dout3和out2b在通道维度上拼接得到dout3_out2b
        dout3_out2b = torch.cat([dout3, out2b], 1)
        # 将dout3_out2b经过dlayer2b得到dout2b
        dout2b = self.dlayer2b(dout3_out2b)
        # 将dout2b和out2在通道维度上拼接得到dout2b_out2
        dout2b_out2 = torch.cat([dout2b, out2], 1)
        # 将dout2b_out2经过dlayer2得到dout2
        dout2 = self.dlayer2(dout2b_out2)
        # 将dout2和out1在通道维度上拼接得到dout2_out1
        dout2_out1 = torch.cat([dout2, out1], 1)
        # 将dout2_out1经过dlayer1得到dout1
        dout1 = self.dlayer1(dout2_out1)
        # 返回dout1
        return dout1,out6

In [None]:
def load_and_preprocess_image(image_path):
    image = Image.open(image_path).convert('L')  # 转换为灰度图
    airfoils_array = np.array(image)
    airfoils_array = airfoils_array / 255.0  # 归一化
    return airfoils_array

# 加载并预处理图片
image_folder_path = 'airfoils_img'  # 替换为你的图片文件夹路径
# 获取image_folder_path路径下的所有文件，并存储在image_files列表中
image_files = [f for f in os.listdir(image_folder_path) if os.path.isfile(os.path.join(image_folder_path, f))]
input_images = [load_and_preprocess_image(os.path.join(image_folder_path, f)) for f in image_files]


In [None]:
# 自定义数据集类
class UnlabeledImageDataset(Dataset):
    def __init__(self, image_dir, transform=None):
        self.image_dir = image_dir
        self.transform = transform
        self.image_files = [f for f in os.listdir(image_dir) 
                            if os.path.isfile(os.path.join(image_dir, f))]

    def __len__(self):
        return len(self.image_files)

    def __getitem__(self, idx):
        img_name = os.path.join(self.image_dir, self.image_files[idx])
        image = Image.open(img_name).convert('L')  # 转换为灰度图
        if self.transform:
            image = self.transform(image)
        return image

In [None]:
transform = transforms.Compose([
    transforms.Grayscale(num_output_channels=1),  # 确保图像是单通道灰度图
    transforms.Resize((256, 256)),  # 调整图像大小到256*256
    transforms.ToTensor(),  # 将图片转换为张量，并且将像素值归一化到 [0, 1] 范围
])

# 创建数据集
dataset = UnlabeledImageDataset(image_dir='airfoils_img', transform=transform)

# 创建数据加载器
dataloader = DataLoader(dataset, batch_size=64, shuffle=True)

# 3. 定义损失函数
criterion = nn.CrossEntropyLoss()

# 4. 选择优化器
model = TurbNetG()
optimizer = optim.SGD(model.parameters(), lr=0.01, momentum=0.9)

In [None]:
# 5. 训练循环
num_epochs = 1
for epoch in range(num_epochs):
    for images in dataloader:
        # 前向传播
        outputs = model(images)
        # 计算无监督损失
        
        # 反向传播和优化
        optimizer.zero_grad()  # 清空梯度
        optimizer.step()       # 更新参数
    print(f'Epoch [{epoch+1}/{num_epochs}]')

In [None]:
image_path = 'airfoils_img\waspsm.png'  # 图像的路径
image = Image.open(image_path).convert('L')
input_tensor = transform(image).unsqueeze(0)  # 添加批次维度
with torch.no_grad():  # 不计算梯度
    output,para = model(input_tensor)
print(output[0].shape)
print(type(output[0]))
print(para[0].shape)
# 将张量转换为 PIL.Image
image = to_pil_image(output[0])

# 显示图片
image.show()