# Ingest toy version of MNIST digit data from sklearn

### Determine run parameters

In [1]:
# ----------------- Parameters for interactive development --------------
P = {
    "pipeline.run_environment": "dev",
    "pipeline.data_lake_root": "/pipeline-outputs/data-lake",
    "run.retry_nr": "1",
}

In [2]:
# - During automated runs parameters will be injected in the below cell -

In [3]:
# Parameters
P = {
    "pipeline.data_lake_root": "/pipeline-outputs/data-lake",
    "pipeline.run_environment": "ci",
    "pipeline.pipeline_run_id": "711628db-7a55-4b5a-89fb-b41dd311f49f",
    "pipeline.github.repository": "pynb-dag-runner/mnist-digits-demo-pipeline",
    "pipeline.github.workflow": "Run automated tests, pipeline and deploy results to static reporting site",
    "pipeline.github.runner_name": "GitHub Actions 2",
    "pipeline.github.run_id": "2690097514",
    "pipeline.github.actor": "matiasdahl",
    "pipeline.github.job": "run-tests-pipeline-and-persist-pipeline-outputs",
    "pipeline.github.base_ref": "development",
    "pipeline.github.head_ref": "graph-parsing",
    "pipeline.github.sha": "8283f417a1f6964b1fa1c6cda8e5c5ec877446d4",
    "pipeline.github.ref": "refs/pull/57/merge",
    "pipeline.github.ref_type": "branch",
    "pipeline.github.ref_name": "57/merge",
    "pipeline.github.event_name": "pull_request",
    "task.notebook": "notebooks/ingest.py",
    "task.max_nr_retries": "15",
    "run.retry_nr": "10",
    "task.timeout_s": "10",
    "task.num_cpus": 1,
    "_opentelemetry_traceparent": "00-b0e446e89b17bdeb058dbb10a0b42255-1c9d8b576e915a7f-01",
}


In [4]:
# -----------------------------------------------------------------------

---

### Simulate different types of failures (for testing timeout and retry logic)

In [5]:
from pynb_dag_runner.tasks.task_opentelemetry_logging import PydarLogger

logger = PydarLogger(P)

2022-07-18 11:25:05,707	INFO worker.py:842 -- Connecting to existing Ray cluster at address: 172.17.0.2:6379


In [6]:
import time, random


def maybe_crash(retry_nr: int, run_environment: str):
    if retry_nr == 2 and run_environment == "ci":
        time.sleep(1e6)

    max_retry_nr: int = 3 if run_environment == "dev" else 10

    if retry_nr < max_retry_nr:
        if random.random() < 0.1:
            time.sleep(1e6)
        else:
            raise Exception("Simulated exception failure from ingestion step notebook!")


maybe_crash(
    retry_nr=int(P["run.retry_nr"]), run_environment=P["pipeline.run_environment"]
)

### Notebook code

In [7]:
from sklearn import datasets

#
from common.io import datalake_root, write_numpy

In [8]:
digits = datasets.load_digits()

X = digits["data"]
y = digits["target"]

In [9]:
logger.log_value("data_shape", list(X.shape))
logger.log_value("target_shape", list(y.shape))

X.shape, y.shape

 - Logging data_shape (json) : [1797, 64]
 - Logging target_shape (json) : [1797]


((1797, 64), (1797,))

In [10]:
write_numpy(datalake_root(P) / "raw" / "digits.numpy", X)
write_numpy(datalake_root(P) / "raw" / "labels.numpy", y)