<a href="https://colab.research.google.com/github/miguelamev/data-pipelines/blob/main/ClickUp_Data_Pipeline.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Botrista ETL Pipeline

The goal of this project is to automate the ETL process of Botrista's sales data sources for easy analysis.


In [57]:
import os
from dotenv import load_dotenv

from google.colab import auth
from google.colab import drive
from google.colab import auth
from oauth2client.client import GoogleCredentials
from google.auth.transport.requests import Request
from google.auth import default
import gspread

import pandas as pd
import numpy as np

import requests
from urllib.parse import urlparse
import re

import time
import progressbar
from tqdm import tqdm

In [2]:
drive.mount('/content/drive')

pd.set_option('display.max_rows', None) # Show all rows
pd.set_option('display.max_columns', None) # Show all columns

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [3]:
# Area index
area_path = 'drive/MyDrive/02 Areas/Business Intelligence/Indexes/Area Index.csv'
area_index = pd.read_csv(area_path)

# Brand index
brand_path = 'drive/MyDrive/02 Areas/Business Intelligence/Indexes/Brand Index.csv'
brand_index = pd.read_csv(brand_path)

## ClickUp Tokens

In [59]:
# Load the .env file from Google Drive (adjust the path)
load_dotenv("/content/drive/MyDrive/02 Areas/Business Intelligence/Keys/clickup_key.env")  # Adjust the path if needed

# Fetch the API key from the .env file
API_TOKEN = os.getenv("CLICKUP_API_KEY")

# Check if the API key was successfully loaded
if API_TOKEN:
    print("Successfully loaded API Key")
else:
    print("Error: API key not found.")

Successfully loaded API Key


In [60]:
HEADERS = {
    "Authorization": API_TOKEN,
    "Content-Type": "application/json"
}

# === Key ID Definitions === #

## TeamID
TEAM_ID = "12612276"

## SpaceIDs
Deployments_SPACEID = "18930510"

## FolderIDs
InstallBase_FOLDERID = "108028980"

## ListIDs
LiveMachines_LISTID = "170916222"
DecomSwap_LISTID = "164425124"
CurrentDeployments_LISTID = "180119627"
PlumbingJob_LISTID = "901400603529"
PostDeployments_LISTID = "901408037105"

## TaskIDs
test_TASKID = "86b328wdy"

## Extract

In [5]:
# GET Function Definitions:

## Function to get team details
def get_teams():
    url = "https://api.clickup.com/api/v2/team"
    response = requests.get(url, headers=HEADERS)

    if response.status_code == 200:
        return response.json()  # Returns full task details
    else:
        return {"error": f"Failed to fetch team: {response.status_code}", "details": response.text}

## Function to get folder information
def get_folder(folder_id):
    url = f"https://api.clickup.com/api/v2/folder/{folder_id}/list?archived=false"
    response = requests.get(url, headers=HEADERS)
    if response.status_code == 200:

        return response.json()  # Returns full task details
    else:
        return {"error": f"Failed to fetch folder: {response.status_code}", "details": response.text}

## Function to get list information
def get_list(list_id):
    url = f"https://api.clickup.com/api/v2/list/{list_id}"
    response = requests.get(url, headers=HEADERS)

    if response.status_code == 200:
        return response.json()  # Returns full task details
    else:
        return {"error": f"Failed to fetch list: {response.status_code}", "details": response.text}

## Function to get the custom fields allowed on a list:
def get_custom_fields_from_list(list_id):
  url = f"https://api.clickup.com/api/v2/list/{list_id}/field"
  response = requests.get(url, headers=HEADERS)
  if response.status_code == 200:
        return response.json()  # Returns full task details
  else:
        return {"error": f"Failed to fetch custom fields: {response.status_code}", "details": response.text}

## Function to get all tasks from a list with a progress bar
def get_all_tasks(list_id):
    all_tasks = []
    page = 0  # Start from the first page

    with tqdm(desc="Fetching Tasks", unit="page") as pbar:
        while True:
            url = f"https://api.clickup.com/api/v2/list/{list_id}/task?page={page}"
            response = requests.get(url, headers=HEADERS)

            if response.status_code != 200:
                print(f"Error: {response.status_code}, {response.text}")
                break  # Stop if there's an error

            data = response.json()
            tasks = data.get("tasks", [])

            if not tasks:
                break  # Stop when no more tasks are found

            all_tasks.extend(tasks)  # Add tasks from this page
            page += 1  # Move to the next page
            pbar.update(1)  # Increment progress bar for each page fetched

    return all_tasks

## Function to get task details
def get_task_details(task_id):
    url = f"https://api.clickup.com/api/v2/task/{task_id}"
    response = requests.get(url, headers=HEADERS)

    if response.status_code == 200:
        return response.json()  # Returns full task details
    else:
        return {"error": f"Failed to fetch task: {response.status_code}", "details": response.text}

## Function to get tags available in a space
def get_tags_from_space(space_id):
    url = f"https://api.clickup.com/api/v2/space/{space_id}/tag"
    response = requests.get(url, headers=HEADERS)

    if response.status_code == 200:
        return response.json()  # Returns full task details
    else:
        return {"error": f"Failed to fetch task: {response.status_code}", "details": response.text}

In [6]:
# EXTRACTION Function Definition:

## Extracting TaskIDs from raw list data object.
def extract_task_ids(tasks):
    """Extracts all task IDs from a list of tasks."""
    return [task["id"] for task in tasks if "id" in task]

def extract_custom_fields(field_list):
    """Extracts all custom fields allowed in a list."""

    fields = []  # Initialize an empty list to store extracted field data
    for field in field_list['fields']:
        fields.append({
            'Name': field.get('name'),
            'Type': field.get('type'),
            'Date_created': field.get('date_created'),
            'Required': field.get('required')
        })

    return pd.DataFrame(fields)  # Convert list of dictionaries to a DataFrame

## Extracting task details with a progress bar
def extract_task_details(task_ids):
    all_tasks_data = []  # List to store details of all tasks

    with tqdm(total=len(task_ids), desc="Processing Tasks", unit="task") as pbar:
        for task_id in task_ids:
            taskdetails = get_task_details(task_id)  # Fetch task details

            task_data = {
                "Task ID": task_id,
                "Task Name": taskdetails.get('name'),
                "Status": taskdetails.get('status', {}).get('status'),
                "ClickUp URL": taskdetails.get('URL'),
                "Tags": [tag.get('name') for tag in taskdetails.get('tags',{})], #Extracting tags.
            }

            # Extract custom fields and add them as individual columns
            for field in taskdetails.get('custom_fields', []):
                key = field.get('name')  # Get field name
                value = field.get('value')  # Get field value

                if key and key.strip():  # Ensure key is valid
                    task_data[key] = value  # Add custom field as a new column

            # Add metadata at the end
            task_data["List ID"] = taskdetails.get('list').get('id')
            task_data["Project ID"] = taskdetails.get('project').get('id')
            task_data["Folder ID"] = taskdetails.get('folder').get('id')
            task_data["Space ID"] = taskdetails.get('space').get('id')

            all_tasks_data.append(task_data)  # Append task details to list
            pbar.update(1)  # Update the progress bar for each task processed

    return all_tasks_data  # Return structured data

## Extracts tag names from raw tag information.
def extract_tag_names_from_space(space_id):
    """
    Extracts tag names from a given tags dictionary.

    Parameters:
        tags_raw (dict): The dictionary containing tag information.

    Returns:
        list: A list of tag names to be used as column names.
    """
    tags_raw = get_tags_from_space(space_id)
    tag_names = [tag['name'] for tag in tags_raw.get('tags',{})]
    return tag_names

## Identifies and flattens out any fields that have nested dictionaries or lists of dictionries.
def flatten_nested_data(data_list):
    """Flattens nested dictionaries and lists in a list of dictionaries.
       - Expands dictionaries into individual keys.
       - Extracts lists of single values or dictionaries into direct values.
    """
    flattened_data = []

    for entry in data_list:  # Iterate over each dictionary in the list
        if not isinstance(entry, dict):
            continue  # Skip non-dictionary items

        flat_entry = {}  # Store flattened key-value pairs

        def flatten_value(key, value, parent_key=None):
            """ Helper function to handle flattening of nested dictionaries and lists """
            if isinstance(value, dict):  # If it's a dictionary
                for sub_key, sub_value in value.items():
                    flatten_value(sub_key, sub_value, parent_key=f"{parent_key}_{key}" if parent_key else key)
            elif isinstance(value, list):  # If it's a list
                if all(isinstance(item, dict) for item in value):  # List of dictionaries
                    for sub_key in set().union(*(d.keys() for d in value)):  # Get all keys in the list of dicts
                        extracted_values = [d.get(sub_key) for d in value if d.get(sub_key) is not None]
                        # If there's only one value, extract it directly; otherwise, keep the list
                        flat_entry[f"{key}_{sub_key}"] = extracted_values[0] if len(extracted_values) == 1 else extracted_values
                else:  # List of values (non-dictionaries)
                    flat_entry[key] = value[0] if len(value) == 1 else ", ".join(map(str, value))
            else:
                # For simple values, just store it
                flat_entry[key] = value

        for key, value in entry.items():
            flatten_value(key, value)  # Start flattening for each key-value pair

        flattened_data.append(flat_entry)  # Add to the result list

    return flattened_data  # Returns a list of dictionaries

