# Score Morning Light
See what morning light does and if I can produce an aggregate score.

In [None]:
%reload_ext autoreload
%autoreload 2
import numpy as np
import pandas as pd
import run_yasa
import logging
import mne
import yasa
import os
import argparse
import os

import mne
import numpy as np
import pandas as pd
from brainflow.board_shim import BoardShim, BoardIds
from brainflow.data_filter import DataFilter

log = lambda msg: logging.info(msg)


# Load data

In [None]:
input_dir = "C:\\dev\\play\\brainwave-data"
stats_df = pd.read_csv(input_dir + os.path.sep + "stats.csv")


In [None]:

from sleep_events import connect_to_firebase

# def load_sleep_events(log, start_date, end_date, waking_start_time_tz, waking_end_time_tz):
db = connect_to_firebase()

docs = db.collection('daysExperimental').stream()

# Convert to list of dictionaries
records = [doc.to_dict() for doc in docs]

days = pd.DataFrame(records)
days

In [None]:
import pandas as pd
from pandas import json_normalize
import json

df = days
# Assuming yasa_df is your DataFrame and 'ML' is the column containing JSON data
json_column = 'ml'

# Explode the JSON column into separate columns
exploded_df = json_normalize(df[json_column])

# Merge the new DataFrame with the 'dayAndNightOf' column
result_df = pd.concat([df['dayAndNightOf'], exploded_df], axis=1)

# Drop all other columns except 'dayAndNightOf' and the new columns from the JSON data
result_df = result_df.loc[:, ~result_df.columns.duplicated()]


In [None]:
result_df

# Add score

I'm trying to get the last evening peak (LEP) of my core body temperature to 21:45.  If the LEP is close to there and doesn't move that's good.  If the LEP is far away but moves towards the target (particularly rapidly) that is also good.  How to come up with a single score that represents both goals?

