## HW 4

In [34]:
import duckdb
import pandas as pd
import requests
from pathlib import Path

Note: Jupyter kernel is running inside of this directory, not HW directory:

zoomcamp/data-engineering-zoomcamp/04-analytics-engineering/taxi_rides_ny

In [73]:
# Connect to the db.
con = duckdb.connect("taxi_rides_ny.duckdb")

In [23]:
# Check all schemas.
con.execute("SELECT * FROM information_schema.schemata").df()

Unnamed: 0,catalog_name,schema_name,schema_owner,default_character_set_catalog,default_character_set_schema,default_character_set_name,sql_path
0,system,information_schema,duckdb,,,,
1,system,main,duckdb,,,,
2,system,pg_catalog,duckdb,,,,
3,taxi_rides_ny,dev,duckdb,,,,
4,taxi_rides_ny,main,duckdb,,,,
5,taxi_rides_ny,prod,duckdb,,,,
6,temp,main,duckdb,,,,


---

### Question 1

Given a dbt project with the following structure:

<img src="supplements/images/hw4q1.png"
     alt="hw4q1."
     width="550">

If you run dbt run --select int_trips_unioned, what models will be built?

Answer: `stg_green_tripdata, stg_yellow_tripdata, and int_trips_unioned (upstream dependencies)`

---

### Question 2

You've configured a generic test like this in your schema.yml:

<img src="supplements/images/hw4q2.png"
     alt="hw4q2."
     width="550">

Your model fct_trips has been running successfully for months. A new value 6 now appears in the source data.

What happens when you run dbt test --select fct_trips?

Answer: `dbt will fail the test, returning a non-zero exit code`

---

### Question 3

After running your dbt project, query the fct_monthly_zone_revenue model.

What is the count of records in the fct_monthly_zone_revenue model?

In [25]:
con.execute("SELECT COUNT(*) FROM prod.fct_monthly_zone_revenue").df()

Unnamed: 0,count_star()
0,12184


Answer: `12,184`

---

### Question 4

Using the fct_monthly_zone_revenue table, find the pickup zone with the highest total revenue (revenue_monthly_total_amount) for Green taxi trips in 2020.

Which zone had the highest revenue?

In [26]:
con.execute("""
    SELECT 
        pickup_zone,
        SUM(revenue_monthly_total_amount) as total_revenue
    FROM prod.fct_monthly_zone_revenue
    WHERE service_type = 'Green'
        AND YEAR(revenue_month) = 2020
    GROUP BY pickup_zone
    ORDER BY total_revenue DESC
    LIMIT 1
""").df()

Unnamed: 0,pickup_zone,total_revenue
0,East Harlem North,1817472.75


Answer: `East Harlem North`

---

### Question 5

Using the fct_monthly_zone_revenue table, what is the total number of trips (total_monthly_trips) for Green taxis in October 2019?

In [29]:
con.execute("""
    SELECT SUM(total_monthly_trips) as total_trips
    FROM prod.fct_monthly_zone_revenue
    WHERE service_type = 'Green'
        AND revenue_month = '2019-10-01'
""").df()

Unnamed: 0,total_trips
0,384624.0


Answer: `384,624`

---

### Question 6

Create a staging model for the For-Hire Vehicle (FHV) trip data for 2019.

1. Load the FHV trip data for 2019 into your data warehouse
2. Create a staging model stg_fhv_tripdata with these requirements:
  - Filter out records where dispatching_base_num IS NULL
  - Rename fields to match your project's naming conventions (e.g., PUlocationID â†’ pickup_location_id)
  
What is the count of records in stg_fhv_tripdata?

In [66]:
BASE_URL = "https://github.com/DataTalksClub/nyc-tlc-data/releases/download/fhv"

In [67]:
def download_and_convert_files(taxi_type):
    data_dir = Path("data") / taxi_type
    data_dir.mkdir(exist_ok=True, parents=True)

    for year in [2019, 2020]:
        for month in range(1, 13):
            parquet_filename = f"{taxi_type}_tripdata_{year}-{month:02d}.parquet"
            parquet_filepath = data_dir / parquet_filename

            if parquet_filepath.exists():
                print(f"Skipping {parquet_filename} (already exists)")
                continue

            # Download CSV.gz file
            csv_gz_filename = f"{taxi_type}_tripdata_{year}-{month:02d}.csv.gz"
            csv_gz_filepath = data_dir / csv_gz_filename

            response = requests.get(f"{BASE_URL}/{csv_gz_filename}", stream=True)
            response.raise_for_status()

            with open(csv_gz_filepath, 'wb') as f:
                for chunk in response.iter_content(chunk_size=8192):
                    f.write(chunk)

            print(f"Converting {csv_gz_filename} to Parquet...")
            con = duckdb.connect()
            con.execute(f"""
                COPY (SELECT * FROM read_csv_auto('{csv_gz_filepath}', strict_mode=false, ignore_errors=true))
                TO '{parquet_filepath}' (FORMAT PARQUET)
            """)
            con.close()

            # Remove the CSV.gz file to save space
            csv_gz_filepath.unlink()
            print(f"Completed {parquet_filename}")

