<a href="https://colab.research.google.com/github/tarumi283/tarumi/blob/main/Cosinor%20for%20fitting%20data_24well_version1.0.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [2]:
!pip -q install numpy scipy

In [3]:
from google.colab import files
files.upload()

print("アップロード完了。次のセルで INPUT_PATH を書いてください（ファイル名だけでOK）。")


Saving 2025-0826-1048_Plate1_F0_N_D.csv to 2025-0826-1048_Plate1_F0_N_D.csv
アップロード完了。次のセルで INPUT_PATH を書いてください（ファイル名だけでOK）。


In [5]:
from pathlib import Path

# ★アップロードしたファイル名（例）
INPUT_PATH = "/content/2025-0826-1048_Plate1_F0_N_D.csv"

# period scan settings
PERIOD_START = 18.0
PERIOD_END   = 30.0
PERIOD_STEP  = 0.1

# plate CSV settings
ANALYZE_ALL_WELLS = True   # True: 全well解析
WELL_NAME = "A1"           # ANALYZE_ALL_WELLS=Falseの時だけ有効
EXCLUDE_BACKGROUND = True

# output
OUTPUT_DIR = "cosinor_results"

# ---- check
p = Path(INPUT_PATH)
print("INPUT_PATH:", p)
print("Exists?:", p.exists())
if not p.exists():
    raise FileNotFoundError(
        "INPUT_PATH が見つかりません。アップロードしたファイル名と完全一致しているか確認してください。"
    )


INPUT_PATH: /content/2025-0826-1048_Plate1_F0_N_D.csv
Exists?: True


In [6]:
from __future__ import annotations

import math
from dataclasses import dataclass
from typing import List, Tuple

import numpy as np
from scipy.stats import f as f_dist


@dataclass
class PeriodRow:
    period: float
    f_stat: float
    p_value: float
    mesor: float
    amplitude: float
    acrophase_deg: float


def _fit_cosinor(t: np.ndarray, y: np.ndarray, period: float) -> Tuple[float, float, float, float, float]:
    """Fit y = b0 + bc*cos(wt) + bs*sin(wt). Return b0, bc, bs, SSE_full, SSE_null."""
    w = 2.0 * np.pi / period
    X = np.column_stack([np.ones_like(t), np.cos(w * t), np.sin(w * t)])
    beta, *_ = np.linalg.lstsq(X, y, rcond=None)

    y_hat = X @ beta
    resid = y - y_hat
    sse_full = float(np.sum(resid**2))

    y_mean = float(np.mean(y))
    sse_null = float(np.sum((y - y_mean) ** 2))
    return float(beta[0]), float(beta[1]), float(beta[2]), sse_full, sse_null


def _f_test(sse_full: float, sse_null: float, n: int) -> Tuple[float, float]:
    """F-test for adding cos+sin (df_num=2) vs intercept only (df_den=n-3)."""
    if n <= 3:
        return float("nan"), float("nan")
    df_den = n - 3
    if sse_full <= 0:
        return float("inf"), 0.0

    f_stat = ((sse_null - sse_full) / 2.0) / (sse_full / df_den)
    if f_stat < 0:
        f_stat = 0.0
    p = float(f_dist.sf(f_stat, 2, df_den))
    return float(f_stat), p


def _amp_acrophase_deg(bc: float, bs: float) -> Tuple[float, float]:
    """
    Model: y = M + A*cos(wt + phi)
      bc = A*cos(phi)
      bs = -A*sin(phi)
      phi = atan2(-bs, bc)
    Return amplitude and acrophase in degrees (NOT wrapped; negative allowed).
    """
    amp = math.sqrt(bc * bc + bs * bs)
    phi = math.atan2(-bs, bc)  # radians
    return amp, phi * 180.0 / math.pi


