# dynamic

> This code helps users automatically save, timestamp, and eventually source trace a specific set of data for publication.

In [None]:
#| default_exp dynamic

In [2]:
#| hide
from nbdev.showdoc import *

In [None]:
#| export
import datetime
import os
import sys
import platform
from dotenv import load_dotenv
from pathlib import Path
import functools

load_dotenv()
def set_default_dir():
    print('Setting reproduce.work config dir to ./reproduce')
    return Path("./reproduce")

reproduce_dir = os.getenv("REPROWORKDIR", set_default_dir())
dev_image_tag = os.getenv("REPRODEVIMAGE")

def read_base_config():
    with open(Path(reproduce_dir, 'config.toml'), 'r') as f:
        base_config = toml.load(f)
    return base_config

def update_watched_files(add=[], remove=[]):
    base_config = read_base_config()
    existing_files = base_config['repro']['files']['watch']
    new_files = existing_files + [a for a in add if a not in existing_files]
    new_files = [f for f in new_files if f not in remove]
    base_config['repro']['files']['watch'] = new_files

    current_develop_script = base_config['repro']['stage']['develop']['script']
    current_develop_script
    # regex to replace content in string matching 'watcher \"{to_replace}\"'
    # with 'watcher \"{new_files}\"'
    # and replace 'build_cmd' with 'python reproduce_work.build()'
    import re
    new_develop_script = re.sub(
        r'watcher \"(.*?)\"', 
        f'watcher \"{",".join(new_files)}\"', 
        current_develop_script
    )
    base_config['repro']['stage']['develop']['script'] = new_develop_script

    with open(Path(reproduce_dir, 'config.toml'), 'w') as f:
        toml.dump(base_config, f)
        
    if base_config['repro']['verbose']:
        print(f"Updated watched files to {new_files}")
    return new_files

def validate_base_config(base_config):
    required_keys = ['authors', 'repro']
    for key in required_keys:
        if key not in base_config:
            print(toml.dumps(base_config))
            print(f"Error: Missing required field '{key}' in config.toml")
            return False
        if key=='repro':
            if 'stages' not in base_config['repro']:
                print(f"Error: Missing required field 'repro.stages' in reproduce.work configuration at {reproduce_dir}/config.toml")
                return False
            for stage in base_config['repro']['stages']:
                if (f'repro.stage.{stage}' not in base_config) and (stage not in base_config['repro']['stage']):
                    print(toml.dumps(base_config))
                    print(f"Error: Missing required field repro.stage.{stage} in reproduce.work configuration at {reproduce_dir}/config.toml")
                    return False
    return True

def requires_config(func):
    def wrapper(*args, **kwargs):
        config = read_base_config()
        if not validate_base_config(config):
            raise Exception("Your reproduce.work configuration is not valid.")
        return func(*args, **kwargs)
    return wrapper


VAR_REGISTRY = {
    'REPROWORK_REMOTE_URL': None,
    'REPROWORK_ACTIVE_NOTEBOOK': None
}

Setting reproduce.work config dir to ./reproduce


In [None]:
#| hide
def reproducible_old(var_assignment_func):
    """
    A decorator to register the line number and timestamp when a variable is assigned.
    """
    @functools.wraps(var_assignment_func)
    def wrapper(*args, **kwargs):
        # Extract value and var_name from args
        # Assumes the decorated function always takes at least two arguments: value and var_name
        value, var_name = args[0], args[1]

        # Extract metadata from kwargs or default to an empty dictionary
        metadata = kwargs.get('metadata', {})

        # Get the current frame and line number
        frame = inspect.currentframe()
        line_number = frame.f_back.f_lineno

        # Get the current timestamp
        timestamp = datetime.datetime.now().isoformat()

        # Get the filename of the caller
        filename = frame.f_back.f_code.co_filename

        # Execute the variable assignment function
        result = var_assignment_func(*args, **kwargs)

        # Register the variable name, line number, timestamp, and filename
        VAR_REGISTRY[var_name] = {
            "type": "string",
            "timestamp": timestamp,
        }

        if type(value) is not str:
            value = str(value)
            print(f"WARNING: value of {var_name} was not a string. Converted to string: {value}.")

        VAR_REGISTRY[var_name]['value'] = value

        metadata.update(VAR_REGISTRY[var_name])

        config = read_base_config()

        # check if dynamic file exists
        if not os.path.exists(Path(config['repro']['files']['dynamic'])):
            with open(Path(config['repro']['files']['dynamic']), 'w') as file:
                file.write(toml.dumps({}))
        with open(Path(config['repro']['files']['dynamic']), 'r') as file:
            dynamic_data = toml.load(file)

        dynamic_data[var_name] = metadata

        with open(Path(config['repro']['files']['dynamic']), 'w') as file:
            toml.dump(dynamic_data, file, encoder=ReproduceWorkEncoder())

        return result
    return wrapper

