In [6]:
%pip install multiprocess

Collecting multiprocess
  Downloading multiprocess-0.70.17-py310-none-any.whl (134 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m134.8/134.8 kB[0m [31m3.5 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting dill>=0.3.9
  Downloading dill-0.3.9-py3-none-any.whl (119 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m119.4/119.4 kB[0m [31m15.5 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: dill, multiprocess
Successfully installed dill-0.3.9 multiprocess-0.70.17

[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m23.0.1[0m[39;49m -> [0m[32;49m25.0.1[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip install --upgrade pip[0m
Note: you may need to restart the kernel to use updated packages.


In [1]:
import re
from unstructured.partition.md import partition_md
from unstructured.chunking.dispatch import chunk
from tecton_gen_ai.api import Agent
import pandas as pd
import os
import tqdm
from pydantic import BaseModel, Field
from tecton_gen_ai.testing import set_dev_mode

set_dev_mode()

class Declarations(BaseModel):
    declarations: list[list[str,str]] = Field(..., description="""List of tuples of declarations.
Each tuple contains the object/function name and the description.

You should only extract:
- Tecton classes and decorated functions
- Tecton objects embedded in other objects (e.g. SnowflakeConfig in BatchSource, Attribute and Aggregate)
- Unit tests (set the first value in the tuple as "test")
                                              
Pay attention to the import statements at the beginning that tells you which objects and functions are imported from Tecton.

Don't extract declarations that are commented out

The description should be under 150 words


For example, with this code:

```from tecton import Entity, FeatureTable, Attribute
from tecton.types import String, Timestamp, Int64, Field
from fraud.entities import user
from datetime import timedelta


features = [
    Attribute('user_login_count_7d', Int64),
    Attribute('user_login_count_30d', Int64),
]

user_login_counts = FeatureTable(
    name='user_login_counts',
    entities=[user],
    features=features,
    online=True,
    offline=True,
    ttl=timedelta(days=7),
    owner='demo-user@tecton.ai',
    tags={'release': 'production'},
    description='User login counts over time.',
    timestamp_field='timestamp'
)
```

The declarations would be:

[("FeatureTable", "User login counts over time.")]

In this code

```python
fraud_detection_feature_service = FeatureService(
    name='fraud_detection_feature_service',
    prevent_destroy=False,  # Set to True for production services to prevent accidental destructive changes or downtime.
    features=[
        transaction_amount_is_higher_than_average,
        user_transaction_amount_metrics,
        user_transaction_counts,
        user_distinct_merchant_transaction_count_30d,
        merchant_fraud_rate
    ]
)

minimal_fs = FeatureService(
     name='minimal_fs',
     features=[
         transaction_amount_is_high
     ]
)
```

The declarations would be:

[
    ("FeatureService", "Fraud detection feature service"),
    ("FeatureService", "Whether transaction amount is higher")
]

In this code:

```
import math

from ads.features.on_demand_feature_views.user_query_embedding_similarity import user_query_embedding_similarity


# Testing the 'user_query_embedding_similarity' feature which takes in request data ('query_embedding')
# and a precomputed feature ('user_embedding') as inputs
def test_user_query_embedding_similarity():
    request = {'query_embedding': [1.0, 1.0, 0.0]}
    user_embedding = {'user_embedding': [0.0, 1.0, 1.0]}

    actual = user_query_embedding_similarity.test_run(request=request, user_embedding=user_embedding)

    # Float comparison.
    expected = 0.5
    assert math.isclose(actual['cosine_similarity'], expected)
```

The declarations would be:

[("test", "Testing the 'user_query_embedding_similarity' feature which takes in request data ('query_embedding') and a precomputed feature ('user_embedding') as inputs")]

In this code
                                              
```python
from tecton import BatchSource, SnowflakeConfig
from tecton.types import Field, Int64, String, Timestamp, Array

gaming_user_batch = BatchSource(
    name="gaming_users",
    batch_config=SnowflakeConfig(
      database="VINCE_DEMO_DB",
      schema="PUBLIC",
      table="ONLINE_GAMING_USERS",
      url="https://<your-cluster>.<your-snowflake-region>.snowflakecomputing.com/",
      warehouse="COMPUTE_WH",
      timestamp_field='TIMESTAMP',
    ),
)
```

(Pay attention that SnowflakeConfig is a configuration object embedded in the BatchSource object, we also need to extract that)

The declarations would be:

[("BatchSource", "Gaming users batch source"), ("SnowflakeConfig", "Gaming users batch source configuration")]  

In this code:
```
# The following defines several sliding time window aggregations over a user's transaction amounts
@stream_feature_view(
    source=transactions_stream,
    entities=[user],
    mode='pandas',
    batch_schedule=timedelta(days=1), # Defines how frequently batch jobs are scheduled to ingest into the offline store
    features=[
        Aggregate(input_column=Field('amt', Float64), function='sum', time_window=timedelta(hours=1)),
        Aggregate(input_column=Field('amt', Float64), function='max', time_window=timedelta(days=1)),
        Aggregate(input_column=Field('amt', Float64), function='min', time_window=timedelta(days=3)),
        Aggregate(input_column=Field('amt', Float64), function=approx_percentile(percentile=0.5, precision=100), time_window=timedelta(hours=1))
    ],
    timestamp_field='timestamp',
    online=True,
    offline=True,
    feature_start_time=datetime(2022, 5, 1),
    tags={'release': 'production'},
    owner='demo-user@tecton.ai',
    description='Transaction amount statistics and total over a series of time windows, updated every 10 minutes.',
    aggregation_leading_edge=AggregationLeadingEdge.LATEST_EVENT_TIME
)
def user_transaction_amount_metrics(transactions):
    return transactions[['user_id', 'amt', 'timestamp']]
```
                                              
[("Aggregate", "sum of transaction amounts over the past hour"), ("Aggregate", "max of transaction amounts over the past day"), ("Aggregate", "min of transaction amounts over the past 3 days"), ("Aggregate", "50th percentile of transaction amounts over the past hour")]
""")

agent = Agent(
    name="code_parser",
    prompt="Extract the function, class or test declarations from the code.",
    output_schema=Declarations,
    llm = {
        "model": "openai/gpt-4o-2024-11-20",
        "temperature": 0,
        "timeout": 30,
        "max_tokens": 2000,
    }
)

def get_py_files(directory):
    files = []
    for root, dirs, filenames in os.walk(directory):
        for filename in filenames:
            if filename.endswith('.py'):
                files.append(os.path.join(root, filename))
    return files

def extract_declarations(folders):
    files = []
    for folder in folders:
        files+= get_py_files(folder)
    
    res = []
    for i in tqdm.tqdm(range(len(files))):
        with open(files[i], 'r') as f:
            code = f.read()
        for d in agent.invoke(code)["declarations"]:
            res.append({"text": f"Example of {d[0]}. {d[1]}", "code": code}) 
    return res


def partition(path, **kwargs):
    with open(path, 'r') as f:
        markdown_text = f.read()
    replaced_text, code_snippets = extract_and_replace(markdown_text)
    replacer = CodeReplacer(code_snippets)
    elements = partition_md(text=replaced_text, languages=["eng"], **kwargs)
    for element in elements:
        element.apply(replacer.apply)
        yield element

def chunk_md(path, chunking_strategy, max_characters, **kwargs):
    elements = partition(path, **kwargs)
    chunks = chunk(elements, chunking_strategy=chunking_strategy, max_characters=max_characters)
    for ck in chunks:
        yield {'text': ck.text, "id": ck.id}

def clean_source_id(source: str) -> str:
    if source.endswith(".md"):
        src = source[:-3]
    else:
        src = source
    if src.endswith("/changelog"):
        return "changelog"
    parts = src.split("/")
    # there can be duplications in the last n parts, dedup
    while(len(parts) > 1 and parts[-1] == parts[-2]):
        parts = parts[:-1]
    return "/".join(parts)

def generate_docs(version, base_path, chunking_strategy, max_characters, concurrency, url_func, **kwargs):
    files = get_md_files(base_path)
    files += ["../../changelog.md"]

    def process_file(file):
        source_id = clean_source_id(os.path.relpath(file, base_path))
        if any(x.startswith("_") for x in source_id.split("/")):
            return None
        with open(file, 'r') as f:
            markdown_text = f.read()
        df = pd.DataFrame(chunk_md(file, chunking_strategy=chunking_strategy, max_characters=max_characters, **kwargs))        
        df["source"] = source_id
        return pd.DataFrame([{"source":source_id, "version":version, "url": url_func(source_id), "text":markdown_text}]), df

    #with Pool(concurrency) as pool:
    #    raw = pool.map(process_file, files)
    raw = []
    for i in tqdm.tqdm(range(len(files))):
        res = process_file(files[i])
        if res is not None:
            raw.append(process_file(files[i]))
    # Flatten the list of lists
    chunks = pd.concat([sublist for _, sublist in raw])
    texts = pd.concat([md for md, _ in raw])
    return texts, chunks

def to_url(source_id, prefix) -> str:
    if source_id == "changelog":
        return "https://docs.tecton.ai/changelog"
    return prefix + source_id

In [2]:
import pandas as pd

res = extract_declarations(["../../rift", "../../spark", "../../../examples"])
df=pd.DataFrame(res)
df.to_parquet("/tmp/examples.parquet")

100%|██████████| 168/168 [07:12<00:00,  2.57s/it]


In [5]:
len(df)

462

In [6]:
import pandas as pd

# Set Pandas to display all rows
pd.set_option('display.max_rows', 400)
pd.set_option('display.max_colwidth', 150)
display(df.head(400))

Unnamed: 0,text,code
0,Example of Entity. An ad,"from tecton import Entity\nfrom tecton.types import Field, String, Int64\n\nad = Entity(\n name='ad',\n join_keys=[Field('ad_id', Int64)],\n..."
1,Example of Entity. Content ID,"from tecton import Entity\nfrom tecton.types import Field, String, Int64\n\nad = Entity(\n name='ad',\n join_keys=[Field('ad_id', Int64)],\n..."
2,Example of Entity. Auction ID,"from tecton import Entity\nfrom tecton.types import Field, String, Int64\n\nad = Entity(\n name='ad',\n join_keys=[Field('ad_id', Int64)],\n..."
3,Example of Entity. A user of the platform,"from tecton import Entity\nfrom tecton.types import Field, String, Int64\n\nad = Entity(\n name='ad',\n join_keys=[Field('ad_id', Int64)],\n..."
4,Example of Entity. The keyword describing the content this ad is being placed alongside.,"from tecton import Entity\nfrom tecton.types import Field, String, Int64\n\nad = Entity(\n name='ad',\n join_keys=[Field('ad_id', Int64)],\n..."
5,Example of FeatureTable. Precomputed ad embeddings pushed into Tecton.,"from tecton.types import Field, Timestamp, Array, Float64, Int64\nfrom tecton import FeatureTable, Attribute\nfrom datetime import timedelta\n\nfr..."
6,Example of FeatureTable. Precomputed user embeddings pushed into Tecton.,"from tecton.types import Field, String, Timestamp, Array, Float64\nfrom tecton import FeatureTable, Attribute\nfrom ads.entities import user\nfrom..."
7,Example of realtime_feature_view. Computes the cosine similarity between a precomputed ad embedding and a precomputed user embedding.,"from tecton import realtime_feature_view, Attribute\nfrom tecton.types import Float64\nfrom ads.features.feature_tables.user_embeddings import use..."
8,Example of RequestSource. Defines the schema for the request source.,"from tecton import RequestSource, realtime_feature_view, Attribute\nfrom tecton.types import Field, Array, Float64\nfrom ads.features.feature_tabl..."
9,Example of realtime_feature_view. Computes the cosine similarity between a query embedding and a precomputed user embedding.,"from tecton import RequestSource, realtime_feature_view, Attribute\nfrom tecton.types import Field, Array, Float64\nfrom ads.features.feature_tabl..."
