# Starting workshop xD!

In [1]:
import dlt
import requests
import pandas as pd
from datetime import datetime

## 1. Line-by-Line mode Educational Explanation

In [2]:
url = "https://us-central1-dlthub-analytics.cloudfunctions.net/data_engineering_zoomcamp_api"
response = requests.get(url)
data = response.json()

In [3]:
 # Convert to DataFrame
df = pd.DataFrame(data)
df['Trip_Pickup_DateTime'] = pd.to_datetime(df['Trip_Pickup_DateTime'])

In [4]:
df

Unnamed: 0,End_Lat,End_Lon,Fare_Amt,Passenger_Count,Payment_Type,Rate_Code,Start_Lat,Start_Lon,Tip_Amt,Tolls_Amt,Total_Amt,Trip_Distance,Trip_Dropoff_DateTime,Trip_Pickup_DateTime,mta_tax,store_and_forward,surcharge,vendor_name
0,40.742963,-73.980072,45.0,1,Credit,,40.641525,-73.787442,9.0,4.15,58.15,17.52,2009-06-14 23:48:00,2009-06-14 23:23:00,,,0.0,VTS
1,40.740187,-74.005698,6.5,1,Credit,,40.722065,-74.009767,1.0,0.00,8.50,1.56,2009-06-18 17:43:00,2009-06-18 17:35:00,,,1.0,VTS
2,40.718043,-74.004745,12.5,5,Credit,,40.761945,-73.983038,2.0,0.00,15.50,3.37,2009-06-10 18:27:00,2009-06-10 18:08:00,,,1.0,VTS
3,40.739637,-73.985233,4.9,1,CASH,,40.749802,-73.992247,0.0,0.00,5.40,1.11,2009-06-14 23:58:00,2009-06-14 23:54:00,,,0.5,VTS
4,40.730032,-73.852693,25.7,1,CASH,,40.776825,-73.949233,0.0,4.15,29.85,11.09,2009-06-13 13:23:00,2009-06-13 13:01:00,,,0.0,VTS
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
995,40.742998,-73.919065,6.9,1,CASH,,40.743523,-73.918735,0.0,0.00,6.90,1.83,2009-06-10 06:23:00,2009-06-10 06:16:00,,,0.0,VTS
996,40.731953,-73.985330,7.3,1,CASH,,40.733143,-74.006408,0.0,0.00,7.80,1.59,2009-06-10 05:11:00,2009-06-10 05:02:00,,,0.5,VTS
997,40.712640,-73.998870,5.7,1,CASH,,40.711865,-74.010158,0.0,0.00,5.70,0.79,2009-06-13 12:45:00,2009-06-13 12:37:00,,,0.0,VTS
998,40.732998,-74.007113,7.3,2,CASH,,40.744658,-73.992063,0.0,0.00,7.80,1.87,2009-06-09 21:34:00,2009-06-09 21:25:00,,,0.5,VTS


In [5]:
# Define buckets, 
# - `cut` divide continuous values into bins, in this case datetime intervals.
df['tag'] = pd.cut(
df['Trip_Pickup_DateTime'],
    bins=[
        pd.Timestamp("2009-06-01"),
        pd.Timestamp("2009-06-10"),
        pd.Timestamp("2009-06-20"),
        pd.Timestamp("2009-06-30")
    ],
    labels=["first_10_days", "second_10_days", "last_10_days"],
    right=False
)



In [6]:
df['tag'].value_counts(dropna=False)

tag
first_10_days     481
second_10_days    295
last_10_days      222
NaN                 2
Name: count, dtype: int64

In [7]:
# Optional: fill NaN with 'out_of_range' for clarity
df['tag'] = df['tag'].cat.add_categories('out_of_range').fillna('out_of_range') 

In [8]:
# count by tag
df['tag'].value_counts(dropna=False)

tag
first_10_days     481
second_10_days    295
last_10_days      222
out_of_range        2
Name: count, dtype: int64

## Now lets see using @A decorator to wrap a function. 

- giving it extra functionality without modifying its internal code.

In [9]:
from dotenv import load_dotenv
load_dotenv()

True