#@reproducible
#def publish_variable(value, var_name, metadata={}):
#    globals()[var_name] = value

In [None]:
#| export
from pathlib import Path
import hashlib
import inspect
import re
import toml
import io
import pandas as pd
import numpy as np

#def update_registry(var_name, value):
    

def get_cell_index():
    """
    Get the current cell index in a Jupyter notebook environment.
    If not in Jupyter, return None.
    """
    try:
        # Execute JavaScript to get the current cell index
        get_ipython().run_cell_magic('javascript', '', 'IPython.notebook.kernel.execute(\'current_cell_index = \' + IPython.notebook.get_selected_index())')
        return current_cell_index
    except:
        return None
    
def check_for_defintion_in_context(function_name='save'):
    assert function_name in ['save', 'assign'], "function_name must be either 'save' or 'assign'"
    
    from IPython import get_ipython
    ip = get_ipython()

    # Check if in Jupyter environment
    if ip is None:
        
        #fill this in 
        pass

    else:
        # Get the input history
        #lineno = inspect.stack()[0].lineno
        raw_hist = ip.history_manager.input_hist_raw
        current_cell = raw_hist[-1]


        matches = re.findall(rf"{function_name}\((.+?),", current_cell)
                
        if matches:
            # save call
            defined_var = matches[0].strip()
            definition_cell_content = ''
            
            for prior_cell in raw_hist[-2::-1]:
                #print(prior_cell)
                if f'{defined_var} =' in prior_cell or f'{defined_var}=' in prior_cell:
                    definition_cell_content = prior_cell
                    break
            
            # find the line number of the where the variable was defined
            # Give a window of 5 lines around the definition call
            def_cell_lines = definition_cell_content.split('\n')
            if len(def_cell_lines)>0:
                lineno = None
                for line_num, line in enumerate(def_cell_lines):
                    if defined_var in line:
                        lineno = line_num
                        break
                if lineno:
                    definition_context = (
                        '\n'.join(def_cell_lines[max(0, lineno-5):lineno]) + 
                        '\nFLAG' + def_cell_lines[lineno] + '\n' +
                        '\n'.join(def_cell_lines[lineno+1:min(len(def_cell_lines), lineno+5)])
                    )
                else:
                    definition_context = None

            else:
                definition_context = None

            
            save_cell_lines = current_cell.split('\n')
            if len(save_cell_lines)>0:
                save_lineno = None
                for line_num, line in enumerate(save_cell_lines):
                    if 'save(' in line:
                        save_lineno = line_num
                        break
                
                if save_lineno:
                    save_context = (
                        '\n'.join(save_cell_lines[max(0, save_lineno-5):save_lineno]) + 
                        '\nFLAG' + save_cell_lines[save_lineno] + '\n' +
                        '\n'.join(save_cell_lines[save_lineno+1:min(len(save_cell_lines), save_lineno+5)])
                    )
                else:
                    save_context = None
                
            else:
                save_context = None
            

        else:
            # not a save call
            save_context = None
            definition_context = None

        return(save_context, definition_context)


class ReproduceWorkEncoder(toml.TomlEncoder):
    def dump_str(self, v):
        """Encode a string."""
        if "\n" in v:
            return v  # If it's a multi-line string, return it as-is
        return super().dump_str(v)
    
    def dump_value(self, v):
        """Determine the type of a Python object and serialize it accordingly."""
        if isinstance(v, str) and "\n" in v:
            return '"""\n' + v.strip() + '\n' + '"""'
        return super().dump_value(v)


