In [1]:
from datetime import datetime
import os

import requests

import pandas as pd

import dlt

import cognee
from cognee.shared.logging_utils import get_logger, ERROR
from cognee.api.v1.visualize.visualize import visualize_graph
from cognee.api.v1.search import SearchType
from cognee.modules.engine.models import NodeSet


[2m2025-07-07T02:06:02.740985[0m [[32m[1minfo     [0m] [1mLogging initialized           [0m [[0m[1m[34mcognee.shared.logging_utils[0m][0m [36mcognee_version[0m=[35m0.2.0[0m [36mos_info[0m=[35m'Linux 6.11.0-26-generic (#26~24.04.1-Ubuntu SMP PREEMPT_DYNAMIC Thu Apr 17 19:20:47 UTC 2)'[0m [36mpython_version[0m=[35m3.12.3[0m [36mstructlog_version[0m=[35m25.4.0[0m

[2m2025-07-07T02:06:02.742003[0m [[32m[1minfo     [0m] [1mWant to learn more? Visit the Cognee documentation: https://docs.cognee.ai[0m [[0m[1m[34mcognee.shared.logging_utils[0m][0m

[1mHTTP Request: GET https://raw.githubusercontent.com/BerriAI/litellm/main/model_prices_and_context_window.json "HTTP/1.1 200 OK"[0m


In [2]:
from dotenv import load_dotenv

load_dotenv()

cognee.config.set_llm_api_key(os.getenv('OPENAI_API_KEY'))
os.environ["GRAPH_DATABASE_PROVIDER"] = "kuzu"

In [3]:
# Step 1: Create DLT resource
@dlt.resource(write_disposition="replace", name="zoomcamp_data")
def zoomcamp_data():
    url = "https://us-central1-dlthub-analytics.cloudfunctions.net/data_engineering_zoomcamp_api"
    response = requests.get(url)
    data = response.json()

    # Convert to DataFrame
    df = pd.DataFrame(data)
    df['Trip_Pickup_DateTime'] = pd.to_datetime(df['Trip_Pickup_DateTime'])

    # Define buckets
    df['tag'] = pd.cut(
        df['Trip_Pickup_DateTime'],
        bins=[
            pd.Timestamp("2009-06-01"),
            pd.Timestamp("2009-06-10"),
            pd.Timestamp("2009-06-20"),
            pd.Timestamp("2009-06-30")
        ],
        labels=["first_10_days", "second_10_days", "last_10_days"],
        right=False
    )

    # Drop rows not in the specified range
    df = df[df['tag'].notnull()]
    yield df

# Step 2: Create and run the pipeline
pipeline = dlt.pipeline(
    pipeline_name="zoomcamp_pipeline",
    destination="duckdb",
    dataset_name="zoomcamp_tagged_data"
)
load_info = pipeline.run(zoomcamp_data())
print(pipeline.last_trace)

  - rate_code
  - mta_tax

Unless type hints are provided, these columns will not be materialized in the destination.
One way to provide type hints is to use the 'columns' argument in the '@dlt.resource' decorator.  For example:

@dlt.resource(columns={'rate_code': {'data_type': 'text'}})


Run started at 2025-07-07 02:06:08.250062+00:00 and COMPLETED in 2.95 seconds with 4 steps.
Step extract COMPLETED in 2.58 seconds.

Load package 1751853968.3890276 is EXTRACTED and NOT YET LOADED to the destination and contains no failed jobs

Step normalize COMPLETED in 0.03 seconds.
Normalized data for the following tables:
- zoomcamp_data: 998 row(s)

Load package 1751853968.3890276 is NORMALIZED and NOT YET LOADED to the destination and contains no failed jobs

Step load COMPLETED in 0.22 seconds.
Pipeline zoomcamp_pipeline load step completed in 0.19 seconds
1 load package(s) were loaded to destination duckdb and into dataset zoomcamp_tagged_data
The duckdb destination used duckdb:////home/rajinder-mavi/code/nlp_sandbox/llm_zoomcamp/zoomcamp_pipeline.duckdb location to store data
Load package 1751853968.3890276 is LOADED and contains no failed jobs

Step run COMPLETED in 2.95 seconds.
Pipeline zoomcamp_pipeline load step completed in 0.19 seconds
1 load package(s) were loaded to 

In [4]:
dataset = pipeline.dataset().zoomcamp_data.df()

dataset

Unnamed: 0,end_lat,end_lon,fare_amt,passenger_count,payment_type,start_lat,start_lon,tip_amt,tolls_amt,total_amt,trip_distance,trip_dropoff_date_time,trip_pickup_date_time,store_and_forward,surcharge,vendor_name,tag
0,40.742963,-73.980072,45.0,1,Credit,40.641525,-73.787442,9.0,4.15,58.15,17.52,2009-06-14 23:48:00,2009-06-14 23:23:00,,0.0,VTS,second_10_days
1,40.740187,-74.005698,6.5,1,Credit,40.722065,-74.009767,1.0,0.00,8.50,1.56,2009-06-18 17:43:00,2009-06-18 17:35:00,,1.0,VTS,second_10_days
2,40.718043,-74.004745,12.5,5,Credit,40.761945,-73.983038,2.0,0.00,15.50,3.37,2009-06-10 18:27:00,2009-06-10 18:08:00,,1.0,VTS,second_10_days
3,40.739637,-73.985233,4.9,1,CASH,40.749802,-73.992247,0.0,0.00,5.40,1.11,2009-06-14 23:58:00,2009-06-14 23:54:00,,0.5,VTS,second_10_days
4,40.730032,-73.852693,25.7,1,CASH,40.776825,-73.949233,0.0,4.15,29.85,11.09,2009-06-13 13:23:00,2009-06-13 13:01:00,,0.0,VTS,second_10_days
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
993,40.742998,-73.919065,6.9,1,CASH,40.743523,-73.918735,0.0,0.00,6.90,1.83,2009-06-10 06:23:00,2009-06-10 06:16:00,,0.0,VTS,second_10_days
994,40.731953,-73.985330,7.3,1,CASH,40.733143,-74.006408,0.0,0.00,7.80,1.59,2009-06-10 05:11:00,2009-06-10 05:02:00,,0.5,VTS,second_10_days
995,40.712640,-73.998870,5.7,1,CASH,40.711865,-74.010158,0.0,0.00,5.70,0.79,2009-06-13 12:45:00,2009-06-13 12:37:00,,0.0,VTS,second_10_days
996,40.732998,-74.007113,7.3,2,CASH,40.744658,-73.992063,0.0,0.00,7.80,1.87,2009-06-09 21:34:00,2009-06-09 21:25:00,,0.5,VTS,first_10_days


In [5]:
dataset["tag"].value_counts()

tag
first_10_days     481
second_10_days    295
last_10_days      222
Name: count, dtype: int64

In [6]:
async def main():
    await cognee.prune.prune_data()
    await cognee.prune.prune_system(metadata=True)

    # Add the first 10 days
    df_set1 = dataset.loc[dataset["tag"] == "first_10_days"]
    df_set1.drop(columns=["tag"], inplace=True)
    df_set1 = df_set1.to_json(orient="records", lines=False)
    await cognee.add(df_set1, node_set=["first_10_days"])

    # Add the second 10 days
    df_set2 = dataset.loc[dataset["tag"] == "second_10_days"]
    df_set2.drop(columns=["tag"], inplace=True)
    df_set2 = df_set2.to_json(orient="records", lines=False)
    await cognee.add(df_set2, node_set=["second_10_days"])

    # Add the last 10 days
    df_set3 = dataset.loc[dataset["tag"] == "last_10_days"]
    df_set3.drop(columns=["tag"], inplace=True)
    df_set3 = df_set3.to_json(orient="records", lines=False)
    await cognee.add(df_set3, node_set=["last_10_days"])

    await cognee.cognify()

    visualization_path = "artifacts/graph_visualization.html"
    await visualize_graph(visualization_path)

In [7]:
await main()


[2m2025-07-07T02:06:11.935468[0m [[32m[1minfo     [0m] [1mCleared all data from graph while preserving structure[0m [[0m[1m[34mcognee.shared.logging_utils[0m][0m
[2m2025-07-07T02:06:11.966582[0m [[32m[1minfo     [0m] [1mDeleted Kuzu database files at /home/rajinder-mavi/code/nlp_sandbox/.venv/lib/python3.12/site-packages/cognee/.cognee_system/databases/cognee_graph_kuzu[0m [[0m[1m[34mcognee.shared.logging_utils[0m][0m
A value is trying to be set on a copy of a slice from a DataFrame

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  df_set1.drop(columns=["tag"], inplace=True)

[1mLangfuse client is disabled since no public_key was provided as a parameter or environment variable 'LANGFUSE_PUBLIC_KEY'. See our docs: https://langfuse.com/docs/sdk/python/low-level-sdk#initialize-client[0m[92m19:06:13 - LiteLLM:INFO[0m: utils.py:2929 - 
LiteLLM completion() model= gpt-4o-m

User 664676e7-9087-466e-ae88-07ae56e6589a has registered.



[2m2025-07-07T02:06:15.772064[0m [[32m[1minfo     [0m] [1mCoroutine task started: `resolve_data_directories`[0m [[0m[1m[34mrun_tasks_base[0m][0m
[2m2025-07-07T02:06:16.147109[0m [[32m[1minfo     [0m] [1mCoroutine task started: `ingest_data`[0m [[0m[1m[34mrun_tasks_base[0m][0m
[2m2025-07-07T02:06:17.170116[0m [[32m[1minfo     [0m] [1mCoroutine task completed: `ingest_data`[0m [[0m[1m[34mrun_tasks_base[0m][0m
[2m2025-07-07T02:06:17.577180[0m [[32m[1minfo     [0m] [1mCoroutine task completed: `resolve_data_directories`[0m [[0m[1m[34mrun_tasks_base[0m][0m
A value is trying to be set on a copy of a slice from a DataFrame

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  df_set2.drop(columns=["tag"], inplace=True)

[2m2025-07-07T02:06:18.691760[0m [[32m[1minfo     [0m] [1mPipeline run started: `146ea502-50bd-5c97-994f-2c0dfc93c74f`[0m [[0m[1m[

In [8]:
async def search_cognee(query, node_set, top_k = 5, query_type=SearchType.GRAPH_COMPLETION):
    answer = await cognee.search(
        query_text=query,
        query_type=query_type,
        node_type=NodeSet,
        node_name=node_set
    )
    return answer

In [9]:
results = await search_cognee(
    "What's in this knowledge graph?",
    node_set=["first_10_days"]
)
print(results[0])

[92m19:09:39 - LiteLLM:INFO[0m: cost_calculator.py:655 - selected model name for cost calculation: openai/text-embedding-3-large
[1mselected model name for cost calculation: openai/text-embedding-3-large[0m[92m19:09:40 - LiteLLM:INFO[0m: cost_calculator.py:655 - selected model name for cost calculation: openai/text-embedding-3-large
[1mselected model name for cost calculation: openai/text-embedding-3-large[0m[92m19:09:40 - LiteLLM:INFO[0m: cost_calculator.py:655 - selected model name for cost calculation: openai/text-embedding-3-large
[1mselected model name for cost calculation: openai/text-embedding-3-large[0m[92m19:09:40 - LiteLLM:INFO[0m: cost_calculator.py:655 - selected model name for cost calculation: openai/text-embedding-3-large
[1mselected model name for cost calculation: openai/text-embedding-3-large[0m[92m19:09:40 - LiteLLM:INFO[0m: cost_calculator.py:655 - selected model name for cost calculation: openai/text-embedding-3-large
[1mselected model name for c

The knowledge graph contains data about trips on specific dates in June 2009, detailing trip pickup and drop-off times, locations (latitude and longitude), fare amounts, passenger counts, payment types, and associated vendor information. Key dates include June 2, 3, 4, 8, and 9, with connections indicating that various trip records are associated with these dates.


In [10]:
results = await search_cognee(
    "What kind of payment methods are there?",
    node_set=["first_10_days"]
)
print(results[0])

[92m19:09:45 - LiteLLM:INFO[0m: cost_calculator.py:655 - selected model name for cost calculation: openai/text-embedding-3-large
[1mselected model name for cost calculation: openai/text-embedding-3-large[0m[92m19:09:45 - LiteLLM:INFO[0m: cost_calculator.py:655 - selected model name for cost calculation: openai/text-embedding-3-large
[1mselected model name for cost calculation: openai/text-embedding-3-large[0m[92m19:09:45 - LiteLLM:INFO[0m: cost_calculator.py:655 - selected model name for cost calculation: openai/text-embedding-3-large
[1mselected model name for cost calculation: openai/text-embedding-3-large[0m[92m19:09:45 - LiteLLM:INFO[0m: cost_calculator.py:655 - selected model name for cost calculation: openai/text-embedding-3-large
[1mselected model name for cost calculation: openai/text-embedding-3-large[0m[92m19:09:45 - LiteLLM:INFO[0m: cost_calculator.py:655 - selected model name for cost calculation: openai/text-embedding-3-large
[1mselected model name for c

The payment methods available include cash, which is accepted by taxi service vendors (vts).
