# BlueprintFlow: Bootstrap Example

This notebook serves as an example demonstrating how to systematically bootstrap data into BlueprintFlow for demonstration purposes.

## What You'll Bootstrap

- **Language Context**: A language context for Python ETL development
- **Coding Standards**: Rules that must be followed in the codebase and guidelines and coding standards for Python ETL development
- **Project Structure**: Source code structure for an ETL project
- **Design Patterns**: Common abstractions and design patterns useful in ETL processes
- **Tool Stack**: Preferred libraries and tools for ETL processes
- **Implementation Examples**: Example code implementations for extract, transform, and load operations

In [1]:
import inspect

In [2]:
from ipynb_util import pprint_lancedb_record

In [3]:
from blueprintflow.core.models.data_store import TableNameEnum
from blueprintflow.core.models.tasks import (
    CreateAstractionTask,
    CreateCodeTask,
    CreateGuidelineTask,
    CreateLanguageContextTask,
    CreatePreferenceTask,
    CreateRuleTask,
    CreateSrcStructureTask,
)
from blueprintflow.core.settings import load_settings
from blueprintflow.store.store_manager import StoreManager
from blueprintflow.utils.xdg.data import UserData

##### Initialize BlueprintFlow Components

This section sets up the necessary components for interacting with BlueprintFlow's database:
- Loads default settings
- Initializes the default user data storage
- Creates a store manager to handle database operations

In [4]:
default_settings = load_settings()
default_user_data = UserData(reset_if_exists=True)
store_manager = StoreManager(default_settings, default_user_data)

##### Define Language Context for Python ETL

Establishes the core language context for the blueprint.

In [5]:
lang_context_key = "python_etl"
create_lang_context_task = CreateLanguageContextTask(
    key=lang_context_key,
    language="python",
    context="etl",
    description="Python is a versatile and readable language ideal for ETL processes. "
    "It excels in handling data extraction from diverse sources, transforming data "
    "into required formats, and loading it into target systems. "
    "Its flexibility and extensive ecosystem make it a popular choice for building "
    "efficient data processing workflows and integration tasks.",
)
store_manager.create(create_lang_context_task)
lang_context = store_manager.lance_handler.get_by_key(
    TableNameEnum.LANG_CONTEXT, lang_context_key
)
pprint_lancedb_record(lang_context)

{'key': 'python_etl',
 'language': 'python',
 'context': 'etl',
 'description': 'Python is a versatile and readable language ideal for ETL '
                'processes. It excels in handling data extraction from diverse '
                'sources, transforming data into required formats, and loading '
                'it into target systems. Its flexibility and extensive '
                'ecosyste [...]',
 'embedding': [-0.00034710637, -0.02914067, 0.055726133]}


##### Define Rules and Standards

Establishes strict coding rules that must be followed in the ETL implementation including:

- Documentation standards (Google-style docstrings)
- Type safety requirements (static typing for functions)
- Code quality rules (no assert statements outside tests)

In [6]:
docstring_rule_key = "python_etl_google_docstrings"
create_docstring_rule_task = CreateRuleTask(
    key=docstring_rule_key,
    language_context_key="python_etl",
    name="Google convention docstrings",
    description="Always generate docstrings following the Google convention",
    rule_type="documentation",
    violations_action="require_fix",
)
store_manager.create(create_docstring_rule_task)
docstring_rule = store_manager.lance_handler.get_by_key(
    TableNameEnum.RULE, docstring_rule_key
)
pprint_lancedb_record(docstring_rule)

{'key': 'python_etl_google_docstrings',
 'language_context_key': 'python_etl',
 'name': 'Google convention docstrings',
 'description': 'Always generate docstrings following the Google convention',
 'rule_type': 'documentation',
 'violations_action': 'require_fix',
 'embedding': [-0.058820747, -0.042512137, -0.018018302]}