In [10]:
# Step 1: Create DLT resource
@dlt.resource(write_disposition="replace", name="zoomcamp_data")
def zoomcamp_data():
    url = "https://us-central1-dlthub-analytics.cloudfunctions.net/data_engineering_zoomcamp_api"
    response = requests.get(url)
    data = response.json()

    # Convert to DataFrame
    df = pd.DataFrame(data)
    df['Trip_Pickup_DateTime'] = pd.to_datetime(df['Trip_Pickup_DateTime'])

    # Define buckets
    df['tag'] = pd.cut(
        df['Trip_Pickup_DateTime'],
        bins=[
            pd.Timestamp("2009-06-01"),
            pd.Timestamp("2009-06-10"),
            pd.Timestamp("2009-06-20"),
            pd.Timestamp("2009-06-30")
        ],
        labels=["first_10_days", "second_10_days", "last_10_days"],
        right=False
    )

    # Drop rows not in the specified range
    # df = df[df['tag'].notnull()] #Lets see what happend with fill Nan to `out_of_range`
    yield df


In [11]:
# Step 2: Create Pipeline
pipeline = dlt.pipeline(
    pipeline_name="zoomcamp_pipeline",
    destination="duckdb",
    dataset_name="zoomcamp_tagged_data"
)

In [12]:
# Step 3: run the pipeline

load_info = pipeline.run(zoomcamp_data())

  - rate_code
  - mta_tax

Unless type hints are provided, these columns will not be materialized in the destination.
One way to provide type hints is to use the 'columns' argument in the '@dlt.resource' decorator.  For example:

@dlt.resource(columns={'rate_code': {'data_type': 'text'}})



In [13]:
print(pipeline.last_trace)

Run started at 2025-07-07 01:04:40.318881+00:00 and COMPLETED in 2.58 seconds with 4 steps.
Step extract COMPLETED in 2.09 seconds.

Load package 1751850280.5308163 is EXTRACTED and NOT YET LOADED to the destination and contains no failed jobs

Step normalize COMPLETED in 0.03 seconds.
Normalized data for the following tables:
- zoomcamp_data: 1000 row(s)

Load package 1751850280.5308163 is NORMALIZED and NOT YET LOADED to the destination and contains no failed jobs

Step load COMPLETED in 0.27 seconds.
Pipeline zoomcamp_pipeline load step completed in 0.12 seconds
1 load package(s) were loaded to destination duckdb and into dataset zoomcamp_tagged_data
The duckdb destination used duckdb:////home/radianv/projects/data_talk_club/local-llm-zoomcamp/workshops/dlt/code/zoomcamp_pipeline.duckdb location to store data
Load package 1751850280.5308163 is LOADED and contains no failed jobs

Step run COMPLETED in 2.58 seconds.
Pipeline zoomcamp_pipeline load step completed in 0.12 seconds
1 load

In [14]:
#Lets trying to retrieve the data that was previously loaded into destination (in this case, DuckDB) using the DLT pipeline interface.

dataset = pipeline.dataset().zoomcamp_data.df()



In [15]:
dataset