## Transform

In [7]:
# TRANSFORMATION : Dealing with Null Values and Setting column names

def analyze_columns(df):
    """Analyzes a DataFrame by sorting columns alphabetically,
       checking data types, unique values, and null counts.
    """
    # Sort columns alphabetically
    sorted_columns = sorted(df.columns)

    analysis = []

    for col in sorted_columns:
        col_data = df[col]  # Extract column data

        # Handle unhashable types (lists, dictionaries)
        converted_data = col_data.apply(lambda x: tuple(x) if isinstance(x, list) else x)

        # Unique count (handling lists safely)
        try:
            unique_count = converted_data.nunique()
        except TypeError:
            unique_count = len(set(map(str, col_data.dropna())))  # Convert lists to strings for uniqueness

        analysis.append({
            "Column Name": col,
            "Data Type": col_data.dtype,
            "Unique Values": unique_count,
            "Total Values": len(col_data),
            "Null Count": col_data.isna().sum(),
            "Null Percentage": round((col_data.isna().sum() / len(col_data)) * 100,2)
        })
    return pd.DataFrame(analysis)  # Return results as a DataFrame


## Removes Nan filled columns and rows
def lean_dataframe(df, min_fill_col_percentage, min_fill_row_percentage):
    """
    Filters a DataFrame by keeping only:
    - Columns with at least `min_fill_col_percentage` of non-null values.
    - Rows with at least `min_fill_row_percentage` of non-null values.

    Parameters:
        df (pd.DataFrame): The input dataset.
        min_fill_col_percentage (float): Minimum percentage (0-100) of non-null values for columns.
        min_fill_row_percentage (float): Minimum percentage (0-100) of non-null values for rows.

    Returns:
        pd.DataFrame: A cleaned DataFrame with only the valid columns and rows.
    """
    # Keep columns with sufficient non-null values
    df = df.loc[:, df.notnull().mean() * 100 >= min_fill_col_percentage]

    # Keep rows with sufficient non-null values
    return df.loc[df.notnull().mean(axis=1) * 100 >= min_fill_row_percentage]

## One-hot encodes tag columns to yield True and False values.
def expand_tags_column(df, tag_column):
    """
    Expands a column containing comma-separated tags into individual binary columns with formatted names.

    Parameters:
        df (pd.DataFrame): The input DataFrame.
        tag_column (str): The column name containing the tags.

    Returns:
        pd.DataFrame: The transformed DataFrame with separate binary columns for each tag.
    """
    # Fill NaNs with empty strings to avoid errors
    df[tag_column] = df[tag_column].fillna("")

    # Split tags into lists
    df["Tag_List"] = df[tag_column].apply(lambda x: x.split(", ") if x else [])

    # Get unique tags from all rows
    unique_tags = set(tag for tags in df["Tag_List"] for tag in tags)

    # Create new columns with the format [Tag name] (Tag)
    for tag in unique_tags:
        formatted_col_name = f"[{tag}] Tag"
        df[formatted_col_name] = df["Tag_List"].apply(lambda tags: tag in tags)

    # Drop the temporary list column
    df.drop(columns=["Tag_List"], inplace=True)

    return df

In [8]:
# TRANSFORMATION : Data Standardization

def validate_email(email):
    """Check if the string is a valid email."""
    email_regex = r"^[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+$"
    return bool(re.match(email_regex, str(email)))

def validate_phone(phone):
    """Check if the string is a valid phone number (basic validation)."""
    phone_regex = r"^\+?[0-9().\-\s]{7,15}$"
    return bool(re.match(phone_regex, str(phone)))

def is_unix_timestamp(value):
    """Check if a value is a Unix timestamp (either in seconds or milliseconds)."""
    if isinstance(value, (int, float)) and not np.isnan(value):
        value = int(value)  # Ensure value is an integer before checking
        if 1000000000 <= value <= 4102444800:  # Approx. 2001 to 2100 (seconds)
            return "seconds"
        elif 1000000000000 <= value <= 4102444800000:  # Approx. 2001 to 2100 (milliseconds)
            return "milliseconds"
    return False

def standardize_column_types(dataset, column_names=None, column_types=None):
    """
    Standardizes and/or converts the columns in `dataset` based on the rules defined in `column_names` and `column_types`.
    If `column_names` and `column_types` are not provided, it standardizes column types across the dataset.

    Parameters:
        dataset (pd.DataFrame): The dataset to standardize/convert.
        column_names (list or pd.Series, optional): The list of column names to convert. Defaults to None.
        column_types (list or pd.Series, optional): The corresponding data types for conversion. Defaults to None.

    Returns:
        pd.DataFrame: The modified dataset with updated and standardized data types.
    """
    if column_names is not None and column_types is not None:
        for col_name, col_type in zip(column_names, column_types):
            if col_name not in dataset.columns:
                continue  # Skip columns that are not in the dataset

            # Convert potential date columns to numeric first
            if col_type in ['date']:
                dataset[col_name] = pd.to_numeric(dataset[col_name], errors='coerce')

            if col_type in ['short_text', 'text', 'labels', 'users', 'tasks', 'formula']:
                dataset[col_name] = dataset[col_name].astype(str)

            elif col_type == 'drop_down':
                dataset[col_name] = dataset[col_name].apply(lambda x: str(int(float(x))) if pd.notna(x) and x not in ["", None] else "")

            elif col_type == 'url':
                dataset[col_name] = dataset[col_name].apply(lambda x: x if urlparse(str(x)).scheme else np.nan)

            elif col_type == 'email':
                dataset[col_name] = dataset[col_name].apply(lambda x: x if validate_email(x) else np.nan)

            elif col_type == 'checkbox':
                dataset[col_name] = dataset[col_name].astype(bool)

            elif col_type == 'phone':
                dataset[col_name] = dataset[col_name].apply(lambda x: x if validate_phone(x) else np.nan)

            elif col_type in ['currency', 'number']:
                dataset[col_name] = pd.to_numeric(dataset[col_name], errors='coerce')

            elif col_type == 'date':
                dataset[col_name] = dataset[col_name].apply(
                    lambda x: pd.to_datetime(int(x), unit='s').date() if is_unix_timestamp(x) == "seconds"
                    else pd.to_datetime(int(x), unit='ms').date() if is_unix_timestamp(x) == "milliseconds"
                    else np.nan
                )

    else:
        # Standardize column types across the dataset
        for col in dataset.columns:
            # Drop completely empty columns
            if dataset[col].isna().all():
                continue

            # Convert object-type columns to numeric if possible
            if dataset[col].dtype == "object":
                dataset[col] = pd.to_numeric(dataset[col], errors='ignore')

            # Infer the dominant data type in the column
            inferred_type = dataset[col].dropna().map(type).mode()[0] if not dataset[col].dropna().empty else str

            # Standardize None and NaN values
            dataset[col] = dataset[col].replace({None: np.nan})

            # Apply appropriate conversion based on inferred type
            if inferred_type == str:
                dataset[col] = dataset[col].astype(str).replace('nan', np.nan)  # Ensure empty strings convert to NaN
            elif inferred_type in [int, float, np.number]:
                dataset[col] = pd.to_numeric(dataset[col], errors='coerce')

                # Check if numeric column is a Unix timestamp
                if dataset[col].apply(is_unix_timestamp).any():
                    dataset[col] = dataset[col].apply(
                        lambda x: pd.to_datetime(int(x), unit='s').date() if is_unix_timestamp(x) == "seconds"
                        else pd.to_datetime(int(x), unit='ms').date() if is_unix_timestamp(x) == "milliseconds"
                        else np.nan
                    )

            elif inferred_type == bool:
                dataset[col] = dataset[col].astype(bool)
            elif inferred_type == pd.Timestamp:
                dataset[col] = pd.to_datetime(dataset[col], errors='coerce')

    return dataset

In [9]:
# TRANSFORMATION : Dealing with indexed columns.

def get_coltype_columns(column_names, column_types, coltype):
    """
    Identifies and returns a list of column names that are of type 'drop_down'.

    Parameters:
        column_names (list or pd.Series): The list of column names.
        column_types (list or pd.Series): The corresponding data types.

    Returns:
        list: A list of column names that are dropdowns.
    """
    return [col_name for col_name, col_type in zip(column_names, column_types) if col_type == coltype]