In [7]:
typing_rule_key = "python_etl_static_typing"
create_typing_rule_task = CreateRuleTask(
    key=typing_rule_key,
    language_context_key="python_etl",
    name="Static typing for functions",
    description="Always generate static typing for function definitions to be "
    "evaluated with mypy later on",
    rule_type="typing",
    violations_action="require_fix",
)
store_manager.create(create_typing_rule_task)
typing_rule = store_manager.lance_handler.get_by_key(
    TableNameEnum.RULE, typing_rule_key
)
pprint_lancedb_record(typing_rule)

{'key': 'python_etl_static_typing',
 'language_context_key': 'python_etl',
 'name': 'Static typing for functions',
 'description': 'Always generate static typing for function definitions to be '
                'evaluated with mypy later on',
 'rule_type': 'typing',
 'violations_action': 'require_fix',
 'embedding': [-0.026746703, 0.0070767906, 0.05300787]}


In [8]:
assert_rule_key = "python_etl_no_assert_outside_tests"
create_assert_rule_task = CreateRuleTask(
    key=assert_rule_key,
    language_context_key="python_etl",
    name="No assert outside tests",
    description="Do not use assert outside of test functions",
    rule_type="style",
    violations_action="require_fix",
)
store_manager.create(create_assert_rule_task)
assert_rule = store_manager.lance_handler.get_by_key(
    TableNameEnum.RULE, assert_rule_key
)
pprint_lancedb_record(assert_rule)

{'key': 'python_etl_no_assert_outside_tests',
 'language_context_key': 'python_etl',
 'name': 'No assert outside tests',
 'description': 'Do not use assert outside of test functions',
 'rule_type': 'style',
 'violations_action': 'require_fix',
 'embedding': [-0.010566957, -0.016409732, 0.036240608]}


##### Establish Guidelines

Defines coding guidelines and best practices for Python ETL development including:

- The Zen of Python principles
- Style guidelines like proper boolean evaluation

In [9]:
zen_of_python_key = "python_etl_zen_of_python"
create_zen_guideline_task = CreateGuidelineTask(
    key=zen_of_python_key,
    language_context_key="python_etl",
    name="The Zen of Python",
    description=inspect.cleandoc(
        """
        Beautiful is better than ugly.
        Explicit is better than implicit.
        Simple is better than complex.
        Complex is better than complicated.
        Flat is better than nested.
        Sparse is better than dense.
        Readability counts.
        Special cases aren't special enough to break the rules.
        Although practicality beats purity.
        Errors should never pass silently.
        Unless explicitly silenced.
        In the face of ambiguity, refuse the temptation to guess.
        There should be one-- and preferably only one --obvious way to do it.
        Although that way may not be obvious at first unless you're Dutch.
        Now is better than never.
        Although never is often better than *right* now.
        If the implementation is hard to explain, it's a bad idea.
        If the implementation is easy to explain, it may be a good idea.
        Namespaces are one honking great idea -- let's do more of those!"""
    ),
    category="principles",
)
store_manager.create(create_zen_guideline_task)
zen_guideline = store_manager.lance_handler.get_by_key(
    TableNameEnum.GUIDELINE, zen_of_python_key
)
pprint_lancedb_record(zen_guideline)

{'key': 'python_etl_zen_of_python',
 'language_context_key': 'python_etl',
 'name': 'The Zen of Python',
 'description': 'Beautiful is better than ugly.\n'
                'Explicit is better than implicit.\n'
                'Simple is better than complex.\n'
                'Complex is better than complicated.\n'
                'Flat is better than nested.\n'
                'Sparse is better than dense.\n'
                'Readability counts.\n'
                "Special cases aren't special enough [...]",
 'category': 'principles',
 'examples': None,
 'embedding': [-0.07254717, -0.003949336, 0.05075067]}


In [10]:
bool_evaluation_key = "python_etl_bool_evaluation"
create_bool_evaluation_task = CreateGuidelineTask(
    key=bool_evaluation_key,
    language_context_key="python_etl",
    name="Variable bool evaluation",
    description="Use Python's truthiness directly instead of explicit comparisons to "
    "True and False",
    category="style",
    examples=[
        "if attr: # Good - check truthiness",
        "if not attr: # Good - check falsiness",
        "if attr is None: # Good - explicit None check",
        "if attr == True: # Bad - unnecessary comparison",
    ],
)
store_manager.create(create_bool_evaluation_task)
bool_evaluation_guideline = store_manager.lance_handler.get_by_key(
    TableNameEnum.GUIDELINE, bool_evaluation_key
)
pprint_lancedb_record(bool_evaluation_guideline)