def serialize_to_toml(data, root=True):
    """Unified function to serialize various Python data types to TOML format."""
    toml_string = ""
    
    # Handle numpy array
    if isinstance(data, np.ndarray):
        toml_string += f"array = {data.tolist()}"
    
    # Handle pandas DataFrame
    if isinstance(data, pd.DataFrame):
        toml_string += "[dataframe]\n"
        for col in data.columns:
            values = data[col].tolist()
            if all(isinstance(val, (int, float)) for val in values):
                toml_string += f"{col} = {values}\n"
            else:
                values_str = ['"' + str(val) + '"' for val in values]
                toml_string += f"{col} = [{', '.join(values_str)}]\n"
        return toml_string
    
    # Handle dictionary
    if isinstance(data, dict):
        for key, value in data.items():
            if isinstance(value, str):
                toml_string += f"{key} = \"{value}\"\n"
            elif isinstance(value, (int, float)):
                toml_string += f"{key} = {value}\n"
            elif isinstance(value, bool):
                toml_string += f"{key} = {str(value).lower()}\n"
            elif isinstance(value, (list, set, tuple)):
                values = ", ".join([str(v) for v in value])
                toml_string += f"{key} = [{values}]\n"
            elif value is None:
                toml_string += f"{key} = null\n"
            elif isinstance(value, (np.datetime64, pd.Timestamp)):
                toml_string += f"{key} = \"{str(value)}\"\n"
            elif isinstance(value, dict) or isinstance(value, pd.DataFrame):
                # Recursive call for nested dictionaries or DataFrames
                nested_str = serialize_to_toml(value, root=False)
                toml_string += f"[{key}]\n{nested_str}\n"
    
    # If it's the root call, remove any trailing newline
    if root:
        toml_string = toml_string.rstrip()
    return toml_string


class ReproduceWorkEncoder(toml.TomlEncoder):
    def dump_str(self, v):
        """Encode a string."""
        if "\n" in v:
            return v  # If it's a multi-line string, return it as-is
        return super().dump_str(v)
    
    def dump_value(self, v):
        """Determine the type of a Python object and serialize it accordingly."""
        if isinstance(v, str) and "\n" in v:
            return '"""\n' + v.strip() + '\n' + '"""'
        return super().dump_value(v)

@requires_config
def publish_data(content, name, metadata={}, watch=True):
    """
    Save data to default pubdata.toml file and register metadata.
    """
    # Capture metadata
    timestamp = datetime.datetime.now().isoformat()
    inspect_filename = inspect.currentframe().f_back.f_code.co_filename
    python_version = sys.version.strip().replace('\n', ' ')
    platform_info = platform.platform()

    # generate cryptographic hash of file contents
    content_hash = hashlib.md5(str(content).encode('utf-8')).hexdigest()
    timed_hash = hashlib.md5((str(content) + timestamp).encode('utf-8')).hexdigest()
         
    # Store metadata
    new_metadata = {
        "type": "data",
        "timestamp": timestamp,
        "content_hash": content_hash,
        "timed_hash": timed_hash,
        #"python_version": python_version,
        #"platform_info": platform_info,
    }
    if VAR_REGISTRY['REPROWORK_REMOTE_URL']:
        metadata['published_url'] = f"{VAR_REGISTRY['REPROWORK_REMOTE_URL']}/{reproduce_dir}/pubdata.toml"

    if VAR_REGISTRY['REPROWORK_ACTIVE_NOTEBOOK']:
        metadata['generating_script'] = VAR_REGISTRY['REPROWORK_ACTIVE_NOTEBOOK']
    else:
        metadata['generating_script'] = inspect_filename

    '''
    # detect if content var is of matplotlib or seaborn object type
    if type(content).__name__ in ['Figure', 'AxesSubplot'] and 'savefig' in dir(content):
        print('Saving serialized plot to SVG as file and in local data registry.')
        # Serialize plot to SVG
        buffer = io.BytesIO()
        content.savefig(buffer, format='svg')
        svg_data = buffer.getvalue()
        buffer.close()

        # Save SVG to file
        svg_filename = filename.replace('.py', '.svg')
        with open(svg_filename, 'wb') as file:
            file.write(svg_data)

        # Save SVG to registry
        metadata['plot'] = svg_data.decode()
    '''

    base_config = read_base_config()
    metadata.update(new_metadata)

    metadata['value'] = content

    # Save content to the default pubdata.toml file
    #with open(Path(reproduce_dir, 'pubdata.toml'), 'a') as file:
    #    file.write(f'\n[{name}]\n')
    #    file.write(toml.dumps(content, encoder=ReproduceWorkEncoder()))


    # For this demo, let's return the metadata (in practice, you might want to log it, save it to another file, etc.)
    if watch:
        update_watched_files(add=[Path(reproduce_dir, 'pubdata.toml').resolve().as_posix()])

    # check if dynamic file exists
    if not os.path.exists(Path(base_config['repro']['files']['dynamic'])):
        with open(Path(base_config['repro']['files']['dynamic']), 'w') as file:
            file.write(toml.dumps({}))

    with open(Path(base_config['repro']['files']['dynamic']), 'r') as file:
        dynamic_data = toml.load(file)
        
    dynamic_data[name] = metadata

    with open(Path(base_config['repro']['files']['dynamic']), 'w') as file:
        toml.dump(dynamic_data, file, encoder=ReproduceWorkEncoder())

    #return metadata
    

