# aMapReduce Framework

Agentics enable scalable execution of LLM workflows by implementing a MapReduce framework which enable the async use of LLM blended with regular python code.

In [None]:
! uv pip install agentics-py

CURRENT_PATH=""
import os
from dotenv import load_dotenv, find_dotenv
import sys

IN_COLAB = 'google.colab' in sys.modules
print("In Colab:", IN_COLAB)

if IN_COLAB: 
    CURRENT_PATH = "/content/drive/MyDrive/"
    # Mount your google drive
    load_dotenv("/content/drive/MyDrive/.env")
    from google.colab import drive
    drive.mount('/content/drive')
else: load_dotenv(find_dotenv())



if not os.getenv("GEMINI_API_KEY"):
    os.getenv["GEMINI_API_KEY"] = input("Enter your GEMINI_API_KEY:")

In Colab: False


Let us first define an aType to represent StockMarket Data for the DowJones index, and populate it with historical data

In [3]:
from agentics import Agentics as AG
from typing import Optional
from pydantic import BaseModel

## Define the data model for stock market data


class StockMarketState(BaseModel):   
    Date: Optional[str] = None
    Open: Optional[float] = None
    High: Optional[float] = None
    Low: Optional[float] = None
    Close: Optional[float]  = None
    Volume: Optional[int] = None
    Adj_Close: Optional[float] = None
    daily_range: Optional[float] = None
    News: Optional[str] = None
    Explanation_report: Optional[str] = None


       
    
## import the data
dj_data = AG.from_csv("data/dow_jones.csv", atype=StockMarketState)

2025-09-05 07:20:05.023 | DEBUG    | agentics.core.llm_connections:<module>:90 - AGENTICS is connecting to the following LLM API providers:
2025-09-05 07:20:05.023 | DEBUG    | agentics.core.llm_connections:<module>:93 - 0 - WatsonX
2025-09-05 07:20:05.023 | DEBUG    | agentics.core.llm_connections:<module>:98 - 1 - Gemini
2025-09-05 07:20:05.024 | DEBUG    | agentics.core.llm_connections:<module>:102 - 2 - OpenAI
2025-09-05 07:20:05.024 | DEBUG    | agentics.core.llm_connections:<module>:104 - Please add API keys in .env file to add or disconnect providers.
2025-09-05 07:20:05.030 | DEBUG    | agentics.core.llm_connections:get_llm_provider:29 - No LLM provider specified. Using the first available provider.
2025-09-05 07:20:05.030 | DEBUG    | agentics.core.llm_connections:get_llm_provider:31 - Available LLM providers: ['watsonx', 'gemini', 'openai']. Using 'watsonx'
2025-09-05 07:20:05.033 | DEBUG    | agentics.core.agentics:from_csv:312 - Importing Agentics of type StockMarketState f

## Amap

Amap functions enable async execution of functions over all the states of an AG. Agentics supports 1:1 maps that maps all states of an AG into states of the same type.

In the following example we define a simple function to compute the daily_range of the stock and we pass that to an amap fuction which applies that to all states asyncronously

In [4]:
## Note that input and output are both StockMarketState objects
async def get_daily_variation_percentage(state: StockMarketState) -> StockMarketState:
    state.daily_range = (float(state.High) - float(state.Low)) / float(state.Low) * 100
    return state

## Apply the function to all states using amap
dj_data.batch_size = 100
dj_data = await dj_data.amap(get_daily_variation_percentage)

for state in dj_data[:3]: 
    print(f"Date: {state.Date}, Daily Range: {state.daily_range}")

2025-09-05 07:20:05.045 | DEBUG    | agentics.core.agentics:amap:206 - Executing amap on function <function get_daily_variation_percentage at 0x10885df80>
2025-09-05 07:20:05.046 | DEBUG    | agentics.core.agentics:amap:231 - 100 states processed. 9.627342224121094e-06 seconds average per state in the last chunk ...
2025-09-05 07:20:05.047 | DEBUG    | agentics.core.agentics:amap:231 - 200 states processed. 9.059906005859375e-06 seconds average per state in the last chunk ...
2025-09-05 07:20:05.048 | DEBUG    | agentics.core.agentics:amap:231 - 300 states processed. 6.611347198486328e-06 seconds average per state in the last chunk ...
2025-09-05 07:20:05.049 | DEBUG    | agentics.core.agentics:amap:231 - 400 states processed. 5.6290626525878905e-06 seconds average per state in the last chunk ...
2025-09-05 07:20:05.050 | DEBUG    | agentics.core.agentics:amap:231 - 500 states processed. 6.568431854248047e-06 seconds average per state in the last chunk ...
2025-09-05 07:20:05.051 | DEB

Date: 2016-07-01, Daily Range: 0.47703930117312493
Date: 2016-06-30, Daily Range: 1.2353831025172834
Date: 2016-06-29, Daily Range: 1.423521751672572


## aReduce

Reduce functions enable executing operations on the entire list of elements (states) within an Agentics group. Although reduce operations are intrinsically synchronous—since they consider all states at once—they are defined as async functions to allow for internal async calls (such as fetching news or running LLMs).

In the following example we will use a reduce function to analyze get the top 10 days with highest variation in the market

In [5]:
async def get_highest_volatility_days(states:list[StockMarketState]) -> list[StockMarketState]:
    
    # sort the states by volatility and return the top 10, define a new AG with these states
    return sorted(states, 
                key=lambda x: abs(x.daily_range) if x.daily_range is not None else 0, 
                reverse=True)[:10]

# apply the reduce function to get the top 10 days with highest volatility
highest_volatility_days = await dj_data.areduce(get_highest_volatility_days)
print(highest_volatility_days.pretty_print())