{'key': 'python_etl_bool_evaluation',
 'language_context_key': 'python_etl',
 'name': 'Variable bool evaluation',
 'description': "Use Python's truthiness directly instead of explicit "
                'comparisons to True and False',
 'category': 'style',
 'examples': ['if attr: # Good - check truthiness',
              'if not attr: # Good - check falsiness',
              'if attr is None: # Good - explicit None check'],
 'embedding': [-0.052617468, 0.01749514, 0.062484514]}


##### Define Project Structure

Sets up the standard source code structure for a Python ETL application including:

- Root directory and package initialization
- Main entry point for the ETL process
- Separate modules for extract, transform, and load operations

In [11]:
src_root_key = "python_etl_src_root"
create_src_root_task = CreateSrcStructureTask(
    key=src_root_key,
    language_context_key="python_etl",
    path="src/foo",
    description="Root source directory for Python application",
    structure_type="directory",
)
store_manager.create(create_src_root_task)
src_root_structure = store_manager.lance_handler.get_by_key(
    TableNameEnum.SRC_STRUCTURE, src_root_key
)
pprint_lancedb_record(src_root_structure)

{'key': 'python_etl_src_root',
 'language_context_key': 'python_etl',
 'path': 'src/foo',
 'description': 'Root source directory for Python application',
 'structure_type': 'directory',
 'embedding': [-0.04562018, -0.018908845, 0.032933872]}


In [12]:
src_init_key = "python_etl_src_init"
create_src_init_task = CreateSrcStructureTask(
    key=src_init_key,
    language_context_key="python_etl",
    path="src/foo/__init__.py",
    description="Package initialization file",
    structure_type="file",
)
store_manager.create(create_src_init_task)
src_init_structure = store_manager.lance_handler.get_by_key(
    TableNameEnum.SRC_STRUCTURE, src_init_key
)
pprint_lancedb_record(src_init_structure)

{'key': 'python_etl_src_init',
 'language_context_key': 'python_etl',
 'path': 'src/foo/__init__.py',
 'description': 'Package initialization file',
 'structure_type': 'file',
 'embedding': [-0.036667243, -0.014609963, 0.014548799]}


In [13]:
src_main_key = "python_etl_src_main"
create_src_main_task = CreateSrcStructureTask(
    key=src_main_key,
    language_context_key="python_etl",
    path="src/foo/__main__.py",
    description="Main entry point for the ETL script",
    structure_type="file",
)
store_manager.create(create_src_main_task)
src_main_structure = store_manager.lance_handler.get_by_key(
    TableNameEnum.SRC_STRUCTURE, src_main_key
)
pprint_lancedb_record(src_main_structure)

{'key': 'python_etl_src_main',
 'language_context_key': 'python_etl',
 'path': 'src/foo/__main__.py',
 'description': 'Main entry point for the ETL script',
 'structure_type': 'file',
 'embedding': [-0.018054372, 0.0037061085, 0.010968876]}


In [14]:
src_extract_key = "python_etl_src_extract"
create_src_extract_task = CreateSrcStructureTask(
    key=src_extract_key,
    language_context_key="python_etl",
    path="src/foo/extract.py",
    description="Data extraction module",
    structure_type="file",
)
store_manager.create(create_src_extract_task)
src_extract_structure = store_manager.lance_handler.get_by_key(
    TableNameEnum.SRC_STRUCTURE, src_extract_key
)
pprint_lancedb_record(src_extract_structure)

{'key': 'python_etl_src_extract',
 'language_context_key': 'python_etl',
 'path': 'src/foo/extract.py',
 'description': 'Data extraction module',
 'structure_type': 'file',
 'embedding': [-0.016554547, -0.06822745, -0.016024876]}