def cosinor_periodogram_exe_style(
    t: np.ndarray,
    y: np.ndarray,
    period_start: float,
    period_end: float,
    period_step: float,
) -> Tuple[PeriodRow, List[PeriodRow]]:
    """Return (best_row, all_rows). best_row is the row with minimum p_value."""
    t = np.asarray(t, float)
    y = np.asarray(y, float)
    if t.shape != y.shape:
        raise ValueError("t and y must have the same shape.")
    if period_step <= 0:
        raise ValueError("period_step must be > 0")
    if period_end <= period_start:
        raise ValueError("period_end must be > period_start")

    periods = np.arange(period_start, period_end + 1e-12, period_step, dtype=float)
    n = int(t.size)

    rows: List[PeriodRow] = []
    for P in periods:
        b0, bc, bs, sse_full, sse_null = _fit_cosinor(t, y, float(P))
        f_stat, p = _f_test(sse_full, sse_null, n=n)
        amp, aph_deg = _amp_acrophase_deg(bc, bs)
        rows.append(
            PeriodRow(
                period=float(P),
                f_stat=f_stat,
                p_value=p,
                mesor=b0,
                amplitude=amp,
                acrophase_deg=aph_deg,
            )
        )

    best = min(rows, key=lambda r: (r.p_value, -r.f_stat))
    return best, rows


In [7]:
import csv
import re
from typing import Dict, Tuple, Optional

import numpy as np
from pathlib import Path


def looks_like_plate_reader_csv(path: Path) -> bool:
    """Heuristic sniff for plate-reader CSV exports."""
    try:
        with path.open("r", encoding="utf-8", errors="ignore") as f:
            head = "".join([next(f) for _ in range(30)])
        return ("Time(h)" in head) and ("RLU" in head)
    except Exception:
        return False


def parse_plate_reader_csv(path: Path) -> Tuple[Dict[str, Tuple[np.ndarray, np.ndarray]], Dict[str, str]]:
    """
    Returns:
      series_by_well: {well: (t_hours, y_rlu)}
      meta: metadata
    """
    lines = path.read_text(encoding="utf-8", errors="ignore").splitlines()

    meta: Dict[str, str] = {}
    header_idx: Optional[int] = None

    # find header row and collect meta
    for i, ln in enumerate(lines[:200]):
        if ln.startswith('"Date","Time(h)"') or ln.startswith("Date,Time(h)") or ("Time(h)" in ln and "RLU" in ln and "Date" in ln):
            header_idx = i
            break
        m = re.match(r'^"([^"]+)"\s*,\s*(.*)$', ln)
        if m:
            meta[m.group(1).strip()] = m.group(2).strip().strip('"')

    if header_idx is None:
        raise ValueError("Could not find header row containing Date/Time(h)/RLU")

    # detect well-label row near header
    well_labels: list[str] = []
    well_pat = re.compile(r"^[A-H]\d+$")

    for back in range(max(0, header_idx - 12), header_idx):
        ln = lines[back]
        # require at least one typical well token
        if not any(w in ln for w in ("A1", "A2", "B1", "C1", "D1")):
            continue
        try:
            row = next(csv.reader([ln]))
        except Exception:
            continue
        cand = [c.strip().strip('"') for c in row if c.strip().strip('"')]
        if any(well_pat.fullmatch(c) for c in cand):
            well_labels = [c for c in cand if (c == "Background" or well_pat.fullmatch(c))]
            break

    # read from header row onward using csv.reader
    rows: list[list[str]] = []
    with path.open("r", encoding="utf-8", errors="ignore", newline="") as f:
        reader = csv.reader(f)
        for idx, row in enumerate(reader):
            if idx < header_idx:
                continue
            rows.append(row)

    header = rows[0]
    data_rows = rows[1:]

    def clean(s: str) -> str:
        return s.strip().strip('"')

    time_cols = [i for i, c in enumerate(header) if clean(c) == "Time(h)"]
    rlu_cols = [i for i, c in enumerate(header) if clean(c) == "RLU"]
    if not time_cols or not rlu_cols or len(time_cols) != len(rlu_cols):
        raise ValueError("Unexpected header: cannot find paired Time(h)/RLU columns.")

    pairs = list(zip(time_cols, rlu_cols))

    # assign names
    if well_labels and len(well_labels) >= len(pairs):
        names = well_labels[: len(pairs)]
    else:
        names = ["Background"] + [f"Well{i}" for i in range(1, len(pairs))]

    series_by_well: Dict[str, Tuple[np.ndarray, np.ndarray]] = {}
    for name, (ti, yi) in zip(names, pairs):
        t_vals: list[float] = []
        y_vals: list[float] = []
        for r in data_rows:
            if len(r) <= max(ti, yi):
                continue
            ts = r[ti].strip()
            ys = r[yi].strip()
            if ts == "" or ys == "":
                continue
            try:
                t_vals.append(float(ts))
                y_vals.append(float(ys))
            except ValueError:
                continue

        if len(t_vals) >= 4:
            series_by_well[name] = (np.asarray(t_vals, float), np.asarray(y_vals, float))

    if not series_by_well:
        raise ValueError("No valid series extracted from CSV.")
    return series_by_well, meta


