In [None]:
import os
import time
import requests
import pandas as pd
from datetime import datetime, timezone, timedelta

# =========================
# CONFIG
# =========================
SPACE_ID  = "90159483029"
# Correct folder id for 99 DEV | Templates (from enumeration):
FOLDER_ID = "901513517686"

TASK_TEMPLATE_ID = "t-86c7n8hr7"   # "DEV | Typical Task"
CSV_PATH = "OITILO_ClickUp_WBS__dependencies___full_task_name_.csv"  # <-- put your CSV filename/path here
LIST_NAME = "DEV | Blng typical plan"         # target list name inside the folder (created if missing)

# Use env var if you can; fallback to provided token
# In notebook: os.environ["CLICKUP_TOKEN"] = "pk_...."
CLICKUP_TOKEN = os.getenv("CLICKUP_TOKEN") or "pk_56660333_WATA0RDNID48ZA30VX30Z2CRSNB16SN3"

DRY_RUN = False

BASE = "https://api.clickup.com/api/v2"

# =========================
# HTTP helper (retry 429/5xx)
# =========================
sess = requests.Session()
# ClickUp Personal Token uses raw token (no "Bearer")
sess.headers.update({
    "Authorization": CLICKUP_TOKEN,
    "Accept": "application/json",
})

def req(method: str, url: str, *, params=None, json=None, ok=(200, 201), retries=6):
    if DRY_RUN:
        print(f"[DRY_RUN] {method} {url} params={params} json={json}")
        return {}
    last = None
    for i in range(retries):
        r = sess.request(method, url, params=params, json=json, timeout=60)
        if r.status_code in ok:
            return r.json() if r.text else {}
        if r.status_code == 429 or r.status_code >= 500:
            wait = min(60, 2 ** i)
            time.sleep(wait)
            last = (r.status_code, r.text[:400])
            continue
        # Improved diagnostics for non-retryable errors
        snippet = (r.text or "")[:400]
        raise RuntimeError(f"{method} {url} failed: {r.status_code} {snippet}")
    raise RuntimeError(f"{method} {url} failed after retries. Last error: {last}")

def date_to_ms_midday_utc(date_str: str) -> int:
    # noon UTC to avoid day shifting from timezone conversions
    d = datetime.strptime(date_str, "%Y-%m-%d").replace(tzinfo=timezone.utc) + timedelta(hours=12)
    return int(d.timestamp() * 1000)

def days_to_ms(days: float) -> int:
    return int(float(days) * 24 * 60 * 60 * 1000)

def split_deps(dep_cell) -> list[str]:
    if dep_cell is None or (isinstance(dep_cell, float) and pd.isna(dep_cell)):
        return []
    s = str(dep_cell).strip()
    if not s or s.lower() == "nan":
        return []
    # allow ; , greek semicolon, etc.
    parts = []
    for chunk in s.replace("；", ";").replace(",", ";").split(";"):
        c = chunk.strip()
        if c:
            parts.append(c)
    return parts

# =========================
# 1) Load CSV
# =========================
df = pd.read_csv(CSV_PATH)

required = ["TASK NAME", "dependencies", "start date", "due date", "task type", "estimated time"]
missing = [c for c in required if c not in df.columns]
if missing:
    raise ValueError(f"CSV missing columns: {missing}. Found: {list(df.columns)}")

df = df.copy()
df["TASK NAME"] = df["TASK NAME"].astype(str).str.strip()
df["task type"] = df["task type"].astype(str).str.strip().str.lower()
df["start date"] = df["start date"].astype(str).str.strip()
df["due date"] = df["due date"].astype(str).str.strip()

# Optional: sort by start date then name for nicer creation order
def safe_dt(s):
    try:
        return datetime.strptime(s, "%Y-%m-%d")
    except:
        return datetime(2100, 1, 1)

df = df.sort_values(by=["start date", "TASK NAME"], key=lambda col: col.map(safe_dt) if col.name=="start date" else col).reset_index(drop=True)

# =========================
# 2) Get or create target List inside Folder
# =========================
lists = req("GET", f"{BASE}/folder/{FOLDER_ID}/list", params={"archived": "false"}).get("lists", [])

list_id = None
for l in lists:
    if str(l.get("name", "")).strip().lower() == LIST_NAME.strip().lower():
        list_id = str(l["id"])
        break

if list_id is None:
    created = req("POST", f"{BASE}/folder/{FOLDER_ID}/list", json={"name": LIST_NAME})
    list_id = str(created["id"])

