# Concurrency with Pandas

`pd.DataFrame.aio` is an accessor that provides an async interface to OpenAI's API. It is designed to be used with Python's `asyncio` library, allowing you to make non-blocking concurrent requests to the OpenAI API.

In [2]:
import os
from enum import Enum

import pandas as pd
from openai import AsyncOpenAI, OpenAI
from pydantic import BaseModel, Field

from openaivec import pandas_ext

pandas_ext.set_client(OpenAI())
pandas_ext.set_async_client(AsyncOpenAI())
pandas_ext.set_responses_model("gpt-4.1-mini")
pandas_ext.set_embeddings_model("text-embedding-3-small")

In [3]:
files_dict = {"path": []}
for root, dirs, files in os.walk('../../src/openaivec'):
    for file in files:
        path = os.path.join(root, file)
        if "__" not in path:
            files_dict["path"].append(os.path.join(root, file))

implements_df = pd.DataFrame(files_dict).assign(
    module=lambda df: df["path"].str.split("/")
        .map(lambda x: x[3:])
        .map(lambda x: ".".join(x))
        .map(lambda x: x.replace(".py", "")),
)

In [4]:
implements_df

Unnamed: 0,path,module
0,../../src/openaivec/_di.py,openaivec._di
1,../../src/openaivec/_provider.py,openaivec._provider
2,../../src/openaivec/_prompt.py,openaivec._prompt
3,../../src/openaivec/_responses.py,openaivec._responses
4,../../src/openaivec/_serialize.py,openaivec._serialize
5,../../src/openaivec/_util.py,openaivec._util
6,../../src/openaivec/pandas_ext.py,openaivec.pandas_ext
7,../../src/openaivec/spark.py,openaivec.spark
8,../../src/openaivec/_model.py,openaivec._model
9,../../src/openaivec/_log.py,openaivec._log


In [5]:
class OpjectType(str, Enum):
    FUNCTION = "function"
    CLASS = "class"
    VARIABLE = "variable"

class Question(BaseModel):
    question: str = Field(description="The specific question related to the code section.")
    answer: str = Field(description="The corresponding answer explaining the code aspect.")

class Section(BaseModel):
    name: str = Field(description="The name of the function or class being documented.")
    type: OpjectType = Field(description="The type of the code section, either a function or a class.")
    description: str = Field(description="A concise summary of the function or class's purpose and functionality.")
    questions: list[Question] = Field(description="A list of Q&A pairs clarifying aspects of this code section.")

class Document(BaseModel):
    sections: list[Section] = Field(description="A list of sections, each documenting a specific function or class.")

Note that async methods are not available in lambda functions. We can use `aio.assign` instead of `assign` to use an async function in a lambda function.
And top-level await is allowed in notebook cells.

In [6]:
docs_df = await implements_df.aio.assign(
    code=lambda df: df["path"].map(lambda x: open(x).read()),
    doc=lambda df: df["code"].aio.responses(
        instructions="Document the code in detail, including a summary, Q&A pairs, and explanations.",
        response_format=Document,
        batch_size=1,
    ),
)


Processing batches:   0%|          | 0/28 [00:00<?, ?item/s]

In [7]:
docs_df

