## Example for callbacks + Memory profiling of verbs and workflow

In this example we show how to use the workflow callbacks to be able profile the memory usage of every verb

In [1]:
# Enable nesting asyncio since jupyter has a asyncio already running
import nest_asyncio

nest_asyncio.apply()


import json
import os

from typing import Any, Dict, Optional, List
from datashaper.execution.execution_node import ExecutionNode
from datashaper.table_store import TableContainer

from typing import List

from datashaper.workflow import Workflow

### Create a Callback class
This class needs to follows the WorkflowCallbacks Protocol, we use tracemalloc to create memory snapshots and trace the memory

In [2]:
import pandas as pd
import tracemalloc
import time
from collections import defaultdict


class MemoryProfilingCallbacks:
    def __init__(self):
        self._snapshots = defaultdict(list)
        self._peak_memory = defaultdict(list)
        self._timing = defaultdict(list)
        self._peak_memory = defaultdict(list)
        self._peak_start_workflow = 0
        self._peak_start_verb = 0
        self._workflow_start = 0
        self._verb_start = 0

    def on_workflow_start(self) -> None:
        """Called when the workflow starts."""
        tracemalloc.start()
        _, self._peak_start_workflow = tracemalloc.get_traced_memory()
        self._snapshots["all"].append(tracemalloc.take_snapshot())
        self._workflow_start = time.time()

    def on_step_start(self, node: ExecutionNode, inputs: Dict[str, Any]) -> None:
        """Called when a step starts."""
        # reset peak so we can get the peak during the verb execution
        self._snapshots[node.verb.name].append(tracemalloc.take_snapshot())
        _, self._peak_start_verb = tracemalloc.get_traced_memory()
        self._verb_start = time.time()

    def on_step_end(
        self, node: ExecutionNode, result: Optional[TableContainer]
    ) -> None:
        """Called when a step ends."""
        total_time = time.time() - self._verb_start
        self._timing[node.verb.name].append(total_time)
        self._snapshots[node.verb.name].append(tracemalloc.take_snapshot())
        # Get peak recorded during verb execution
        _, peak = tracemalloc.get_traced_memory()
        self._peak_memory[node.verb.name].append(peak - self._peak_start_verb)

    def on_workflow_end(self) -> None:
        """Called when the workflow ends."""
        total_time = time.time() - self._workflow_start
        self._timing["all"].append(total_time)
        self._snapshots["all"].append(tracemalloc.take_snapshot())
        _, peak = tracemalloc.get_traced_memory()
        self._peak_memory["all"].append(peak - self._peak_start_workflow)
        tracemalloc.stop()

    def get_snapshot_stats(self, sort_by="max"):
        stats = {}
        for verb, snapshots in self._snapshots.items():
            verb_stats = []
            for first, second in zip(snapshots[::2], snapshots[1::2]):
                stat_diff = second.compare_to(first, "lineno")
                diff_size = sum(stat.size_diff for stat in stat_diff)
                verb_stats.append(MemoryProfilingCallbacks.__bytes_to_mb(diff_size))
            stats[verb] = {
                "mean": sum(verb_stats) / len(verb_stats),
                "max": max(verb_stats),
                "min": min(verb_stats),
                "samples": len(verb_stats),
            }
        return pd.DataFrame(stats).transpose().sort_values(sort_by, ascending=False)

    def get_peak_stats(self, sort_by="max"):
        stats = {}
        for verb, peak in self._peak_memory.items():
            stats[verb] = {
                "mean": MemoryProfilingCallbacks.__bytes_to_mb(sum(peak) / len(peak)),
                "max": MemoryProfilingCallbacks.__bytes_to_mb(max(peak)),
                "min": MemoryProfilingCallbacks.__bytes_to_mb(min(peak)),
                "samples": len(peak),
            }
        return pd.DataFrame(stats).transpose().sort_values(sort_by, ascending=False)

    def get_time_stats(self, sort_by="max"):
        stats = {}
        for verb, times in self._timing.items():
            stats[verb] = {
                "mean": sum(times) / len(times),
                "max": max(times),
                "min": min(times),
                "samples": len(times),
            }
        return pd.DataFrame(stats).transpose().sort_values(sort_by, ascending=False)

    def get_detailed_view(self):
        df_json = defaultdict(list)
        for verb, snapshot in self._snapshots.items():
            for sample, (first, second) in enumerate(
                zip(snapshot[::2], snapshot[1::2])
            ):
                stat_diff = second.compare_to(first, "lineno")
                for stat in stat_diff:
                    df_json["verb"].append(verb)
                    df_json["size_diff"].append(stat.size_diff)
                    df_json["size"].append(stat.size)
                    df_json["count_diff"].append(stat.count_diff)
                    df_json["count"].append(stat.count)
                    df_json["filename"].append(stat.traceback[0].filename)
                    df_json["lineno"].append(stat.traceback[0].lineno)
                    df_json["sample"].append(sample)
        return pd.DataFrame(df_json)

    @staticmethod
    def __bytes_to_mb(bytes):
        return bytes / 1024**2