print(f"Using List: {LIST_NAME} (id={list_id})")

# =========================
# 3) Detect custom field "task type" (dropdown) on that List
#     and map 'task'/'milestone' labels -> option IDs
# =========================
task_type_field_id = None
task_type_option_ids = {}  # {"task": <opt_id>, "milestone": <opt_id>}

fields = req("GET", f"{BASE}/list/{list_id}/field").get("fields", [])
for f in fields:
    if str(f.get("name", "")).strip().lower() == "task type":
        task_type_field_id = str(f.get("id"))
        # dropdown options are typically in type_config.options
        opts = (f.get("type_config") or {}).get("options") or []
        for o in opts:
            label = str(o.get("name", "")).strip().lower()
            if label in ("task", "milestone"):
                task_type_option_ids[label] = o.get("id")
        break

if task_type_field_id:
    print(f'Found custom field "task type" id={task_type_field_id}, options={task_type_option_ids}')
else:
    print('Custom field "task type" not found on the List. (Will still set dates/time/deps; type will be skipped.)')

# =========================
# 4) Create tasks from template + set start/due/time estimate + task type
# =========================
name_to_id = {}

created_cnt = 0
updated_cnt = 0

for _, r in df.iterrows():
    name = r["TASK NAME"]

    # Create from template
    created = req(
        "POST",
        f"{BASE}/list/{list_id}/taskTemplate/{TASK_TEMPLATE_ID}",
        json={"name": name},
        ok=(200, 201),
    )

    task_id = str(created.get("id") or created.get("task", {}).get("id") or "")
    if not task_id:
        raise RuntimeError(f"Could not parse created task id for '{name}'. Response: {created}")

    name_to_id[name] = task_id
    created_cnt += 1

    # Update fields: start_date, due_date, time_estimate, + custom field "task type" (dropdown)
    payload = {}

    sd = r["start date"]
    dd = r["due date"]
    if sd and sd.lower() != "nan":
        payload["start_date"] = date_to_ms_midday_utc(sd)
        payload["start_date_time"] = False
    if dd and dd.lower() != "nan":
        payload["due_date"] = date_to_ms_midday_utc(dd)
        payload["due_date_time"] = False

    est = r["estimated time"]
    if est is not None and not (isinstance(est, float) and pd.isna(est)):
        try:
            payload["time_estimate"] = days_to_ms(float(est))
        except:
            pass

    # Set task type via custom field "task type" (dropdown)
    tt = r["task type"]
    if task_type_field_id and tt in task_type_option_ids:
        payload["custom_fields"] = [{
            "id": task_type_field_id,
            "value": task_type_option_ids[tt]
        }]

    if payload:
        req("PUT", f"{BASE}/task/{task_id}", json=payload, ok=(200,))
        updated_cnt += 1

print(f"Created: {created_cnt} | Updated (dates/time/type): {updated_cnt}")

# =========================
# 5) Add dependencies (depends_on) using FULL TASK NAME mapping
# =========================
deps_added = 0
deps_missing = 0
deps_already = 0

for _, r in df.iterrows():
    child_name = r["TASK NAME"]
    child_id = name_to_id.get(child_name)
    if not child_id:
        continue

    for parent_name in split_deps(r["dependencies"]):
        parent_id = name_to_id.get(parent_name)
        if not parent_id:
            deps_missing += 1
            continue

        try:
            req(
                "POST",
                f"{BASE}/task/{child_id}/dependency",
                json={"depends_on": parent_id},
                ok=(200, 201),
            )
            deps_added += 1
        except RuntimeError as e:
            # often ClickUp returns an error if dependency already exists
            if "already" in str(e).lower() or "exist" in str(e).lower():
                deps_already += 1
            else:
                raise

print(f"Dependencies added: {deps_added} | Missing-mapped deps: {deps_missing} | Already existed: {deps_already}")


Using List: DEV | Blng typical plan (id=901520251480)
Custom field "task type" not found on the List. (Will still set dates/time/deps; type will be skipped.)


In [13]:
# Quick diagnostics to verify token + IDs access
print("Token prefix:", (CLICKUP_TOKEN[:6] + "…"))

# 1) Sanity: team templates (you said this works)
try:
    team_templates = req("GET", f"{BASE}/team/{SPACE_ID}/taskTemplate")
    count = len(team_templates.get("task_templates", [])) if isinstance(team_templates, dict) else None
    print("Team task templates OK, count:", count)
