In [45]:
import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler

# 假设有一个 CSV 文件，包含配方参数和对应的发酵产量
data = pd.read_table('fermentation_data.csv')


# 将数据分为输入 (X) 和输出 (y)
X = data[['carbon_source', 'nitrogen_source', 'pH', 'temperature', 'stirring_speed', 'aeration_rate']].values
y = data['yield'].values  # 发酵产量

# 标准化输入数据
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X)

# 拆分训练集和测试集
X_train, X_test, y_train, y_test = train_test_split(X_scaled, y, test_size=0.3, random_state=42)


In [46]:
import torch
import torch.nn as nn
import torch.optim as optim

# 定义神经网络模型
class FermentationModel(nn.Module):
    def __init__(self):
        super(FermentationModel, self).__init__()
        self.fc1 = nn.Linear(6, 64)  # 输入层有 6 个变量
        self.fc2 = nn.Linear(64, 32)
        self.fc3 = nn.Linear(32, 1)  # 输出层，预测发酵产量

    def forward(self, x):
        x = torch.relu(self.fc1(x))
        x = torch.relu(self.fc2(x))
        x = self.fc3(x)
        return x

# 实例化模型
model = FermentationModel()
criterion = nn.MSELoss()  # 使用均方误差作为损失函数
optimizer = optim.Adam(model.parameters(), lr=0.001)

# 转换数据为 PyTorch 张量
X_train_tensor = torch.tensor(X_train, dtype=torch.float32)
y_train_tensor = torch.tensor(y_train, dtype=torch.float32).view(-1, 1)
X_test_tensor = torch.tensor(X_test, dtype=torch.float32)
y_test_tensor = torch.tensor(y_test, dtype=torch.float32).view(-1, 1)

# 训练模型
epochs = 10000
for epoch in range(epochs):
    model.train()
    optimizer.zero_grad()
    outputs = model(X_train_tensor)
    loss = criterion(outputs, y_train_tensor)
    loss.backward()
    optimizer.step()

    if (epoch+1) % 10 == 0:
        print(f'Epoch [{epoch+1}/{epochs}], Loss: {loss.item():.4f}')

Epoch [10/10000], Loss: 55901.7930
Epoch [20/10000], Loss: 55783.1445
Epoch [30/10000], Loss: 55606.1094
Epoch [40/10000], Loss: 55316.9102
Epoch [50/10000], Loss: 54889.9531
Epoch [60/10000], Loss: 54283.6211
Epoch [70/10000], Loss: 53450.5039
Epoch [80/10000], Loss: 52340.4336
Epoch [90/10000], Loss: 50905.5391
Epoch [100/10000], Loss: 49107.5273
Epoch [110/10000], Loss: 46920.3125
Epoch [120/10000], Loss: 44336.3477
Epoch [130/10000], Loss: 41370.9102
Epoch [140/10000], Loss: 38065.9766
Epoch [150/10000], Loss: 34492.8477
Epoch [160/10000], Loss: 30752.5762
Epoch [170/10000], Loss: 26972.6426
Epoch [180/10000], Loss: 23299.1309
Epoch [190/10000], Loss: 19884.6738
Epoch [200/10000], Loss: 16869.2461
Epoch [210/10000], Loss: 14358.5576
Epoch [220/10000], Loss: 12403.9814
Epoch [230/10000], Loss: 10991.4492
Epoch [240/10000], Loss: 10046.3311
Epoch [250/10000], Loss: 9456.2266
Epoch [260/10000], Loss: 9102.1826
Epoch [270/10000], Loss: 8884.4414
Epoch [280/10000], Loss: 8734.4707
Epoch

# 贝叶斯优化算法

In [51]:
from skopt import gp_minimize
from skopt.space import Real

# 定义目标函数，用深度学习模型预测产量
def objective_function(params):
    carbon_source, nitrogen_source, pH, temperature, stirring_speed, aeration_rate = params
    
    # 将参数标准化
    input_data = scaler.transform([[carbon_source, nitrogen_source, pH, temperature, stirring_speed, aeration_rate]])
    input_tensor = torch.tensor(input_data, dtype=torch.float32)

    # 使用模型预测产量
    model.eval()
    with torch.no_grad():
        predicted_yield = model(input_tensor).item()

    # 我们希望最大化产量，但优化工具通常是最小化问题，所以返回负值
    return -predicted_yield

