In [1]:
import pandas as pd
import os
import argparse
from time import time
from prefect import flow, task
from sqlalchemy import create_engine


In [2]:
# Start a Prefect Flow in Jupyter Notebook

from prefect import flow, task

@task
def say_hello():
    return "Hello, Prefect!"

@flow
def hello_flow():
    result = say_hello()
    print(result)

hello_flow()


Hello, Prefect!


In [3]:
! pip install --upgrade prefect



[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m24.3.1[0m[39;49m -> [0m[32;49m25.0[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpython3 -m pip install --upgrade pip[0m


In [1]:
import psycopg2

conn = psycopg2.connect(
    dbname="ny_taxi",
    user="root",
    password="root",
    host="172.18.0.2", 
    port="5432"
)
print("Connected successfully!")
conn.close()


Connected successfully!


In [None]:
import os
import pandas as pd
from time import time
from sqlalchemy import create_engine
from prefect import flow, task
from prefect.tasks import task_input_hash
from datetime import timedelta

@task(log_prints=True, retires =3, cache_key_fn = task_input_hash, cache_expiration=timedelta(days=1))
def extract_data(url: str):
    # the backup files are gzipped, and it's important to keep the correct extension
    # for pandas to be able to open the file
    if url.endswith('.csv.gz'):
        csv_name = 'yellow_tripdata_2021-01.csv.gz'
    else:
        csv_name = 'output.csv'
    
    os.system(f"wget {url} -O {csv_name}")

    df_iter = pd.read_csv(csv_name, iterator=True, chunksize=100000)
    df = next(df_iter)

    df.tpep_pickup_datetime = pd.to_datetime(df.tpep_pickup_datetime)
    df.tpep_dropoff_datetime = pd.to_datetime(df.tpep_dropoff_datetime)
    return df

@task(log_prints=True)
def transform_data(df):
    print(f"pre: missing passenger count: {df['passenger_count'].isin([0]).sum()}")
    df = df[df['passenger_count'] != 0]
    print(f"post: missing passenger count: {df['passenger_count'].isin([0]).sum()}")
    return df

@task(log_prints=True, retries=3)
def load_data(table_name, df):
    connection_block = SqlAlchemyConnector.load("postgres-connector")
    with connection_block.get_connection(begin=False) as engine:
        df.head(n=0).to_sql(name=table_name, con=engine, if_exists='replace')
        df.to_sql(name=table_name, con=engine, if_exists='append')

@flow(name="Subflow", log_prints=True)
def log_subflow(table_name: str):
    print(f"Logging Subflow for: {table_name}")

@flow(name="Ingest Data")
def main_flow(table_name: str = "yellow_taxi_trips"):

    csv_url = "https://github.com/DataTalksClub/nyc-tlc-data/releases/download/yellow/yellow_tripdata_2021-03.csv.gz"
    log_subflow(table_name)
    raw_data = extract_data(csv_url)
    data = transform_data(raw_data)
    load_data(table_name, data)

if __name__ == '__main__':
    main_flow(table_name = "yellow_trips")

--2025-02-06 01:33:48--  https://github.com/DataTalksClub/nyc-tlc-data/releases/download/yellow/yellow_tripdata_2021-01.csv.gz
Resolving github.com (github.com)... 140.82.116.4
Connecting to github.com (github.com)|140.82.116.4|:443... connected.
HTTP request sent, awaiting response... 302 Found
Location: https://objects.githubusercontent.com/github-production-release-asset-2e65be/513814948/f6895842-79e6-4a43-9458-e5b0b454a340?X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Credential=releaseassetproduction%2F20250206%2Fus-east-1%2Fs3%2Faws4_request&X-Amz-Date=20250206T013348Z&X-Amz-Expires=300&X-Amz-Signature=4eee63fdfac862ab6a6c528378fa74c551bfd60fccdd11e5b94a5c0b9a4bc947&X-Amz-SignedHeaders=host&response-content-disposition=attachment%3B%20filename%3Dyellow_tripdata_2021-01.csv.gz&response-content-type=application%2Foctet-stream [following]
--2025-02-06 01:33:48--  https://objects.githubusercontent.com/github-production-release-asset-2e65be/513814948/f6895842-79e6-4a43-9458-e5b0b454a340?X-Am

  df = next(df_iter)


In [3]:
import pandas as pd
from sqlalchemy import create_engine

# Database connection details
user = "root"
password = "root"
host = "172.18.0.2"  
port = "5432"
db = "ny_taxi"

# Create connection
postgres_url = f'postgresql://{user}:{password}@{host}:{port}/{db}'
engine = create_engine(postgres_url)

# Test connection
try:
    with engine.connect() as conn:
        print("Connected to PostgreSQL successfully!")
except Exception as e:
    print(f"Connection failed: {e}")


Connected to PostgreSQL successfully!


In [4]:
# List Tables
query = "SELECT table_name FROM information_schema.tables WHERE table_schema='public'"
tables = pd.read_sql(query, con=engine)
tables


Unnamed: 0,table_name
0,yellow_taxi_trip
1,yellow_taxi_trips_20250206


In [32]:
from sqlalchemy import create_engine, text

# Drop the table query
drop_query = text("DROP TABLE IF EXISTS yellow_taxi_data")

# Execute the query
with engine.connect() as conn:
    conn.execute(drop_query)
    conn.commit()  # Ensure changes are saved
    print("Table yellow_taxi_data dropped successfully!")


Table yellow_taxi_data dropped successfully!


In [30]:
# If you're using a transactional database (e.g., PostgreSQL), the DROP TABLE command might require a commit to finalize the changes.
with engine.connect() as conn:
    conn.execute(text("DROP TABLE IF EXISTS yellow_taxi_data"))
    conn.commit()  # Ensure changes are saved

In [36]:
tables = pd.read_sql("SELECT table_name FROM information_schema.tables WHERE table_schema='public'", con=engine)
tables



Unnamed: 0,table_name
0,yellow_taxi_trip


In [34]:
# change table name

# Rename the table
old_table_name = "yellow_taxi_trips_20250205"
new_table_name = "yellow_taxi_trip"  # Set your new table name here
rename_query = text(f"ALTER TABLE {old_table_name} RENAME TO {new_table_name}")

# Execute the query
with engine.connect() as conn:
    conn.execute(rename_query)
    conn.commit()  # Ensure changes are saved
    print(f"Table {old_table_name} renamed to {new_table_name} successfully!")


Table yellow_taxi_trips_20250205 renamed to yellow_taxi_trip successfully!


In [12]:
from prefect_sqlalchemy import SqlAlchemyConnector
from prefect.blocks.core import Block

user = "root"
password = "root"
host = "172.18.0.2"  # Or use "pg-database" if using Docker networking
port = "5432"
db = "ny_taxi"

# Create the actual connection URL using the variables
connection_url = f"postgresql://{user}:{password}@{host}:{port}/{db}"

# Initialize the SqlAlchemyConnector block with the connection URL directly in connection_info
db_block = SqlAlchemyConnector(connection_info=connection_url)

# Save the block with the name "postgres-connector"
db_block.save("postgres-connector", overwrite=True)



<coroutine object sync_compatible.<locals>.coroutine_wrapper.<locals>.ctx_call at 0x7efb99def5b0>

In [13]:
# Save the block
db_block.save("postgres-connector", overwrite=True)

# Load the block
loaded_block = SqlAlchemyConnector.load("postgres-connector")
print(loaded_block)


<coroutine object Block.aload at 0x7efb99cb29e0>


  db_block.save("postgres-connector", overwrite=True)


In [6]:
! pip install prefect_sqlalchemy

Collecting prefect_sqlalchemy
  Downloading prefect_sqlalchemy-0.5.2-py3-none-any.whl.metadata (2.3 kB)
Downloading prefect_sqlalchemy-0.5.2-py3-none-any.whl (14 kB)
Installing collected packages: prefect_sqlalchemy
Successfully installed prefect_sqlalchemy-0.5.2

[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m24.3.1[0m[39;49m -> [0m[32;49m25.0[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpython3 -m pip install --upgrade pip[0m


In [6]:
from sqlalchemy import create_engine

In [9]:
engine = create_engine('postgresql://root:root@localhost:5432/ny_taxi')

In [10]:
# Create the table schema
df.head(n=0).to_sql(name='yellow_taxi_data', con=engine, if_exists='replace')

0

In [12]:
# Load the actual data into the table
df.to_sql(name='yellow_taxi_data', con=engine, index=False, if_exists='replace')

: 

In [6]:
! pip install prefect

Collecting prefect
  Using cached prefect-3.1.15-py3-none-any.whl.metadata (14 kB)
Collecting aiosqlite<1.0.0,>=0.17.0 (from prefect)
  Using cached aiosqlite-0.21.0-py3-none-any.whl.metadata (4.3 kB)
Collecting alembic<2.0.0,>=1.7.5 (from prefect)
  Using cached alembic-1.14.1-py3-none-any.whl.metadata (7.4 kB)
Collecting apprise<2.0.0,>=1.1.0 (from prefect)
  Using cached apprise-1.9.2-py3-none-any.whl.metadata (52 kB)
Collecting asyncpg<1.0.0,>=0.23 (from prefect)
  Using cached asyncpg-0.30.0-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (5.0 kB)
Collecting click<8.2,>=8.0 (from prefect)
  Using cached click-8.1.8-py3-none-any.whl.metadata (2.3 kB)
Collecting cryptography>=36.0.1 (from prefect)
  Using cached cryptography-44.0.0-cp39-abi3-manylinux_2_28_x86_64.whl.metadata (5.7 kB)
Collecting dateparser<2.0.0,>=1.1.1 (from prefect)
  Using cached dateparser-1.2.1-py3-none-any.whl.metadata (29 kB)
Collecting docker<8.0,>=4.0 (from prefect)
  Using cached docker