In [8]:
from pathlib import Path

def safe_filename(name: str) -> str:
    return re.sub(r"[^A-Za-z0-9_\-\.]+", "_", name.strip()) or "output"


def format_exe_report(best: PeriodRow, rows: List[PeriodRow], title: str, meta: Optional[dict] = None) -> str:
    out: List[str] = []
    out.append(title)
    if meta:
        for k in ("Plate format", "Date", "Time", "Integral time", "Interval time", "Measurement time", "Background"):
            if k in meta:
                out.append(f"{k}: {meta[k]}")
        out.append("")
    out.append("Cosinor (best period)")
    out.append(f"MESOR      {best.mesor:.6g}")
    out.append(f"Amplitude  {best.amplitude:.6g}")
    out.append(f"Acrophase  {best.acrophase_deg:.6g} deg")
    out.append(f"F          {best.f_stat:.6g}")
    out.append(f"p          {best.p_value:.6g}")
    out.append("")
    out.append("Periodogram")
    out.append("Period\tF\tp")
    for r in rows:
        out.append(f"{r.period:.6g}\t{r.f_stat:.6g}\t{r.p_value:.6g}")
    return "\n".join(out) + "\n"


inp = Path(INPUT_PATH)
outdir = Path(OUTPUT_DIR)
outdir.mkdir(parents=True, exist_ok=True)

written = []

if inp.suffix.lower() == ".csv" and looks_like_plate_reader_csv(inp):
    series_by_well, meta = parse_plate_reader_csv(inp)

    if ANALYZE_ALL_WELLS:
        chosen = dict(series_by_well)
        if EXCLUDE_BACKGROUND:
            chosen.pop("Background", None)
    else:
        # case-insensitive well match
        lut = {k.lower(): k for k in series_by_well.keys()}
        if WELL_NAME.lower() not in lut:
            raise KeyError(f"WELL_NAME '{WELL_NAME}' not found. Available: {', '.join(series_by_well.keys())}")
        key = lut[WELL_NAME.lower()]
        chosen = {key: series_by_well[key]}

    for well, (t, y) in chosen.items():
        best, rows = cosinor_periodogram_exe_style(t, y, PERIOD_START, PERIOD_END, PERIOD_STEP)
        title = f"Cosinor Periodogram Result  ({inp.name} / {well})"
        rep = format_exe_report(best, rows, title, meta=meta)
        out_path = outdir / f"{safe_filename(well)}.txt"
        out_path.write_text(rep, encoding="utf-8")
        written.append(str(out_path))

else:
    raise ValueError("この版は plate reader CSV 向けです（Time(h)/RLU のワイドCSV）。")

