In [None]:
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import pprint
import os
class Battery:
    def __init__(self,path='../Dataset_1_NCA_battery/CY25-1_1-#1.csv'):
        self.path = path
        self.df = pd.read_excel(path)
        self.cycle_index = self._get_cycle_index()
        self.cycle_life = len(self.cycle_index)

    def _get_cycle_index(self):
        cycle_num = np.unique(self.df['Cycle_Index'].values)
        return cycle_num
    
    def _check(self,cycle=None,variable=None):
        if cycle is not None:
            if cycle not in self.cycle_index:
                raise ValueError('cycle should be in [{},{}]'.format(int(self.cycle_index.min()),int(self.cycle_index.max())))
        if variable is not None:
            if variable not in self.df.columns:
                raise ValueError('variable should be in {}'.format(list(self.df.columns)))
        return True
    
    def get_cycle(self,cycle):
        self._check(cycle=cycle)
        cycle_df = self.df[self.df['Cycle_Index']==cycle]
        return cycle_df
    
    def get_degradation_trajectory(self):
        charge_capacity = []
        discharge_capacity = []
        for cycle in self.cycle_index:
            cycle_df = self.get_cycle(cycle)
            charge_capacity.append(cycle_df[cycle_df['Current(A)'] > 0]['Capacity(Ah)'].max())
            discharge_capacity.append(cycle_df[cycle_df['Current(A)'] < 0]['Capacity(Ah)'].max())
        return charge_capacity,discharge_capacity
    
    def get_value(self,cycle,variable):
        self._check(cycle=cycle,variable=variable)
        cycle_df = self.get_cycle(cycle)
        return cycle_df[variable].values

    def get_charge_stage(self,cycle):
        self._check(cycle=cycle)
        cycle_df = self.get_cycle(cycle)
        charge_df = cycle_df[cycle_df['Current(A)']>0]
        return charge_df

    def get_qv_curve(self, cycle, voltage_range=None, num_samples=100):
        self._check(cycle=cycle)
        charge_df = self.get_charge_stage(cycle) 
        voltage = charge_df['Voltage(V)'].values
        capacity = charge_df['Capacity(Ah)'].values
        if voltage_range is not None:
            min_voltage, max_voltage = voltage_range
            mask = (voltage >= min_voltage) & (voltage <= max_voltage)
            voltage = voltage[mask]
            capacity = capacity[mask]
        if len(voltage) > 0:
            sampled_voltage = np.linspace(voltage.min(), voltage.max(), num_samples)
            sampled_capacity = np.interp(sampled_voltage, voltage, capacity)
        else:
            sampled_capacity = np.full(num_samples, np.nan)
        return sampled_capacity
    
    def extract_charge_discharge_currents(self):
        current_dict = {} 
        for cycle in self.cycle_index:
            cycle_df = self.get_cycle(cycle) 
        
            charge_currents = cycle_df[cycle_df['Current(A)'] > 0]['Current(A)'].values
            if len(charge_currents) > 0:
                charge_current = np.round(charge_currents[0], 1) 
            else:
                charge_current = None 
            discharge_currents = cycle_df[cycle_df['Current(A)'] < 0]['Current(A)'].values
            if len(discharge_currents) > 0:
                discharge_current = np.round(discharge_currents[0], 1) 
            else:
                discharge_current = None 
            current_dict[cycle] = (charge_current, discharge_current)
        
        return current_dict
    def extract_cycle_data_to_csv(self, output_file='cycle_data.csv', voltage_range=None, num_samples=100):
        cycle_data = []
        current_dict = self.extract_charge_discharge_currents()
        charge_capacity, discharge_capacity = self.get_degradation_trajectory()
        for i, cycle in enumerate(self.cycle_index):
            qv_curve = self.get_qv_curve(cycle, voltage_range=voltage_range, num_samples=num_samples)
            charge_current, discharge_current = current_dict[cycle]
            max_charge_capacity = charge_capacity[i]
            max_discharge_capacity = discharge_capacity[i]
            cycle_data.append({
                'Cycle_Index': cycle,
                'QV_Curve': qv_curve.tolist(),  
                'Charge_Current(A)': charge_current,
                'Temperture':25,
                'Discharge_Current(A)': discharge_current,
                'Max_Charge_Capacity(Ah)': max_charge_capacity,
                'Max_Discharge_Capacity(Ah)': max_discharge_capacity
            })
        if output_file is None:
            return cycle_data
        result_df = pd.DataFrame(cycle_data)
        result_df.to_csv(output_file, index=False)
        print(f"数据已保存到 {output_file}")
        return None

In [None]:
import os
import pandas as pd

root_dir = 'raw-dataset'

for subdir, dirs, files in os.walk(root_dir):
    if subdir == root_dir:
        continue
    combined_data = []
    first20cycle_files = [file for file in files if 'first20cycle' in file]
    other_files = [file for file in files if 'first20cycle' not in file and file.endswith('.xlsx')]

    for file in first20cycle_files:
        file_path = os.path.join(subdir, file)
        battery = Battery(path=file_path)
        data = battery.extract_cycle_data_to_csv(output_file=None, voltage_range=(3.6, 4.15), num_samples=50)
        combined_data.extend(data)
    for file in other_files:
        file_path = os.path.join(subdir, file)
        battery = Battery(path=file_path)
        data = battery.extract_cycle_data_to_csv(output_file=None, voltage_range=(3.6, 4.15), num_samples=50)
        combined_data.extend(data)
    cleaned_data = []
    for row in combined_data:
        qv_curve_has_nan = any(pd.isna(value) for value in row['QV_Curve'])
        other_fields_have_nan = any(
            pd.isna(value) or value is None
            for key, value in row.items()
            if key != 'QV_Curve'
        )
        if not qv_curve_has_nan and not other_fields_have_nan:
            cleaned_data.append(row)
    if cleaned_data:
        output_file = os.path.join(subdir, 'combined_data.csv')
        pd.DataFrame(cleaned_data).to_csv(output_file, index=False)
    else:
        print()