Unnamed: 0,end_lat,end_lon,fare_amt,passenger_count,payment_type,start_lat,start_lon,tip_amt,tolls_amt,total_amt,trip_distance,trip_dropoff_date_time,trip_pickup_date_time,store_and_forward,surcharge,vendor_name,tag
0,40.742963,-73.980072,45.0,1,Credit,40.641525,-73.787442,9.0,4.15,58.15,17.52,2009-06-14 23:48:00,2009-06-14 23:23:00,,0.0,VTS,second_10_days
1,40.740187,-74.005698,6.5,1,Credit,40.722065,-74.009767,1.0,0.00,8.50,1.56,2009-06-18 17:43:00,2009-06-18 17:35:00,,1.0,VTS,second_10_days
2,40.718043,-74.004745,12.5,5,Credit,40.761945,-73.983038,2.0,0.00,15.50,3.37,2009-06-10 18:27:00,2009-06-10 18:08:00,,1.0,VTS,second_10_days
3,40.739637,-73.985233,4.9,1,CASH,40.749802,-73.992247,0.0,0.00,5.40,1.11,2009-06-14 23:58:00,2009-06-14 23:54:00,,0.5,VTS,second_10_days
4,40.730032,-73.852693,25.7,1,CASH,40.776825,-73.949233,0.0,4.15,29.85,11.09,2009-06-13 13:23:00,2009-06-13 13:01:00,,0.0,VTS,second_10_days
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
995,40.742998,-73.919065,6.9,1,CASH,40.743523,-73.918735,0.0,0.00,6.90,1.83,2009-06-10 06:23:00,2009-06-10 06:16:00,,0.0,VTS,second_10_days
996,40.731953,-73.985330,7.3,1,CASH,40.733143,-74.006408,0.0,0.00,7.80,1.59,2009-06-10 05:11:00,2009-06-10 05:02:00,,0.5,VTS,second_10_days
997,40.712640,-73.998870,5.7,1,CASH,40.711865,-74.010158,0.0,0.00,5.70,0.79,2009-06-13 12:45:00,2009-06-13 12:37:00,,0.0,VTS,second_10_days
998,40.732998,-74.007113,7.3,2,CASH,40.744658,-73.992063,0.0,0.00,7.80,1.87,2009-06-09 21:34:00,2009-06-09 21:25:00,,0.5,VTS,first_10_days


In [16]:
dataset["tag"].value_counts()

tag
first_10_days     481
second_10_days    295
last_10_days      222
Name: count, dtype: int64

# Lets load data into Cognee!

In [17]:
import cognee
from cognee.shared.logging_utils import get_logger, ERROR
from cognee.api.v1.visualize.visualize import visualize_graph
from cognee.api.v1.search import SearchType
from cognee.modules.engine.models import NodeSet
import os

#from dotenv import load_dotenv
#load_dotenv()