In [15]:
src_transform_key = "python_etl_src_transform"
create_src_transform_task = CreateSrcStructureTask(
    key=src_transform_key,
    language_context_key="python_etl",
    path="src/foo/transform.py",
    description="Data transformation module",
    structure_type="file",
)
store_manager.create(create_src_transform_task)
src_transform_structure = store_manager.lance_handler.get_by_key(
    TableNameEnum.SRC_STRUCTURE, src_transform_key
)
pprint_lancedb_record(src_transform_structure)

{'key': 'python_etl_src_transform',
 'language_context_key': 'python_etl',
 'path': 'src/foo/transform.py',
 'description': 'Data transformation module',
 'structure_type': 'file',
 'embedding': [0.0045563127, -0.074894615, -0.011505007]}


In [16]:
src_load_key = "python_etl_src_load"
create_src_load_task = CreateSrcStructureTask(
    key=src_load_key,
    language_context_key="python_etl",
    path="src/foo/load.py",
    description="Data loading module",
    structure_type="file",
)
store_manager.create(create_src_load_task)
src_load_structure = store_manager.lance_handler.get_by_key(
    TableNameEnum.SRC_STRUCTURE, src_load_key
)
pprint_lancedb_record(src_load_structure)

{'key': 'python_etl_src_load',
 'language_context_key': 'python_etl',
 'path': 'src/foo/load.py',
 'description': 'Data loading module',
 'structure_type': 'file',
 'embedding': [0.00481641, -0.043022912, 0.008896195]}


##### Define Common Abstractions and Design Patterns

Establishes reusable abstractions and design patterns useful in ETL processing including:

- Common design patterns (Adapter, Mediator, Facade, Singleton)
- Data processing patterns (Data Enricher, Data Router, Dataset Splitter)

In [17]:
adapter_key = "python_etl_adapter"
create_adapter_task = CreateAstractionTask(
    key=adapter_key,
    language_context_key="python_etl",
    name="Adapter",
    description="Structural design pattern that allows objects with incompatible "
    "interfaces to collaborate",
    abstraction_type="design pattern",
    content="Adapter is a structural design pattern that allows objects with "
    "incompatible interfaces to collaborate. An adapter wraps one of the objects to "
    "hide the complexity of conversion happening behind the scenes.",
    tags=["structural", "compatibility", "interface"],
)
store_manager.create(create_adapter_task)
adapter_abstraction = store_manager.lance_handler.get_by_key(
    TableNameEnum.ABSTRACTION, adapter_key
)
pprint_lancedb_record(adapter_abstraction)

{'key': 'python_etl_adapter',
 'language_context_key': 'python_etl',
 'name': 'Adapter',
 'description': 'Structural design pattern that allows objects with '
                'incompatible interfaces to collaborate',
 'abstraction_type': 'design pattern',
 'content': 'Adapter is a structural design pattern that allows objects with '
            'incompatible interfaces to collaborate. An adapter wraps one of '
            'the objects to hide the complexity of conversion happening behind '
            'the scenes.',
 'tags': ['structural', 'compatibility', 'interface'],
 'embedding': [0.0074661355, -0.008026236, -0.019360468]}


In [18]:
mediator_key = "python_etl_mediator"
create_mediator_task = CreateAstractionTask(
    key=mediator_key,
    language_context_key="python_etl",
    name="Mediator",
    description="Behavioral design pattern that reduces chaotic dependencies between "
    "objects by forcing collaboration through a mediator",
    abstraction_type="design pattern",
    content="Mediator is a behavioral design pattern that lets you reduce chaotic "
    "dependencies between objects. The pattern restricts direct communications between "
    "the objects and forces them to collaborate only via a mediator object.",
    tags=["behavioral", "decoupling", "communication"],
)
store_manager.create(create_mediator_task)
mediator_abstraction = store_manager.lance_handler.get_by_key(
    TableNameEnum.ABSTRACTION, mediator_key
)
pprint_lancedb_record(mediator_abstraction)