## Build a dropdown from the raw data from Custom Fields.
def build_lookup(fields):
    """
    Extracts dropdown field mappings from a list of fields.

    Parameters:
        fields (dict): Dictionary containing field metadata.
        E.g. get_custom_fields_from_list(list_id)

    Returns:
        dict: A lookup dictionary where keys are field names,
              and values are dictionaries mapping indices to labels.
    """
    lookup_table = {}

    for field in fields.get('fields', []):  # Safely get 'fields' list
        if field.get('type') == 'drop_down':  # Check if field is dropdown
            lookup_table[field['name']] = {
                str(option.get('orderindex')): str(option.get('name'))  # Use 'NA' or 'Unknown' if key doesn't exist
                for option in field.get('type_config', {}).get('options', [])
            }

        if field.get('type') == 'labels':  # Check if field is labels
            lookup_table[field['name']] = {
                str(option.get('id')): str(option.get('label'))  # Use 'NA' or 'Unknown' if key doesn't exist
                for option in field.get('type_config', {}).get('options', [])
            }

    return lookup_table

## Use LookUp Table to convert indexes into values
def replace_indexes(dataset, lookup_table):
    """
    Replaces indexed dropdown values with their corresponding labels using the lookup table.
    Handles both single and multiple-choice fields.

    Parameters:
        dataset (pd.DataFrame): The dataset where dropdown columns have indexed values.
        lookup_table (dict): A dictionary where keys are dropdown column names,
                             and values are dictionaries mapping index -> label.

    Returns:
        pd.DataFrame: A modified dataset with dropdown indexes replaced by their corresponding labels.
    """
    df = dataset.copy()  # Avoid modifying the original dataset

    for col, mapping in lookup_table.items():
        if col in df.columns:
            df[col] = df[col].astype(str).apply(
                lambda x: ", ".join([mapping.get(val, val) for val in x.split(", ")]) if pd.notna(x) else x
            )

    return df

## Clean

In [31]:
# Defining Cleaning functions

## STATUS
def process_status_column(df, column_name="Status"):
    def clean_status(status):
        # Convert to Title Case
        title_case_status = status.title()

        return title_case_status

    df[column_name] = df[column_name].apply(clean_status)
    return df

## CUSTOMER TYPE
def process_customer_type(df, column_name="Customer Type"):
    def categorize_customer_type(customer_type):
        # Ensure it's a string and handle NaNs
        if pd.isna(customer_type):
            return pd.Series(["Other", "NA"])  # Default values for NaNs

        customer_type = str(customer_type).title()  # Convert to string and standardize casing

        # Determine category
        if "Channel" in customer_type:
            category = "Channel"
        elif "Enterprise" in customer_type:
            category = "Enterprise"
        elif "Smb" in customer_type:  # Normalize "SMB"
            category = "Mid-Market"
        else:
            category = "Other"

        return pd.Series([category])

    df[["Customer Type"]] = df[column_name].apply(categorize_customer_type)
    return df

## VERTICAL
def process_vertical_column(df, column_name="Vertical"):
    valid_values = {
        "Education", "QSR", "SMB", "Theme Parks", "Cinema",
        "Healthcare & Seniors", "Sports & Leisure", "Business & Industry"
    }

    smb_aliases = {
        "High Traffic Chain/Business",
        "High Potential Growth Business",
        "Independent Business"
    }

    def clean_vertical(value):
        if value in smb_aliases:
            return "SMB"
        return value if value in valid_values else value

    df[column_name] = df[column_name].apply(clean_vertical)
    return df

## BRAND
def process_brand_column(df, column_name="Brand"):
    def clean_brand(value):
        if pd.isna(value):  # Handle NaN values first
            return "Other"

        value = str(value).strip()  # Convert to string and remove extra spaces

        # Remove "SMB -" and "Channel -"
        cleaned_value = re.sub(r"^(SMB|Channel) - ", "", value)

        return cleaned_value

    df[column_name] = df[column_name].apply(clean_brand)
    return df

## MACHINE DELIVERY DATE
def fill_machine_delivery_date(df):
    # Define the possible columns in order of priority
    columns = [
        "Machine delivery Date",
        "Bot Delivery Date",
        "Install Date",
        "Install Date (duplicated)"
    ]

    # Filter only existing columns
    available_cols = [col for col in columns if col in df.columns]

    if len(available_cols) > 1:
        # Use ffill over the available columns
        df["Machine delivery Date"] = df[available_cols].bfill(axis=1).iloc[:, 0]

    # Ensure NaT remains if all possible columns were empty
    df["Machine delivery Date"] = df.apply(
        lambda row: pd.NaT if row[available_cols].isna().all() else row["Machine delivery Date"],
        axis=1
    )

    return df

## SITE WORK
def format_site_work(df, column_name="site work"):
    mapping = {
        "done": "Finished",
        "not required": "Not Required",
        "Botrista in progress": "In Progress by Botrista",
        "Customer in progress": "In Progress by Customer",
        "Ninja": "In Progress by Botrista"  # Assuming "Ninja" means Botrista is handling it
    }

    df[column_name] = df[column_name].map(mapping).fillna(df[column_name])
    return df

In [11]:
# Filling in empty VERTICAL, CUSTOMER TYPE, BRAND:

import pandas as pd
import numpy as np

def create_index_table(filled_df):
    """
    Create an index table using 'Brand' as the unique key, mapping to Vertical and Customer Type.

    Parameters:
        filled_df (pd.DataFrame): DataFrame with non-empty 'Vertical', 'Customer Type', and 'Brand' columns.

    Returns:
        dict: Dictionary mapping each Brand to its most common (Vertical, Customer Type).
    """
    # Group by Brand and select the most frequent Vertical and Customer Type
    index_table = (
        filled_df.groupby('Brand')[['Vertical', 'Customer Type']]
        .agg(lambda x: x.mode().iloc[0] if not x.mode().empty else np.nan)
        .to_dict(orient='index')
    )
    return index_table  # Returns {Brand: {'Vertical': X, 'Customer Type': Y}}

def find_brand_from_location(location_name, brands):
    """
    Try to find the best-matching brand from the 'Location Name' using substring search.

    Parameters:
        location_name (str): The name of the location.
        brands (list): List of known brand names.

    Returns:
        str or None: The best-matching brand or None if no match is found.
    """
    if pd.isna(location_name):
        return None
    location_name_lower = location_name.lower()

    for brand in brands:
        if brand.lower() in location_name_lower:
            return brand
    return None

def autocomplete_table(incomplete_df, index_table):
    """
    Autocomplete missing 'Vertical', 'Customer Type', and 'Brand' in an incomplete dataset.

    Parameters:
        incomplete_df (pd.DataFrame): DataFrame with missing values in 'Vertical', 'Customer Type', and 'Brand'.
        index_table (dict): Dictionary of Brand -> {'Vertical': X, 'Customer Type': Y}

    Returns:
        pd.DataFrame: The completed dataset with missing values filled.
    """
    brands = list(index_table.keys())

    for i, row in incomplete_df.iterrows():
        if pd.notna(row['Brand']) and row['Brand'] in index_table:
            # Fill missing Vertical and Customer Type using index table
            incomplete_df.at[i, 'Vertical'] = index_table[row['Brand']]['Vertical']
            incomplete_df.at[i, 'Customer Type'] = index_table[row['Brand']]['Customer Type']
        elif pd.isna(row['Brand']):
            # Try finding the Brand from Location Name
            detected_brand = find_brand_from_location(row['Location Name'], brands)
            if detected_brand:
                incomplete_df.at[i, 'Brand'] = detected_brand
                incomplete_df.at[i, 'Vertical'] = index_table[detected_brand]['Vertical']
                incomplete_df.at[i, 'Customer Type'] = index_table[detected_brand]['Customer Type']

    return incomplete_df

In [12]:
def format_title_case_in_dataframe(df, columns):
    # Function to format individual string values or list of strings to title case with no extra spaces
    def format_title_case(input_value):
        # Handle cases where the value is NaN or None
        if pd.isna(input_value) or input_value is None:
            return None

        # If it's a list, process each element in the list
        if isinstance(input_value, list):
            return ' '.join([format_title_case(item) for item in input_value])

        # Ensure input is a string and handle it
        if isinstance(input_value, str):
            # Remove extra spaces and convert to title case
            formatted_string = ' '.join(input_value.split())
            return formatted_string.title()

        # If it's neither a string nor a list, return it as is
        return input_value

    # Apply the formatting to the specified columns
    for column in columns:
        if column in df.columns:
            df[column] = df[column].apply(lambda x: format_title_case(x) if isinstance(x, (str, list, type(None))) else x)
        else:
            raise ValueError(f"Column '{column}' not found in the DataFrame.")

    return df