Unnamed: 0,path,module,code,doc
0,../../src/openaivec/_di.py,openaivec._di,from collections.abc import Callable\nfrom dat...,sections=[Section(name='CircularDependencyErro...
1,../../src/openaivec/_provider.py,openaivec._provider,import os\nimport warnings\n\nimport tiktoken\...,sections=[Section(name='_check_azure_v1_api_ur...
2,../../src/openaivec/_prompt.py,openaivec._prompt,"""""""\nThis module provides a builder for creati...","sections=[Section(name='Example', type=<Opject..."
3,../../src/openaivec/_responses.py,openaivec._responses,import warnings\nfrom dataclasses import datac...,sections=[Section(name='_handle_temperature_er...
4,../../src/openaivec/_serialize.py,openaivec._serialize,"""""""Refactored serialization utilities for Pyda...","sections=[Section(name='serialize_base_model',..."
5,../../src/openaivec/_util.py,openaivec._util,import asyncio\nimport functools\nimport re\ni...,sections=[Section(name='get_exponential_with_c...
6,../../src/openaivec/pandas_ext.py,openaivec.pandas_ext,"""""""Pandas Series / DataFrame extension for Ope...",sections=[Section(name='_df_rows_to_json_serie...
7,../../src/openaivec/spark.py,openaivec.spark,"""""""Asynchronous Spark UDFs for the OpenAI and ...","sections=[Section(name='setup', type=<OpjectTy..."
8,../../src/openaivec/_model.py,openaivec._model,"from dataclasses import dataclass, field\nfrom...","sections=[Section(name='PreparedTask', type=<O..."
9,../../src/openaivec/_log.py,openaivec._log,import functools\nimport json\nimport time\nim...,"sections=[Section(name='observe', type=<Opject..."


In [8]:
questions_df = await docs_df.aio.pipe(
    lambda df: df
    .drop(columns=["code"])
    .ai.extract("doc")
    .explode("doc_sections")
    .ai.extract("doc_sections")
    .explode("doc_sections_questions")
    .ai.extract("doc_sections_questions")
    .reset_index(drop=True)
    .aio.assign(
        doc_sections_type=lambda df: df.doc_sections_type.map(lambda x: x.value),
        embedding=lambda df: df["doc_sections_questions_question"].aio.embeddings()
    )
)



Processing batches:   0%|          | 0/326 [00:00<?, ?item/s]

In [9]:
questions_df

Unnamed: 0,path,module,doc_sections_name,doc_sections_type,doc_sections_description,doc_sections_questions_question,doc_sections_questions_answer,embedding
0,../../src/openaivec/_di.py,openaivec._di,CircularDependencyError,class,Exception raised when the container detects a ...,What causes a CircularDependencyError to be ra...,It is raised when during resolution the contai...,"[-0.027163064, 0.033928107, 0.07552155, 0.0463..."
1,../../src/openaivec/_di.py,openaivec._di,CircularDependencyError,class,Exception raised when the container detects a ...,How can I handle a circular dependency in my s...,You should refactor your services to break the...,"[-0.04108237, -0.0004586271, 0.055678822, -0.0..."
2,../../src/openaivec/_di.py,openaivec._di,ProviderError,class,Exception raised when a provider function fail...,When is ProviderError raised?,It is raised if the provider callable throws a...,"[-0.06448649, -0.027017353, 0.0700168, 0.04274..."
3,../../src/openaivec/_di.py,openaivec._di,ProviderError,class,Exception raised when a provider function fail...,How can I debug a ProviderError?,Check the inner exception and the error messag...,"[-0.066516794, -0.025948327, 0.03345204, 0.011..."
4,../../src/openaivec/_di.py,openaivec._di,Provider,variable,Type alias for a callable that takes no argume...,What is a Provider in this context?,"A callable that, when invoked, creates and ret...","[-0.04972546, -0.061839428, 0.03576001, 0.0449..."
...,...,...,...,...,...,...,...,...
325,../../src/openaivec/task/nlp/dependency_parsin...,openaivec.task.nlp.dependency_parsing,DependencyParsing,class,Models the full dependency parsing output for ...,What information does 'dependencies' hold?,It holds a list of DependencyRelation instance...,"[-0.023963861, 0.019676488, 0.0939815, -0.0201..."
326,../../src/openaivec/task/nlp/dependency_parsin...,openaivec.task.nlp.dependency_parsing,DependencyParsing,class,Models the full dependency parsing output for ...,How is 'root_word' defined?,It identifies the root word of the sentence's ...,"[-0.009704481, 0.011894519, 0.08572626, 0.0383..."
327,../../src/openaivec/task/nlp/dependency_parsin...,openaivec.task.nlp.dependency_parsing,DependencyParsing,class,Models the full dependency parsing output for ...,What type of data is 'syntactic_structure'?,A string representing a tree-like structure th...,"[0.015374741, 0.029959805, 0.019000068, -0.040..."
328,../../src/openaivec/task/nlp/dependency_parsin...,openaivec.task.nlp.dependency_parsing,DEPENDENCY_PARSING,variable,A pre-configured PreparedTask instance for per...,What is PreparedTask?,It is a utility class (imported) designed to e...,"[-0.012449127, -0.009066647, 0.009980651, 0.00..."