{'key': 'python_etl_mediator',
 'language_context_key': 'python_etl',
 'name': 'Mediator',
 'description': 'Behavioral design pattern that reduces chaotic dependencies '
                'between objects by forcing collaboration through a mediator',
 'abstraction_type': 'design pattern',
 'content': 'Mediator is a behavioral design pattern that lets you reduce '
            'chaotic dependencies between objects. The pattern restricts '
            'direct communications between the objects and forces them to '
            'collaborate only via a mediator object.',
 'tags': ['behavioral', 'decoupling', 'communication'],
 'embedding': [-0.015068922, -0.06496549, 0.020649796]}


In [19]:
facade_key = "python_etl_facade"
create_facade_task = CreateAstractionTask(
    key=facade_key,
    language_context_key="python_etl",
    name="Facade",
    description="Structural design pattern that provides a simplified interface to a "
    "complex subsystem",
    abstraction_type="design pattern",
    content="Facade is a structural design pattern that provides a simplified "
    "interface to a library, a framework, or any other complex set of classes. A "
    "facade provides limited functionality in comparison to working with the subsystem "
    "directly.",
    tags=["structural", "simplification", "interface"],
)
store_manager.create(create_facade_task)
facade_abstraction = store_manager.lance_handler.get_by_key(
    TableNameEnum.ABSTRACTION, facade_key
)
pprint_lancedb_record(facade_abstraction)

{'key': 'python_etl_facade',
 'language_context_key': 'python_etl',
 'name': 'Facade',
 'description': 'Structural design pattern that provides a simplified '
                'interface to a complex subsystem',
 'abstraction_type': 'design pattern',
 'content': 'Facade is a structural design pattern that provides a simplified '
            'interface to a library, a framework, or any other complex set of '
            'classes. A facade provides limited functionality in comparison to '
            'working with the subsystem directly.',
 'tags': ['structural', 'simplification', 'interface'],
 'embedding': [-0.0037615807, -0.013434297, -0.030911405]}


In [20]:
singleton_key = "python_etl_singleton"
create_singleton_task = CreateAstractionTask(
    key=singleton_key,
    language_context_key="python_etl",
    name="Singleton",
    description="Creational design pattern that ensures a class has only one instance "
    "with global access",
    abstraction_type="design pattern",
    content="Singleton is a creational design pattern that lets you ensure that a "
    "class has only one instance, while providing a global access point to this "
    "instance. Make the default constructor private and create a static creation "
    "method that acts as a constructor.",
    tags=["creational", "instance", "global"],
)
store_manager.create(create_singleton_task)
singleton_abstraction = store_manager.lance_handler.get_by_key(
    TableNameEnum.ABSTRACTION, singleton_key
)
pprint_lancedb_record(singleton_abstraction)

{'key': 'python_etl_singleton',
 'language_context_key': 'python_etl',
 'name': 'Singleton',
 'description': 'Creational design pattern that ensures a class has only one '
                'instance with global access',
 'abstraction_type': 'design pattern',
 'content': 'Singleton is a creational design pattern that lets you ensure '
            'that a class has only one instance, while providing a global '
            'access point to this instance. Make the default constructor '
            'private and create a static creation method that acts as a co '
            '[...]',
 'tags': ['creational', 'instance', 'global'],
 'embedding': [0.0033135426, 0.0075293877, 0.005304555]}


In [21]:
data_enricher_key = "python_etl_data_enricher"
create_data_enricher_task = CreateAstractionTask(
    key=data_enricher_key,
    language_context_key="python_etl",
    name="Data Enricher",
    description="Uses information from the incoming dataset to retrieve additional "
    "data from an external source and appends it to the dataset",
    abstraction_type="data processing pattern",
    content="The Data Enricher uses information from the incoming dataset "
    "(e.g., key fields) to retrieve data from an external source. After retrieval, "
    "it appends the additional data to the dataset.",
    tags=["transformation", "enrichment", "data-processing"],
)
store_manager.create(create_data_enricher_task)
data_enricher_abstraction = store_manager.lance_handler.get_by_key(
    TableNameEnum.ABSTRACTION, data_enricher_key
)
pprint_lancedb_record(data_enricher_abstraction)