# 定义搜索空间
search_space = [
    Real(0, 200, name='carbon_source'),       # 碳源浓度范围
    Real(0, 500, name='nitrogen_source'),      # 氮源浓度范围
    Real(0, 7.5, name='pH'),                 # pH 范围
    Real(0, 40, name='temperature'),          # 温度范围
    Real(0, 500, name='stirring_speed'),     # 搅拌速度范围
    Real(0, 2.0, name='aeration_rate')       # 通气量范围
]

# 使用贝叶斯优化进行最优配方搜索
result = gp_minimize(objective_function, search_space, n_calls=50, random_state=42)

# 输出最优配方
best_params = result.x
best_yield = -result.fun
print(f"最优配方: {best_params}")
print(f"预测的最优产量: {best_yield:.4f}")

最优配方: [200.0, 500.0, 7.5, 40.0, 500.0, 2.0]
预测的最优产量: 591.3811


In [55]:
# 打印最优配方
print(f"碳源浓度: {best_params[0]:.2f} g/L")
print(f"氮源浓度: {best_params[1]:.2f} g/L")
print(f"pH: {best_params[2]:.2f}")
print(f"温度: {best_params[3]:.2f} °C")
print(f"搅拌速度: {best_params[4]:.2f} rpm")
print(f"通气量: {best_params[5]:.2f} vvm")

# 预测的最优发酵产量
print(f"最优配方下的预测产量: {best_yield:.4f}")

碳源浓度: 200.00 g/L
氮源浓度: 500.00 g/L
pH: 7.50
温度: 40.00 °C
搅拌速度: 500.00 rpm
通气量: 2.00 vvm
最优配方下的预测产量: 591.3811


# 遗传优化算法

In [56]:
import random
import numpy as np
import torch
from deap import base, creator, tools

# 假设深度学习模型和数据已经准备好（与之前相同）

# 1. 定义适应度函数，使用深度学习模型预测配方的发酵产量
def evaluate(individual):
    # individual 是遗传算法中的一个个体（即一个配方），包含6个参数
    carbon_source, nitrogen_source, pH, temperature, stirring_speed, aeration_rate = individual
    
    # 将配方参数进行标准化
    input_data = scaler.transform([[carbon_source, nitrogen_source, pH, temperature, stirring_speed, aeration_rate]])
    input_tensor = torch.tensor(input_data, dtype=torch.float32)

    # 使用训练好的深度学习模型预测产量
    model.eval()
    with torch.no_grad():
        predicted_yield = model(input_tensor).item()

    return predicted_yield,  # DEAP 要求返回一个元组

# 2. 初始化遗传算法的配置
creator.create("FitnessMax", base.Fitness, weights=(1.0,))  # 我们要最大化产量
creator.create("Individual", list, fitness=creator.FitnessMax)

# 定义个体和种群的生成规则
toolbox = base.Toolbox()
toolbox.register("attr_float", random.uniform, 0, 1)  # 随机生成 [0, 1] 范围内的浮点数
toolbox.register("individual", tools.initCycle, creator.Individual,
                 (lambda: random.uniform(0, 200),   # 碳源浓度
                  lambda: random.uniform(0, 500),    # 氮源浓度
                  lambda: random.uniform(0, 7.5),  # pH
                  lambda: random.uniform(0, 40),    # 温度
                  lambda: random.uniform(0, 500),  # 搅拌速度
                  lambda: random.uniform(0, 2.0)), # 通气量
                 n=1)
toolbox.register("population", tools.initRepeat, list, toolbox.individual)

# 注册遗传算法的操作
toolbox.register("mate", tools.cxBlend, alpha=0.5)  # 使用均匀交叉（blend crossover）
toolbox.register("mutate", tools.mutGaussian, mu=0, sigma=0.1, indpb=0.2)  # 使用高斯变异
toolbox.register("select", tools.selTournament, tournsize=3)  # 使用锦标赛选择
toolbox.register("evaluate", evaluate)