In [13]:
### Index by Brand ###
def fill_vertical_by_brand(df, brand_index):
    """
    Fills missing values in LiveDecom['Vertical'] based on the first three characters of LiveDecom['Location ID']
    using brand_index.

    Parameters:
    livedecom_df (pd.DataFrame): DataFrame containing 'Location ID' and 'Vertical' columns.
    brand_index_df (pd.DataFrame): DataFrame containing 'Brand ID' and 'Vertical' columns.

    Returns:
    pd.DataFrame: Updated DataFrame with missing or empty 'Vertical' values filled.
    """
    # Create a mapping dictionary from brand_index
    brand_map = brand_index.set_index('Brand ID')['Vertical'].to_dict()

    # Fill missing or empty values based on the first three characters of 'Location ID'
    df['Vertical'] = df.apply(
        lambda row: brand_map.get(row['Location ID'][:3], row['Vertical'])
        if (pd.isna(row['Vertical']) or row['Vertical'] == '') else row['Vertical'], axis=1
    )

    return df

def fill_region_by_state(df, area_index):
    """
    Adds a 'Region' column to the DataFrame based on 'State', using the 'Abbreviation' in area_index.
    If 'State' is null, empty, or not found, 'Region' will be left as NaN.

    Parameters:
    df (pd.DataFrame): DataFrame containing a 'State' column.
    area_index (pd.DataFrame): DataFrame containing 'Abbreviation' and 'Region' columns.

    Returns:
    pd.DataFrame: Updated DataFrame with new 'Region' column.
    """
    # Create a mapping dictionary from area_index using 'Abbreviation'
    area_map = area_index.set_index('Abbreviation')['Region'].to_dict()

    # Assign region directly based on state abbreviation
    df['Region'] = df['State'].map(area_map)

    return df

In [32]:
# CLEAN : Live Machines

def clean_LiveMachines(df, colnames):
  df = process_status_column(df)
  df = process_customer_type(df)
  df = process_vertical_column(df)
  df = process_brand_column(df)
  df = fill_machine_delivery_date(df)
  df = format_site_work(df)
  df = fill_vertical_by_brand(df, brand_index)
  df = fill_region_by_state(df, area_index)

  # Appending tag columns
  tag_columns = [column for column in df.columns if "Tag" in column]

  # Renaming and Reordering
  df = df.rename(columns=colnames)  # Rename columns
  df = df[list(list(colnames.values()) + tag_columns)]  # Reorder columns

  return df

LM_colnames = {
    "Location ID": "Location ID",
    "Task Name": "Location Name",
    "Vertical": "Vertical",
    "Customer Type": "Customer Type",
    "Brand": "Brand",
    "Launch Date": "Launch Date",
    "Install Date": "Install Date",
    "Machine delivery Date": "Machine Delivery Date",
    "Survey Date": "Survey Date",
    "Region": "Region",
    "State": "State",
    "formatted_address": "Formatted Address",
    "lat": "Latitude",
    "lng": "Longitude",
    "place_id": "Place ID",
    "Operator": "Operator",
    "Contract Type": "Contract Type",
    "site work": "Site Work",
    "Site GM Name": "Site GM Name",
    "Site GM number": "Site GM Number",
    "Site GM Email": "Site GM Email",
    "Sales_id": "Sales ID",
    "Sales_username": "Sales Name",
    "Sales_email": "Sales Email",
    "RAE_id": "RAE ID",
    "RAE_username": "RAE Name",
    "RAE_email": "RAE Email",
    "SOE_id": "SOE ID",
    "SOE_username": "SOE Name",
    "SOE_email": "SOE Email",
    "CC Agent_id": "CCA ID",
    "CC Agent_username": "CCA Name",
    "CC Agent_email": "CCA Email",
    "LO_id": "LO ID",
    "LO_username": "LO Name",
    "LO_email": "LO Email",
    "Status": "ClickUp Status",
    "School ID": "School ID",
    "Task ID": "Task ID",
    "List ID": "List ID",
    "Project ID": "Project ID",
    "Folder ID": "Folder ID",
    "Space ID": "Space ID"
}

In [33]:
# CLEAN : Decom & Swap

def clean_DecomSwap(df, colnames):
  df = process_status_column(df)
  df = process_customer_type(df)
  df = process_vertical_column(df)
  df = process_brand_column(df)
  df = fill_machine_delivery_date(df)
  df = format_site_work(df)
  df = fill_vertical_by_brand(df, brand_index)
  df = fill_region_by_state(df, area_index)


  # Filling in Empty Vertical, Customer Type, Brand:
  index = create_index_table(df[['Vertical', 'Customer Type', 'Brand']])
  df = autocomplete_table(df, index)

  # Appending tag columns
  tag_columns = [column for column in df.columns if "Tag" in column]

  # Renaming and Reordering
  df = df.rename(columns=colnames)  # Rename columns
  df = df[list(list(colnames.values()) + tag_columns)]  # Reorder columns

  return df

DC_colnames = {
    "Location ID": "Location ID",
    "Task Name": "Location Name",
    "Vertical": "Vertical",
    "Customer Type": "Customer Type",
    "Brand": "Brand",
    "Decom Date": "Decom Date",
    "Launch Date": "Launch Date",
    "Install Date": "Install Date",
    "Machine delivery Date": "Machine Delivery Date",
    "Survey Date": "Survey Date",
    "Reason for Decommission\t": "Decom Reason",
    "Region": "Region",
    "State": "State",
    "formatted_address": "Formatted Address",
    "lat": "Latitude",
    "lng": "Longitude",
    "place_id": "Place ID",
    "Contract Type": "Contract Type",
    "site work": "Site Work",
    "Franchise Owner Name": "Franchise Owner Name",
    "Franchise Owner Phone #": "Franchise Owner Phone",
    "Franchise Owner Email": "Franchise Owner Email",
    "Site GM Name": "Site GM Name",
    "Site GM number": "Site GM Number",
    "Site GM Email": "Site GM Email",
    "Sales_id": "Sales ID",
    "Sales_username": "Sales Name",
    "Sales_email": "Sales Email",
    "RAE_id": "RAE ID",
    "RAE_username": "RAE Name",
    "RAE_email": "RAE Email",
    "SOE_id": "SOE ID",
    "SOE_username": "SOE Name",
    "SOE_email": "SOE Email",
    "CC Agent_id": "CCA ID",
    "CC Agent_username": "CCA Name",
    "CC Agent_email": "CCA Email",
    "LO_id": "LO ID",
    "LO_username": "LO Name",
    "LO_email": "LO Email",
    "Status": "ClickUp Status",
    "Task ID": "Task ID",
    "List ID": "List ID",
    "Project ID": "Project ID",
    "Folder ID": "Folder ID",
    "Space ID": "Space ID"
}

# Clean with Other Data Sources: Contract Type, Area

In [43]:
# CLEAN : Current Deployments

def clean_CurrentDeployments(df, colnames):
  df = process_status_column(df)
  df = process_customer_type(df)
  df = process_vertical_column(df)
  df = process_brand_column(df)
  df = fill_machine_delivery_date(df)
  df = format_site_work(df)
  df = fill_vertical_by_brand(df, brand_index)
  df = fill_region_by_state(df, area_index)

  # Appending tag columns
  tag_columns = [column for column in df.columns if "Tag" in column]

  # Renaming and Reordering
  df = df.rename(columns=colnames)  # Rename columns
  df = df[list(list(colnames.values()) + tag_columns)]  # Reorder columns

  return df


CD_colnames = {
    "Location ID": "Location ID",
    "Task Name": "Location Name",
    "Vertical": "Vertical",
    "Customer Type": "Customer Type",
    "Brand": "Brand",
    "Tentative launch date": "Tentative Launch Date",
    "On Track For Install date": "On Track For Install Date",
    "Launch Date": "Launch Date",
    "Install Date": "Install Date",
    "Machine delivery Date": "Machine Delivery Date",
    "Survey Date": "Survey Date",
    "Region": "Region",
    "State": "State",
    "formatted_address": "Formatted Address",
    "lat": "Latitude",
    "lng": "Longitude",
    "place_id": "Place ID",
    "Operator": "Operator",
    "Site Survey Options": "Site Survey Options",
    "site work": "Site Work",
    "Site GM Name": "Site GM Name",
    "Site GM number": "Site GM Number",
    "Site GM Email": "Site GM Email",
    "Sales_id": "Sales ID",
    "Sales_username": "Sales Name",
    "Sales_email": "Sales Email",
    "RAE_id": "RAE ID",
    "RAE_username": "RAE Name",
    "RAE_email": "RAE Email",
    "SOE_id": "SOE ID",
    "SOE_username": "SOE Name",
    "SOE_email": "SOE Email",
    "CC Agent_id": "CCA ID",
    "CC Agent_username": "CCA Name",
    "CC Agent_email": "CCA Email",
    "LO_id": "LO ID",
    "LO_username": "LO Name",
    "LO_email": "LO Email",
    "Status": "ClickUp Status",
    "School ID": "School ID",
    "Task ID": "Task ID",
    "List ID": "List ID",
    "Project ID": "Project ID",
    "Folder ID": "Folder ID",
    "Space ID": "Space ID"
}