{'key': 'python_etl_data_enricher',
 'language_context_key': 'python_etl',
 'name': 'Data Enricher',
 'description': 'Uses information from the incoming dataset to retrieve '
                'additional data from an external source and appends it to the '
                'dataset',
 'abstraction_type': 'data processing pattern',
 'content': 'The Data Enricher uses information from the incoming dataset '
            '(e.g., key fields) to retrieve data from an external source. '
            'After retrieval, it appends the additional data to the dataset.',
 'tags': ['transformation', 'enrichment', 'data-processing'],
 'embedding': [-0.031380683, -0.016995514, -0.035144974]}


In [22]:
data_router_key = "python_etl_data_router"
create_data_router_task = CreateAstractionTask(
    key=data_router_key,
    language_context_key="python_etl",
    name="Data Router",
    description="Examines dataset content and routes data to different processing "
    "paths based on field values or structure",
    abstraction_type="data processing pattern",
    content="The Data Router examines the dataset content and routes elements to "
    "different processing paths based on criteria such as field values, "
    "existence of fields, or data quality checks.",
    tags=["routing", "dataflow", "data-processing"],
)
store_manager.create(create_data_router_task)
data_router_abstraction = store_manager.lance_handler.get_by_key(
    TableNameEnum.ABSTRACTION, data_router_key
)
pprint_lancedb_record(data_router_abstraction)

{'key': 'python_etl_data_router',
 'language_context_key': 'python_etl',
 'name': 'Data Router',
 'description': 'Examines dataset content and routes data to different '
                'processing paths based on field values or structure',
 'abstraction_type': 'data processing pattern',
 'content': 'The Data Router examines the dataset content and routes elements '
            'to different processing paths based on criteria such as field '
            'values, existence of fields, or data quality checks.',
 'tags': ['routing', 'dataflow', 'data-processing'],
 'embedding': [-0.009200101, -0.052700326, 0.024571396]}


In [23]:
splitter_key = "python_etl_splitter"
create_splitter_task = CreateAstractionTask(
    key=splitter_key,
    language_context_key="python_etl",
    name="Dataset Splitter",
    description="Breaks composite datasets into smaller subsets or individual records "
    "for separate processing",
    abstraction_type="data processing pattern",
    content="The Dataset Splitter breaks composite datasets (e.g., nested records or "
    "batched data) into smaller subsets or individual records for distributed or "
    "sequential processing.",
    tags=["decomposition", "data-processing", "transformation"],
)
store_manager.create(create_splitter_task)
splitter_abstraction = store_manager.lance_handler.get_by_key(
    TableNameEnum.ABSTRACTION, splitter_key
)
pprint_lancedb_record(splitter_abstraction)

{'key': 'python_etl_splitter',
 'language_context_key': 'python_etl',
 'name': 'Dataset Splitter',
 'description': 'Breaks composite datasets into smaller subsets or individual '
                'records for separate processing',
 'abstraction_type': 'data processing pattern',
 'content': 'The Dataset Splitter breaks composite datasets (e.g., nested '
            'records or batched data) into smaller subsets or individual '
            'records for distributed or sequential processing.',
 'tags': ['decomposition', 'data-processing', 'transformation'],
 'embedding': [-0.02178357, -0.04705736, -0.028762724]}


##### Define Preferred Libraries for ETL

Specifies preferred libraries for ETL operations in Python:

- Polars: A high-performance DataFrame library optimized for large datasets
- DuckDB: An embedded analytical database with SQL interface and efficient data processing

In [24]:
polars_key = "python_etl_polars"
create_polars_preference_task = CreatePreferenceTask(
    key=polars_key,
    language_context_key="python_etl",
    name="polars",
    description="A fast DataFrame library for Python designed as a powerful tool for "
    "ETL tasks. It excels in efficient data transformation and loading, powered by "
    "Rust for high performance with large datasets. Adopts an imperative programming "
    "approach, ideal for developers who prefer native data manipulation and control "
    "within Python.",
    tags=["dataframe", "performance", "big data", "Rust", "imperative"],
)
store_manager.create(create_polars_preference_task)
polars_preference = store_manager.lance_handler.get_by_key(
    TableNameEnum.PREFERENCE, polars_key
)
pprint_lancedb_record(polars_preference)