print("[OK] wrote:")
for p in written[:30]:
    print(" -", p)
if len(written) > 30:
    print(f" ... ({len(written)} files) total")
print("Output dir:", outdir.resolve())


[OK] wrote:
 - cosinor_results/A1.txt
 - cosinor_results/A2.txt
 - cosinor_results/A3.txt
 - cosinor_results/A4.txt
 - cosinor_results/A5.txt
 - cosinor_results/A6.txt
 - cosinor_results/B1.txt
 - cosinor_results/B2.txt
 - cosinor_results/B3.txt
 - cosinor_results/B4.txt
 - cosinor_results/B5.txt
 - cosinor_results/B6.txt
 - cosinor_results/C1.txt
 - cosinor_results/C2.txt
 - cosinor_results/C3.txt
 - cosinor_results/C4.txt
 - cosinor_results/C5.txt
 - cosinor_results/C6.txt
 - cosinor_results/D1.txt
 - cosinor_results/D2.txt
 - cosinor_results/D3.txt
 - cosinor_results/D4.txt
 - cosinor_results/D5.txt
 - cosinor_results/D6.txt
Output dir: /content/cosinor_results


In [10]:
!pip -q install pandas openpyxl

import re
import pandas as pd
from pathlib import Path

def safe_filename(name: str) -> str:
    return re.sub(r"[^A-Za-z0-9_\-\.]+", "_", name.strip()) or "output"

def format_exe_report(best: PeriodRow, rows: List[PeriodRow], title: str, meta: Optional[dict] = None) -> str:
    out: List[str] = []
    out.append(title)
    if meta:
        for k in ("Plate format", "Date", "Time", "Integral time", "Interval time", "Measurement time", "Background"):
            if k in meta:
                out.append(f"{k}: {meta[k]}")
        out.append("")
    out.append("Cosinor (best period)")
    out.append(f"Best period {best.period:.6g}")  # ★ periodも明示
    out.append(f"MESOR      {best.mesor:.6g}")
    out.append(f"Amplitude  {best.amplitude:.6g}")
    out.append(f"Acrophase  {best.acrophase_deg:.6g} deg")
    out.append(f"F          {best.f_stat:.6g}")
    out.append(f"p          {best.p_value:.6g}")
    out.append("")
    out.append("Periodogram")
    out.append("Period\tF\tp")
    for r in rows:
        out.append(f"{r.period:.6g}\t{r.f_stat:.6g}\t{r.p_value:.6g}")
    return "\n".join(out) + "\n"

# ---- run
inp = Path(INPUT_PATH)
outdir = Path(OUTPUT_DIR)
outdir.mkdir(parents=True, exist_ok=True)

written_txt = []
summary_rows = []   # ★Excel用

if inp.suffix.lower() == ".csv" and looks_like_plate_reader_csv(inp):
    series_by_well, meta = parse_plate_reader_csv(inp)

    if ANALYZE_ALL_WELLS:
        chosen = dict(series_by_well)
        if EXCLUDE_BACKGROUND:
            chosen.pop("Background", None)
    else:
        lut = {k.lower(): k for k in series_by_well.keys()}
        if WELL_NAME.lower() not in lut:
            raise KeyError(f"WELL_NAME '{WELL_NAME}' not found. Available: {', '.join(series_by_well.keys())}")
        key = lut[WELL_NAME.lower()]
        chosen = {key: series_by_well[key]}

    for well, (t, y) in chosen.items():
        # PERIOD_STEP=None のときは exe互換デフォルト(0.1h)を使う想定
        DEFAULT_EXE_STEP = 0.1
        step = DEFAULT_EXE_STEP if PERIOD_STEP is None else PERIOD_STEP

        best, rows = cosinor_periodogram_exe_style(t, y, PERIOD_START, PERIOD_END, step)

        # txt report
        title = f"Cosinor Periodogram Result  ({inp.name} / {well})"
        rep = format_exe_report(best, rows, title, meta=meta)
        out_path = outdir / f"{safe_filename(well)}.txt"
        out_path.write_text(rep, encoding="utf-8")
        written_txt.append(str(out_path))

        # ★ summary row (Excel)
        summary_rows.append({
            "Well": well,
            "BestPeriod_h": best.period,          # ★ period追加
            "MESOR": best.mesor,
            "Amplitude": best.amplitude,
            "Acrophase_deg": best.acrophase_deg, # 負値ありのまま
            "F": best.f_stat,
            "p": best.p_value,
        })

