# aMapReduce Framework

Agentics enable scalable execution of LLM workflows by implementing a MapReduce framework which enable the async use of LLM blended with regular python code.

In [1]:
! uv pip install agentics-py


import os
from pathlib import Path
import sys
from getpass import getpass

from dotenv import find_dotenv, load_dotenv

CURRENT_PATH = ""

IN_COLAB = "google.colab" in sys.modules
print("In Colab:", IN_COLAB)


if IN_COLAB:
    CURRENT_PATH = "/content/drive/MyDrive/"
    # Mount your google drive
    load_dotenv("/content/drive/MyDrive/.env")
    from google.colab import drive

    drive.mount("/content/drive")
else:
    load_dotenv(find_dotenv())

if not os.getenv("GEMINI_API_KEY"):
    os.environ["GEMINI_API_KEY"] = getpass("Enter your GEMINI_API_KEY:")

base = Path(CURRENT_PATH)

[2mUsing Python 3.12.9 environment at: /Users/gliozzo/Code/agentics0.0.6/Agentics/.venv[0m
[2K[2mResolved [1m249 packages[0m [2min 36ms[0m[0m                                        [0m
[2mUninstalled [1m2 packages[0m [2min 1ms[0m[0m
[2K[2mInstalled [1m1 package[0m [2min 1ms[0m[0m                                  [0m
 [31m-[39m [1magentics-py[0m[2m==0.0.4.post1.dev0+b2e926b (from file:///Users/gliozzo/Code/agentics0.0.6/Agentics)[0m
 [33m~[39m [1magentics-py[0m[2m==0.0.10[0m
In Colab: False


Let us first define an aType to represent StockMarket Data for the DowJones index, and populate it with historical data

In [7]:
from agentics import Agentics as AG
from typing import Optional
from pydantic import BaseModel, Field
from datetime import datetime
## Define the data model for stock market data


class StockMarketState(BaseModel):   
    ticker: str = Field(..., description="The stock ticker symbol.")
    date: Optional[datetime] = None
    open: Optional[float] = Field(None, ge=0)
    high: Optional[float] = None
    low: Optional[float] = None
    daily_range: Optional[float] = Field(None, 
        description="""The difference between the high and low prices for the day.""")
    News: Optional[list[str]] = Field([], 
        description="""A list of news headlines relevant to the stock for the day.""")
    Explanation_report: Optional[str] = Field(None,
        description="A detailed explanation of the stock market state for the day.")


       
    
## import the data
dj_data = AG.from_csv(base / "data/dow_jones.csv", atype=StockMarketState)

2025-09-05 09:39:54.462 | DEBUG    | agentics.core.agentics:from_csv:312 - Importing Agentics of type StockMarketState from CSV data/dow_jones.csv


## Amap

Amap functions enable async execution of functions over all the states of an AG. Agentics supports 1:1 maps that maps all states of an AG into states of the same type.

In the following example we define a simple function to compute the daily_range of the stock and we pass that to an amap fuction which applies that to all states asyncronously

In [8]:
## Note that input and output are both StockMarketState objects
async def get_daily_variation_percentage(state: StockMarketState) -> StockMarketState:
    state.daily_range = (float(state.High) - float(state.Low)) / float(state.Low) * 100
    return state

## Apply the function to all states using amap
dj_data.batch_size = 100
dj_data = await dj_data.amap(get_daily_variation_percentage)

for state in dj_data[:3]: 
    print(f"Date: {state.Date}, Daily Range: {state.daily_range}")

2025-09-05 09:40:58.753 | DEBUG    | agentics.core.agentics:amap:206 - Executing amap on function <function get_daily_variation_percentage at 0x126bb6d40>
2025-09-05 09:40:58.756 | DEBUG    | agentics.core.agentics:amap:231 - 100 states processed. 1.6341209411621095e-05 seconds average per state in the last chunk ...
2025-09-05 09:40:58.757 | DEBUG    | agentics.core.agentics:amap:231 - 200 states processed. 9.36269760131836e-06 seconds average per state in the last chunk ...
2025-09-05 09:40:58.759 | DEBUG    | agentics.core.agentics:amap:231 - 300 states processed. 9.570121765136719e-06 seconds average per state in the last chunk ...
2025-09-05 09:40:58.760 | DEBUG    | agentics.core.agentics:amap:231 - 400 states processed. 7.398128509521485e-06 seconds average per state in the last chunk ...
2025-09-05 09:40:58.761 | DEBUG    | agentics.core.agentics:amap:231 - 500 states processed. 9.279251098632813e-06 seconds average per state in the last chunk ...
2025-09-05 09:40:58.762 | DEBU

Date: 2016-07-01, Daily Range: 0.47703930117312493
Date: 2016-06-30, Daily Range: 1.2353831025172834
Date: 2016-06-29, Daily Range: 1.423521751672572


## aReduce

Reduce functions enable executing operations on the entire list of elements (states) within an Agentics group. Although reduce operations are intrinsically synchronous—since they consider all states at once—they are defined as async functions to allow for internal async calls (such as fetching news or running LLMs).

In the following example we will use a reduce function to analyze get the top 10 days with highest variation in the market

In [9]:
async def get_highest_volatility_days(states:list[StockMarketState]) -> list[StockMarketState]:
    
    # sort the states by volatility and return the top 10, define a new AG with these states
    return sorted(states, 
                key=lambda x: abs(x.daily_range) if x.daily_range is not None else 0, 
                reverse=True)[:10]

# apply the reduce function to get the top 10 days with highest volatility
highest_volatility_days = await dj_data.areduce(get_highest_volatility_days)
print(highest_volatility_days.pretty_print())

Atype : <class '__main__.StockMarketState'>
Date: '2008-10-10'
Open: 8568.669922
High: 8901.280273
Low: 7882.509766
Close: 8451.19043
Volume: 674920000
Adj_Close: null
daily_range: 12.924443321266926
News: null
Explanation_report: null

Date: '2008-11-13'
Open: 8281.139648
High: 8876.589844
Low: 7965.419922
Close: 8835.25
Volume: 476600000
Adj_Close: null
daily_range: 11.439069514507388
News: null
Explanation_report: null

Date: '2008-10-13'
Open: 8462.419922
High: 9427.990234
Low: 8462.179688
Close: 9387.610352
Volume: 399290000
Adj_Close: null
daily_range: 11.413259722782675
News: null
Explanation_report: null

Date: '2008-10-28'
Open: 8178.720215
High: 9082.080078
Low: 8174.72998
Close: 9065.120117
Volume: 372160000
Adj_Close: null
daily_range: 11.099450382090795
News: null
Explanation_report: null

Date: '2010-05-06'
Open: 10868.120117
High: 10879.759766
Low: 9869.620117
Close: 10520.320312
Volume: 459890000
Adj_Close: null
daily_range: 10.23483819058118
News: null
Explanation_repo

## Complex AMAPs

aMaps function can contain external API and LLM calls. This way we can use agentics as a scaleout frameworks for complex workflows. 

In [10]:
from ddgs import DDGS

## Define a function to get news for a given date using the DDGS search engine
## Note that the similar functionalities can be implemented using MCP tools in AGs
async def get_news(state):
    state.News=str(DDGS().text(f"What happended to the stock market and dow jones on {state.Date}", max_results=10))
    return state    

## set the batch size for the amap function to 5 (only 10 states will be processed)
highest_volatility_days.batch_size = 10

# Now get news for the top 10 days with highest volatility using amap
highest_volatility_days = await highest_volatility_days.amap(get_news)

# print the first result for brevity
print(f"Date: {highest_volatility_days[0].Date}, Daily Range: {highest_volatility_days[0].daily_range}, News: {highest_volatility_days[0].News[:200]}...")

2025-09-05 09:42:14.165 | DEBUG    | agentics.core.agentics:amap:206 - Executing amap on function <function get_news at 0x120491ee0>
2025-09-05 09:42:29.293 | DEBUG    | agentics.core.agentics:amap:231 - 10 states processed. 1.5126305103302002 seconds average per state in the last chunk ...


Date: 2008-10-10, Daily Range: 12.924443321266926, News: [{'title': 'Global financial crisis in October 2008 - Wikipedia', 'href': 'https://en.wikipedia.org/wiki/Global_financial_crisis_in_October_2008', 'body': 'The Dow ended the day losing only 128 points...


Now let's use self transduction to provide an explanation for the market volatility

In [11]:
from agentics.core.llm_connections import get_llm_provider
highest_volatility_days.instructions = """Explain the reasons why the market went down or up 
given the high volatility in the stock market on this day based on the news provided. 
Provide a concise summary."""
highest_volatility_days.llm= get_llm_provider() ## You can choose between "openai", "watsonx", "gemini", "vllm_crewai"
highest_volatility_explanations = await highest_volatility_days.self_transduction(
["Date", "Open", "High", "Low", "Close", "Volume", "daily_range", "News"],["Explanation_report"])

for state in highest_volatility_explanations: 
    print(f"Date: {state.Date}, Daily Range: {state.daily_range}\nExplanation: {state.Explanation_report}...")

2025-09-05 09:43:09.659 | DEBUG    | agentics.core.llm_connections:get_llm_provider:29 - No LLM provider specified. Using the first available provider.
2025-09-05 09:43:09.659 | DEBUG    | agentics.core.llm_connections:get_llm_provider:31 - Available LLM providers: ['watsonx', 'gemini', 'openai']. Using 'watsonx'
2025-09-05 09:43:09.660 | DEBUG    | agentics.core.agentics:__lshift__:518 - Executing task: Explain the reasons why the market went down or up 
given the high volatility in the stock market on this day based on the news provided. 
Provide a concise summary.
10 states will be transduced
2025-09-05 09:43:09.661 | DEBUG    | agentics.core.agentics:__lshift__:612 - transducer class: <class 'agentics.abstractions.pydantic_transducer.PydanticTransducerCrewAI'>
2025-09-05 09:43:17.024 | DEBUG    | agentics.core.agentics:__lshift__:648 - Processed 10 states in 7.3627002239227295 seconds
2025-09-05 09:43:17.024 | DEBUG    | agentics.core.agentics:__lshift__:700 - 10 states processed i

Date: 2008-10-10, Daily Range: 12.924443321266926
Explanation: The market went down on October 10, 2008, due to the global financial crisis. The Dow Jones Industrial Average fell by 7.3% to close at 8,451.19, its lowest level in five years. This decline was part of a larger trend of losses in the stock market, with the Dow having fallen by 18% over the course of the week and 40% from its record high in October 2007. The crisis was triggered by a speculative bubble in the housing market, which led to a surge in defaults on subprime mortgage-backed securities. As a result, banks and other financial institutions suffered significant losses, leading to a credit crunch and a decline in investor confidence. The situation was exacerbated by the collapse of Lehman Brothers and the subsequent bailout of American International Group (AIG). The news articles from the time period, including those from The New York Times, Wikipedia, and CNBC, all point to the same conclusion: the stock market crash

## Well Done
You are now fully equipped to work with agentics and apply it to your data.
Congratulations and please contribute back to the community if you feel this is exciting. 