@requires_config
def publish_file(filename, metadata={}, watch=True):
    """
    Save content to a file and register metadata.
    """

    # Capture metadata
    timestamp = datetime.datetime.now().isoformat()
    inspect_filename = inspect.currentframe().f_back.f_code.co_filename
    #python_version = sys.version.strip().replace('\n', ' ')
    #platform_info = platform.platform()

    # generate cryptographic hash of file contents

    with open(filename, 'r') as file:
        content = file.read()
    content_hash = hashlib.md5(content.encode('utf-8')).hexdigest()
    timed_hash = hashlib.md5((content + timestamp).encode('utf-8')).hexdigest()
         
    #save_context, definition_context = check_for_defintion_in_context(function_name='save')

    # Store metadata
    new_metadata = {
        "type": "file",
        "timestamp": timestamp,
        #"python_version": python_version,
        #"platform_info": platform_info,
        "content_hash": content_hash,
        "timed_hash": timed_hash,
        #"save_context": save_context,
        #"definition_context": definition_context
    }
    cell_index = get_cell_index()
    if cell_index:
        new_metadata["cell_index"] = cell_index

    if VAR_REGISTRY['REPROWORK_REMOTE_URL']:
        new_metadata['published_url'] = f"{VAR_REGISTRY['REPROWORK_REMOTE_URL']}/{filename}"

    if VAR_REGISTRY['REPROWORK_ACTIVE_NOTEBOOK']:
        new_metadata['generating_script'] = VAR_REGISTRY['REPROWORK_ACTIVE_NOTEBOOK']
    else:
        new_metadata['generating_script'] = inspect_filename

    base_config = read_base_config()
    #reproduce_work_watched_files = base_config['repro.files.watch']

    metadata.update(new_metadata)

    if watch:
        update_watched_files(add=[filename])

    # check if dynamic file exists
    if not os.path.exists(Path(base_config['repro']['files']['dynamic'])):
        with open(Path(base_config['repro']['files']['dynamic']), 'w') as file:
            file.write(toml.dumps({}))

    with open(Path(base_config['repro']['files']['dynamic']), 'r') as file:
        dynamic_data = toml.load(file)

    dynamic_data[filename] = metadata

    with open(Path(base_config['repro']['files']['dynamic']), 'w') as file:
        toml.dump(dynamic_data, file, encoder=ReproduceWorkEncoder())

    if 'verbosity' in base_config['repro'] and base_config['repro']['verbose']:
        print(f"Added metadata for file {filename} to dynamic file {base_config['repro']['files']['dynamic']}")

    #return metadata