In [17]:
# CLEAN : Post Deployments

def clean_PostDeployments(df, colnames):
  df = process_status_column(df)
  df = process_customer_type(df)
  df = process_vertical_column(df)
  df = process_brand_column(df)
  df = fill_machine_delivery_date(df)
  df = format_site_work(df)
  df = fill_vertical_by_brand(df, brand_index)
  df = fill_region_by_state(df, area_index)

  # Appending tag columns
  tag_columns = [column for column in df.columns if "Tag" in column]

  # Renaming and Reordering
  df = df.rename(columns=colnames)  # Rename columns
  df = df[list(list(colnames.values()) + tag_columns)]  # Reorder columns

  return df


PD_colnames = {
    "Location ID": "Location ID",
    "Task Name": "Location Name",
    "Vertical": "Vertical",
    "Customer Type": "Customer Type",
    "Brand": "Brand",
    "Tentative launch date": "Tentative Launch Date",
    "On Track For Install date": "On Track For Install Date",
    "Launch Date": "Launch Date",
    "Install Date": "Install Date",
    "Machine delivery Date": "Machine Delivery Date",
    "Survey Date": "Survey Date",
    "Region": "Region",
    "State": "State",
    "formatted_address": "Formatted Address",
    "lat": "Latitude",
    "lng": "Longitude",
    "place_id": "Place ID",
    "Operator": "Operator",
    "Site Survey Options": "Site Survey Options",
    "site work": "Site Work",
    "Site GM Name": "Site GM Name",
    "Site GM number": "Site GM Number",
    "Site GM Email": "Site GM Email",
    "Sales_id": "Sales ID",
    "Sales_username": "Sales Name",
    "Sales_email": "Sales Email",
    "RAE_id": "RAE ID",
    "RAE_username": "RAE Name",
    "RAE_email": "RAE Email",
    "SOE_id": "SOE ID",
    "SOE_username": "SOE Name",
    "SOE_email": "SOE Email",
    "CC Agent_id": "CCA ID",
    "CC Agent_username": "CCA Name",
    "CC Agent_email": "CCA Email",
    "LO_id": "LO ID",
    "LO_username": "LO Name",
    "LO_email": "LO Email",
    "Status": "ClickUp Status",
    "School ID": "School ID",
    "Task ID": "Task ID",
    "List ID": "List ID",
    "Project ID": "Project ID",
    "Folder ID": "Folder ID",
    "Space ID": "Space ID"
}

In [18]:
# CLEAN : Plumbing Job

def clean_PlumbingJob(df, colnames):
  df = process_status_column(df)
  df = process_customer_type(df)

  if "Vertical" in df.columns:
    df = process_vertical_column(df)

  df = process_brand_column(df)

  if "State" in df.columns:
    df = fill_region_by_state(df, area_index)

  # Appending tag columns
  tag_columns = [column for column in df.columns if "Tag" in column]

  # Renaming and Reordering
  df = df.rename(columns=colnames)  # Rename columns
  df = df[list(list(colnames.values()) + tag_columns)]  # Reorder columns

  return df

PJ_colnames = plumbingjob_colnames = {
    "Task ID": "Task ID",
    "Task Name": "Location Name",
    "Customer Type": "Customer Type",
    "Brand": "Brand",
    "SS Report": "SS Report",
    "Plumbing Task": "Plumbing Task",
    "Other Plumbing job": "Other Plumbing Job",
    "Plumber type": "Plumber Type",
    "Quote": "Quote",
    "Initial Cost": "Initial Cost",
    "Final Cost": "Final Cost",
    "Plumber ETA": "Plumber ETA",
    "Install time ETA": "Install Time ETA",
    "place_id": "Place ID",
    "lat": "Latitude",
    "lng": "Longitude",
    "formatted_address": "Formatted Address",
    #"Region": "Region",
    "Status": "ClickUp Status",
    "Sales_id": "Sales ID",
    "Sales_username": "Sales Name",
    "Sales_email": "Sales Email",
    "Coordinator_id": "LO ID",
    "Coordinator_username": "LO Name",
    "Coordinator_email": "LO Email",
    "CC Agent_id": "CCA ID",
    "CC Agent_username": "CCA Name",
    "CC Agent_email": "CCA Email",
    "SOE_id": "SOE ID",
    "SOE_username": "SOE Name",
    "SOE_email": "SOE Email",
    "List ID": "List ID",
    "Project ID": "Project ID",
    "Folder ID": "Folder ID",
    "Space ID": "Space ID"
}

## Load

In [19]:
# GoogleDrive
def load_GoogleDrive(df, path):
    """Saves a DataFrame as a CSV file in Google Drive."""
    df.to_csv(path, index=False, encoding='utf-8-sig')
    print(f"Saved to: {path}")
    return path

In [20]:
# GoogleDrive
def load_GoogleDrive(df, path):
  df.to_csv(path, index=False)

## ETL Functions

In [21]:
#EXTRACT

def extract_ClickUp(list_id):

  ### Getting the raw list of tasks using a list id
  Tasklist_raw = get_all_tasks(list_id)

  # Extracting task ids from raw data
  Tasklist_IDs = extract_task_ids(Tasklist_raw)

  # Getting the raw task details and extracting the columns from raw data
  Tasklist_all = extract_task_details(Tasklist_IDs)

  # Flatenning nested data into a structured dataframe
  Tasklist_flat = flatten_nested_data(Tasklist_all)

  ### Converting to DataFrame
  Tasklist = pd.DataFrame(Tasklist_flat)
  return Tasklist

In [22]:
#TRANSFORM

def transform_ClickUp(df, list_id):

  # Removing column and rows composed mostly of NA values
  df_lean = lean_dataframe(df, 1, 10)

  ### Getting custom field information
  CustomFields_raw = get_custom_fields_from_list(list_id)
  CustomFields = extract_custom_fields(CustomFields_raw)

  # One-hot encoding tag columns
  if "Tags" in df_lean.columns:
    df_lean = expand_tags_column(df_lean, "Tags")

  # Standardizing the data types for all columns
  df_typed = standardize_column_types(df_lean, column_names=CustomFields['Name'], column_types=CustomFields['Type'])

  ### Build lookup table for indexed columns
  lookup_table = build_lookup(CustomFields_raw)

  # Indexing all indexed columns to display real values
  df_indexed = replace_indexes(df_typed, lookup_table)

  return df_indexed

# Data Execution

#### Live Machines

In [23]:
LiveMachinesE = extract_ClickUp(LiveMachines_LISTID)

Fetching Tasks: 11page [00:31,  2.82s/page]
Processing Tasks: 100%|██████████| 1032/1032 [07:37<00:00,  2.26task/s]


In [24]:
LiveMachinesT = transform_ClickUp(LiveMachinesE, LiveMachines_LISTID)

In [25]:
LiveMachinesC = clean_LiveMachines(LiveMachinesT, LM_colnames)

In [26]:
path = "drive/MyDrive/02 Areas/Business Intelligence/Datasets/LiveMachines.csv"
load_GoogleDrive(LiveMachinesC, path)

#### Decom & Swap

In [27]:
DecomSwapE = extract_ClickUp(DecomSwap_LISTID)

Fetching Tasks: 5page [00:12,  2.46s/page]
Processing Tasks: 100%|██████████| 496/496 [03:35<00:00,  2.30task/s]


In [35]:
DecomSwapT = transform_ClickUp(DecomSwapE, DecomSwap_LISTID)

In [37]:
DecomSwapC = clean_DecomSwap(DecomSwapT, DC_colnames)

  df["Machine delivery Date"] = df[available_cols].bfill(axis=1).iloc[:, 0]


In [44]:
path = "drive/MyDrive/02 Areas/Business Intelligence/Datasets/DecomSwap.csv"
load_GoogleDrive(DecomSwapC, path)

#### Current Deployments

In [38]:
CurrentDeploymentsE = extract_ClickUp(CurrentDeployments_LISTID)

Fetching Tasks: 4page [00:10,  2.63s/page]
Processing Tasks: 100%|██████████| 319/319 [02:39<00:00,  2.00task/s]


In [39]:
CurrentDeploymentsT = transform_ClickUp(CurrentDeploymentsE, CurrentDeployments_LISTID)

In [45]:
CurrentDeploymentsC = clean_CurrentDeployments(CurrentDeploymentsT, CD_colnames)

  df["Machine delivery Date"] = df[available_cols].bfill(axis=1).iloc[:, 0]