# 3. 设置遗传算法的参数和运行流程
def main():
    random.seed(42)
    
    # 生成初始种群
    population = toolbox.population(n=50)  # 50 个个体的种群
    ngen = 40  # 进化的代数
    cxpb = 0.5  # 交叉概率
    mutpb = 0.2  # 变异概率
    
    # 评估初始种群
    fitnesses = list(map(toolbox.evaluate, population))
    for ind, fit in zip(population, fitnesses):
        ind.fitness.values = fit
    
    # 进化过程
    for gen in range(ngen):
        print(f"-- Generation {gen+1} --")
        
        # 选择下一代个体
        offspring = toolbox.select(population, len(population))
        offspring = list(map(toolbox.clone, offspring))
        
        # 交叉操作
        for child1, child2 in zip(offspring[::2], offspring[1::2]):
            if random.random() < cxpb:
                toolbox.mate(child1, child2)
                del child1.fitness.values
                del child2.fitness.values
        
        # 变异操作
        for mutant in offspring:
            if random.random() < mutpb:
                toolbox.mutate(mutant)
                del mutant.fitness.values
        
        # 重新评估变异和交叉后的个体
        invalid_ind = [ind for ind in offspring if not ind.fitness.valid]
        fitnesses = map(toolbox.evaluate, invalid_ind)
        for ind, fit in zip(invalid_ind, fitnesses):
            ind.fitness.values = fit
        
        # 替换种群
        population[:] = offspring
        
        # 输出当前种群中最好的个体
        top_ind = tools.selBest(population, 1)[0]
        print(f"Best individual: {top_ind}, Yield: {top_ind.fitness.values[0]:.4f}")
    
    # 最终最优配方
    best_individual = tools.selBest(population, 1)[0]
    print(f"\n最优配方: {best_individual}")
    print(f"最优配方的预测产量: {best_individual.fitness.values[0]:.4f}")

if __name__ == "__main__":
    main()