[2m2025-07-07T01:04:56.976793[0m [[32m[1minfo     [0m] [1mLogging initialized           [0m [[0m[1m[34mcognee.shared.logging_utils[0m][0m [36mcognee_version[0m=[35m0.2.0[0m [36mos_info[0m=[35m'Linux 6.11.0-29-generic (#29~24.04.1-Ubuntu SMP PREEMPT_DYNAMIC Thu Jun 26 14:16:59 UTC 2)'[0m [36mpython_version[0m=[35m3.10.13[0m [36mstructlog_version[0m=[35m25.4.0[0m

[2m2025-07-07T01:04:56.977577[0m [[32m[1minfo     [0m] [1mWant to learn more? Visit the Cognee documentation: https://docs.cognee.ai[0m [[0m[1m[34mcognee.shared.logging_utils[0m][0m

[1mHTTP Request: GET https://raw.githubusercontent.com/BerriAI/litellm/main/model_prices_and_context_window.json "HTTP/1.1 200 OK"[0m


In [None]:
#os.environ["LLM_API_KEY"] = os.environ.get('OPENAI_API_KEY') # it can be OpenAI API key


In [20]:
async def main():
    await cognee.prune.prune_data()
    await cognee.prune.prune_system(metadata=True)

    # Add the first 10 days
    df_set1 = dataset.loc[dataset["tag"] == "first_10_days"]
    df_set1.drop(columns=["tag"], inplace=True)
    df_set1 = df_set1.to_json(orient="records", lines=False)
    await cognee.add(df_set1, node_set=["first_10_days"])

    # Add the second 10 days
    df_set2 = dataset.loc[dataset["tag"] == "second_10_days"]
    df_set2.drop(columns=["tag"], inplace=True)
    df_set2 = df_set2.to_json(orient="records", lines=False)
    await cognee.add(df_set2, node_set=["second_10_days"])

    # Add the last 10 days
    df_set3 = dataset.loc[dataset["tag"] == "last_10_days"]
    df_set3.drop(columns=["tag"], inplace=True)
    df_set3 = df_set3.to_json(orient="records", lines=False)
    await cognee.add(df_set3, node_set=["last_10_days"])

    await cognee.cognify()

    #visualization_path = "/content/.artifacts/graph_visualization.html"
    visualization_path = "graph_output/graph_visualization.html"
    await visualize_graph(visualization_path)

In [21]:
await main()



[2m2025-07-07T01:15:10.181046[0m [[32m[1minfo     [0m] [1mCleared all data from graph while preserving structure[0m [[0m[1m[34mcognee.shared.logging_utils[0m][0m
[2m2025-07-07T01:15:10.210703[0m [[32m[1minfo     [0m] [1mDeleted Kuzu database files at /home/radianv/anaconda3/envs/llm-zoomcamp/lib/python3.10/site-packages/cognee/.cognee_system/databases/cognee_graph_kuzu[0m [[0m[1m[34mcognee.shared.logging_utils[0m][0m
A value is trying to be set on a copy of a slice from a DataFrame

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  df_set1.drop(columns=["tag"], inplace=True)

[2m2025-07-07T01:15:10.616530[0m [[32m[1minfo     [0m] [1mPipeline run started: `8492166f-f591-5087-8636-82ce555d52ba`[0m [[0m[1m[34mrun_tasks(tasks: [Task], data)[0m][0m

User 9c5d371c-f82f-4c28-987d-bd637e8d3f4c has registered.



[2m2025-07-07T01:15:13.041544[0m [[32m[1minfo     [0m] [1mCoroutine task started: `resolve_data_directories`[0m [[0m[1m[34mrun_tasks_base[0m][0m
[2m2025-07-07T01:15:13.554362[0m [[32m[1minfo     [0m] [1mCoroutine task started: `ingest_data`[0m [[0m[1m[34mrun_tasks_base[0m][0m
[2m2025-07-07T01:15:14.309668[0m [[32m[1minfo     [0m] [1mCoroutine task completed: `ingest_data`[0m [[0m[1m[34mrun_tasks_base[0m][0m
[2m2025-07-07T01:15:14.700868[0m [[32m[1minfo     [0m] [1mCoroutine task completed: `resolve_data_directories`[0m [[0m[1m[34mrun_tasks_base[0m][0m
A value is trying to be set on a copy of a slice from a DataFrame

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  df_set2.drop(columns=["tag"], inplace=True)

[2m2025-07-07T01:15:15.565543[0m [[32m[1minfo     [0m] [1mPipeline run started: `8492166f-f591-5087-8636-82ce555d52ba`[0m [[0m[1m[


[1;31mGive Feedback / Get Help: https://github.com/BerriAI/litellm/issues/new[0m
LiteLLM.Info: If you need to debug this error, use `litellm._turn_on_debug()'.



[92m19:18:43 - LiteLLM:INFO[0m: cost_calculator.py:655 - selected model name for cost calculation: openai/text-embedding-3-large
[1mselected model name for cost calculation: openai/text-embedding-3-large[0m[92m19:18:44 - LiteLLM:INFO[0m: cost_calculator.py:655 - selected model name for cost calculation: openai/text-embedding-3-large
[1mselected model name for cost calculation: openai/text-embedding-3-large[0m
[2m2025-07-07T01:18:44.648530[0m [[32m[1minfo     [0m] [1mCoroutine task started: `summarize_text`[0m [[0m[1m[34mrun_tasks_base[0m][0m[92m19:18:45 - LiteLLM:INFO[0m: utils.py:2929 - 
LiteLLM completion() model= gpt-4o-mini; provider = openai
[1m
LiteLLM completion() model= gpt-4o-mini; provider = openai[0m[92m19:18:45 - LiteLLM:INFO[0m: utils.py:2929 - 
LiteLLM completion() model= gpt-4o-mini; provider = openai
[1m
LiteLLM completion() model= gpt-4o-mini; provider = openai[0m[92m19:18:45 - LiteLLM:INFO[0m: utils.py:2929 - 
LiteLLM completion() model= 


[1;31mGive Feedback / Get Help: https://github.com/BerriAI/litellm/issues/new[0m
LiteLLM.Info: If you need to debug this error, use `litellm._turn_on_debug()'.



[92m19:18:51 - LiteLLM:INFO[0m: cost_calculator.py:655 - selected model name for cost calculation: openai/text-embedding-3-large
[1mselected model name for cost calculation: openai/text-embedding-3-large[0m[92m19:18:51 - LiteLLM:INFO[0m: cost_calculator.py:655 - selected model name for cost calculation: openai/text-embedding-3-large
[1mselected model name for cost calculation: openai/text-embedding-3-large[0m[92m19:18:53 - LiteLLM:INFO[0m: cost_calculator.py:655 - selected model name for cost calculation: openai/text-embedding-3-large
[1mselected model name for cost calculation: openai/text-embedding-3-large[0m
[2m2025-07-07T01:18:53.287218[0m [[32m[1minfo     [0m] [1mCoroutine task completed: `add_data_points`[0m [[0m[1m[34mrun_tasks_base[0m][0m
[2m2025-07-07T01:18:53.716330[0m [[32m[1minfo     [0m] [1mCoroutine task completed: `summarize_text`[0m [[0m[1m[34mrun_tasks_base[0m][0m
[2m2025-07-07T01:18:54.127279[0m [[32m[1minfo     [0m] [1mCorout

In [22]:
async def search_cognee(query, node_set, query_type=SearchType.GRAPH_COMPLETION):
    answer = await cognee.search(
        query_text=query,
        query_type=query_type,
        node_type=NodeSet,
        node_name=node_set,
        top_k=5 # limit search for retrieval
    )
    return answer

In [23]:
results = await search_cognee(
    "What's in this knowledge graph?",
    node_set=["first_10_days"]
)

[92m19:43:57 - LiteLLM:INFO[0m: cost_calculator.py:655 - selected model name for cost calculation: openai/text-embedding-3-large
[1mselected model name for cost calculation: openai/text-embedding-3-large[0m[92m19:43:57 - LiteLLM:INFO[0m: cost_calculator.py:655 - selected model name for cost calculation: openai/text-embedding-3-large
[1mselected model name for cost calculation: openai/text-embedding-3-large[0m[92m19:43:57 - LiteLLM:INFO[0m: cost_calculator.py:655 - selected model name for cost calculation: openai/text-embedding-3-large
[1mselected model name for cost calculation: openai/text-embedding-3-large[0m[92m19:43:57 - LiteLLM:INFO[0m: cost_calculator.py:655 - selected model name for cost calculation: openai/text-embedding-3-large
[1mselected model name for cost calculation: openai/text-embedding-3-large[0m[92m19:43:57 - LiteLLM:INFO[0m: cost_calculator.py:655 - selected model name for cost calculation: openai/text-embedding-3-large
[1mselected model name for c

In [24]:
print(results[0])

The knowledge graph contains nodes representing specific dates and ride details from a taxi service, including information like pickup and drop-off times, coordinates, fare amounts, payment types, and passenger counts. Key relationships include connections between dates and various trips, summarized by passenger counts (1 to 6). The main date in focus is '2009-06-09', which has multiple related trip entries.


In [25]:
results = await search_cognee(
    "What are the most payment method used?",
    node_set=["first_10_days"]
)

[92m20:17:10 - LiteLLM:INFO[0m: cost_calculator.py:655 - selected model name for cost calculation: openai/text-embedding-3-large
[1mselected model name for cost calculation: openai/text-embedding-3-large[0m[92m20:17:10 - LiteLLM:INFO[0m: cost_calculator.py:655 - selected model name for cost calculation: openai/text-embedding-3-large
[1mselected model name for cost calculation: openai/text-embedding-3-large[0m[92m20:17:10 - LiteLLM:INFO[0m: cost_calculator.py:655 - selected model name for cost calculation: openai/text-embedding-3-large
[1mselected model name for cost calculation: openai/text-embedding-3-large[0m[92m20:17:10 - LiteLLM:INFO[0m: cost_calculator.py:655 - selected model name for cost calculation: openai/text-embedding-3-large
[1mselected model name for cost calculation: openai/text-embedding-3-large[0m[92m20:17:10 - LiteLLM:INFO[0m: cost_calculator.py:655 - selected model name for cost calculation: openai/text-embedding-3-large
[1mselected model name for c

In [26]:
print(results[0])

The most used payment method mentioned in the context is cash, which is accepted and offered by the taxicab vendor company (vts).