In [46]:
path = "drive/MyDrive/02 Areas/Business Intelligence/Datasets/CurrentDeployments.csv"
load_GoogleDrive(CurrentDeploymentsT, path)

#### Post Deployments

In [53]:
PostDeploymentsE = extract_ClickUp(PostDeployments_LISTID)

Fetching Tasks: 0page [00:00, ?page/s]
Processing Tasks: 0task [00:00, ?task/s]


In [48]:
PostDeploymentsT = transform_ClickUp(PostDeploymentsE, PostDeployments_LISTID)

In [None]:
PostDeploymentsC = clean_PostDeployments(PostDeploymentsT, PD_colnames)

In [None]:
path = "drive/MyDrive/02 Areas/Business Intelligence/Datasets/PostDeployments.csv"
load_GoogleDrive(PostDeploymentsC, path)

#### Plumbing Job

In [52]:
PlumbingJobE = extract_ClickUp(PlumbingJob_LISTID)

Fetching Tasks: 1page [00:01,  1.66s/page]
Processing Tasks: 100%|██████████| 58/58 [00:25<00:00,  2.29task/s]


In [None]:
PlumbingJobT = transform_ClickUp(PlumbingJobE, PlumbingJob_LISTID)

In [None]:
PlumbingJobC = clean_PlumbingJob(PlumbingJobT, PJ_colnames)

In [None]:
path = "drive/MyDrive/02 Areas/Business Intelligence/Datasets/PlumbingJob.csv"
load_GoogleDrive(PlumbingJobC, path)

# Dataset Building


In [None]:
pd.set_option('display.max_rows', None)  # Show all rows
pd.set_option('display.max_columns', None)  # Show all columns

In [None]:
# Step 1: Remove duplicate column names
LiveMachinesM = LiveMachinesC.loc[:, ~LiveMachinesC.columns.duplicated()]
DecomSwapM = DecomSwapC.loc[:, ~DecomSwapC.columns.duplicated()]
CurrentDeploymentsM = CurrentDeploymentsC.loc[:, ~CurrentDeploymentsC.columns.duplicated()]
PostDeploymentsM = PostDeploymentsC.loc[:, ~PostDeploymentsC.columns.duplicated()]
PlumbingJobM = PlumbingJobC.loc[:, ~PlumbingJobC.columns.duplicated()]

# Step 2: Reset index to avoid duplicate row indices
LiveMachinesM = LiveMachinesM.reset_index(drop=True)
DecomSwapM = DecomSwapM.reset_index(drop=True)
PostDeploymentsM = PostDeploymentsM.reset_index(drop=True)
CurrentDeploymentsM = CurrentDeploymentsM.reset_index(drop=True)
PlumbingJobM = PlumbingJobM.reset_index(drop=True)

### AllClickUp Dataset

In [None]:
# === Merging all ClickUp datasets === #

# Concatenate all datasets using union of columns
AllClickUp = pd.concat(
    [LiveMachinesM, CurrentDeploymentsM, DecomSwapM, PostDeploymentsM, PlumbingJobM],
    ignore_index=True,
    sort=False
)

# Optional: Preview the combined DataFrame
print(AllClickUp.shape)
print(AllClickUp.columns.tolist())

In [None]:
# === Labeling for all ClickUp statuses === #
live_statuses = ['Enterprise Park ', 'Mid Market', 'Enterprise Stage 3&4','Channel Partner', 'Enterprise Stage 2']

decom_statuses = ['Decommission', 'Removal', 'Removal Audit']

deploy_statuses = ['Not Scheduled', 'Scheduled For Install', 'Partially Installed', 'Equipment Delivered Only', 'To Be Invoiced']

swap_statuses = ['Swap', 'Swapped', 'Relocation']

plumbing_statuses = ['Scheduled For Work', 'Hold', 'In Progress', 'Cancelled', 'Quoted', 'Delayed']

# Now all leftovers go to "other"
all_statuses = set(AllClickUp['ClickUp Status'].unique())

categorized_statuses = set(
    live_statuses + decom_statuses + deploy_statuses + swap_statuses + plumbing_statuses
)

other_statuses = sorted(all_statuses - categorized_statuses)


# === Subsetting Task IDs by ClickUp status === #
live_IDs = AllClickUp[AllClickUp['ClickUp Status'].isin(live_statuses)]['Task ID'].tolist()
decom_IDs = AllClickUp[AllClickUp['ClickUp Status'].isin(decom_statuses)]['Task ID'].tolist()
deploy_IDs = AllClickUp[AllClickUp['ClickUp Status'].isin(deploy_statuses)]['Task ID'].tolist()
swap_IDs = AllClickUp[AllClickUp['ClickUp Status'].isin(swap_statuses)]['Task ID'].tolist()
plumbing_IDs = AllClickUp[AllClickUp['ClickUp Status'].isin(plumbing_statuses)]['Task ID'].tolist()
other_IDs = AllClickUp[AllClickUp['ClickUp Status'].isin(other_statuses)]['Task ID'].tolist()

print(f'Live ID elements: {len(live_IDs)}')
print(f'Decom ID elements: {len(decom_IDs)}')
print(f'Deploy ID elements: {len(deploy_IDs)}')
print(f'Swap ID elements: {len(swap_IDs)}')
print(f'Plumbing ID elements: {len(plumbing_IDs)}')
print(f'Other ID elements: {len(other_IDs)}')

print(f'\nStatus ID objects are type: {type(other_IDs)}')

In [None]:
# === Labeling by Analytical Category === #

AllClickUp['Status'] = np.where(AllClickUp['Task ID'].isin(live_IDs), 'Live',
                      np.where(AllClickUp['Task ID'].isin(decom_IDs), 'Decommission',
                      np.where(AllClickUp['Task ID'].isin(deploy_IDs), 'Deployment',
                      np.where(AllClickUp['Task ID'].isin(swap_IDs), 'Swapped',
                      np.where(AllClickUp['Task ID'].isin(plumbing_IDs), 'Plumbing',
                      np.where(AllClickUp['Task ID'].isin(other_IDs), 'Other',''))))))

AllClickUp['Status'].value_counts()

In [None]:
all_colorder = [

    # --- Core Identifiers ---
    'Location ID',
    'Location Name',
    'Vertical',
    'Customer Type',
    'Brand',
    'Status',

    # --- Geography ---
    'Region',
    'State',
    'Formatted Address',
    'Latitude',
    'Longitude',
    'Place ID',

    # --- Dates ---
    'Launch Date',
    'Install Date',
    'Machine Delivery Date',
    'Survey Date',
    'Tentative Launch Date',
    'On Track For Install Date',
    'Decom Date',
    'Plumber ETA',
    'Install Time ETA',

    # --- People Columns ---
    'Operator',
    'Site GM Name',
    'Site GM Number',
    'Site GM Email',
    'Sales ID',
    'Sales Name',
    'Sales Email',
    'RAE ID',
    'RAE Name',
    'RAE Email',
    'SOE ID',
    'SOE Name',
    'SOE Email',
    'CCA ID',
    'CCA Name',
    'CCA Email',
    'LO ID',
    'LO Name',
    'LO Email',
    'Franchise Owner Name',
    'Franchise Owner Phone',
    'Franchise Owner Email',

    # --- Other Fields ---
    'Decom Reason',
    'Contract Type',
    'Site Work',
    'SS Report',
    'Plumbing Task',
    'Other Plumbing Job',
    'Plumber Type',
    'Quote',
    'Initial Cost',
    'Final Cost',
    'Site Survey Options',

    # --- ClickUp IDs ---

    'ClickUp Status',
    'School ID',
    'Task ID',
    'List ID',
    'Project ID',
    'Folder ID',
    'Space ID',

    # --- Tags ---
    'Tags',
    '[can schedule site work] Tag',
    '[q2] Tag',
    '[partially installed] Tag',
    '[new construction] Tag',
    '[t1] Tag',
    '[post sq] Tag',
    '[backflow] Tag',
    '[case study] Tag',
    '[adjustment] Tag',
    '[cca order verified] Tag',
    '[need plumbing scheduled] Tag',
    '[pop delay] Tag',
    '[need to review sq] Tag',
    '[high priority] Tag',
    '[assignment test] Tag',
    '[incomplete] Tag',
    '[launch tracker] Tag',
    '[tentative install set] Tag',
    '[on track for launch] Tag',
    '[on track for install] Tag',
    '[risk of churn] Tag',
    '[can schedule install/launch] Tag',
    '[plumbing] Tag',
    '[no sq report] Tag',
    '[ice machine needed] Tag',
    '[email] Tag',
    '[important location] Tag',
    '[conversion] Tag',
    '[relaunch] Tag',
    '[delay install/launch] Tag',
    '[need follow up] Tag',
    '[setup] Tag',
    '[hold] Tag',
    '[not scheduled] Tag',
    '[new!] Tag',
    '[bib adaptor] Tag',
    '[compass] Tag',
    '[sales calendly] Tag',
    '[sq/vsq to be scheduled] Tag',
    '[site work in progress] Tag',
    '[cloudbar only] Tag',
    '[delivered only] Tag',
    '[r&d bot] Tag',
    '[hawaii] Tag',
    '[tlds] Tag',
    '[waiting for contract] Tag',
    '[ghosting franchisee] Tag',
    '[sq report in progress] Tag',
    '[order added] Tag',
    '[q4] Tag'
]