{'key': 'python_etl_polars',
 'language_context_key': 'python_etl',
 'name': 'polars',
 'description': 'A fast DataFrame library for Python designed as a powerful '
                'tool for ETL tasks. It excels in efficient data '
                'transformation and loading, powered by Rust for high '
                'performance with large datasets. Adopts an imperative '
                'programming approach, ideal fo [...]',
 'tags': ['dataframe', 'performance', 'big data'],
 'embedding': [-0.020702168, 0.028024463, 0.0018910654]}


In [25]:
duckdb_key = "python_etl_duckdb"
create_duckdb_preference_task = CreatePreferenceTask(
    key=duckdb_key,
    language_context_key="python_etl",
    name="duckdb",
    description="An embedded, in-memory analytical database with columnar storage, "
    "optimized for fast analytical queries and seamless Python integration. Excels in "
    "ETL by efficiently processing diverse data formats such as CSV, Parquet, Delta, "
    "and JSON. Its lightweight design and high-speed batch processing make it ideal "
    "for cost-effective, lightweight ETL workflows. Supports a fully declarative "
    "SQL-based approach, enabling complex data operations using SQL syntax. Integrates "
    "smoothly with other tools, making it versatile for comprehensive data "
    "transformation and analysis.",
    tags=[
        "database",
        "columnar",
        "sql",
        "json",
        "ETL",
        "performance",
        "lightweight",
        "declarative",
    ],
)
store_manager.create(create_duckdb_preference_task)
duckdb_preference = store_manager.lance_handler.get_by_key(
    TableNameEnum.PREFERENCE, duckdb_key
)
pprint_lancedb_record(duckdb_preference)

{'key': 'python_etl_duckdb',
 'language_context_key': 'python_etl',
 'name': 'duckdb',
 'description': 'An embedded, in-memory analytical database with columnar '
                'storage, optimized for fast analytical queries and seamless '
                'Python integration. Excels in ETL by efficiently processing '
                'diverse data formats such as CSV, Parquet, Delta, and JSON. '
                'Its lig [...]',
 'tags': ['database', 'columnar', 'sql'],
 'embedding': [-0.04089327, -0.0075872457, 0.011690059]}


##### Implement Core ETL Functions

Provides concrete code implementation examples for key ETL operations including:

- Extract function to fetch data
- Transform function to process data using Polars DataFrame
- Load function to store processed data in SQLite database

In [26]:
extract_key = "python_etl_extract"
create_extract_task = CreateCodeTask(
    key=extract_key,
    language_context_key="python_etl",
    name="Extract Function",
    description="Fetches data from the English Wikipedia API for a given page title "
    "and returns a dictionary with the page title and extract text.",
    content="""def extract(page_title: str) -> dict[str, str]:
    \"\"\"Extract data from the English Wikipedia API for a given page title.

    Args:
        page_title (str): The exact title of the Wikipedia page to fetch.

    Returns:
        dict[str, str]: A dictionary containing the page title and the plain text
            extract.
    \"\"\"
    url = "https://en.wikipedia.org/w/api.php"
    params = {
        "action": "query",
        "format": "json",
        "prop": "extracts",
        "explaintext": True,
        "titles": page_title,
    }

    response = requests.get(url, params=params, timeout=10)
    response.raise_for_status()
    data = response.json()

    page = next(iter(data["query"]["pages"].values()))
    return {"title": page.get("title", ""), "extract": page.get("extract", "")}""",
    tags=["etl", "extract", "wikipedia", "requests"],
)
store_manager.create(create_extract_task)
extract_code = store_manager.lance_handler.get_by_key(TableNameEnum.CODE, extract_key)
pprint_lancedb_record(extract_code, fields_to_clip_at_first_line=["content"])

