In [82]:
import pandas as pd
import numpy as np
from pydantic.dataclasses import dataclass
from abc import ABC, abstractmethod
from typing import Tuple

In [199]:
class Math(ABC):
    @abstractmethod
    def _latex(self):
        raise NotImplementedError()

    def latex(self):
        l = self._latex()
        if isinstance(l, str):
            return f"${self._latex()}$"
        elif isinstance(l, tuple) or isinstance(l, list):
            return "$$" + " \\\\ ".join(l) + "$$"
        else:
            raise TypeError(f"Expected _latex to return a string or tuple, got {type(l)}")
    
    def math(self):
        from IPython.display import Math
        return Math(self.latex())

class Effect(Math):
    def __add__(self, other):
        assert isinstance(other, Effect) or (other in [0, 1]) or isinstance(other, Variable)
        if other == 1:
            other = Intercept()
        elif other == 0:
            return self
        elif isinstance(other, Variable):
            other = Fixed(other)
        
        effects = []
        if isinstance(self, CompoundEffect):
            effects.extend(self.effects)
        else:
            effects.append(self)
        
        if isinstance(other, CompoundEffect):
            effects.extend(other.effects)
        else:
            effects.append(other)
        
        return CompoundEffect(tuple(effects))
    
    def __radd__(self, other):
        return self.__add__(other)
    
    def __or__(self, other):
        return CompoundEffect([self]).ref(other)
    
    def __ror__(self, other):
        return CompoundEffect([other]).ref(self)

class CompoundEffect(Effect):
    def __init__(self, effects):
        self.fixed_effects = []
        self.random_effects = []
        self.intercept = False

        effects = list(effects)
        while effects:
            effect = effects.pop()
            if isinstance(effect, CompoundEffect):
                effects.extend(effect.effects)
            elif isinstance(effect, Fixed):
                self.fixed_effects.append(effect)
            elif isinstance(effect, Random):
                self.random_effects.append(effect)
            elif effect == 1 or isinstance(effect, Intercept):
                self.intercept = True
            elif effect == 0:
                continue
            else:
                raise TypeError(f"Expected effect to be an Effect, got {type(effect)}")
        
        self.fixed_effects = tuple(sorted(self.fixed_effects))
        self.random_effects = tuple(sorted(self.random_effects))
    
    @property
    def effects(self):
        return ((Intercept(),) * self.intercept) + self.fixed_effects + self.random_effects
    
    def ref(self, other):
        assert len(self.effects) <= 2
        if isinstance(other, Fixed):
            other = other.name
        i = s = None
        for e in self.effects:
            if isinstance(e, Fixed):
                s = e.name
            elif isinstance(e, Intercept):
                i = True
            else:
                raise TypeError(f"RandomEffect must have only Fixed and Intercept on left hand side, got {type(e)}")
        return Random(group=varify(other), slope=s, intercept=not not i)
    
    def __repr__(self):
        return f"CompoundEffect({list(self.effects)})"

    def _latex(self):
        f_count = 0
        r_count = 0
        randoms = []
        ans = []
        for effect in self.effects:
            if isinstance(effect, Fixed):
                f_count += 1
                ans.append(effect._latex(f_count))
            elif isinstance(effect, Random):
                r_count += 1
                ans.append(effect._latex(r_count))
                if effect.slope:
                    randoms.append(effect._slope_latex(r_count) + "\sim Normal(0, \\sigma_{\omega," + str(r_count) + ", 1}^2)")
                if effect.intercept:
                    randoms.append(effect._slope_latex(r_count) + "\sim Normal(0, \\sigma_{\omega," + str(r_count) + ", 0}^2)")
            else:
                ans.append(effect._latex())

        return [" + ".join(ans), *randoms]

class Intercept(Effect):
    def __init__(self):
        pass
    
    def _latex(self):
        return "\\beta_0"
    
    def __repr__(self):
        return "Intercept()"

class Fixed(Effect):
    def __init__(self, name):
        self.name = varify(name)
    
    def __lt__(self, other):
        assert isinstance(other, Fixed)
        return self.name.name < other.name.name
    
    def __repr__(self):
        return f"Fixed({self.name})"

    def _latex(self, n=1):
        return "\\beta_{" + str(n) + "} \\text{" + self.name._latex() + "}"

class Random(Effect):
    def __init__(self, group, slope, intercept=True):
        assert isinstance(intercept, bool)
        self.group = varify(group)
        if slope is not None:
            self.slope = varify(slope)
        else:
            self.slope = None
        self.intercept = intercept
    
    def __lt__(self, other):
        assert isinstance(other, Random)
        return (self.group.name, self.slope.name, self.intercept) < (other.group.name, other.slope.name, other.intercept)
    
    def _slope_latex(self, n=1):
        return "\\omega_{\\text{" + self.group._latex() + "}}^{" + str(n) + ", 1}"
    
    def _intercept_latex(self, n=1):
        return "\\omega_{\\text{" + self.group._latex() + "}}^{" + str(n) + ", 0}"

    def _latex(self, n=1):
        ans = []
        if self.slope:
            slope_var = "\\text{" + self.slope._latex() + "}"
            ans.append(self._slope_latex(n) + slope_var)
        if self.intercept:
            ans.append(self._intercept_latex(n))
        return "+".join(ans)
    
    def __repr__(self):
        return f"Random(group={self.group}, slope={self.slope}, intercept={self.intercept})"