AllClickUp = AllClickUp[all_colorder]

In [None]:
path = "drive/MyDrive/02 Areas/Business Intelligence/Datasets/AllClickUp.csv"
load_GoogleDrive(AllClickUp, path)

### LiveDecom Dataset

In [None]:
# === Building the << LiveDecom >> dataset === #
LiveDecom = AllClickUp[(AllClickUp['Task ID'].isin(live_IDs)) | (AllClickUp['Task ID'].isin(decom_IDs))]

In [None]:
# === Merging all ClickUp datasets === #

# Step 1: Remove duplicate column names
LiveMachinesM = LiveMachinesC.loc[:, ~LiveMachinesC.columns.duplicated()]
DecomSwapM = DecomSwapC.loc[:, ~DecomSwapC.columns.duplicated()]


# Step 2: Reset index to avoid duplicate row indices
LiveMachinesM = LiveMachinesM.reset_index(drop=True)
DecomSwapM = DecomSwapM.reset_index(drop=True)

# Step 3: Align columns before concatenation
common_cols = LiveMachinesC.columns.intersection(DecomSwapC.columns)

# Option 1: Use only shared columns
LiveDecom = pd.concat([LiveMachinesC[common_cols], DecomSwapC[common_cols]], ignore_index=True)

# Option 2: Keep all columns (even if missing in one DataFrame)
LiveDecom = pd.concat([LiveMachinesM, DecomSwapM], ignore_index=True, sort=False)

# Print or return the final DataFrame
print(LiveDecom)

In [None]:
len(AllClickUp['ClickUp Status'])

In [None]:
AllClickUp['ClickUp Status'].unique()

In [None]:
LD_cols = ['Location ID', 'Location Name', 'Vertical', 'Customer Type', 'Brand',
       'Decom Date','Launch Date', 'Install Date', 'Machine Delivery Date', 'Survey Date',
       'Area', 'State', 'Formatted Address', 'Latitude', 'Longitude','Place ID',
        'Contract Type', 'Site Work', 'Site GM Name','Sales Name',
       'RAE Name', 'SOE Name', 'CCA Name', 'LO Name', 'Franchise Owner Name',
        'ClickUp Status', 'School ID', 'Task ID', 'List ID', 'Project ID', 'Folder ID', 'Space ID']

LiveDecom = LiveDecom[LD_cols]

load_GoogleDrive(LiveDecom, "drive/MyDrive/02 Areas/Business Intelligence/Datasets/LiveDecom (02-04-2025).csv")

In [None]:
DecomSwapE['Vertical']

In [None]:
# === Concatenating Datasets === #

import pandas as pd

def clean_and_concat(dataframes, keep_all_columns=True):
    """
    Cleans multiple DataFrames by removing duplicate columns, resetting index,
    and concatenating them based on shared or all columns.

    Parameters:
        dataframes (list of pd.DataFrame): List of DataFrames to process.
        keep_all_columns (bool): If True, keep all columns; if False, keep only shared columns.

    Returns:
        pd.DataFrame: The concatenated DataFrame.
    """
    # Step 1: Remove duplicate column names
    cleaned_dfs = [df.loc[:, ~df.columns.duplicated()] for df in dataframes]

    # Step 2: Reset index
    cleaned_dfs = [df.reset_index(drop=True) for df in cleaned_dfs]

    if keep_all_columns:
        # Option 2: Keep all columns (even if missing in one DataFrame)
        result = pd.concat(cleaned_dfs, ignore_index=True, sort=False)
    else:
        # Option 1: Use only shared columns
        common_cols = set.intersection(*(set(df.columns) for df in cleaned_dfs))
        result = pd.concat([df[common_cols] for df in cleaned_dfs], ignore_index=True)

    return result

In [None]:
df_list = [LiveMachinesC, DecomSwapC, CurrentDeploymentsC, PostDeploymentsC]
AllClickUp = clean_and_concat(df_list, keep_all_columns=True)  # Change to False to use only shared columns

In [None]:
load_GoogleDrive(AllClickUp, "drive/MyDrive/02 Areas/Business Intelligence/Datasets/AllClickUp.csv")

In [None]:
len(AllClickUp)

# Playground: Start Here

To Clean: Status, site work, Accessories & Toppings, Types of Accessories and Toppings, Customer Type, Brand, Vertical, Site Survey Options

Filled with Nan: Tentative Launch Date, Location ID,

Remove: , Launch Week

Keep: On Track for Install date, Menu, Survey Date, Launch Date

Fill in: Machine Type (fill with cloudbar)

In [None]:
LiveMachinesC[['Location Name', 'Formatted Address', 'Latitude', 'Longitude']].iloc[130:140]

In [None]:
CurrentDeploymentsT['LO_email']

In [None]:
import pandas as pd
import gspread
from google.colab import auth
from google.auth.transport.requests import Request
from google.auth import default

# Function to load DataFrame into a specific Google Sheets worksheet
def load_dataframe_to_google_sheets(df, spreadsheet_name, worksheet_name):
    # Authenticate the user (This step will prompt for authorization in Google Colab)
    auth.authenticate_user()

    # Use google-auth to get credentials and authorize the client
    creds, _ = default()  # This gets the default credentials
    client = gspread.authorize(creds)  # Authorize using google-auth credentials

    # Clean the DataFrame: Replace NaN, Inf, and -Inf
    df_cleaned = df.replace([float('inf'), float('-inf')], None)  # Replace Inf and -Inf with None (JSON-compliant)
    df_cleaned = df_cleaned.fillna('')  # Replace NaN values with 'N/A'

    # Convert the entire dataframe to string to ensure no issues with non-serializable objects
    df_cleaned = df_cleaned.astype(str)

    # Open the spreadsheet
    sheet = client.open(spreadsheet_name)

    # Try to get the worksheet by name, if it exists, delete it
    try:
        worksheet = sheet.worksheet(worksheet_name)  # Try to access the existing worksheet
        sheet.del_worksheet(worksheet)  # Delete the worksheet if it exists
        print(f"Deleted existing worksheet: {worksheet_name}")
    except gspread.exceptions.WorksheetNotFound:
        print(f"Worksheet '{worksheet_name}' not found, proceeding to create it.")

    # Create a new worksheet with the name and dimensions based on the DataFrame
    new_worksheet = sheet.add_worksheet(title=worksheet_name, rows=df_cleaned.shape[0] + 1, cols=df_cleaned.shape[1])
    print(f"Created new worksheet: {worksheet_name}")

    # Upload the updated DataFrame to the new worksheet
    new_worksheet.update([df_cleaned.columns.values.tolist()] + df_cleaned.values.tolist())
    print(f"Uploaded new data to worksheet: {worksheet_name}")

# Example usage
spreadsheet_name = "Botrista Datasets"  # Replace with your actual spreadsheet name
worksheet_name = "Live Machines"  # Replace with your actual worksheet name

# Assuming 'LiveMachinesC' is your DataFrame
load_dataframe_to_google_sheets(LiveMachinesC, spreadsheet_name, worksheet_name)

In [None]:
import pandas as pd
import gspread
from google.colab import auth
from google.auth.transport.requests import Request
from google.auth import default

# Function to load DataFrame into a specific Google Sheets worksheet
def load_dataframe_to_google_sheets(df, spreadsheet_name, worksheet_name):
    # Authenticate the user (This step will prompt for authorization in Google Colab)
    auth.authenticate_user()

    # Use google-auth to get credentials and authorize the client
    creds, _ = default()  # This gets the default credentials
    client = gspread.authorize(creds)  # Authorize using google-auth credentials

    # Clean the DataFrame: Replace NaN, Inf, and -Inf
    df_cleaned = df.replace([float('inf'), float('-inf')], None)  # Replace Inf and -Inf with None (JSON-compliant)
    df_cleaned = df_cleaned.fillna('')  # Replace NaN values with empty strings

    # Convert datetime columns to string format (e.g., 'YYYY-MM-DD')
    for col in df_cleaned.select_dtypes(include=['datetime']).columns:
        df_cleaned[col] = df_cleaned[col].dt.strftime('%Y-%m-%d')  # You can adjust the format if needed

    # Open the spreadsheet
    sheet = client.open(spreadsheet_name)

    # Try to get the worksheet by name, if it exists, delete it
    try:
        worksheet = sheet.worksheet(worksheet_name)  # Try to access the existing worksheet
        sheet.del_worksheet(worksheet)  # Delete the worksheet if it exists
        print(f"Deleted existing worksheet: {worksheet_name}")
    except gspread.exceptions.WorksheetNotFound:
        print(f"Worksheet '{worksheet_name}' not found, proceeding to create it.")

    # Create a new worksheet with the name and dimensions based on the DataFrame
    new_worksheet = sheet.add_worksheet(title=worksheet_name, rows=df_cleaned.shape[0] + 1, cols=df_cleaned.shape[1])
    print(f"Created new worksheet: {worksheet_name}")

    # Upload the updated DataFrame to the new worksheet
    new_worksheet.update([df_cleaned.columns.values.tolist()] + df_cleaned.values.tolist())
    print(f"Uploaded new data to worksheet: {worksheet_name}")