except Exception as e:
    print("Team task templates FAIL:", e)

# 2) Folder details (ensures token can see this folder)
try:
    folder = req("GET", f"{BASE}/folder/{FOLDER_ID}")
    print("Folder OK:", folder.get("name"), "(id=", folder.get("id"), ")")
except Exception as e:
    print("Folder FAIL:", e)

# 3) Lists in folder (this is what main flow uses)
try:
    lists = req("GET", f"{BASE}/folder/{FOLDER_ID}/list", params={"archived": "false"})
    print("Lists OK, count:", len(lists.get("lists", [])))
except Exception as e:
    print("Lists FAIL:", e)


Token prefix: pk_566…
Team task templates OK, count: 0
Folder OK: Oitilo (id= 901513316704 )
Lists OK, count: 1


In [14]:
# Enumerate spaces and folders accessible by this token
print("Token prefix:", (CLICKUP_TOKEN[:6] + "…"))
print("Workspace/Team id used (SPACE_ID var):", SPACE_ID)

# 1) List spaces under the team/workspace
try:
    spaces = req("GET", f"{BASE}/team/{SPACE_ID}/space", params={"archived": "false"}).get("spaces", [])
    print(f"Spaces visible: {len(spaces)}")
except Exception as e:
    print("Spaces FAIL:", e)
    spaces = []

# 2) For each space, list folders
for s in spaces:
    sid = str(s.get("id"))
    sname = s.get("name")
    print(f"\nSpace: {sname} (id={sid})")
    try:
        folders = req("GET", f"{BASE}/space/{sid}/folder", params={"archived": "false"}).get("folders", [])
        print(f"  Folders: {len(folders)}")
        for f in folders:
            fid = str(f.get("id"))
            fname = f.get("name")
            print(f"   - Folder: {fname} (id={fid})")
            # probe lists to ensure access
            try:
                lists = req("GET", f"{BASE}/folder/{fid}/list", params={"archived": "false"}).get("lists", [])
                print(f"       Lists: {len(lists)} (access OK)")
            except Exception as e:
                print(f"       Lists probe FAIL: {e}")
    except Exception as e:
        print("  Folders FAIL:", e)


Token prefix: pk_566…
Workspace/Team id used (SPACE_ID var): 90152198197
Spaces visible: 5

Space: OPS – Work Orders (id=90159268305)
  Folders: 2
   - Folder: ASSET O&M (id=901513201204)
       Lists: 6 (access OK)
   - Folder: PV O&M (id=901513226829)
       Lists: 6 (access OK)

Space: PROJECTS – CAPEX & Delivery (id=90159268307)
  Folders: 5
   - Folder: test_me (id=901513223215)
       Lists: 3 (access OK)
   - Folder: CAPEX & Upgrades (id=901513201222)
       Lists: 1 (access OK)
   - Folder: Projects – Dev & Construction (id=901513201233)
       Lists: 1 (access OK)
   - Folder: DC – 12MW IT / 19.5MW Total – Tier III N+1 – Northern Greece (id=901513254092)
       Lists: 6 (access OK)
   - Folder: Oitilo (id=901513316704)
       Lists: 1 (access OK)

Space: FINANCE + CONTROL (id=90159268308)
  Folders: 3
   - Folder: Finance Control (id=901513201239)
       Lists: 1 (access OK)
   - Folder: Governance (id=901513201246)
       Lists: 2 (access OK)
   - Folder: Vendor Management (i

In [12]:
# Override ClickUp token in-session and refresh headers
import os
try:
    sess
except NameError:
    import requests
    sess = requests.Session()
    sess.headers.update({"Accept": "application/json"})

# Paste token below or set env var CLICKUP_TOKEN before running
PASTE_TOKEN = ""  # <-- optionally paste token here, else leave blank to use env
if PASTE_TOKEN:
    os.environ["CLICKUP_TOKEN"] = PASTE_TOKEN

# Refresh variables and header
CLICKUP_TOKEN = os.getenv("CLICKUP_TOKEN") or CLICKUP_TOKEN
USE_BEARER = False  # set True if your org requires Bearer format
sess.headers["Authorization"] = (f"Bearer {CLICKUP_TOKEN}" if USE_BEARER else CLICKUP_TOKEN)
print("Auth header updated. Using Bearer:", USE_BEARER, "| Token prefix:", (CLICKUP_TOKEN[:6] + "…"))


Auth header updated. Using Bearer: False | Token prefix: pk_566…