In [None]:
def calculate_lep_score(current_lep_ssm, lep_delta_seconds, target_lep_ssm=78300):  # 21:45 = 21*3600 + 45*60 = 78300
    # Constants
    SECONDS_PER_DAY = 24 * 60 * 60
    MAX_DIFFERENCE = SECONDS_PER_DAY // 12  # Maximum difference is 2 hours

    # Calculate proximity score (0 to 100)
    current_difference = min(abs(current_lep_ssm - target_lep_ssm),
                             SECONDS_PER_DAY - abs(current_lep_ssm - target_lep_ssm))
    proximity_score = 100 * (1 - current_difference / MAX_DIFFERENCE)

    # Calculate improvement score (-100 to 100)
    target_change = (target_lep_ssm - (current_lep_ssm - lep_delta_seconds) + SECONDS_PER_DAY // 2) % SECONDS_PER_DAY - SECONDS_PER_DAY // 2

    if lep_delta_seconds == 0 or target_change == 0:
        improvement_score = 0
    elif (lep_delta_seconds > 0) == (target_change > 0):  # Moving in the right direction
        improvement_score = min(100, abs(lep_delta_seconds / target_change) * 100)
    else:  # Moving in the wrong direction
        improvement_score = -min(100, abs(lep_delta_seconds / target_change) * 100)

    # Combine scores (adjust weights as needed)
    proximity_weight = 0.6
    improvement_weight = 0.4
    final_score = proximity_weight * proximity_score + improvement_weight * improvement_score

    return final_score

# Helper function to convert time to SSM
def time_to_ssm(hours, minutes, seconds=0):
    return hours * 3600 + minutes * 60 + seconds

# Helper function to convert SSM to time string
def ssm_to_time_string(ssm):
    hours, remainder = divmod(ssm, 3600)
    minutes, seconds = divmod(remainder, 60)
    return f"{hours:02d}:{minutes:02d}:{seconds:02d}"

# Example usage
current_lep_ssm = time_to_ssm(22, 30)  # 22:30:00
lep_delta_seconds = -1800  # Moved 30 minutes earlier (negative because it's closer to target)
target_lep_ssm = time_to_ssm(21, 45)  # 21:45:00
score = calculate_lep_score(current_lep_ssm, lep_delta_seconds, target_lep_ssm)
print(f"Current LEP: {ssm_to_time_string(current_lep_ssm)}")
print(f"Delta: {lep_delta_seconds} seconds")
print(f"Target LEP: {ssm_to_time_string(target_lep_ssm)}")
print(f"LEP Score: {score:.2f}")

# Additional examples
print("\nAdditional Examples:")
print(f"Score for 21:45:00 (no change): {calculate_lep_score(time_to_ssm(21, 45), 0):.2f}")
print(f"Score for 22:00:00 (15 min later): {calculate_lep_score(time_to_ssm(22, 0), 900):.2f}")
print(f"Score for 21:30:00 (15 min earlier): {calculate_lep_score(time_to_ssm(21, 30), -900):.2f}")
print(f"Score for 23:00:00 (30 min later): {calculate_lep_score(time_to_ssm(23, 0), 1800):.2f}")
print(f"Score for 21:45:00 (at target, moved 15 min later): {calculate_lep_score(time_to_ssm(21, 45), 900):.2f}")

In [None]:
import datetime

def apply_lep_score(row):
    return calculate_lep_score(row['circadian:combined:entries:LEP:datetimeSSM'], row['circadian:combined:entries:LEP:datetimeSSM:vsDayMinus1'])

# Apply the function to each row
result_df['lep_score'] = result_df.apply(apply_lep_score, axis=1)

def ssm_to_hhmm(ssm):
    if pd.isna(ssm):
        return None
    ssm = int(ssm)  # Convert to integer
    hours, remainder = divmod(ssm, 3600)
    minutes, _ = divmod(remainder, 60)
    return f"{hours:02d}:{minutes:02d}"

result_df['LEP_HHMM'] = result_df['circadian:combined:entries:LEP:datetimeSSM'].apply(ssm_to_hhmm)


# Display the updated DataFrame
results = result_df[['circadian:combined:entries:LEP:datetimeSSM', 'circadian:combined:entries:LEP:datetimeSSM:vsDayMinus1', 'LEP_HHMM', 'lep_score']]
results[~results['lep_score'].isna()]

# Prepare data

In [None]:
# 21:45 should get us asleep about 22:15 and waking about 06:15
# It's kind of arbritrary but it's a good starting point
ideal_lep_ssm = 21*60*60 + 45*60
result_df['lep_mins_from_ideal_lep'] = (result_df['circadian:basic:entries:LEP:datetimeSSM'] - ideal_lep_ssm) / 60
result_df['lep_mins_from_ideal_lep'].describe()

In [None]:
[col for col in result_df.columns if
 ("sun" in col or "LEP" in col or "lightDuringFirstTimeOutside" in col) and "DayMinus" not in col]

In [None]:
interesting_causes = ['sunrise:sunsetEndSSM',
                      'sunExposure:firstEnteredOutsideSSM',
                      'sunExposureCombined:betweenWakeAndFirstSun',
                      'sunExposure:totalTimeOutsideSecs',
                      'sunExposure:firstDurationOutside',
                      'events:shower:lastSSM',
                      'sunExposure:lightDuringFirstTimeOutside',
                      ]

interesting_effects = ['circadian:basic:entries:LEP:prominence',
                       'circadian:combined:entries:LEP:temp',
                       'circadian:basic:entries:LEP:datetimeSSM',
                       'circadian:basic:entries:LEP:datetimeSSM:vsDayMinus1',
                       'lep_mins_from_ideal_lep'
                       ]

filtered_df = result_df[interesting_causes + interesting_effects]
filtered_df

In [None]:
filtered_df[~filtered_df['circadian:basic:entries:LEP:datetimeSSM'].isna()]['lep_mins_from_ideal_lep'].plot()

In [None]:
filtered_df['lep_mins_from_ideal_lep'].describe()

In [None]:
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.linear_model import LinearRegression
from sklearn.feature_selection import RFE
from sklearn.metrics import mean_squared_error

# Assuming result_df is your DataFrame
X = result_df.drop(columns=['lep_mins_from_ideal_lep'])
y = result_df['lep_mins_from_ideal_lep']

# Convert non-numeric columns to numeric where possible, and drop the rest
X = X.apply(pd.to_numeric, errors='coerce')
X = X.dropna(axis=1, how='any')

# Drop rows where y is NaN
X = X[~y.isna()]
y = y.dropna()

# Split the data into training and testing sets
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

# Initialize the model
model = LinearRegression()

# Use RFE for feature selection
selector = RFE(model, n_features_to_select=10, step=1)
selector = selector.fit(X_train, y_train)

# Get the selected features
selected_features = X_train.columns[selector.support_]

# Train the model with selected features
model.fit(X_train[selected_features], y_train)

# Predict and evaluate
y_pred = model.predict(X_test[selected_features])
mse = mean_squared_error(y_test, y_pred)
rmse = np.sqrt(mse)
print(f"Mean Squared Error: {mse}")
print(f"Root Mean Squared Error: {rmse}")

# Display the selected features
print("Selected features:", selected_features)

In [None]:
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt



# Compute the correlation matrix
correlation_matrix = filtered_df.corr()

# Filter the correlation matrix to include only the correlations between causes and effects
correlation_matrix = correlation_matrix.loc[interesting_causes, interesting_effects]

# Plot the correlation matrix
plt.figure(figsize=(12, 8))
sns.heatmap(correlation_matrix, annot=True, cmap='coolwarm', center=0)
plt.title('Correlation Matrix between Causes and Effects')
plt.show()

In [None]:
import pandas as pd
import matplotlib.pyplot as plt

# Filter columns that start with 'weather:' or 'sunrise:'
filtered_columns = [col for col in result_df.columns if col.startswith('weather:') or col.startswith('sunrise:')]
filtered_columns.append('sunExposure:lightDuringFirstTimeOutside')

# Compute the correlation matrix
correlation_matrix = result_df[filtered_columns].corr()

# Get the correlation of 'lep_mins_from_ideal_lep' with other features
lep_correlation = correlation_matrix['sunExposure:lightDuringFirstTimeOutside'].drop('sunExposure:lightDuringFirstTimeOutside')

# Sort the correlations
lep_correlation = lep_correlation.sort_values()

# Plot the correlations
plt.figure(figsize=(14, 10))
lep_correlation.plot(kind='barh', color='skyblue')
plt.xlabel('Correlation')
plt.ylabel('Feature')
plt.title('Correlation of sunExposure:lightDuringFirstTimeOutside with weather: and sunrise: features')
plt.grid(True)
plt.show()

In [None]:
import pandas as pd
import matplotlib.pyplot as plt

# Filter the relevant columns
filtered_df = result_df[['sunExposure:lightDuringFirstTimeOutside', 'sunrise:sunsetDurationSecs']]

# Drop rows with missing values
filtered_df = filtered_df.dropna()

# Create a scatter plot with swapped axes
plt.figure(figsize=(10, 6))
plt.scatter(filtered_df['sunrise:sunsetDurationSecs'], filtered_df['sunExposure:lightDuringFirstTimeOutside'], color='skyblue')
plt.xlabel('sunrise:sunsetDurationSecs')
plt.ylabel('sunExposure:lightDuringFirstTimeOutside')
plt.title('sunrise:sunsetDurationSecs vs sunExposure:lightDuringFirstTimeOutside')
plt.grid(True)
plt.show()

# Train basic linear regression model

In [None]:
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.linear_model import LinearRegression
from sklearn.feature_selection import RFE
from sklearn.metrics import mean_squared_error

# Assuming result_df is your DataFrame
X = result_df.drop(columns=['lep_mins_from_ideal_lep'])
y = result_df['lep_mins_from_ideal_lep']

# Convert non-numeric columns to numeric where possible, and drop the rest
X = X.apply(pd.to_numeric, errors='coerce')
X = X.dropna(axis=1, how='any')

# Drop rows where y is NaN
X = X[~y.isna()]
y = y.dropna()

# Split the data into training and testing sets
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

# Initialize the model
model = LinearRegression()

# Use RFE for feature selection
selector = RFE(model, n_features_to_select=10, step=1)
selector = selector.fit(X_train, y_train)

# Get the selected features
selected_features = X_train.columns[selector.support_]

# Train the model with selected features
model.fit(X_train[selected_features], y_train)

# Predict and evaluate
y_pred = model.predict(X_test[selected_features])
mse = mean_squared_error(y_test, y_pred)
rmse = np.sqrt(mse)
print(f"Mean Squared Error: {mse}")
print(f"Root Mean Squared Error: {rmse}")

# Display the selected features
print("Selected features:", selected_features)

# Train Catboost model

In [None]:
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_squared_error
from catboost import CatBoostRegressor

# Assuming result_df is your DataFrame
X = result_df.drop(columns=['lep_mins_from_ideal_lep'])
y = result_df['lep_mins_from_ideal_lep']

# Convert non-numeric columns to numeric where possible, and drop the rest
X = X.apply(pd.to_numeric, errors='coerce')
X = X.dropna(axis=1, how='any')

# Drop rows where y is NaN
X = X[~y.isna()]
y = y.dropna()

# Split the data into training and testing sets
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

# Initialize the CatBoost model
model = CatBoostRegressor(
    eval_metric='RMSE',  # Root Mean Squared Error
    loss_function='RMSE',  # Loss function for regression
    iterations=1000,  # Number of boosting iterations
    learning_rate=0.03,  # Learning rate
    depth=6,  # Depth of the tree
    l2_leaf_reg=3,  # L2 regularization term on weights
    early_stopping_rounds=50  # Early stopping rounds
)

# Train the model
model.fit(X_train, y_train, eval_set=(X_test, y_test), verbose=100)

# Predict and evaluate
y_pred = model.predict(X_test)
mse = mean_squared_error(y_test, y_pred)
rmse = np.sqrt(mse)
print(f"Mean Squared Error: {mse}")
print(f"Root Mean Squared Error: {rmse}")

# Display the feature importances
feature_importances = model.get_feature_importance()
selected_features = X_train.columns[feature_importances > 1]
print("Selected features:", selected_features)

In [None]:
import pandas as pd
import matplotlib.pyplot as plt

def importances(md):
    # Get feature importances
    feature_importances = model.get_feature_importance()

    # Create a DataFrame to display the feature importances
    feature_names = X_train.columns
    importance_df = pd.DataFrame({
        'Feature': feature_names,
        'Importance': feature_importances
    })
    importance_df = importance_df[importance_df['Importance'] > 1]

    # Sort the DataFrame by importance
    importance_df = importance_df.sort_values(by='Importance', ascending=False)

    # Plot the feature importances
    plt.figure(figsize=(12, 8))
    plt.barh(importance_df['Feature'], importance_df['Importance'], color='skyblue')
    plt.xlabel('Importance')
    plt.ylabel('Feature')
    plt.title(md.name + ' Feature Importances')
    plt.gca().invert_yaxis()  # Invert y-axis to have the most important feature at the top
    plt.show()

importances(None)