In [None]:
# Install required packages if needed. Refer to README for more info.
!pip install requests pandas gdown

In [None]:
#COOKIES
!mkdir -p ~/.cache/gdown
from google.colab import files
uploaded = files.upload()
!mkdir -p ~/.cache/gdown && mv /content/cookies.txt ~/.cache/gdown/cookies.txt

In [None]:
#SUMMARY_CSV_COMPILER
import os
import time
import requests
import pandas as pd
import gdown
import re
import shutil
from google.colab import files

def list_files_in_folder(folder_id, api_key):
    """
    Lists all CSV files in the Google Drive folder using the Drive API.
    Returns a list of dictionaries with 'id' and 'name' for each file.
    This program requires an input of the folder ID and API key, which are
    passed in through main by the user.
    """

    files_list = []

    page_token = None
    base_url = "https://www.googleapis.com/drive/v3/files"
    query = f"'{folder_id}' in parents and mimeType='text/csv'"


    while True:
        params = {
            "q": query,
            "fields": "nextPageToken, files(id, name)",
            "pageToken": page_token,
            "key": api_key
        }
        response = requests.get(base_url, params=params)
        if response.status_code != 200:
            print("Error fetching file list.")
            return files_list

        data = response.json()
        files_list.extend(data.get("files", []))
        page_token = data.get("nextPageToken")
        if not page_token:
            break
    return files_list


def find_files_with_age_type_2(dataset_dir):
    """
    Looks through each CSV file in the dataset directory.
    If the file contains 'age_type' == 2, its filename is added to a list.
    This is because 'age_type' == 2 indicates that the battery was cyclically aged,
    representing the data we are interested in. The function inputs the dataset directory path
    and outputs a list of filenames that meet the condition.
    """

    #Creates a list to store cyclic file names to
    age_type_2_files = []
    #For each file in the dataset, we check that it is csv, find if its age type
    #is 2, and then append its filename to a list if so.
    for filename in os.listdir(dataset_dir):
        if filename.endswith('.csv'):
            file_path = os.path.join(dataset_dir, filename)
            try:
                df = pd.read_csv(file_path, sep=';')
            except Exception as e:
                continue
            if 'age_type' in df.columns and (df['age_type'] == 2).any():
                age_type_2_files.append(filename)
    return age_type_2_files

def collect_data(dataset_dir, file_list):
  """
    For each CSV file in file_list (which is passed in via main and represents files
    with 'age_type' == 2' compiled from the find_files_with_age_type_2 function), we do the following:
        1.) Check for a row where `soh_cap` <= 66.6666667. This represents when the battery reaches 80%
           EOL capacity or below, which is our threshold for battery end-of-life. The reason we use
           66.6667 is because the dataset normalizes data so that "EOL" of 40% capacity fade is reached
           when soh_cap is 0. 66.6667 soh_cap is equal to an 80% capacity fade from 100%.
              If no such row exists, we skip the file/battery from our analysis. This is because the battery
              would be an outlier and impossible to quantify its end-of-life parameters.
        2.) For the first row EOL is reached, we record its `num_cycles_op`, which indicates the number
           of cycles taken before the battery reached EOL.
        3.) Then, we compute averages for the columns that are significant for our analysis.
        4.) We finally return a dictionary with the filename and the stored data for the necessary metrics from each significant column.
    This code inputs the file_list detailed above, the dataset directory path, and outputs a dictionary with each batteries'
    filename, average v_min_target_V, average v_max_target_V, average age_temp, average age_chg_rate, and average age_dischg_rate.
    """

  #Creates a dictionary for us to store our necessary information
  result_dict = {}

  #For each file, we check to ensure soh_cap reaches 66.6667 or below.
  #The file is skkipped if not, as this would be an outlying cell that does
  #Not meet EOL requirements (which is abnorma).
  for filename in file_list:
      file_path = os.path.join(dataset_dir, filename)
      df = pd.read_csv(file_path, sep=';')
      mask = df['soh_cap'] <= 66.6666667
      if not mask.any():
          print(f"Skipped {filename}.")
          continue

  #finds average column values for important variables we will need later
      file_result = {"num_cycles_op": df.loc[mask, 'num_cycles_op'].iloc[0]}
      if 'v_min_target_V' in df.columns:
          file_result['avg_v_min_target_V'] = df['v_min_target_V'].mean()
      if 'v_max_target_V' in df.columns:
          file_result['avg_v_max_target_V'] = df['v_max_target_V'].mean()
      if 'age_temp' in df.columns:
          file_result['avg_age_temp'] = df['age_temp'].mean()
      if 'age_chg_rate' in df.columns:
          file_result['avg_age_chg_rate'] = df['age_chg_rate'].mean()
      if 'age_dischg_rate' in df.columns:
        file_result['avg_age_dischg_rate'] = df['age_dischg_rate'].mean()

      result_dict[filename] = file_result

  return result_dict

