In [None]:
from pathlib import Path
drive_dir = Path("../")

import sys
import os
module_path = os.path.abspath(str(drive_dir))
sys.path.insert(0, module_path)

data_dir = Path(drive_dir, "data-test/")
results_dir = Path(data_dir, "results/predict-286-gutenberg")

import katspace as ks
import katspace.core

from katspace.data import results_from_json

import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
import itertools
import seaborn as sns

import statsmodels.api as sm

from functools import reduce


In [3]:
label2id = {"perceived_space": 0, "action_space": 1, "visual_space": 2, "descriptive_space":3, "no_space":4}
id2label = dict(map(reversed, label2id.items()))
space_types = label2id.keys()
id_matrix = np.eye(len(label2id), dtype = np.integer)

In [None]:
def load_all_results_f_json(): 

    label2id = {"perceived_space": 0, "action_space": 1, "visual_space": 2, "descriptive_space":3, "no_space":4}
    id_matrix = np.eye(len(label2id), dtype = np.integer)
    
    gutenberg_df = pd.read_excel(Path(data_dir, "20231216_1_gutenberg_master.xlsx"), index_col = 0)

    file_list = gutenberg_df.filename
    results_dic = {filename : results_from_json(filename, results_dir) for filename in file_list}

    num_chunks = 20
    results_dic = {filename : res for filename, res in results_dic.items() if (res != None) and (len(res) >= num_chunks)}

    file_list = results_dic.keys()

    space_type_ids_dict = {filename : [label2id[result["label"]] for result in results_dic[filename]] for filename in file_list}
    results_a_dict = {filename : id_matrix[space_type_ids_dict[filename]] for filename in file_list}

    return results_a_dict, file_list, gutenberg_df
    
def chunk_data(results_a_dict, num_chunks = 20): 
    chunk_length_dict = {filename : ks.data.chunk_lengths(len(results_a_dict[filename]), num_chunks) for filename in file_list}
    results_chunked_dict = {filename : list(map(sum, ks.data.chunker(results_a_dict[filename], num_chunks = num_chunks))) for filename in file_list}
    return chunk_length_dict, results_chunked_dict

def mk_time_df(df, exact = False, filename = None):
    time_df = pd.DataFrame()
    if not exact: 
        time_tf = np.pi * (df.index / df.index.max())
        time_df["time_tf"] = time_tf 
    elif exact: 
        time_tf = df[(filename, "time_tf_n")]
        time_df["time_tf"] = time_tf

    time_df["sin"] = np.sin(time_tf)
    time_df["cos"] = np.cos(time_tf)

    time_df["sin2"] = np.sin(2*time_tf)
    time_df["cos2"] = np.cos(2*time_tf)
    return time_df

In [5]:
results_a_dict, file_list, gutenberg_df = load_all_results_f_json()

In [7]:
num_chunks = 20
chunk_length_dict, results_chunked_dict = chunk_data(results_a_dict) 

In [None]:
data_dict = {
    (filename, space_type) :
                            [result[label2id[space_type]] for result in results_chunked_dict[filename]]
                              for (filename, space_type) in itertools.product(file_list, space_types)
                              } | {
                                  (filename, "all_space") : [total - result[label2id["no_space"]] for result, total in 
                                                             zip(results_chunked_dict[filename], chunk_length_dict[filename])]
                                                            for filename in file_list
                              } | {
                                  (filename, "total") : chunk_length_dict[filename] for filename in file_list
                                  } 

for filename in file_list:
    bin_sizes = data_dict[(filename, "total")]
    sent_count = [sum(bin_sizes[0:n + 1]) for n in range(len(bin_sizes))]
    data_dict[(filename, "sent_count")] = sent_count

index = pd.MultiIndex.from_product([file_list, list(katspace.core.space_types_ext) + ["total", "sent_count"]])
results_df = pd.DataFrame(data_dict, columns = index)

results_df

In [None]:
temp_dict = {}
for filename, space_type in results_df.columns: 
    col = space_type + "_n"   
    temp_dict[(filename, col)] = results_df.loc[:,(filename, space_type)] / results_df.loc[:,(filename, "total")]


normed_df = pd.DataFrame(temp_dict)

normed_df.sort_index(axis = "columns", inplace = True)
normed_df

In [None]:
time_df = mk_time_df(results_df)

def fit(normed_df, space_type, time_df = time_df): 
    x = pd.DataFrame()
    y = pd.Series()

    for filename in normed_df.columns.get_level_values(0).unique():
        x = pd.concat([x, time_df[['time_tf', "sin", "cos", "sin2", "cos2"]]], axis = 0)
        y = pd.concat([y, normed_df[(filename, space_type)]], axis = 0)
    
    xx = sm.add_constant(x) 
    time_tf = np.pi * np.arange(1,len(normed_df.index) + 1) / len(normed_df.index)
    time_tf = sm.add_constant(time_tf)
    
    model = sm.OLS(y, xx).fit()
    
    print_model = model.summary()
    print(print_model)
    return x, y, model

def predict(model, time_df = time_df): 
    time_df = sm.add_constant(time_df) 
    predictions = model.predict(time_df) 
    return time_df["time_tf"], predictions

In [None]:
sns.set_style("whitegrid")
sns.set_theme()

x, y, model = fit(normed_df, "all_space_n")

x_pred, predictions = predict(model, time_df)

err = normed_df.loc[:,(slice(None), "all_space_n")].var(axis = 1)
mean = normed_df.loc[:,(slice(None), "all_space_n")].mean(axis = 1)

plt.scatter(x["time_tf"], y, label = "data", s = 1, color = "#c44e52")
plt.plot(x_pred, predictions, label = "model", color = "#dd8452")
plt.errorbar(x["time_tf"][0:20], mean, yerr=err, uplims= True, lolims= True, linestyle='none', label = "mean (variance)", color = "#9467bd")
plt.legend()
_ = plt.title("all_space")