class Variable:
    def __init__(self, name):
        self.name = name

    def _latex(self):
        return "\\text{" + self.name + "}"
    
    def __repr__(self):
        return f"Variable({self.name})"
    
    def __add__(self, other):
        return Fixed(self) + other
    
    def __radd__(self, other):
        return self + other
    
    def __or__(self, other):
        return Fixed(self) | other
    
    def __ror__(self, other):
        return other | Fixed(self)

    def hat(self):
        return Outcome(self)

def varify(name):
    if isinstance(name, str):
        return Variable(name)
    if isinstance(name, Variable):
        return name
    raise TypeError(f"Expected string or Variable, got {type(name)}")

class Outcome(Math):
    def __init__(self, var):
        self.var = varify(var)

    def _latex(self):
        return self.var._latex()

    def __eq__(self, other):
        assert isinstance(other, Effect)
        return Model(self, other)

class Model(Math):
    def __init__(self, outcome, effect):
        if isinstance(outcome, Outcome):
            outcome = outcome.var
        assert isinstance(outcome, Variable)
        assert isinstance(effect, Effect)
        self.outcome = outcome
        self.effect = CompoundEffect([effect])
    
    def __repr__(self):
        return f"Model({self.outcome}, {self.effect})"

    def _latex(self):
        el = self.effect._latex()
        return [
            f"{self.outcome._latex()} = {el[0]} + \epsilon",
            "\epsilon \sim Normal(0, \\sigma_\epsilon^2)",
            *el[1:]
            ]

x = Variable("X")
g = Variable("G")
y = Variable("Y")

(y.hat() == 1 + x + (x + 1 | g)).math()

<IPython.core.display.Math object>

In [192]:
("foo",) * False

()

In [78]:
true_treat_effect = 1
true_intercept = 1
true_eps_sigma = 0.5
true_g_intercepts = np.array([1, 2, 3])
true_g_slopes = np.array([0.5, 0.75, 1])

n = 10
np.random.seed(0)

n_g = len(true_g_intercepts)
treatment = np.array([0, 1]).repeat(n // 2)
groups = np.random.choice(np.arange(n_g), n)

each_g_intercept = true_g_intercepts[groups]
each_g_effect = true_g_slopes[groups] * treatment
each_treat_effect = true_treat_effect * treatment
each_noise = np.random.normal(0, true_eps_sigma, n)

outcome = true_intercept + each_treat_effect + each_g_effect + each_g_intercept + each_noise

df = pd.DataFrame({
    "Y": outcome,
    "X": treatment,
    "G": groups
})
df

Unnamed: 0,Y,X,G
0,1.913465,0,0
1,2.119174,0,1
2,1.956163,0,0
3,3.68344,0,1
4,3.562657,0,1
5,5.820502,1,2
6,4.110304,1,0
7,5.330252,1,2
8,3.714187,1,0
9,3.438268,1,0


In [80]:
# make general model
s, i = np.polyfit(df["X"], df["Y"], 1)
print(s, i)
resid = df["Y"] - (s * df["X"] + i)
print(resid)

resid = np.zeros(n)
for g in df["G"].unique():
    print("g =", g)
    mask = df["G"] == g
    s_g, i_g = np.polyfit(df[mask]["X"], df[mask]["Y"], 1)
    print("\t", s_g, i_g)
    g_resid = df[mask]["Y"] - (s_g * df[mask]["X"] + i_g)
    print("\t", g_resid)
    mask_ind = np.where(mask)[0]
    resid[mask_ind] = g_resid


1.8357228076984635 2.6469799139311707
0   -0.733515
1   -0.527806
2   -0.690816
3    1.036460
4    0.915677
5    1.337800
6   -0.372399
7    0.847550
8   -0.768516
9   -1.044434
dtype: float64
g = 0
	 1.819438718818719 1.9348143341113473
	 0   -0.021349
2    0.021349
6    0.356051
8   -0.040066
9   -0.315985
dtype: float64
g = 1
 ** On entry to DLASCLS parameter number  4 had an illegal value
 ** On entry to DLASCLS parameter number  4 had an illegal value
 ** On entry to DLASCLS parameter number  4 had an illegal value
 ** On entry to DLASCLS parameter number  4 had an illegal value
 ** On entry to DLASCLS parameter number  5 had an illegal value
 ** On entry to DLASCLS parameter number  4 had an illegal value


  lhs /= scale


LinAlgError: SVD did not converge in Linear Least Squares

In [71]:
true_g_slopes.mean() + true_treat_effect

1.75

In [76]:
np.where(np.array([True, False, True]))

(array([0, 2]),)