This notebook goes through an example showing how to use distributed_estimation with PHReg, first we generate the data.

In [None]:
import numpy as np

"""generate simulated data, taken from statsmodels PHReg tests"""

n = 500
p = 25

exog = np.random.normal(size=(5*n,p))
coef = np.random.normal(size=p) * np.random.randint(2, size=p)
lpred = np.dot(exog, coef)
expected_survival_time = np.exp(-lpred)

# Survival times are exponential
survival_time = -np.log(np.random.uniform(size=5*n))
survival_time *= expected_survival_time

# Set this to get a reasonable amount of censoring
expected_censoring_time = np.mean(expected_survival_time)

# Theses are the observation times.
censoring_time = -np.log(np.random.uniform(size=5*n))
censoring_time *= expected_censoring_time

# Entry times
entry_time = -np.log(np.random.uniform(size=5*n))
entry_time *= 0.5*expected_censoring_time

# 1=failure (death), 0=no failure (no death)
status = 1*(survival_time <= censoring_time)

# The censoring time of the failure time, whichever comes first
time = np.where(status==1, survival_time, censoring_time)

# Round time so that we have ties
time = np.around(time, decimals=1)

# Only take cases where the entry time is before the failure or
# censoring time.  Take exactly n such cases.
ii = np.flatnonzero(entry_time < time)
ii = ii[np.random.permutation(len(ii))[0:n]]
status = status[ii]
time = time[ii]
exog = exog[ii,:]
entry_time = entry_time[ii]

Now we create the data generators.  We also need a generator to produce the init_kwds (status) for each partition.

In [None]:
def _exog_gen(exog, partitions):
    """partitions exog data"""

    n_exog = exog.shape[0]
    n_part = np.ceil(n_exog / partitions)

    ii = 0
    while ii < n_exog:
        jj = int(min(ii + n_part, n_exog))
        yield exog[ii:jj, :]
        ii += int(n_part)

def _endog_gen(endog, partitions):
    """partitions endog data"""

    n_endog = endog.shape[0]
    n_part = np.ceil(n_endog / partitions)

    ii = 0
    while ii < n_endog:
        jj = int(min(ii + n_part, n_endog))
        yield endog[ii:jj]
        ii += int(n_part)
        
def _gen_init(data, init_key, partitions):
    """partitions init_kwds data"""

    n_data = data.shape[0]
    n_part = np.ceil(n_data / partitions)

    ii = 0
    while ii < n_data:
        jj = int(min(ii + n_part, n_data))
        yield {init_key : data[ii:jj]}
        ii += int(n_part)

Now we generate and fit the model

In [None]:
from statsmodels.base.distributed_estimation import _est_regularized_naive, _join_naive
from statsmodels.base.distributed_estimation import DistributedModel
from statsmodels.duration.hazard_regression import PHReg

# number of partitions
m = 5

debiased_mod = DistributedModel(m, model_class=PHReg)
debiased_fit = debiased_mod.fit(zip(_endog_gen(time, m), _exog_gen(exog, m)),
                                fit_kwds={"alpha": 0.1},
                                init_kwds_generator=_gen_init(status, "status", m))
naive_mod = DistributedModel(m, model_class=PHReg,
                             estimation_method=_est_regularized_naive,
                             join_method=_join_naive)
naive_fit = naive_mod.fit(zip(_endog_gen(time, m), _exog_gen(exog, m)),
                          fit_kwds={"alpha": 0.1},
                          init_kwds_generator=_gen_init(status, "status", m))
print(np.abs(debiased_fit.params - coef), np.abs(naive_fit.params - coef))