In [80]:
import pandas as pd
import numpy as np
import scipy.stats as stats
import matplotlib.pyplot as plt
from typing import List, Tuple

In [81]:
active_counts_per_epoch_min = 500
minimum_bout_length = 10
maximum_number_consec_inactive_epochs_in_bout = 3
min_non_wearing_length = 20 * 2  # Assuming 30 second epochs
filepath = "~/Desktop/Desktop/epidemiology_PhD/R/packages/walkboutr/tests/fixtures/"

In [85]:
ActivityEpoch = Tuple[int, int, int]


# General purpose activity sequence builders
def make_active_period(length: int = 1, is_bout: bool = True) -> ActivityEpoch:
    return [(active_counts_per_epoch_min, int(is_bout), 0)]*length

def make_inactive_period(length: int = 1, is_bout: bool = False, non_wearing: bool = False, complete_day: bool = False) -> ActivityEpoch:
    return [(0, int(is_bout), int(non_wearing), bool(complete_day))]*length

def add_date_and_format(counts_and_labels: List[ActivityEpoch]) -> pd.DataFrame:
    dates = pd.date_range(start='2012-04-07',freq="30S", periods = len(counts_and_labels))
    df = pd.DataFrame(data=counts_and_labels, columns=['activity_counts', 'bout', 'non_wearing', 'complete_day'])
    df['time'] = dates
    return df

# Higher level helpers

def make_smallest_bout_window() -> List[ActivityEpoch]:
    return make_active_period(minimum_bout_length)

def make_non_bout_window() -> List[ActivityEpoch]:
    return make_inactive_period(maximum_number_consec_inactive_epochs_in_bout + 1)

def make_smallest_nonwearing_window() -> List[ActivityEpoch]:
    return make_inactive_period(min_non_wearing_length, non_wearing=True)


# Test set makers

def make_smallest_bout():
    counts = (
        make_non_bout_window()
        + make_smallest_bout_window()
        + make_non_bout_window()
    )
    return add_date_and_format(counts)

def make_smallest_bout_with_largest_inactive_period():
    nbw = make_non_bout_window()
    inactive_period = make_inactive_period(maximum_number_consec_inactive_epochs_in_bout,is_bout=True)
    sbw = make_smallest_bout_window()
    halfway = len(sbw) // 2
    
    counts = (
        nbw 
        + sbw[:halfway] + inactive_period + sbw[halfway:]
        + nbw
    )
    return add_date_and_format(counts)

def make_smallest_bout_with_smallest_non_wearing_period():
    counts = (
        make_non_bout_window()
        + make_smallest_bout_window()
        + make_smallest_nonwearing_window()
    )
    return add_date_and_format(counts)
    


In [86]:
make_smallest_bout().to_csv(f'{filepath}smallest_bout.csv', index=False)
make_smallest_bout_with_largest_inactive_period().to_csv(f'{filepath}smallest_bout_with_largest_inactive_period.csv', index=False)
make_smallest_bout_with_smallest_non_wearing_period().to_csv(f'{filepath}smallest_bout_with_smallest_non_wearing_period.csv', index=False)

In [61]:
def generate_bout_activity_vector(bout_length: int,
                                 min_active_epochs: int, 
                                 seed: int = 12345):
    np.random.seed(seed)
    if bout_length < min_active_epochs: 
        raise ValueError("Bout length must be greater than number of minimum active epochs")
    dist = stats.bernoulli(p=.5)
    activity_sequence = [0,0,0,0,1]
    zero_counter = 0
    bout_length = bout_length - 2
    min_active_epochs = min_active_epochs - 2
    for i in range(bout_length):
        next_epoch = dist.rvs()
        if next_epoch == 0: 
            zero_counter = zero_counter + 1 
            if zero_counter == 4:
                next_epoch = 1 
                zero_counter = 0
        else: 
            zero_counter = 0
            min_active_epochs = min_active_epochs - 1
            if min_active_epochs == bout_length - i:
                remainder = [1]*(bout_length-i)
                activity_sequence.extend(remainder)
                break
        activity_sequence.append(next_epoch)
        
    activity_sequence.extend([1,0,0,0,0])
    return activity_sequence

In [63]:
# %%timeit
np.array(generate_bout_activity_vector(100, 14))

array([0, 0, 0, 0, 1, 1, 0, 0, 0, 1, 1, 1, 1, 1, 1, 1, 1, 0, 0, 0, 1, 1,
       1, 1, 1, 1, 1, 0, 0, 0, 1, 1, 1, 1, 0, 0, 1, 1, 0, 0, 1, 1, 0, 1,
       1, 1, 1, 1, 0, 0, 0, 1, 0, 1, 0, 1, 0, 1, 1, 1, 0, 0, 0, 1, 1, 0,
       0, 1, 0, 0, 1, 1, 1, 1, 1, 1, 0, 0, 1, 1, 0, 0, 1, 0, 1, 1, 1, 0,
       1, 0, 0, 0, 1, 1, 0, 1, 1, 1, 1, 0, 1, 0, 1, 1, 0, 0, 0, 0])

In [64]:
def generate_non_bout_activity_vector(vector_length: int,
                                      min_active_epochs:int,
                                      seed: int = 12345): 
    np.random.seed(seed)
    bout_break = [0,0,0,0]
    dist = stats.bernoulli(.75)
    activity_vector = list()
    while len(activity_vector) < vector_length:
        mini_activity_sequence = list(dist.rvs(min_active_epochs - 1))
        activity_vector.extend(mini_activity_sequence + bout_break)
    return activity_vector[:vector_length]
    

In [65]:
np.array(generate_non_bout_activity_vector(33, 10, 12345))

array([0, 1, 1, 1, 1, 1, 0, 1, 1, 0, 0, 0, 0, 1, 1, 0, 1, 1, 1, 1, 0, 0,
       0, 0, 0, 0, 0, 1, 1, 1, 1, 1, 1])

In [None]:
# def generate_person()

In [None]:
# def generate_dataset()

In [None]:
# format: assign times and accelemotry values to each of these 0s and 1s 

In [None]:
# final result: Dataset with X number of people, each with Y number of bouts and Z number of nonbouts, 
# that can be run through the walkbout code to label bouts