In [3]:
FIXTURES_PATH = "../../../schema/fixtures/workflow"
TABLE_STORE_PATH = "../../../schema/fixtures/workflow_inputs"

memory_profiling_callbacks = MemoryProfilingCallbacks()


def get_verb_test_specs(root: str) -> List[str]:
    subfolders: List[str] = []
    for root, _, files in os.walk(root):
        if "workflow.json" in files:
            subfolders.append(root)
    return subfolders


def test_verbs_schema_input(fixture_path: str):
    with open(os.path.join(fixture_path, "workflow.json")) as schema:
        workflow = Workflow(
            schema=json.load(schema),
            input_path=TABLE_STORE_PATH,
        )

    workflow.run(workflow_callbacks=memory_profiling_callbacks)

In [4]:
for fixture_path in get_verb_test_specs(FIXTURES_PATH):
    test_verbs_schema_input(fixture_path)

  column_datetime = pd.to_datetime(column, errors="ignore")
  column_datetime = pd.to_datetime(column, errors="ignore")
  column_numeric = pd.to_numeric(column, errors="ignore")
  column_datetime = pd.to_datetime(column, errors="ignore")
  column_datetime = pd.to_datetime(column, errors="ignore")
  column_datetime = pd.to_datetime(column, errors="ignore")
  column_numeric = pd.to_numeric(column, errors="ignore")
  output = output.stack(dropna=False).reset_index()
  output = output.stack(dropna=False).reset_index()
  output = output.stack(dropna=False).reset_index()
  output = output.stack(dropna=False).reset_index()
  MergeStrategy.LastOneWins: lambda values, **kwargs: values.dropna().apply(
  MergeStrategy.LastOneWins: lambda values, **kwargs: values.dropna().apply(
  MergeStrategy.LastOneWins: lambda values, **kwargs: values.dropna().apply(
  MergeStrategy.LastOneWins: lambda values, **kwargs: values.dropna().apply(
  input_table.loc[i, col] = value
  input_table.loc[i, col] = value


Test print verb
    ID       Name  Employees     US
0    1  Microsoft     160000   True
1    2      Apple     150000   True
..  ..        ...        ...    ...
3    4     Amazon    1250000   True
4    5    Samsung     270000  False


In [5]:
memory_profiling_callbacks.get_snapshot_stats()

Unnamed: 0,mean,max,min,samples
all,0.016333,0.2077,0.005172,148.0
fold,0.029311,0.1175,0.009782,6.0
convert,0.014538,0.094051,0.003598,15.0
unfold,0.031106,0.08579,0.009594,4.0
print,0.063319,0.063319,0.063319,1.0
union,0.034214,0.061944,0.006483,2.0
merge,0.010663,0.044469,0.006132,15.0
binarize,0.015506,0.042662,0.009157,7.0
difference,0.039237,0.039237,0.039237,1.0
join,0.021702,0.03684,0.014729,7.0


In [6]:
memory_profiling_callbacks.get_peak_stats()

Unnamed: 0,mean,max,min,samples
all,0.023232,0.212424,0.005717,148.0
fold,0.041101,0.128269,0.020516,6.0
convert,0.015892,0.098181,0.004112,15.0
unfold,0.026965,0.079987,0.004741,4.0
union,0.045379,0.07321,0.017548,2.0
print,0.069079,0.069079,0.069079,1.0
join,0.044595,0.061277,0.03504,7.0
difference,0.054802,0.054802,0.054802,1.0
merge,0.01758,0.053172,0.010097,15.0
binarize,0.027253,0.05119,0.021309,7.0


In [7]:
memory_profiling_callbacks.get_time_stats()

Unnamed: 0,mean,max,min,samples
all,0.013173,0.059589,0.000828,148.0
join,0.042037,0.054187,0.032328,7.0
binarize,0.032697,0.052202,0.021101,7.0
merge,0.015433,0.039304,0.007336,15.0
fold,0.019102,0.033014,0.01266,6.0
convert,0.005588,0.030703,0.001391,15.0
filter,0.023549,0.028906,0.018666,7.0
difference,0.02784,0.02784,0.02784,1.0
unhot,0.02441,0.024635,0.024185,2.0
onehot,0.016577,0.021322,0.014472,4.0


In [8]:
dive_deep = memory_profiling_callbacks.get_detailed_view()

In [9]:
dive_deep[
    (dive_deep["verb"] == "unfold")
    & (dive_deep["sample"] == 0)
    & (dive_deep["filename"].str.contains("unfold"))
]

Unnamed: 0,verb,size_diff,size,count_diff,count,filename,lineno,sample
15228,unfold,1104,1104,2,2,/home/andresmor/Projects/datashaper/python/dat...,28,0
15288,unfold,464,464,1,1,/home/andresmor/Projects/datashaper/python/dat...,26,0
15338,unfold,424,424,1,1,/home/andresmor/Projects/datashaper/python/dat...,21,0
15368,unfold,408,408,1,1,/home/andresmor/Projects/datashaper/python/dat...,19,0
15410,unfold,144,144,2,2,/home/andresmor/Projects/datashaper/python/dat...,22,0
15430,unfold,96,96,2,2,/home/andresmor/Projects/datashaper/python/dat...,30,0
