Skip to content

Commit

Permalink
udpate example CLI
Browse files Browse the repository at this point in the history
  • Loading branch information
nick-roberson committed Apr 22, 2024
1 parent bdd3958 commit cc0dcc1
Show file tree
Hide file tree
Showing 6 changed files with 147 additions and 102 deletions.
90 changes: 6 additions & 84 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,88 +16,10 @@ Poetry is used to manage dependencies. To install the dependencies, run the foll

## Usage

The point of this example is to show how the framework can be used to organize and run computation blocks on an input dataframe. The example below shows how to create a sequential runner that performs the following operations, opting to do some in parallel where possible:
```python
import pandas as pd
from src.blocks.average.average import AverageBlock, AverageBlockParams
from src.blocks.block_base import BlockBase
from src.blocks.prepare.prepare import PrepareBlock
from src.runners.parallel_runner import ParallelRunner
from src.runners.sequential_runner import SequentialRunner

# Define test data
TEST_DATA = pd.DataFrame(
{
"ColumnA": list(range(10)),
"ColumnB": list(range(10, 20))
}
)


# Setup for test blocks
class AddFiveBlock(BlockBase):
""" Dummy block that adds 5 to all values in ColumnA """
def __call__(self, input_df: pd.DataFrame):
# Adds 5 to all values in ColumnA
result_df = input_df.copy()
result_df["column_a"] += 5
return result_df


class MultiplyTwoBlock(BlockBase):
""" Dummy block that multiplies all values in ColumnB by 2 """
def __call__(self, input_df: pd.DataFrame):
# Multiplies all values in ColumnB by 2
result_df = input_df.copy()
result_df["column_b"] *= 2
return result_df

def sample():
""" Sample function to display the usage of the framework. """
# Create a sequential runner to perform some operations
sequential_runner = SequentialRunner(
block_map={
# Create id column, convert ColumnA to column_a, ColumnB to column_b
0: PrepareBlock(),
# Add 5 to ColumnA in parallel
2: ParallelRunner(
block=AddFiveBlock(),
chunk_size=5, # Splitting the data into two chunks
use_thread_pool=True,
),
# Multiply ColumnB by 2 in parallel
3: ParallelRunner(
block=MultiplyTwoBlock(),
chunk_size=5, # Splitting the data into two chunks
use_thread_pool=True,
),
4: SequentialRunner(block_map={
1: AverageBlock(
params=AverageBlockParams(column_mapping={"column_a": "column_a_avg"})
)
}),
}
)
return sequential_runner(TEST_DATA)

# Run sample and preview the result
result = sample()
print(f"Cols: {result.columns}")
""" Output:
Cols: Index(['id', 'column_a', 'column_b', 'column_a_avg'], dtype='object')
"""
print(f"Head: {result.head(10)}")
""" Output:
column_a column_b id
0 5 20 73d9d92cd0c229d2df3fb4db4c0eed7ab0977b05a7d102...
1 6 22 9bc4cd485fe277f9c2c16a9bc1cfea81db2a3ed294cc4b...
2 7 24 d362ce1155cb0aedb2a9fa3203cb614326ce2cec98bb82...
3 8 26 6f322ab92247df40b028834dd641302b08134a4afad791...
4 9 28 8fb0ab3bf31047c1d3fe24eb7fd15e13a7d1043939f60e...
5 10 30 89512ae1404ffa1a8ad68993f8a1fb2f2d243814c6ff23...
6 11 32 fb88400d2b8ff1c86694ab22d884309ae39f3da8ef26a3...
7 12 34 c1cf06a78a87c1ab962a7c8df3c13385c8205c3f751179...
8 13 36 554e301e92851d827f2705991e7e377687223ec1c9f653...
9 14 38 bc24c3a5ce2dbf73d9c548ca238b54efd2e8ecea655fcd...
"""
To view an example of this framework in action, run the following command:
```bash
% poetry run python example
```
This will run the example.py file, which demonstrates how to use the framework and comes with a few simple and more complex examples.

To view the code, just open the example.py file in a text editor.
102 changes: 102 additions & 0 deletions example.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
import logging

import pandas as pd
import typer
from pydantic import BaseModel

from src.blocks.average.average import AverageBlock, AverageBlockParams
from src.blocks.block_base import BlockBase
from src.blocks.prepare.prepare import PrepareBlock
from src.runners.parallel_runner import ParallelRunner
from src.runners.sequential_runner import SequentialRunner

app = typer.Typer()


def init_logging():
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)


class AddNBlockParams(BaseModel):
n: int


class AddNBlock(BlockBase):
params: AddNBlockParams

def __call__(self, input_df: pd.DataFrame):
logging.info(f"Adding {self.params.n} to all values in 'column_a'")
result_df = input_df.copy()
result_df["column_a"] += self.params.n
return result_df


class MultiplyBYNBlockParams(BaseModel):
n: int


class MultiplyByNBlock(BlockBase):
params: MultiplyBYNBlockParams

def __call__(self, input_df: pd.DataFrame):
logging.info(f"Multiplying all values in 'column_b' by {self.params.n}")
result_df = input_df.copy()
result_df["column_b"] *= self.params.n
return result_df


@app.command()
def example():
"""Run a simple example of a computation workflow."""
init_logging()
logging.info("Starting the computation sequence.")

test_data = pd.DataFrame(
{"ColumnA": list(range(10)), "ColumnB": list(range(10, 20))}
)
logging.info(f"Running computation on data:")
logging.info(f"{test_data.head(10)}")

sequential_runner = SequentialRunner(
block_map={
0: PrepareBlock(),
2: ParallelRunner(
block=AddNBlock(params=AddNBlockParams(n=5)),
chunk_size=5,
use_thread_pool=True,
),
3: ParallelRunner(
block=MultiplyByNBlock(params=MultiplyBYNBlockParams(n=2)),
chunk_size=5,
use_thread_pool=True,
),
4: SequentialRunner(
block_map={
1: AverageBlock(
params=AverageBlockParams(
column_mapping={"column_a": "column_a_avg"}
)
)
}
),
}
)

# Run the computation
result = sequential_runner(test_data)

# Cleanup before viewing
result = result.sort_values(by="column_a")
result = result.reset_index(drop=True)

logging.info("Completed all computations.")
print(f"Cols: {result.columns}")
print(f"Head: {result.head(10)}")


if __name__ == "__main__":
app()
16 changes: 0 additions & 16 deletions main.py

This file was deleted.

30 changes: 29 additions & 1 deletion poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ pydantic = "^2.6.4"
typing-extensions = "^4.10.0"
coverage = "^7.4.4"
pytest = "^8.1.1"
typer = "^0.12.3"


[build-system]
Expand Down
10 changes: 9 additions & 1 deletion src/blocks/prepare/prepare.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
# My Imports
from src.blocks.block_base import BlockBase

HASH_LENGTH: int = 16


def to_snake_case(s: str) -> str:
"""Converts a string to snake_case."""
Expand All @@ -16,7 +18,8 @@ def to_snake_case(s: str) -> str:

def hash_row_values(row: pd.Series) -> str:
"""Hashes the values of a DataFrame row using SHA-256 and returns the hash as a hex string."""
return hashlib.sha256("".join(str(row.values)).encode()).hexdigest()
hash_val = hashlib.sha256("".join(str(row.values)).encode()).hexdigest()
return hash_val[:HASH_LENGTH]


def convert_columns_to_snake_case(df: pd.DataFrame) -> pd.DataFrame:
Expand Down Expand Up @@ -53,4 +56,9 @@ def run(self, input_df: pd.DataFrame) -> pd.DataFrame:
# Delete duplicate rows
output_df.drop_duplicates(inplace=True)

# Reorder cols to put ID_COL first
cols = output_df.columns.tolist()
cols.remove(self.ID_COL)
output_df = output_df[[self.ID_COL] + cols]

return output_df

0 comments on commit cc0dcc1

Please sign in to comment.