else:
    raise ValueError("この版は plate reader CSV（Time(h)/RLUのワイドCSV）向けです。")

# ---- write summary to Excel/CSV
df = pd.DataFrame(summary_rows)
# Well順に並べたい場合（A1,A2,...）: ざっくりソート
df["__row"] = df["Well"].str.extract(r"^([A-H])")[0]
df["__col"] = df["Well"].str.extract(r"(\d+)$")[0].astype(float)
df = df.sort_values(["__row","__col"]).drop(columns=["__row","__col"])

xlsx_path = outdir / "cosinor_summary.xlsx"
csv_path  = outdir / "cosinor_summary.csv"
df.to_excel(xlsx_path, index=False)
df.to_csv(csv_path, index=False)

print("[OK] wrote txt:")
for p in written_txt[:20]:
    print(" -", p)
if len(written_txt) > 20:
    print(f" ... ({len(written_txt)} files) total")

print("\n[OK] wrote summary:")
print(" -", xlsx_path)
print(" -", csv_path)

df


[OK] wrote txt:
 - cosinor_results/A1.txt
 - cosinor_results/A2.txt
 - cosinor_results/A3.txt
 - cosinor_results/A4.txt
 - cosinor_results/A5.txt
 - cosinor_results/A6.txt
 - cosinor_results/B1.txt
 - cosinor_results/B2.txt
 - cosinor_results/B3.txt
 - cosinor_results/B4.txt
 - cosinor_results/B5.txt
 - cosinor_results/B6.txt
 - cosinor_results/C1.txt
 - cosinor_results/C2.txt
 - cosinor_results/C3.txt
 - cosinor_results/C4.txt
 - cosinor_results/C5.txt
 - cosinor_results/C6.txt
 - cosinor_results/D1.txt
 - cosinor_results/D2.txt
 ... (24 files) total

[OK] wrote summary:
 - cosinor_results/cosinor_summary.xlsx
 - cosinor_results/cosinor_summary.csv


Unnamed: 0,Well,BestPeriod_h,MESOR,Amplitude,Acrophase_deg,F,p
0,A1,23.1,-22.309052,808.293978,92.529965,627.566636,6.791792e-127
1,A2,22.8,-13.60885,774.99426,86.508165,576.066863,4.164466e-121
2,A3,23.0,-19.343437,812.153505,93.73084,556.387861,8.547563000000001e-119
3,A4,23.0,-17.07276,774.236234,90.981356,544.500606,3.010812e-117
4,A5,23.2,-20.779815,769.833631,95.236053,643.319738,1.8769870000000002e-128
5,A6,23.5,-23.771572,816.87151,95.537575,636.353775,1.046382e-127
6,B1,22.9,-39.185911,966.13955,88.050678,380.353585,6.563552e-95
7,B2,22.0,-4.708984,749.757835,71.627818,302.3432,5.294991e-82
8,B3,22.3,-3.827917,791.384034,81.005117,316.342033,1.857243e-84
9,B4,22.0,0.039291,696.843926,73.5836,268.737307,7.993748e-76


In [11]:

from google.colab import files

files.download(f"{OUTPUT_DIR}/cosinor_summary.xlsx")
# ついでにCSVも欲しければ:
# files.download(f"{OUTPUT_DIR}/cosinor_summary.csv")


<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>