{'key': 'python_etl_extract',
 'language_context_key': 'python_etl',
 'name': 'Extract Function',
 'description': 'Fetches data from the English Wikipedia API for a given page '
                'title and returns a dictionary with the page title and '
                'extract text.',
 'content': 'def extract(page_title: str) -> dict[str, str]: [...]',
 'tags': ['etl', 'extract', 'wikipedia'],
 'embedding': [0.0044612205, 0.0029688897, 0.01244339]}


In [27]:
transform_key = "python_etl_transform"
create_transform_task = CreateCodeTask(
    key=transform_key,
    language_context_key="python_etl",
    name="Transform Function",
    description="Transforms extracted Wikipedia data into a Polars DataFrame and "
    "adds a column with the text length of each extract.",
    content="""def transform(data: list[dict[str, str]]) -> pl.DataFrame:
    \"\"\"Transform extracted Wikipedia data into a Polars DataFrame.

    Args:
        data (list[dict[str, str]]): A list of dictionaries with keys 'title' and
            'extract'.

    Returns:
        pl.DataFrame: A Polars DataFrame containing page titles and their text lengths.
    \"\"\"
    df = pl.DataFrame(data)
    df = df.with_columns([
        pl.col("extract").str.len_chars().alias("text_length")
    ])
    return df""",
    tags=["etl", "transform", "polars", "dataframe"],
)
store_manager.create(create_transform_task)
transform_code = store_manager.lance_handler.get_by_key(
    TableNameEnum.CODE, transform_key
)
pprint_lancedb_record(transform_code, fields_to_clip_at_first_line=["content"])

{'key': 'python_etl_transform',
 'language_context_key': 'python_etl',
 'name': 'Transform Function',
 'description': 'Transforms extracted Wikipedia data into a Polars DataFrame '
                'and adds a column with the text length of each extract.',
 'content': 'def transform(data: list[dict[str, str]]) -> pl.DataFrame: [...]',
 'tags': ['etl', 'transform', 'polars'],
 'embedding': [-0.024397362, -0.0014929816, -0.004153799]}


In [28]:
load_key = "python_etl_load"
create_load_task = CreateCodeTask(
    key=load_key,
    language_context_key="python_etl",
    name="Load Function",
    description="Loads transformed Wikipedia data from a Polars DataFrame into an "
    "SQLite database table named 'wikipedia'.",
    content="""def load(df: pl.DataFrame, db_path: str = "wikipedia.db") -> None:
    \"\"\"Load transformed data into an SQLite database.

    Args:
        df (pl.DataFrame): The Polars DataFrame containing Wikipedia data.
        db_path (str, optional): Path to the SQLite database file. Defaults to
            'wikipedia.db'.

    Returns:
        None
    \"\"\"
    conn = sqlite3.connect(db_path)
    cursor = conn.cursor()

    cursor.execute(\"\"\"
        CREATE TABLE IF NOT EXISTS wikipedia (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            title TEXT NOT NULL,
            extract TEXT,
            text_length INTEGER
        )
    \"\"\")

    cursor.executemany(
        "INSERT INTO wikipedia (title, extract, text_length) VALUES (?, ?, ?)",
        [(row["title"], row["extract"], row["text_length"]) for row in df.to_dicts()]
    )

    conn.commit()
    conn.close()""",
    tags=["etl", "load", "sqlite", "database"],
)
store_manager.create(create_load_task)
load_code = store_manager.lance_handler.get_by_key(TableNameEnum.CODE, load_key)
pprint_lancedb_record(load_code, fields_to_clip_at_first_line=["content"])

{'key': 'python_etl_load',
 'language_context_key': 'python_etl',
 'name': 'Load Function',
 'description': 'Loads transformed Wikipedia data from a Polars DataFrame into '
                "an SQLite database table named 'wikipedia'.",
 'content': 'def load(df: pl.DataFrame, db_path: str = "wikipedia.db") -> '
            'None: [...]',
 'tags': ['etl', 'load', 'sqlite'],
 'embedding': [0.035206925, 0.047198765, -0.023641724]}