Atype : <class '__main__.StockMarketState'>
Date: '2008-10-10'
Open: 8568.669922
High: 8901.280273
Low: 7882.509766
Close: 8451.19043
Volume: 674920000
Adj_Close: null
daily_range: 12.924443321266926
News: null
Explanation_report: null

Date: '2008-11-13'
Open: 8281.139648
High: 8876.589844
Low: 7965.419922
Close: 8835.25
Volume: 476600000
Adj_Close: null
daily_range: 11.439069514507388
News: null
Explanation_report: null

Date: '2008-10-13'
Open: 8462.419922
High: 9427.990234
Low: 8462.179688
Close: 9387.610352
Volume: 399290000
Adj_Close: null
daily_range: 11.413259722782675
News: null
Explanation_report: null

Date: '2008-10-28'
Open: 8178.720215
High: 9082.080078
Low: 8174.72998
Close: 9065.120117
Volume: 372160000
Adj_Close: null
daily_range: 11.099450382090795
News: null
Explanation_report: null

Date: '2010-05-06'
Open: 10868.120117
High: 10879.759766
Low: 9869.620117
Close: 10520.320312
Volume: 459890000
Adj_Close: null
daily_range: 10.23483819058118
News: null
Explanation_repo

## Complex AMAPs

aMaps function can contain external API and LLM calls. This way we can use agentics as a scaleout frameworks for complex workflows. 

In [6]:
from ddgs import DDGS

## Define a function to get news for a given date using the DDGS search engine
## Note that the similar functionalities can be implemented using MCP tools in AGs
async def get_news(state):
    state.News=str(DDGS().text(f"What happended to the stock market and dow jones on {state.Date}", max_results=10))
    return state    

## set the batch size for the amap function to 5 (only 10 states will be processed)
highest_volatility_days.batch_size = 10

# Now get news for the top 10 days with highest volatility using amap
highest_volatility_days = await highest_volatility_days.amap(get_news)

# print the first result for brevity
print(f"Date: {highest_volatility_days[0].Date}, Daily Range: {highest_volatility_days[0].daily_range}, News: {highest_volatility_days[0].News[:200]}...")

2025-09-05 07:20:05.097 | DEBUG    | agentics.core.agentics:amap:206 - Executing amap on function <function get_news at 0x10888ef20>
2025-09-05 07:20:14.302 | DEBUG    | agentics.core.agentics:amap:231 - 10 states processed. 0.9204371929168701 seconds average per state in the last chunk ...


Date: 2008-10-10, Daily Range: 12.924443321266926, News: [{'title': 'How Are We Doing On Our Stimulus Plan Stocks So Far? -', 'href': 'https://www.altenergystocks.com/archives/2008/12/how_are_we_doing_on_our_stimulus_plan_stocks_so_far_1/', 'body': 'Charles...


Now let's use self transduction to provide an explanation for the market volatility

In [7]:
from agentics.core.llm_connections import get_llm_provider
highest_volatility_days.instructions = """Explain the reasons why the market went down or up 
given the high volatility in the stock market on this day based on the news provided. 
Provide a concise summary."""
highest_volatility_days.llm= get_llm_provider() ## You can choose between "openai", "watsonx", "gemini", "vllm_crewai"
highest_volatility_explanations = await highest_volatility_days.self_transduction(
["Date", "Open", "High", "Low", "Close", "Volume", "daily_range", "News"],["Explanation_report"])

for state in highest_volatility_explanations: 
    print(f"Date: {state.Date}, Daily Range: {state.daily_range}\nExplanation: {state.Explanation_report}...")

2025-09-05 07:20:14.307 | DEBUG    | agentics.core.llm_connections:get_llm_provider:29 - No LLM provider specified. Using the first available provider.
2025-09-05 07:20:14.307 | DEBUG    | agentics.core.llm_connections:get_llm_provider:31 - Available LLM providers: ['watsonx', 'gemini', 'openai']. Using 'watsonx'
2025-09-05 07:20:14.308 | DEBUG    | agentics.core.agentics:__lshift__:518 - Executing task: Explain the reasons why the market went down or up 
given the high volatility in the stock market on this day based on the news provided. 
Provide a concise summary.
10 states will be transduced
2025-09-05 07:20:14.309 | DEBUG    | agentics.core.agentics:__lshift__:612 - transducer class: <class 'agentics.abstractions.pydantic_transducer.PydanticTransducerCrewAI'>
2025-09-05 07:20:22.609 | DEBUG    | agentics.core.agentics:__lshift__:648 - Processed 10 states in 8.300536155700684 seconds
2025-09-05 07:20:22.610 | DEBUG    | agentics.core.agentics:__lshift__:700 - 10 states processed in

Date: 2008-10-10, Daily Range: 12.924443321266926
Explanation: The market went down on this day due to high volatility, which can be attributed to various news articles indicating a decline in investor confidence. The news articles provided suggest a bearish trend, with mentions of the collapse of large financial institutions, downturns in major stock markets, and a general sense of uncertainty. The article 'Bear Sterns Collapse how it happend and what it caused - Essay' specifically mentions the collapse of Bear Stearns and its impact on the global financial crisis, which likely contributed to the market's decline. Additionally, the article 'Royal Oasis Lender Lehman Brothers Collapses | Bahamaspress.com' discusses the collapse of Lehman Brothers, which would have further eroded investor confidence and contributed to the market's downturn. The '10 Signs Of A Stock Market Bottom' article suggests that the market may be nearing a bottom, but this is not enough to counteract the overall 

## Well Done
You are now fully equipped to work with agentics and apply it to your data.
Congratulations and please contribute back to the community if you feel this is exciting. 