def reproducible(var_assignment_func):
    """
    A decorator to register the line number and timestamp when a variable is assigned.
    """
    @functools.wraps(var_assignment_func)
    def wrapper(*args, **kwargs):
        # Extract value and var_name from args
        # Assumes the decorated function always takes at least two arguments: value and var_name
        value, var_name = args[0], args[1]

        # Extract metadata from kwargs or default to an empty dictionary
        metadata = kwargs.get('metadata', {})

        # Get the current frame and line number
        frame = inspect.currentframe()
        line_number = frame.f_back.f_lineno

        # Get the current timestamp
        timestamp = datetime.datetime.now().isoformat()

        # Get the filename of the caller
        filename = frame.f_back.f_code.co_filename

        # Execute the variable assignment function
        result = var_assignment_func(*args, **kwargs)

        # Register the variable name, line number, timestamp, and filename
        VAR_REGISTRY[var_name] = {
            "type": "string",
            "timestamp": timestamp,
        }

        if type(value) is not str:
            value = str(value)
            print(f"WARNING: value of {var_name} was not a string. Converted to string: {value}.")

        VAR_REGISTRY[var_name]['value'] = value

        metadata.update(VAR_REGISTRY[var_name])
        
        if VAR_REGISTRY['REPROWORK_REMOTE_URL']:
            metadata['published_url'] = f"{VAR_REGISTRY['REPROWORK_REMOTE_URL']}/{reproduce_dir}/pubdata.toml"

        if VAR_REGISTRY['REPROWORK_ACTIVE_NOTEBOOK']:
            metadata['generating_script'] = VAR_REGISTRY['REPROWORK_ACTIVE_NOTEBOOK']

        config = read_base_config()

        # check if dynamic file exists
        if not os.path.exists(Path(config['repro']['files']['dynamic'])):
            with open(Path(config['repro']['files']['dynamic']), 'w') as file:
                file.write(toml.dumps({}))
        with open(Path(config['repro']['files']['dynamic']), 'r') as file:
            dynamic_data = toml.load(file)

        dynamic_data[var_name] = metadata

        with open(Path(config['repro']['files']['dynamic']), 'w') as file:
            toml.dump(dynamic_data, file, encoder=ReproduceWorkEncoder())

        return result
    return wrapper

@reproducible
def publish_variable(value, var_name, metadata={}):
    globals()[var_name] = value


@requires_config
def register_notebook(notebook_name, notebook_dir='nbs'):
    """
    Register a notebook to the config.toml file.
    """
    notebook_path = notebook_dir + '/' + notebook_name
    base_config = read_base_config()
    
    # ensure notebook key exists
    if 'notebooks' not in base_config['repro']:
        base_config['repro']['notebooks'] = []

    if notebook_path not in base_config['repro']['notebooks']:
        base_config['repro']['notebooks'].append(notebook_path)
        with open(Path(reproduce_dir, 'config.toml'), 'w') as f:
            toml.dump(base_config, f)
        if base_config['repro']['verbose']:
            print(f"Registered notebook {notebook_path} in {reproduce_dir}/config.toml")
    else:
        if base_config['repro']['verbose']:
            print(f"Notebook {notebook_path} already registered in {reproduce_dir}/config.toml")

    if 'github_repo' in base_config['project']:
        remote_url_val = f"https://github.com/{base_config['project']['github_repo']}"
        notebook_new_val = f"{remote_url_val}/{notebook_path}"
    else:
        notebook_new_val = Path(notebook_path).resolve().as_posix()
    
    if VAR_REGISTRY['REPROWORK_REMOTE_URL']:
        print(f"Warning: {VAR_REGISTRY['REPROWORK_REMOTE_URL']} is already registered. Overwriting with {remote_url_val}")
    VAR_REGISTRY['REPROWORK_REMOTE_URL'] = remote_url_val

    if VAR_REGISTRY['REPROWORK_ACTIVE_NOTEBOOK']:
        print(f"Warning: Notebook {VAR_REGISTRY['REPROWORK_ACTIVE_NOTEBOOK']} is already registered. Overwriting with {notebook_new_val}")
    VAR_REGISTRY['REPROWORK_ACTIVE_NOTEBOOK'] = notebook_new_val

    return True