# Example usage
spreadsheet_name = "Botrista Datasets"  # Replace with your actual spreadsheet name
worksheet_name = "Live Machines"  # Replace with your actual worksheet name

# Assuming 'LiveMachinesC' is your DataFrame
load_dataframe_to_google_sheets(LiveMachinesC[['Customer Type','Area','Launch Date']], spreadsheet_name, worksheet_name)

## Automation Functions

In [None]:
# AUTOMATION : Outreach and notifications through emails and messages.

# Function to create a new task
def create_task(list_id, task_name, description=""):
    url = f"https://api.clickup.com/api/v2/list/{list_id}/task"
    payload = {
        "name": task_name,
        "description": description
    }
    response = requests.post(url, json=payload, headers=HEADERS)
    return response.json()


# Fetches all comments for a task
def get_comments(task_id):
  url = "https://api.clickup.com/api/v2/task/task_id/comment"
  response = requests.get(url, headers=HEADERS)

  return response.text

In [None]:
# GoogleSheets
def load_GoogleSheets(df, spreadsheet_name, worksheet_name):
    # Authenticate the user (This step will prompt for authorization in Google Colab)
    auth.authenticate_user()

    # Use google-auth to get credentials and authorize the client
    creds, _ = default()  # This gets the default credentials
    client = gspread.authorize(creds)  # Authorize using google-auth credentials

    # Clean the DataFrame to handle problematic values (NaN, Inf, -Inf)
    df_cleaned = df.replace([float('inf'), float('-inf')], None)  # Replace Inf and -Inf with None (JSON-compliant)
    df_cleaned = df_cleaned.fillna('')  # Replace NaN values with 'N/A' or any placeholder value you prefer

    # Convert all datetime or date columns to string format
    for column in df_cleaned.select_dtypes(include=['datetime', 'object']).columns:
        df_cleaned[column] = df_cleaned[column].astype(str)  # Convert datetime objects to strings

    # Open the spreadsheet
    sheet = client.open(spreadsheet_name)

    # Try to get the worksheet by name and delete it
    try:
        worksheet = sheet.worksheet(worksheet_name)  # Try to access the existing worksheet
        sheet.del_worksheet(worksheet)  # Delete the worksheet if it exists
        print(f"Deleted existing worksheet: {worksheet_name}")
    except gspread.exceptions.WorksheetNotFound:
        print(f"Worksheet {worksheet_name} not found, proceeding to create it.")

    # Create a new worksheet with the same name and dimensions of the dataframe
    new_worksheet = sheet.add_worksheet(title=worksheet_name, rows=df_cleaned.shape[0] + 1, cols=df_cleaned.shape[1])
    print(f"Created new worksheet: {worksheet_name}")

    # Upload the updated DataFrame to the new worksheet
    new_worksheet.update([df_cleaned.columns.values.tolist()] + df_cleaned.values.tolist())
    print(f"Uploaded new data to worksheet: {worksheet_name}")

In [None]:
def filter_columns_by_null_percentage(df, percentage_list):
    """Filters and prints the number of columns with less than specific null value percentages."""
    # Get column analysis using the analyze_dataframe function
    coldata = analyze_columns(df)

    for percentage in percentage_list:
        # Calculate the threshold for null values based on the percentage
        threshold = len(df) * (percentage / 100)

        # Filter columns based on the calculated threshold
        filtered_cols = coldata[coldata['Null Count'] < threshold]

        # Print out the result
        print(f"Number of cols with <{percentage}% null values: ", len(filtered_cols))


# Example usage:
# Assume 'your_dataframe' is your DataFrame
percentage_list = [99, 95, 90, 70, 50, 30, 20, 10, 5, 1]  # Example percentages
filter_columns_by_null_percentage(LiveMachines, percentage_list)

### Tags

In [None]:
tags_raw = get_tags_from_space(Deployments_SPACEID)

for i in range(len(tags_raw['tags'])):
  print(tags_raw['tags'][i]['name'], tags_raw['tags'][i]['project_id'])

In [None]:
import pandas as pd

def extract_tag_names(space_id):
    """
    Extracts tag names from a given tags dictionary.

    Parameters:
        tags_raw (dict): The dictionary containing tag information.

    Returns:
        list: A list of tag names to be used as column names.
    """
    tags_raw = get_tags_from_space(space_id)
    tag_names = [tag['name'] for tag in tags_raw.get('tags',{})]
    return tag_names

extract_tag_names(Deployments_SPACEID)

In [None]:
data = {}
for field in range(len(LM_taskdetails['custom_fields'])):
  key = LM_taskdetails['custom_fields'][field]['name']
  value = LM_taskdetails['custom_fields'][field]['value']
  # Skip if key is None or an empty string
  if key and key.strip():
    data[key] = value

data

In [None]:
[print(LM_taskdetails['custom_fields'][n]['name']) for n in range(len(LM_taskdetails['custom_fields']))]

The columns I need to extract:

* LocationName = tf['name']
* TaskID = tf['id']
* Status = tf['status']
* Tags = tf['tags']
* URL = tf['url']
* ListID = tf['list']
* ProjectID = tf['project']
* FolderID = tf['folder']
* SpaceID = tf['space']



* CustomField_NAME = tf['custom_fields'][mdata]['name']
* CustomField_TYPE = tf['custom_fields'][mdata]['type']
* CustomField_VALUE = tf['custom_fields'][mdata]['value']




* TaskID = list_of_tasks[n]['id']
* Status = list_of_tasks[n]['status']
* AVCustomFields = list_of_tasks[n]['custom_fields']

In [None]:
CF = get_custom_fields_from_list(LiveMachines_LISTID)

CF['fields'][8]['type_config']['options'][0]['id']
CF['fields'][8]['type_config']['options'][0]['label']

In [None]:
custom_fields = []

for n in range(len(tf['custom_fields'])):
  x = [n,tf['custom_fields'][n]['name'], tf['custom_fields'][n]['type']]
  custom_fields.append(x)

## Troubleshooting Code

In [None]:
# Check if API Token works:

API_TOKEN = "pk_88269903_5JXC7PRSK9APTJ4P6FV2KN6RQCLH78CV"
HEADERS = {
    "Authorization": API_TOKEN,
    "Content-Type": "application/json"
}

# Test API Access
response = requests.get("https://api.clickup.com/api/v2/user", headers=HEADERS)

print(response.status_code)
print(response.json())  # Check if authentication works


# Check if Task ID works:

TASK_ID = "86b328wdy"
url = f"https://api.clickup.com/api/v2/task/{TASK_ID}"
response = requests.get(url, headers=HEADERS)

print(response.status_code)
print(response.json())  # Check the response for errors


# Check if List ID works:

LIST_ID = "your_list_id"
url = f"https://api.clickup.com/api/v2/list/{LiveMachines_LISTID}/task"
response = requests.get(url, headers=HEADERS)
print(response.json())  # See if your task is listed

# Google Sheets


Data Sources:
*  Partnership Team CRM

Figure out all of the datasources and consolidate them in python instead.

## Extract

In [None]:
import pandas as pd
from google.colab import drive
drive.mount('/content/drive')

In [None]:
from google.colab import auth
auth.authenticate_user()

import gspread
from google.auth import default
creds, _ = default()

gc = gspread.authorize(creds)

In [None]:
PartnershipTeamCRM_URL = 'https://docs.google.com/spreadsheets/d/1AfTqoykzjpI6z6bD_XLeWsqe-git4bBKagpg_R7HmDI/export?format=csv'

In [None]:
# Open the Google Sheet by title or URL
sheet = gc.open_by_url(url)

# Get the data from the first sheet
worksheet = sheet.worksheet("Overall")
data = worksheet.get_all_values()

# Convert the data to a pandas DataFrame
df = pd.DataFrame(data[1:], columns=data[0])

In [None]:
df