def add_soc_window_columns(df):
    """
    This function takes the dictionary output collected from the collect_data function (which is converted into a pandas dataframe in main
    and adds two columns to the DataFrame: SOC Window Min and SOC Window Max. Essentially, this allows us to determine the state of charge
    (SOC) window for each battery tested based on comparing the average v_min_target_V and v_max_target_V values to SOC profiles determined
    by voltage as specified in the dataset source (3.249, 2.500, 4.200, 4.092). Slight errors of +-0.1 and +-0.05 are included to account for experimental error.
    There is greateer variance in the SOC Window Min values, so the error range is 0.05 larger than for SOC Window Max. Additionally, any value that
    does not match the specified values is set to 50, which is a default value indicating that the SOC window is not known, indicating that the
    user should mannually decide wether to include/disclude the particular battery from analysis. Finally, the SOC Window Min and SOC Window Max values are added to
    the dataframe and returned.
    """

    #This subfunction finds the soc_min with an error of 0.1 to account for experimental discrepancies
    def get_soc_window_min(avg_v_min):
        if abs(avg_v_min - 3.249) <= 0.1:
            return 10
        elif abs(avg_v_min - 2.500) <= 0.1:
            return 0
       #If no soc_min is found in the range, the function returns 50. This is so that the user
       #Can easily pinpoint the file in the summary.csv file and decide how to tune the parameters
       #to better fit the datapoint. For our dataset, 50 should never be returned, but it may be useful
       #for users who have issues in their own custom datasets.
        else:
            return 50
  #This subfunction finds the soc_max with an error of 0.05 to account for experimental discrepancies.
    def get_soc_window_max(avg_v_max):
        if abs(avg_v_max - 4.200) <= 0.05:
            return 100
        elif abs(avg_v_max - 4.092) <= 0.05:
            return 90
      #If no soc_max is found in the range, the function returns 50. This is so that the user
       #Can easily pinpoint the file in the summary.csv file and decide how to tune the parameters
       #to better fit the datapoint. For our dataset, 50 should never be returned, but it may be useful
       #for users who have issues in their own custom datasets.
        else:
            return 50
#Here we call the subfunctions and use them to add the soc windows to the data frame
    if 'avg_v_min_target_V' in df.columns:
        df['SOC Window Min'] = df['avg_v_min_target_V'].apply(get_soc_window_min)
    if 'avg_v_max_target_V' in df.columns:
        df['SOC Window Max'] = df['avg_v_max_target_V'].apply(get_soc_window_max)
    return df

def extract_folder_id(folder_link):
    """
    Extracts the folder ID from a Google Drive folder shareable URL.
    """
    match = re.search(r'folders/([a-zA-Z0-9_-]+)', folder_link)
    if match:
        return match.group(1)
    else:
        print("Error finding folder ID.")
        return ""

def main():
    """
    Main function that downloads all CSV files from a specified Google Drive folder,
    processes them, and saves the resulting summary to a CSV file.

    Steps:
    1. Prompt the user for a shareable Google Drive folder link and a Google API key.
    2. Extract the folder ID from the provided link.
    3. Use the Drive API to fetch a list of all CSV files in the folder.
    4. Delete any existing temporary directory (/content/temp_csvs) and recreate it.
    5. Download each CSV file using gdown while printing the current
       file number, total files, and an estimated time remaining.
    6. Pause briefly between downloads to mitigate rate limiting.
    7. Process the downloaded files to select only those that have 'age_type' equal to 2.
    8. Collect relevant data from these files and add additional SOC window columns.
    9. Save the aggregated data as "summary.csv" in the /content directory and trigger its download.
    """
    folder_link = input("Enter your Google Drive folder shareable link (containing the CSV files): ").strip()
    api_key = input("Enter your Google API key (folder must be publicly shared): ").strip()
    folder_id = extract_folder_id(folder_link)
    if not folder_id:
        return

    print("Fetching file list from Google Drive API...")
    files_list = list_files_in_folder(folder_id, api_key)
    total_files = len(files_list)
    print(f"Found {total_files} CSV files in the folder.")

    tmp_dir = '/content/temp_csvs'
    if os.path.exists(tmp_dir):
        shutil.rmtree(tmp_dir)
    os.makedirs(tmp_dir, exist_ok=True)

    download_times = []
    downloaded_count = 0
    for idx, file_info in enumerate(files_list, start=1):
        file_id = file_info['id']
        file_name = file_info['name']
        if not file_name.lower().endswith('.csv'):
            file_name += '.csv'
        download_url = f"https://drive.google.com/uc?id={file_id}"
        output_path = os.path.join(tmp_dir, file_name)
        start_time = time.time()
        try:
            gdown.download(download_url, output_path, quiet=True, fuzzy=True)
            downloaded_count += 1
        except Exception:
            print(f"Failed to download {file_name}.")
            continue
        elapsed = time.time() - start_time
        download_times.append(elapsed)
        avg_time = sum(download_times) / len(download_times) if download_times else 0
        remaining = total_files - idx
        est_time_remaining = remaining * avg_time
        print(f"Downloaded {idx} / {total_files} CSV Files. Estimated Time Remaining: {est_time_remaining:.1f} seconds")
        time.sleep(0.5) #This adds a 0.5 second delay after each download. This way we can bypass Google's rate/bandwidth
        # regulations/limits. Otherwise, google stops us from downloading files and the program fails.
    print(f"Finished downloading. Successfully downloaded {downloaded_count} files out of {total_files}.")

    filtered_files = find_files_with_age_type_2(tmp_dir)
    data_dict = collect_data(tmp_dir, filtered_files)
    print(f"{len(data_dict)} files meet all conditions.")
    if not data_dict:
        print("No files met the conditions.")
        return

    df = pd.DataFrame.from_dict(data_dict, orient='index')
    df.index.name = 'filename'
    df = add_soc_window_columns(df)

    summary_csv_path = os.path.join('/content', "summary.csv")
    df.to_csv(summary_csv_path)
    print(f"summary.csv saved to: {summary_csv_path}")
    files.download(summary_csv_path)

if __name__ == "__main__":
    main()