-- Generation 1 --
Best individual: [228.6140688655108, 222.8549703660248, 2.003791883648972, 24.55632268026442, 369.36552143615324, 0.4145133504225806], Yield: 571.1956
-- Generation 2 --
Best individual: [228.6140688655108, 222.8549703660248, 2.003791883648972, 24.55632268026442, 369.36552143615324, 0.4145133504225806], Yield: 571.1956
-- Generation 3 --
Best individual: [231.47490133650786, 158.01896035378402, 2.378600581475164, 25.646261728615766, 493.37452148614767, 0.20457744726756344], Yield: 598.4065
-- Generation 4 --
Best individual: [215.4929749261005, 213.4158363913748, 0.6261149181232968, 44.104828231267376, 631.0479668480585, 0.426092916472306], Yield: 616.9981
-- Generation 5 --
Best individual: [237.92726725380345, 203.82888795263477, 3.6390260947069906, 43.113641336755585, 556.4597657047425, 0.3753766172639556], Yield: 637.9508
-- Generation 6 --
Best individual: [228.3150506099568, 227.9446993871583, 4.4054741172901535, 44.743656013159296, 612.2660454895697, 0.3584363

# 遗传规划（Genetic Programming, GP）

In [57]:
import random
import operator
import numpy as np
import torch
from deap import base, creator, tools, gp

# 假设深度学习模型和数据已经准备好
# model: 经过训练的发酵模型
# scaler: 数据标准化的工具

# 1. 定义适应度函数，使用深度学习模型来评估每个表达式的产量
def evaluate(individual):
    # 将遗传规划生成的表达式转化为可执行函数
    func = toolbox.compile(expr=individual)
    
    # 假设 func 返回一个发酵配方的参数
    # 将生成的参数应用到发酵模型进行评估
    carbon_source, nitrogen_source, pH, temperature, stirring_speed, aeration_rate = func()
    
    # 将配方参数进行标准化
    input_data = scaler.transform([[carbon_source, nitrogen_source, pH, temperature, stirring_speed, aeration_rate]])
    input_tensor = torch.tensor(input_data, dtype=torch.float32)

    # 使用深度学习模型预测产量
    model.eval()
    with torch.no_grad():
        predicted_yield = model(input_tensor).item()

    return predicted_yield,  # DEAP 需要返回元组

# 2. 初始化遗传规划
pset = gp.PrimitiveSetTyped("MAIN", [], tuple, "ARG")

# 添加运算符（加减乘除）
pset.addPrimitive(operator.add, [float, float], float)
pset.addPrimitive(operator.sub, [float, float], float)
pset.addPrimitive(operator.mul, [float, float], float)
pset.addPrimitive(operator.truediv, [float, float], float, name="div")

# 添加终端（常量），可以表示发酵配方的具体参数范围
pset.addTerminal(50.0, float)   # 碳源浓度的最小值
pset.addTerminal(200.0, float)  # 碳源浓度的最大值
pset.addTerminal(10.0, float)   # 氮源浓度的最小值
pset.addTerminal(50.0, float)   # 氮源浓度的最大值
pset.addTerminal(4.5, float)    # pH 值的最小值
pset.addTerminal(7.5, float)    # pH 值的最大值
pset.addTerminal(20.0, float)   # 温度的最小值
pset.addTerminal(40.0, float)   # 温度的最大值
pset.addTerminal(100.0, float)  # 搅拌速度的最小值
pset.addTerminal(500.0, float)  # 搅拌速度的最大值
pset.addTerminal(0.5, float)    # 通气量的最小值
pset.addTerminal(2.0, float)    # 通气量的最大值

# 3. 设置适应度和个体
creator.create("FitnessMax", base.Fitness, weights=(1.0,))
creator.create("Individual", gp.PrimitiveTree, fitness=creator.FitnessMax)

# 注册遗传规划的操作
toolbox = base.Toolbox()
toolbox.register("expr", gp.genFull, pset=pset, min_=1, max_=3)  # 生成深度为 1 到 3 的表达式树
toolbox.register("individual", tools.initIterate, creator.Individual, toolbox.expr)
toolbox.register("population", tools.initRepeat, list, toolbox.individual)

# 4. 注册评估函数和遗传操作
toolbox.register("evaluate", evaluate)
toolbox.register("select", tools.selTournament, tournsize=3)
toolbox.register("mate", gp.cxOnePoint)  # 单点交叉
toolbox.register("mutate", gp.mutUniform, expr=toolbox.expr, pset=pset)  # 变异操作
toolbox.register("expr_mut", gp.genFull, min_=0, max_=2)

# 5. 编译表达式
toolbox.register("compile", gp.compile, pset=pset)

# 6. 遗传规划的主循环
def main():
    random.seed(42)
    
    # 生成初始种群
    population = toolbox.population(n=100)
    ngen = 50  # 进化的代数
    cxpb = 0.5  # 交叉概率
    mutpb = 0.2  # 变异概率
    
    # 评估初始种群
    fitnesses = list(map(toolbox.evaluate, population))
    for ind, fit in zip(population, fitnesses):
        ind.fitness.values = fit
    
    # 进化过程
    for gen in range(ngen):
        print(f"-- Generation {gen+1} --")
        
        # 选择下一代个体
        offspring = toolbox.select(population, len(population))
        offspring = list(map(toolbox.clone, offspring))
        
        # 交叉操作
        for child1, child2 in zip(offspring[::2], offspring[1::2]):
            if random.random() < cxpb:
                toolbox.mate(child1, child2)
                del child1.fitness.values
                del child2.fitness.values
        
        # 变异操作
        for mutant in offspring:
            if random.random() < mutpb:
                toolbox.mutate(mutant)
                del mutant.fitness.values
        
        # 重新评估变异和交叉后的个体
        invalid_ind = [ind for ind in offspring if not ind.fitness.valid]
        fitnesses = map(toolbox.evaluate, invalid_ind)
        for ind, fit in zip(invalid_ind, fitnesses):
            ind.fitness.values = fit
        
        # 替换种群
        population[:] = offspring
        
        # 输出当前种群中最好的个体
        top_ind = tools.selBest(population, 1)[0]
        print(f"Best individual: {top_ind}, Yield: {top_ind.fitness.values[0]:.4f}")
    
    # 最终最优配方
    best_individual = tools.selBest(population, 1)[0]
    print(f"\n最优配方表达式: {best_individual}")
    print(f"最优配方的预测产量: {best_individual.fitness.values[0]:.4f}")

if __name__ == "__main__":
    main()




IndexError: The gp.generate function tried to add a primitive of type '<class 'tuple'>', but there is none available.

# 强化学习策略

In [58]:
import gym
import torch
import torch.nn as nn
import torch.optim as optim
import random
import numpy as np
from collections import deque

# 1. 创建一个发酵模拟环境 (这里使用 Gym 库来构建)
class FermentationEnv(gym.Env):
    def __init__(self):
        super(FermentationEnv, self).__init__()
        
        # 状态空间：假设有6个状态变量 (温度, pH, 搅拌速度等)
        self.observation_space = gym.spaces.Box(low=np.array([20, 4.5, 100, 0.5]), 
                                                high=np.array([40, 7.5, 500, 2.0]),
                                                dtype=np.float32)
        
        # 动作空间：假设每次可以调整4个参数（温度、pH、搅拌速度、通气量）
        self.action_space = gym.spaces.Discrete(8)
        
        # 初始状态 (温度, pH, 搅拌速度, 通气量)
        self.state = np.array([30.0, 6.0, 300.0, 1.0], dtype=np.float32)
        
        # 目标：最大化产量，初始化产量为0
        self.yield_production = 0
    
    def reset(self):
        # 重置环境状态
        self.state = np.array([30.0, 6.0, 300.0, 1.0], dtype=np.float32)
        self.yield_production = 0
        return self.state
    
    def step(self, action):
        # 根据动作调整状态，动作对应于不同的调整策略
        if action == 0:
            self.state[0] -= 1.0  # 降低温度
        elif action == 1:
            self.state[0] += 1.0  # 增加温度
        elif action == 2:
            self.state[1] -= 0.1  # 降低 pH
        elif action == 3:
            self.state[1] += 0.1  # 增加 pH
        elif action == 4:
            self.state[2] -= 10.0  # 降低搅拌速度
        elif action == 5:
            self.state[2] += 10.0  # 增加搅拌速度
        elif action == 6:
            self.state[3] -= 0.1  # 降低通气量
        elif action == 7:
            self.state[3] += 0.1  # 增加通气量
        
        # 限制状态变量在合理范围内
        self.state = np.clip(self.state, self.observation_space.low, self.observation_space.high)
        
        # 产量计算：这里用简单函数模拟产量与状态的关系
        self.yield_production = - (self.state[0] - 35) ** 2 - (self.state[1] - 5.5) ** 2 \
                                - (self.state[2] - 400) ** 2 - (self.state[3] - 1.5) ** 2
        reward = self.yield_production
        
        # 判断是否完成
        done = False
        
        return self.state, reward, done, {}
    
# 2. 构建 DQN 的神经网络模型
class DQN(nn.Module):
    def __init__(self, input_dim, output_dim):
        super(DQN, self).__init__()
        self.fc1 = nn.Linear(input_dim, 128)
        self.fc2 = nn.Linear(128, 128)
        self.fc3 = nn.Linear(128, output_dim)
    
    def forward(self, x):
        x = torch.relu(self.fc1(x))
        x = torch.relu(self.fc2(x))
        return self.fc3(x)

# 3. 训练 DQN 模型
def train_dqn(env):
    # 定义超参数
    learning_rate = 0.001
    gamma = 0.99
    epsilon = 1.0
    epsilon_decay = 0.995
    epsilon_min = 0.01
    memory_size = 2000
    batch_size = 64
    target_update = 10  # 目标网络更新频率
    
    input_dim = env.observation_space.shape[0]
    output_dim = env.action_space.n
    
    # 创建 DQN 模型和目标网络
    policy_net = DQN(input_dim, output_dim)
    target_net = DQN(input_dim, output_dim)
    target_net.load_state_dict(policy_net.state_dict())
    
    optimizer = optim.Adam(policy_net.parameters(), lr=learning_rate)
    memory = deque(maxlen=memory_size)
    
    def choose_action(state, epsilon):
        if random.random() < epsilon:
            return env.action_space.sample()  # 随机动作
        else:
            state_tensor = torch.FloatTensor(state).unsqueeze(0)
            q_values = policy_net(state_tensor)
            return torch.argmax(q_values).item()  # 选择最优动作
    
    def replay():
        if len(memory) < batch_size:
            return
        
        batch = random.sample(memory, batch_size)
        states, actions, rewards, next_states = zip(*batch)
        
        states = torch.FloatTensor(states)
        actions = torch.LongTensor(actions).unsqueeze(1)
        rewards = torch.FloatTensor(rewards).unsqueeze(1)
        next_states = torch.FloatTensor(next_states)
        
        q_values = policy_net(states).gather(1, actions)
        next_q_values = target_net(next_states).max(1)[0].unsqueeze(1)
        target_q_values = rewards + gamma * next_q_values
        
        loss = nn.MSELoss()(q_values, target_q_values)
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
    
    # 训练过程
    episodes = 500
    for episode in range(episodes):
        state = env.reset()
        total_reward = 0
        
        for t in range(200):  # 限定每个 episode 的最大步数
            action = choose_action(state, epsilon)
            next_state, reward, done, _ = env.step(action)
            memory.append((state, action, reward, next_state))
            state = next_state
            total_reward += reward
            
            replay()  # 经验回放
            
            if done:
                break
        
        # 衰减探索率
        epsilon = max(epsilon * epsilon_decay, epsilon_min)
        
        # 更新目标网络
        if episode % target_update == 0:
            target_net.load_state_dict(policy_net.state_dict())
        
        print(f"Episode {episode+1}, Total Reward: {total_reward:.2f}")
    
    print("训练完成！")

# 4. 运行强化学习
if __name__ == "__main__":
    env = FermentationEnv()
    train_dqn(env)


  logger.warn(f"Box bound precision lowered by casting to {self.dtype}")
  states = torch.FloatTensor(states)


Episode 1, Total Reward: -5805512.91
Episode 2, Total Reward: -4589139.64
Episode 3, Total Reward: -4232036.81
Episode 4, Total Reward: -2804409.10
Episode 5, Total Reward: -4718730.90
Episode 6, Total Reward: -3345145.17
Episode 7, Total Reward: -3185697.91
Episode 8, Total Reward: -4278255.15
Episode 9, Total Reward: -5869251.99
Episode 10, Total Reward: -934338.71
Episode 11, Total Reward: -567004.81
Episode 12, Total Reward: -1364695.51
Episode 13, Total Reward: -735861.78
Episode 14, Total Reward: -743409.04
Episode 15, Total Reward: -1019669.32
Episode 16, Total Reward: -2695855.09
Episode 17, Total Reward: -671659.38
Episode 18, Total Reward: -588143.35
Episode 19, Total Reward: -1745808.15
Episode 20, Total Reward: -1522140.50
Episode 21, Total Reward: -318957.84
Episode 22, Total Reward: -268103.03
Episode 23, Total Reward: -688899.27
Episode 24, Total Reward: -938979.51
Episode 25, Total Reward: -1415318.73
Episode 26, Total Reward: -421929.40
Episode 27, Total Reward: -77906

In [59]:
def evaluate_agent(env, policy_net, episodes=5):
    """
    评估智能体在强化学习完成后的表现，返回最优的参数和优化结果。
    
    :param env: 发酵模拟环境
    :param policy_net: 训练完成的 DQN 模型
    :param episodes: 评估的实验次数
    """
    total_rewards = []
    optimal_params = []
    
    for episode in range(episodes):
        state = env.reset()
        episode_reward = 0
        params_per_episode = []
        
        for t in range(200):  # 限定每个 episode 的最大步数
            # 选择动作 (不再使用随机探索，直接使用训练好的策略)
            state_tensor = torch.FloatTensor(state).unsqueeze(0)
            q_values = policy_net(state_tensor)
            action = torch.argmax(q_values).item()
            
            # 执行动作
            next_state, reward, done, _ = env.step(action)
            episode_reward += reward
            params_per_episode.append(state)  # 保存每一步的状态（参数）
            
            state = next_state
            
            if done:
                break
        
        total_rewards.append(episode_reward)
        optimal_params.append(params_per_episode[-1])  # 保存每个 episode 的最终参数
    
    # 输出结果
    best_episode_index = np.argmax(total_rewards)  # 找到最优的 episode
    best_params = optimal_params[best_episode_index]
    best_reward = total_rewards[best_episode_index]
    
    print(f"最优参数: {best_params}")
    print(f"最优产量: {best_reward:.2f}")
    
    return best_params, best_reward

# 评估强化学习智能体
best_params, best_reward = evaluate_agent(env, policy_net)


NameError: name 'policy_net' is not defined

In [60]:
import gym
import torch
import torch.nn as nn
import torch.optim as optim
import random
import numpy as np
from collections import deque

# 1. 创建一个发酵模拟环境
class FermentationEnv(gym.Env):
    def __init__(self):
        super(FermentationEnv, self).__init__()
        
        # 状态空间
        self.observation_space = gym.spaces.Box(low=np.array([20, 4.5, 100, 0.5]), 
                                                high=np.array([40, 7.5, 500, 2.0]),
                                                dtype=np.float32)
        
        # 动作空间
        self.action_space = gym.spaces.Discrete(8)
        
        # 初始状态
        self.state = np.array([30.0, 6.0, 300.0, 1.0], dtype=np.float32)
        
        # 目标：最大化产量，初始化产量为0
        self.yield_production = 0
    
    def reset(self):
        self.state = np.array([30.0, 6.0, 300.0, 1.0], dtype=np.float32)
        self.yield_production = 0
        return self.state
    
    def step(self, action):
        if action == 0:
            self.state[0] -= 1.0  # 降低温度
        elif action == 1:
            self.state[0] += 1.0  # 增加温度
        elif action == 2:
            self.state[1] -= 0.1  # 降低 pH
        elif action == 3:
            self.state[1] += 0.1  # 增加 pH
        elif action == 4:
            self.state[2] -= 10.0  # 降低搅拌速度
        elif action == 5:
            self.state[2] += 10.0  # 增加搅拌速度
        elif action == 6:
            self.state[3] -= 0.1  # 降低通气量
        elif action == 7:
            self.state[3] += 0.1  # 增加通气量
        
        self.state = np.clip(self.state, self.observation_space.low, self.observation_space.high)
        
        # 产量计算
        self.yield_production = - (self.state[0] - 35) ** 2 - (self.state[1] - 5.5) ** 2 \
                                - (self.state[2] - 400) ** 2 - (self.state[3] - 1.5) ** 2
        reward = self.yield_production
        
        done = False
        
        return self.state, reward, done, {}

# 2. 构建 DQN 模型
class DQN(nn.Module):
    def __init__(self, input_dim, output_dim):
        super(DQN, self).__init__()
        self.fc1 = nn.Linear(input_dim, 128)
        self.fc2 = nn.Linear(128, 128)
        self.fc3 = nn.Linear(128, output_dim)
    
    def forward(self, x):
        x = torch.relu(self.fc1(x))
        x = torch.relu(self.fc2(x))
        return self.fc3(x)

# 3. 训练 DQN 模型
def train_dqn(env):
    learning_rate = 0.001
    gamma = 0.99
    epsilon = 1.0
    epsilon_decay = 0.995
    epsilon_min = 0.01
    memory_size = 2000
    batch_size = 64
    target_update = 10
    
    input_dim = env.observation_space.shape[0]
    output_dim = env.action_space.n
    
    policy_net = DQN(input_dim, output_dim)
    target_net = DQN(input_dim, output_dim)
    target_net.load_state_dict(policy_net.state_dict())
    
    optimizer = optim.Adam(policy_net.parameters(), lr=learning_rate)
    memory = deque(maxlen=memory_size)
    
    def choose_action(state, epsilon):
        if random.random() < epsilon:
            return env.action_space.sample()  # 随机动作
        else:
            state_tensor = torch.FloatTensor(state).unsqueeze(0)
            q_values = policy_net(state_tensor)
            return torch.argmax(q_values).item()  # 选择最优动作
    
    def replay():
        if len(memory) < batch_size:
            return
        
        batch = random.sample(memory, batch_size)
        states, actions, rewards, next_states = zip(*batch)
        
        states = torch.FloatTensor(states)
        actions = torch.LongTensor(actions).unsqueeze(1)
        rewards = torch.FloatTensor(rewards).unsqueeze(1)
        next_states = torch.FloatTensor(next_states)
        
        q_values = policy_net(states).gather(1, actions)
        next_q_values = target_net(next_states).max(1)[0].unsqueeze(1)
        target_q_values = rewards + gamma * next_q_values
        
        loss = nn.MSELoss()(q_values, target_q_values)
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
    
    episodes = 500
    for episode in range(episodes):
        state = env.reset()
        total_reward = 0
        
        for t in range(200):  # 限定每个 episode 的最大步数
            action = choose_action(state, epsilon)
            next_state, reward, done, _ = env.step(action)
            memory.append((state, action, reward, next_state))
            state = next_state
            total_reward += reward
            
            replay()
            
            if done:
                break
        
        epsilon = max(epsilon * epsilon_decay, epsilon_min)
        
        if episode % target_update == 0:
            target_net.load_state_dict(policy_net.state_dict())
        
        print(f"Episode {episode+1}, Total Reward: {total_reward:.2f}")
    
    print("训练完成！")
    return policy_net  # 返回训练后的模型

# 4. 评估智能体
def evaluate_agent(env, policy_net, episodes=5):
    total_rewards = []
    optimal_params = []
    
    for episode in range(episodes):
        state = env.reset()
        episode_reward = 0
        
        for t in range(200):
            state_tensor = torch.FloatTensor(state).unsqueeze(0)
            q_values = policy_net(state_tensor)
            action = torch.argmax(q_values).item()
            
            next_state, reward, done, _ = env.step(action)
            episode_reward += reward
            
            state = next_state
            
            if done:
                break
        
        total_rewards.append(episode_reward)
        optimal_params.append(state)  # 记录每个 episode 的最终参数
    
    best_episode_index = np.argmax(total_rewards)
    best_params = optimal_params[best_episode_index]
    best_reward = total_rewards[best_episode_index]
    
    print(f"最优参数: {best_params}")
    print(f"最优产量: {best_reward:.2f}")
    
    return best_params, best_reward

# 5. 运行强化学习
if __name__ == "__main__":
    env = FermentationEnv()
    policy_net = train_dqn(env)  # 训练并返回训练后的模型
    best_params, best_reward = evaluate_agent(env, policy_net)  # 评估智能体


Episode 1, Total Reward: -2499471.93
Episode 2, Total Reward: -2457329.07
Episode 3, Total Reward: -1795462.24
Episode 4, Total Reward: -4159304.46
Episode 5, Total Reward: -3635325.10
Episode 6, Total Reward: -5920159.36
Episode 7, Total Reward: -661494.13
Episode 8, Total Reward: -657810.72
Episode 9, Total Reward: -605850.16
Episode 10, Total Reward: -916545.68
Episode 11, Total Reward: -887898.37
Episode 12, Total Reward: -665502.09
Episode 13, Total Reward: -883909.48
Episode 14, Total Reward: -1225507.66
Episode 15, Total Reward: -469315.64
Episode 16, Total Reward: -546885.54
Episode 17, Total Reward: -715370.22
Episode 18, Total Reward: -385686.06
Episode 19, Total Reward: -1583913.79
Episode 20, Total Reward: -1299900.77
Episode 21, Total Reward: -1170742.90
Episode 22, Total Reward: -498404.05
Episode 23, Total Reward: -643611.39
Episode 24, Total Reward: -3158226.83
Episode 25, Total Reward: -2843086.57
Episode 26, Total Reward: -2539061.20
Episode 27, Total Reward: -3661685

In [3]:
from transformers import pipeline

classifier = pipeline("sentiment-analysis")
res = classifier("Today is a nice day.")
print(res)

No model was supplied, defaulted to distilbert/distilbert-base-uncased-finetuned-sst-2-english and revision af0f99b (https://huggingface.co/distilbert/distilbert-base-uncased-finetuned-sst-2-english).
Using a pipeline without specifying a model name and revision in production is not recommended.


[{'label': 'POSITIVE', 'score': 0.999871015548706}]