In [68]:
def update_gitignore():
    gitignore_path = Path(".gitignore")

    # Read existing content or start with empty string
    content = gitignore_path.read_text() if gitignore_path.exists() else ""

    # Add data/ if not already present
    if 'data/' not in content:
        with open(gitignore_path, 'a') as f:
            f.write('\n# Data directory\ndata/\n' if content else '# Data directory\ndata/\n')

In [69]:
update_gitignore()

In [70]:
download_and_convert_files("fhv")

Converting fhv_tripdata_2019-01.csv.gz to Parquet...


FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

Completed fhv_tripdata_2019-01.parquet
Converting fhv_tripdata_2019-02.csv.gz to Parquet...
Completed fhv_tripdata_2019-02.parquet
Converting fhv_tripdata_2019-03.csv.gz to Parquet...
Completed fhv_tripdata_2019-03.parquet
Converting fhv_tripdata_2019-04.csv.gz to Parquet...
Completed fhv_tripdata_2019-04.parquet
Converting fhv_tripdata_2019-05.csv.gz to Parquet...
Completed fhv_tripdata_2019-05.parquet
Converting fhv_tripdata_2019-06.csv.gz to Parquet...
Completed fhv_tripdata_2019-06.parquet
Converting fhv_tripdata_2019-07.csv.gz to Parquet...
Completed fhv_tripdata_2019-07.parquet
Converting fhv_tripdata_2019-08.csv.gz to Parquet...
Completed fhv_tripdata_2019-08.parquet
Converting fhv_tripdata_2019-09.csv.gz to Parquet...
Completed fhv_tripdata_2019-09.parquet
Converting fhv_tripdata_2019-10.csv.gz to Parquet...
Completed fhv_tripdata_2019-10.parquet
Converting fhv_tripdata_2019-11.csv.gz to Parquet...
Completed fhv_tripdata_2019-11.parquet
Converting fhv_tripdata_2019-12.csv.gz to

In [74]:
con.execute(f"""
    CREATE OR REPLACE TABLE prod.fhv_tripdata AS
    SELECT * FROM read_parquet('data/fhv/*.parquet', union_by_name=true)
""")

<_duckdb.DuckDBPyConnection at 0x12220adb0>

In [75]:
con.execute("SELECT * FROM prod.fhv_tripdata LIMIT 5").df()

Unnamed: 0,dispatching_base_num,pickup_datetime,dropOff_datetime,PUlocationID,DOlocationID,SR_Flag,Affiliated_base_number
0,B00001,2019-01-01 00:30:00,2019-01-01 02:51:55,,,,B00001
1,B00001,2019-01-01 00:45:00,2019-01-01 00:54:49,,,,B00001
2,B00001,2019-01-01 00:15:00,2019-01-01 00:54:52,,,,B00001
3,B00008,2019-01-01 00:19:00,2019-01-01 00:39:00,,,,B00008
4,B00008,2019-01-01 00:27:00,2019-01-01 00:37:00,,,,B00008


In [76]:
con.execute("DESCRIBE prod.fhv_tripdata").df()

Unnamed: 0,column_name,column_type,null,key,default,extra
0,dispatching_base_num,VARCHAR,YES,,,
1,pickup_datetime,TIMESTAMP,YES,,,
2,dropOff_datetime,TIMESTAMP,YES,,,
3,PUlocationID,BIGINT,YES,,,
4,DOlocationID,BIGINT,YES,,,
5,SR_Flag,VARCHAR,YES,,,
6,Affiliated_base_number,VARCHAR,YES,,,


In [77]:
con.close()

I then added the YAML files to correct Model folders and ran:

! uv run dbt run --select stg_fhv_tripdata --target prod

In [79]:
con = duckdb.connect("taxi_rides_ny.duckdb")

In [80]:
con.execute("""
    SELECT COUNT(*) 
    FROM prod.stg_fhv_tripdata 
    WHERE YEAR(pickup_datetime) = 2019
""").df()

Unnamed: 0,count_star()
0,43244693


Answer: `43,244,693`

---