# Test code


In [None]:
publish_variable(67890, "test_var_timestamp_1")  # This should capture this line number and timestamp
publish_variable("Hello again!", "test_var_timestamp_2")  # And this line number and timestamp

VAR_REGISTRY

<IPython.core.display.Javascript object>



<IPython.core.display.Javascript object>

{'test_var_timestamp_1': {'type': 'string',
  'timestamp': '2023-10-02T11:20:32.483177',
  'value': '67890'},
 'test_var_timestamp_2': {'type': 'string',
  'timestamp': '2023-10-02T11:20:32.506332',
  'value': 'Hello again!'}}

In [None]:
# Test the serialize_to_toml function
data_sample = {
    'name': 'John',
    'age': 28,
    'is_student': False,
    'scores': [85, 90, 78, 92],
    'birthday': pd.Timestamp('2000-01-01'),
    'matrix': np.array([[1, 2], [3, 4]]),
    'df': pd.DataFrame({
        'A': [1, 2, 3],
        'B': ['a', 'b', 'c'],
        'date': [pd.Timestamp('2022-01-01'), pd.Timestamp('2022-01-02'), pd.Timestamp('2022-01-03')]
    }),
    'nested_dict': {
        'key1': 'value1',
        'sub_dict': {
            'sub_key': 'sub_value'
        }
    },
    'none_value': None
}

toml_representation = serialize_to_toml(data_sample)
print(toml_representation)

name = "John"
age = 28
is_student = False
scores = [85, 90, 78, 92]
birthday = "2000-01-01 00:00:00"
[df]
[dataframe]
A = [1, 2, 3]
B = ["a", "b", "c"]
date = ["2022-01-01 00:00:00", "2022-01-02 00:00:00", "2022-01-03 00:00:00"]

[nested_dict]
key1 = "value1"
[sub_dict]
sub_key = "sub_value"


none_value = null


In [9]:
config = read_base_config()
with open(config['repro']['files']['dynamic'], 'r') as file:
    dynamic_data = toml.load(file)
print(toml.dumps(dynamic_data))

[p_value_str]
description = "The p-value of the coefficient on the slope of the linear regression line."
type = "string"
timestamp = "2023-10-02T10:25:50.962908"
value = "0.068"

[x]
description = "The simulated X data"
units = "kilograms"
type = "data"
timestamp = "2023-10-02T10:25:51.146421"
content_hash = "38f13b81a58a7d931600e917d77dfe8f"
timed_hash = "1ff88e55c506ce6249051526f0071e20"
value = "array = [-0.15438854676085806, -0.5912841266673995, 1.3457620267806991, -0.3085476927297975, -0.35074090433304067, -1.343721369940541, -0.41860346256356656, 2.392890531248967, 0.22032854237060082, 0.7867023188803995, 0.08878384294999392, 0.6565087673201803, 0.2412729155438198, 0.6854353883101262, 2.153899580706892, 0.649925720150528, 1.127458119203137, -0.6357927443286684, 0.3077660698412044, -1.6328895355458346, 0.567227693439327, -0.21246173380662106, -0.7203897514131021, 0.5952129857137533, 0.18819499630282482, -0.8834998061258611, 0.7379945086294778, -0.8471634166162177, 0.07930983762624

In [17]:
# Test the save function
test_content = "This is a test content for the save function."

In [18]:
metadata = save(test_content, "saved_file.txt")
metadata

<IPython.core.display.Javascript object>

Updated watched files to ['reproduce/main.md', 'reproduce/data.toml', 'reproduce/latex/template.tex', 'saved_file.txt']


{'type': 'file',
 'timestamp': '2023-10-02T02:11:11.082654',
 'content_hash': 'd1866c6aa7d10eb57a35cc88a77802c5',
 'timed_hash': 'f9293765bd6cc1e991407203aa7da511'}

In [12]:
# Test
#x = 10
#y = "Hello"
#z = [1, 2, 3]
#save()

In [13]:
#| hide
import nbdev; nbdev.nbdev_export()