In [1]:
## Q1 Refactoring

In [2]:
!pip freeze > requirements.txt

In [3]:
%%writefile batch.py
import sys
import pickle
import pandas as pd
import os

def read_data(filename, categorical):
    df = pd.read_parquet(filename)
    
    df['duration'] = df.tpep_dropoff_datetime - df.tpep_pickup_datetime
    df['duration'] = df.duration.dt.total_seconds() / 60

    df = df[(df.duration >= 1) & (df.duration <= 60)].copy()

    df[categorical] = df[categorical].fillna(-1).astype('int').astype('str')
    
    return df

def main(year, month):
    input_file = f'https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_{year:04d}-{month:02d}.parquet'
    output_folder = "output"
    output_file = output_folder+f'/yellow_tripdata_{year:04d}-{month:02d}.parquet'

    with open('model.bin', 'rb') as f_in:
        dv, lr = pickle.load(f_in)

    categorical = ['PULocationID', 'DOLocationID']

    df = read_data(input_file, categorical)
    df['ride_id'] = f'{year:04d}/{month:02d}_' + df.index.astype('str')

    dicts = df[categorical].to_dict(orient='records')
    X_val = dv.transform(dicts)
    y_pred = lr.predict(X_val)

    print('Predicted mean duration:', y_pred.mean())

    df_result = pd.DataFrame()
    df_result['ride_id'] = df['ride_id']
    df_result['predicted_duration'] = y_pred

    if not os.path.exists(output_folder):
        os.makedirs(output_folder)

    df_result.to_parquet(output_file, engine='pyarrow', index=False)
    return

if __name__ == '__main__':
    year = int(sys.argv[1])
    month = int(sys.argv[2])
    main(year, month)

Overwriting batch.py


In [4]:
!python batch.py 2023 03

https://scikit-learn.org/stable/model_persistence.html#security-maintainability-limitations
https://scikit-learn.org/stable/model_persistence.html#security-maintainability-limitations
Predicted mean duration: 14.203865642696083


In [5]:
### Q2. Installing pytest

In [8]:
!pip install pipenv



In [10]:
!pipenv run pip install pytest

[32mCourtesy Notice[0m:
Pipenv found itself running within a virtual environment,  so it will 
automatically use that environment, instead of  creating its own for any 
project. You can set
[1;33mPIPENV_IGNORE_VIRTUALENVS[0m[1m=[0m[1;36m1[0m to force pipenv to ignore that environment and 
create  its own instead.
Collecting pytest
  Downloading pytest-8.4.1-py3-none-any.whl.metadata (7.7 kB)
Collecting iniconfig>=1 (from pytest)
  Downloading iniconfig-2.1.0-py3-none-any.whl.metadata (2.7 kB)
Collecting pluggy<2,>=1.5 (from pytest)
  Downloading pluggy-1.6.0-py3-none-any.whl.metadata (4.8 kB)
Downloading pytest-8.4.1-py3-none-any.whl (365 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m365.5/365.5 kB[0m [31m1.8 MB/s[0m eta [36m0:00:00[0ma [36m0:00:01[0mm
[?25hDownloading iniconfig-2.1.0-py3-none-any.whl (6.0 kB)
Downloading pluggy-1.6.0-py3-none-any.whl (20 kB)
Installing collected packages: pluggy, iniconfig, pytest
Successfully installed iniconfig-2.

In [11]:
!pipenv graph

[32mCourtesy Notice[0m:
Pipenv found itself running within a virtual environment,  so it will 
automatically use that environment, instead of  creating its own for any 
project. You can set
[1;33mPIPENV_IGNORE_VIRTUALENVS[0m[1m=[0m[1;36m1[0m to force pipenv to ignore that environment and 
create  its own instead.
[1mannotated-types==0.7.0[0m
[1mcvlib==0.2.7[0m
[1m├── imageio [0m
[1m│   ├── numpy [0m
[1m│   └── pillow [0m
[1m├── imutils [0m
[1m├── numpy [0m
[1m├── pillow [0m
[1m├── progressbar [0m
[1m└── requests [0m
    ├── certifi 
    ├── charset-normalizer 
    ├── idna 
    └── urllib3 
[1mevidently==0.4.0[0m
[1m├── fastapi [0m
[1m│   ├── pydantic [0m
[1m│   │   └── typing_extensions [0m
[1m│   ├── starlette [0m
[1m│   │   ├── anyio [0m
[1m│   │   │   ├── idna [0m
[1m│   │   │   └── sniffio [0m
[1m│   │   └── typing_extensions [0m
[1m│   └── typing_extensions [0m
[1m├── iterative-telemetry [0m
[1m│   ├── appdirs [0m
[1m│   ├── di

In [12]:
!PIPENV_IGNORE_VIRTUALENVS=1

In [13]:
!pipenv install --dev pytest

[32mCourtesy Notice[0m:
Pipenv found itself running within a virtual environment,  so it will 
automatically use that environment, instead of  creating its own for any 
project. You can set
[1;33mPIPENV_IGNORE_VIRTUALENVS[0m[1m=[0m[1;36m1[0m to force pipenv to ignore that environment and 
create  its own instead.
[1mPipfile.lock not found, creating[0m[1;33m...[0m
Locking  dependencies[33m...[0m
[?25lBuilding requirements...
[2KResolving dependencies...es...
[2K✘ Locking Failed!g packages...
[32m⠧[0m Locking packages...CRITICAL:pipenv.patched.pip._internal.resolution.resolvelib.factory:Cannot 
install -r [35m/tmp/pipenv-7tx1hqyf-requirements/[0m[95mpipenv-u4k1yxij-constraints.txt[0m 
[1m([0mline [1;36m211[0m[1m)[0m and [33mprotobuf[0m==[1;36m6.32[0m.[1;36m0[0m because these package versions have conflicting 
dependencies.
[1m[[0m1m[1m[[0m[1m[[0m0mResolutionFailure[1m[[0m1m[1m][0m[1m[[0m0m:   File 
[1m[[0m32m"[35m/mnt/c/Users/shubham

In [14]:
!pipenv lock --pre

[32mCourtesy Notice[0m:
Pipenv found itself running within a virtual environment,  so it will 
automatically use that environment, instead of  creating its own for any 
project. You can set
[1;33mPIPENV_IGNORE_VIRTUALENVS[0m[1m=[0m[1;36m1[0m to force pipenv to ignore that environment and 
create  its own instead.
Locking  dependencies[33m...[0m
[?25lBuilding requirements...
[2KResolving dependencies...es...
[2K✘ Locking Failed!g packages...
[32m⠇[0m Locking packages...CRITICAL:pipenv.patched.pip._internal.resolution.resolvelib.factory:Cannot 
install -r [35m/tmp/pipenv-r3nmqfhc-requirements/[0m[95mpipenv-ztkwaiu0-constraints.txt[0m 
[1m([0mline [1;36m126[0m[1m)[0m and [33mprotobuf[0m==[1;36m6.32[0m.[1;36m0[0m because these package versions have conflicting 
dependencies.
[1m[[0m1m[1m[[0m[1m[[0m0mResolutionFailure[1m[[0m1m[1m][0m[1m[[0m0m:   File 
[1m[[0m32m"[35m/mnt/c/Users/shubham_yadav/Documents/COXA-ENIT/lab/venv/lib/python3.12/[0m

In [15]:
!pipenv run pip install pytest

[32mCourtesy Notice[0m:
Pipenv found itself running within a virtual environment,  so it will 
automatically use that environment, instead of  creating its own for any 
project. You can set
[1;33mPIPENV_IGNORE_VIRTUALENVS[0m[1m=[0m[1;36m1[0m to force pipenv to ignore that environment and 
create  its own instead.


In [16]:
!pipenv --clear

[32mCourtesy Notice[0m:
Pipenv found itself running within a virtual environment,  so it will 
automatically use that environment, instead of  creating its own for any 
project. You can set
[1;33mPIPENV_IGNORE_VIRTUALENVS[0m[1m=[0m[1;36m1[0m to force pipenv to ignore that environment and 
create  its own instead.
[1mClearing caches...[0m


In [17]:
!pipenv lock --python 3.12

[32mCourtesy Notice[0m:
Pipenv found itself running within a virtual environment,  so it will 
automatically use that environment, instead of  creating its own for any 
project. You can set
[1;33mPIPENV_IGNORE_VIRTUALENVS[0m[1m=[0m[1;36m1[0m to force pipenv to ignore that environment and 
create  its own instead.
Locking  dependencies[33m...[0m
[?25lBuilding requirements...
[2KResolving dependencies...es...
[2K✘ Locking Failed!g packages...
[2K[32m⠸[0m Locking packages...CRITICAL:pipenv.patched.pip._internal.resolution.resolvelib.factory:Cannot 
install -r [35m/tmp/pipenv-yvmd8qq4-requirements/[0m[95mpipenv-rc2es4og-constraints.txt[0m 
[1m([0mline [1;36m225[0m[1m)[0m, -r [35m/tmp/pipenv-yvmd8qq4-requirements/[0m[95mpipenv-rc2es4og-constraints.txt[0m
[1m([0mline [1;36m240[0m[1m)[0m and [33mpydantic[0m==[1;36m1.10[0m.[1;36m22[0m because these package versions have conflicting
dependencies.
[1m[[0m1m[1m[[0m[1m[[0m0mResolutionFailure[1m[

In [19]:
%%writefile tests/test_batch.py
#Comment to enable creating the file

Writing tests/test_batch.py


In [20]:
%%writefile tests/__init__.py
#Comment to enable creating the file

Writing tests/__init__.py


In [21]:
%%writefile batch.py
import sys
import pickle
import pandas as pd
import os

def prepare_data(df, categorical):
    df['duration'] = df.tpep_dropoff_datetime - df.tpep_pickup_datetime
    df['duration'] = df.duration.dt.total_seconds() / 60

    df = df[(df.duration >= 1) & (df.duration <= 60)].copy()

    df[categorical] = df[categorical].fillna(-1).astype('int').astype('str')
    
    return df

def read_data(filename, categorical):
    df = pd.read_parquet(filename)
    df = prepare_data(df, categorical)
    return df

def main(year, month):
    input_file = f'https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_{year:04d}-{month:02d}.parquet'
    output_folder = "output"
    output_file = output_folder+f'/yellow_tripdata_{year:04d}-{month:02d}.parquet'

    with open('model.bin', 'rb') as f_in:
        dv, lr = pickle.load(f_in)

    categorical = ['PULocationID', 'DOLocationID']

    df = read_data(input_file, categorical)
    df['ride_id'] = f'{year:04d}/{month:02d}_' + df.index.astype('str')

    dicts = df[categorical].to_dict(orient='records')
    X_val = dv.transform(dicts)
    y_pred = lr.predict(X_val)

    print('Predicted mean duration:', y_pred.mean())

    df_result = pd.DataFrame()
    df_result['ride_id'] = df['ride_id']
    df_result['predicted_duration'] = y_pred

    if not os.path.exists(output_folder):
        os.makedirs(output_folder)

    df_result.to_parquet(output_file, engine='pyarrow', index=False)
    return

if __name__ == '__main__':
    year = int(sys.argv[1])
    month = int(sys.argv[2])
    main(year, month)

Overwriting batch.py


In [5]:
%%writefile tests/test_batch.py
from datetime import datetime
import pandas as pd
import batch

def dt(hour, minute, second=0):
    return datetime(2023, 1, 1, hour, minute, second)

def test_prepare_data():
    data = [
        (None, None, dt(1, 1), dt(1, 10)),
        (1, 1, dt(1, 2), dt(1, 10)),
        (1, None, dt(1, 2, 0), dt(1, 2, 59)),
        (3, 4, dt(1, 2, 0), dt(2, 2, 1)),      
    ]

    columns = ['PULocationID', 'DOLocationID', 'tpep_pickup_datetime', 'tpep_dropoff_datetime']
    df = pd.DataFrame(data, columns=columns)

    categorical = ['PULocationID', 'DOLocationID']
    actual_result = batch.prepare_data(df, categorical)
    assert len(actual_result) == 2
    
    assert actual_result['duration'].between(1, 60).all()
    
    assert not actual_result[categorical].isna().any().any()
    
    for col in categorical:
        assert actual_result[col].dtype == object or str(actual_result[col].dtype) == 'string'

Overwriting tests/test_batch.py


In [6]:
%%writefile docker-compose.yaml
services:
  localstack:
    image: localstack/localstack:latest
    ports:
      - "4566:4566"
    environment:
      - SERVICES=s3
      - DEBUG=1
      - AWS_DEFAULT_REGION=us-east-1
      - AWS_ACCESS_KEY_ID=test
      - AWS_SECRET_ACCESS_KEY=test
    volumes:
      - "./localstack_data:/var/lib/localstack"

Overwriting docker-compose.yaml


In [None]:
export AWS_ACCESS_KEY_ID=test
export AWS_SECRET_ACCESS_KEY=test
export AWS_DEFAULT_REGION=us-east-1

In [7]:
!docker-compose down

[1A[1B[0G[?25l[+] Running 0/1
 [33m⠋[0m Container modelbestpractices-localstack-1  Stopping                     [34m0.1s [0m
[?25h[1A[1A[0G[?25l[+] Running 0/1
 [33m⠙[0m Container modelbestpractices-localstack-1  Stopping                     [34m0.2s [0m
[?25h[1A[1A[0G[?25l[+] Running 0/1
 [33m⠹[0m Container modelbestpractices-localstack-1  Stopping                     [34m0.3s [0m
[?25h[1A[1A[0G[?25l[+] Running 0/1
 [33m⠸[0m Container modelbestpractices-localstack-1  Stopping                     [34m0.4s [0m
[?25h[1A[1A[0G[?25l[+] Running 0/1
 [33m⠼[0m Container modelbestpractices-localstack-1  Stopping                     [34m0.5s [0m
[?25h[1A[1A[0G[?25l[+] Running 0/1
 [33m⠴[0m Container modelbestpractices-localstack-1  Stopping                     [34m0.6s [0m
[?25h[1A[1A[0G[?25l[+] Running 0/1
 [33m⠦[0m Container modelbestpractices-localstack-1  Stopping                     [34m0.7s [0m
[?25h[1A[1A[0G[?25l[+] Runni

In [8]:
!docker-compose up -d

[1A[1B[0G[?25l[+] Running 1/2
 [32m✔[0m Network modelbestpractices_default         [32mCreated[0m                      [34m0.0s [0m
 [33m⠋[0m Container modelbestpractices-localstack-1  Creating                     [34m0.1s [0m
[?25h[1A[1A[1A[0G[?25l[+] Running 1/2
 [32m✔[0m Network modelbestpractices_default         [32mCreated[0m                      [34m0.0s [0m
 [33m⠙[0m Container modelbestpractices-localstack-1  Starting                     [34m0.2s [0m
[?25h[1A[1A[1A[0G[?25l[+] Running 1/2
 [32m✔[0m Network modelbestpractices_default         [32mCreated[0m                      [34m0.0s [0m
 [33m⠹[0m Container modelbestpractices-localstack-1  Starting                     [34m0.3s [0m
[?25h[1A[1A[1A[0G[?25l[34m[+] Running 2/2[0m
 [32m✔[0m Network modelbestpractices_default         [32mCreated[0m                      [34m0.0s [0m
 [32m✔[0m Container modelbestpractices-localstack-1  [32mStarted[0m                      [3

In [11]:
!aws configure set aws_access_key_id test
!aws configure set aws_secret_access_key test
!aws configure set default.region us-east-1

In [37]:
!aws --endpoint-url=http://localhost:4566 s3 mb s3://nyc-duration

make_bucket: nyc-duration


In [38]:
!set INPUT_FILE_PATTERN="s3://nyc-duration/in/yellow_tripdata_{year:04d}-{month:02d}.parquet"
!set OUTPUT_FILE_PATTERN="s3://nyc-duration/out/yellow_tripdata_{year:04d}-{month:02d}.parquet"

In [39]:
%%writefile batch.py
import sys
import pickle
import pandas as pd
import os

def get_input_path(year, month):
    default_input_pattern = 'https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_{year:04d}-{month:02d}.parquet'
    input_pattern = os.getenv('INPUT_FILE_PATTERN', default_input_pattern)
    return input_pattern.format(year=year, month=month)

def get_output_path(year, month):
    default_output_pattern = 's3://nyc-duration-prediction-alexey/taxi_type=fhv/year={year:04d}/month={month:02d}/predictions.parquet'
    output_pattern = os.getenv('OUTPUT_FILE_PATTERN', default_output_pattern)
    return output_pattern.format(year=year, month=month)

def prepare_data(df, categorical):
    df['duration'] = df.tpep_dropoff_datetime - df.tpep_pickup_datetime
    df['duration'] = df.duration.dt.total_seconds() / 60

    df = df[(df.duration >= 1) & (df.duration <= 60)].copy()

    df[categorical] = df[categorical].fillna(-1).astype('int').astype('str')
    
    return df

def read_data(filename, categorical):
    df = pd.read_parquet(filename)
    df = prepare_data(df, categorical)
    return df

def main(year, month):
    output_folder = "output"
    input_file = get_input_path(year, month)
    output_file = get_output_path(year, month)

    with open('model.bin', 'rb') as f_in:
        dv, lr = pickle.load(f_in)

    categorical = ['PULocationID', 'DOLocationID']

    df = read_data(input_file, categorical)
    df['ride_id'] = f'{year:04d}/{month:02d}_' + df.index.astype('str')

    dicts = df[categorical].to_dict(orient='records')
    X_val = dv.transform(dicts)
    y_pred = lr.predict(X_val)

    print('Predicted mean duration:', y_pred.mean())

    df_result = pd.DataFrame()
    df_result['ride_id'] = df['ride_id']
    df_result['predicted_duration'] = y_pred

    if not os.path.exists(output_folder):
        os.makedirs(output_folder)

    df_result.to_parquet(output_file, engine='pyarrow', index=False)
    return

if __name__ == '__main__':
    year = int(sys.argv[1])
    month = int(sys.argv[2])
    main(year, month)

Overwriting batch.py


In [40]:
!aws --endpoint-url=http://localhost:4566 s3 cp ./input/yellow_tripdata_2023-03.parquet s3://nyc-duration/in/yellow_tripdata_2023-03.parquet
!aws --endpoint-url=http://localhost:4566 s3 cp ./output/yellow_tripdata_2023-03.parquet s3://nyc-duration/out/yellow_tripdata_2023-03.parquet

!set S3_ENDPOINT_URL=http://localhost:4566 

upload: input/yellow_tripdata_2023-03.parquet to s3://nyc-duration/in/yellow_tripdata_2023-03.parquet
upload: output/yellow_tripdata_2023-03.parquet to s3://nyc-duration/out/yellow_tripdata_2023-03.parquet


In [41]:
%%writefile batch.py
import sys
import pickle
import pandas as pd
import os

def get_input_path(year, month):
    default_input_pattern = 'https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_{year:04d}-{month:02d}.parquet'
    input_pattern = os.getenv('INPUT_FILE_PATTERN', default_input_pattern)
    return input_pattern.format(year=year, month=month)

def get_output_path(year, month):
    default_output_pattern = 's3://nyc-duration/taxi_type=yellow/year={year:04d}/month={month:02d}/predictions.parquet'
    output_pattern = os.getenv('OUTPUT_FILE_PATTERN', default_output_pattern)
    return output_pattern.format(year=year, month=month)

def prepare_data(df, categorical):
    df['duration'] = df.tpep_dropoff_datetime - df.tpep_pickup_datetime
    df['duration'] = df.duration.dt.total_seconds() / 60
    df = df[(df.duration >= 1) & (df.duration <= 60)].copy()
    df[categorical] = df[categorical].fillna(-1).astype('int').astype('str')
    
    return df

def read_data(filename, categorical, options=None):
    # Only use storage_options for S3 URLs, not for HTTPS URLs
    if filename.startswith('s3://') and options:
        df = pd.read_parquet(filename, storage_options=options)
    else:
        df = pd.read_parquet(filename)
    
    df = prepare_data(df, categorical)
    return df

def main(year, month):
    S3_ENDPOINT_URL = os.getenv("S3_ENDPOINT_URL")
    
    # Only set up S3 options if we have an S3 endpoint URL
    options = None
    if S3_ENDPOINT_URL:
        options = {
            'client_kwargs': {
                'endpoint_url': S3_ENDPOINT_URL
            }
        }
    
    output_folder = "output"
    input_file = get_input_path(year, month)
    output_file = get_output_path(year, month)
    
    with open('model.bin', 'rb') as f_in:
        dv, lr = pickle.load(f_in)
    
    categorical = ['PULocationID', 'DOLocationID']
    
    # Read input data
    df = read_data(input_file, categorical, options)
    df['ride_id'] = f'{year:04d}/{month:02d}_' + df.index.astype('str')
    
    # Make predictions
    dicts = df[categorical].to_dict(orient='records')
    X_val = dv.transform(dicts)
    y_pred = lr.predict(X_val)
    
    print('Predicted mean duration:', y_pred.mean())
    
    # Prepare results
    df_result = pd.DataFrame()
    df_result['ride_id'] = df['ride_id']
    df_result['predicted_duration'] = y_pred
    
    # Create output directory if it doesn't exist
    if not os.path.exists(output_folder):
        os.makedirs(output_folder)
    
    # Save results
    if output_file.startswith('s3://') and options:
        df_result.to_parquet(output_file, engine='pyarrow', index=False, storage_options=options)
    else:
        # For local files, save to output folder
        local_output = os.path.join(output_folder, f'predictions_{year:04d}_{month:02d}.parquet')
        df_result.to_parquet(local_output, engine='pyarrow', index=False)
        print(f'Results saved to: {local_output}')
    
    return

if __name__ == '__main__':
    year = int(sys.argv[1])
    month = int(sys.argv[2])
    main(year, month)

Overwriting batch.py


In [42]:
!python batch.py 2023 03

https://scikit-learn.org/stable/model_persistence.html#security-maintainability-limitations
https://scikit-learn.org/stable/model_persistence.html#security-maintainability-limitations
Predicted mean duration: 14.203865642696083
Results saved to: output/predictions_2023_03.parquet


In [43]:
!aws --endpoint-url=http://localhost:4566 s3 ls s3://nyc-duration/ --recursive

2025-09-05 06:31:45   23128257 in/yellow_tripdata_2023-03.parquet
2025-09-05 06:31:46   23128257 out/yellow_tripdata_2023-03.parquet


In [30]:
### Q5- test the data

In [46]:
!pip install s3fs

Collecting s3fs
  Downloading s3fs-2025.9.0-py3-none-any.whl.metadata (1.4 kB)
Collecting aiobotocore<3.0.0,>=2.5.4 (from s3fs)
  Downloading aiobotocore-2.24.1-py3-none-any.whl.metadata (25 kB)
Collecting fsspec==2025.9.0 (from s3fs)
  Downloading fsspec-2025.9.0-py3-none-any.whl.metadata (10 kB)
Collecting aiohttp!=4.0.0a0,!=4.0.0a1 (from s3fs)
  Downloading aiohttp-3.12.15-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (7.7 kB)
Collecting aioitertools<1.0.0,>=0.5.1 (from aiobotocore<3.0.0,>=2.5.4->s3fs)
  Downloading aioitertools-0.12.0-py3-none-any.whl.metadata (3.8 kB)
Collecting botocore<1.39.12,>=1.39.9 (from aiobotocore<3.0.0,>=2.5.4->s3fs)
  Downloading botocore-1.39.11-py3-none-any.whl.metadata (5.7 kB)
Collecting jmespath<2.0.0,>=0.7.1 (from aiobotocore<3.0.0,>=2.5.4->s3fs)
  Downloading jmespath-1.0.1-py3-none-any.whl.metadata (7.6 kB)
Collecting multidict<7.0.0,>=6.0.0 (from aiobotocore<3.0.0,>=2.5.4->s3fs)
  Downloading multidict-6.6.4-cp312-cp312-man

In [69]:
%%writefile tests/integration-test.py
from datetime import datetime
import pandas as pd
import os

def dt(hour, minute, second=0):
    return datetime(2023, 1, 1, hour, minute, second)

def get_input_path(year, month):
    # For integration testing, use S3 path instead of HTTPS
    default_input_pattern = 's3://nyc-duration/test-data/yellow_tripdata_{year:04d}-{month:02d}.parquet'
    input_pattern = os.getenv('INPUT_FILE_PATTERN', default_input_pattern)
    return input_pattern.format(year=year, month=month)

# Create test data
data = [
    (None, None, dt(1, 1), dt(1, 10)),
    (1, 1, dt(1, 2), dt(1, 10)),
    (1, None, dt(1, 2, 0), dt(1, 2, 59)),
    (3, 4, dt(1, 2, 0), dt(2, 2, 1)),      
]

columns = ['PULocationID', 'DOLocationID', 'tpep_pickup_datetime', 'tpep_dropoff_datetime']
df = pd.DataFrame(data, columns=columns)

# Set up S3 options for LocalStack
S3_ENDPOINT_URL = os.getenv("S3_ENDPOINT_URL", "http://localhost:4566")
options = {
    'client_kwargs': {
        'endpoint_url': S3_ENDPOINT_URL
    }
}

# Write test data to S3 (LocalStack)
input_file = get_input_path(2023, 1)
print(f"Writing test data to: {input_file}")

try:
    df.to_parquet(
        input_file,
        engine='pyarrow',
        compression=None,
        index=False,
        storage_options=options
    )
    print("Test data written successfully!")
    
    # Verify we can read it back
    df_read = pd.read_parquet(input_file, storage_options=options)
    print(f"Successfully read back {len(df_read)} rows")
    print(df_read.head())
    
except Exception as e:
    print(f"Error: {e}")
    print("Make sure LocalStack is running and the bucket exists:")
    print("aws --endpoint-url=http://localhost:4566 s3 mb s3://nyc-duration")

Overwriting tests/integration-test.py


In [70]:
!python tests/integration-test.py

Writing test data to: s3://nyc-duration/test-data/yellow_tripdata_2023-01.parquet
Test data written successfully!
Successfully read back 4 rows
   PULocationID  DOLocationID tpep_pickup_datetime tpep_dropoff_datetime
0           NaN           NaN  2023-01-01 01:01:00   2023-01-01 01:10:00
1           1.0           1.0  2023-01-01 01:02:00   2023-01-01 01:10:00
2           1.0           NaN  2023-01-01 01:02:00   2023-01-01 01:02:59
3           3.0           4.0  2023-01-01 01:02:00   2023-01-01 02:02:01


In [71]:
!aws --endpoint-url=http://localhost:4566 s3 ls s3://nyc-duration/ --recursive

2025-09-05 06:31:45   23128257 in/yellow_tripdata_2023-03.parquet
2025-09-05 06:31:46   23128257 out/yellow_tripdata_2023-03.parquet
2025-09-05 06:59:29       3215 test-data/yellow_tripdata_2023-01.parquet


In [52]:
####Question 6 - Integration tests

In [72]:
%%writefile batch.py
import sys
import pickle
import pandas as pd
import os

def get_input_path(year, month):
    default_input_pattern = 'https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_{year:04d}-{month:02d}.parquet'
    input_pattern = os.getenv('INPUT_FILE_PATTERN', default_input_pattern)
    return input_pattern.format(year=year, month=month)

def get_output_path(year, month):
    default_output_pattern = 's3://nyc-duration/taxi_type=fhv/year={year:04d}/month={month:02d}/predictions.parquet'
    output_pattern = os.getenv('OUTPUT_FILE_PATTERN', default_output_pattern)
    return output_pattern.format(year=year, month=month)

def prepare_data(df, categorical):
    df['duration'] = df.tpep_dropoff_datetime - df.tpep_pickup_datetime
    df['duration'] = df.duration.dt.total_seconds() / 60

    df = df[(df.duration >= 1) & (df.duration <= 60)].copy()

    df[categorical] = df[categorical].fillna(-1).astype('int').astype('str')
    
    return df

def read_data(filename, categorical, options):
    df = pd.read_parquet(filename, storage_options=options)
    df = prepare_data(df, categorical)
    return df

def save_data(df, output_file, options):
    df.to_parquet(output_file, engine="pyarrow", index=False, storage_options=options)


def main(year, month):
    S3_ENDPOINT_URL = os.getenv("S3_ENDPOINT_URL", "http://localhost:4566")
    options = {
        'client_kwargs': {
            'endpoint_url': S3_ENDPOINT_URL
        }
    }

    output_folder = "output"
    input_file = get_input_path(year, month)
    output_file = get_output_path(year, month)

    with open('model.bin', 'rb') as f_in:
        dv, lr = pickle.load(f_in)

    categorical = ['PULocationID', 'DOLocationID']

    df = read_data(input_file, categorical, options)
    df['ride_id'] = f'{year:04d}/{month:02d}_' + df.index.astype('str')

    dicts = df[categorical].to_dict(orient='records')
    X_val = dv.transform(dicts)
    y_pred = lr.predict(X_val)

    print('Predicted mean duration:', y_pred.mean())

    df_result = pd.DataFrame()
    df_result['ride_id'] = df['ride_id']
    df_result['predicted_duration'] = y_pred

    if not os.path.exists(output_folder):
        os.makedirs(output_folder)

    save_data(df_result, output_file, options)
    return

if __name__ == '__main__':
    year = int(sys.argv[1])
    month = int(sys.argv[2])
    main(year, month)

Overwriting batch.py


In [76]:
%%writefile tests/integration_test.py
from datetime import datetime
import pandas as pd
import os

def dt(hour, minute, second=0):
    return datetime(2023, 1, 1, hour, minute, second)

def get_input_path(year, month):
    # Use S3 for integration testing
    default_input_pattern = 's3://nyc-duration/test-data/yellow_tripdata_{year:04d}-{month:02d}.parquet'
    input_pattern = os.getenv('INPUT_FILE_PATTERN', default_input_pattern)
    return input_pattern.format(year=year, month=month)

def get_output_path(year, month):
    default_output_pattern = 's3://nyc-duration/taxi_type=fhv/year={year:04d}/month={month:02d}/predictions.parquet'
    output_pattern = os.getenv('OUTPUT_FILE_PATTERN', default_output_pattern)
    return output_pattern.format(year=year, month=month)

def test_save_data():
    data = [
        (None, None, dt(1, 1), dt(1, 10)),
        (1, 1, dt(1, 2), dt(1, 10)),
        (1, None, dt(1, 2, 0), dt(1, 2, 59)),
        (3, 4, dt(1, 2, 0), dt(2, 2, 1)),      
    ]
    columns = ['PULocationID', 'DOLocationID', 'tpep_pickup_datetime', 'tpep_dropoff_datetime']
    df = pd.DataFrame(data, columns=columns)
    
    S3_ENDPOINT_URL = os.getenv("S3_ENDPOINT_URL", "http://localhost:4566")
    options = {
        'client_kwargs': {
            'endpoint_url': S3_ENDPOINT_URL
        }
    }
    
    input_file = get_input_path(2023, 1)
    print(f"Saving test data to: {input_file}")
    
    df.to_parquet(
        input_file,
        engine='pyarrow',
        compression=None,
        index=False,
        storage_options=options
    )
    
    print("Test data saved successfully!")
    
    # Set environment variables for the batch script
    os.environ['S3_ENDPOINT_URL'] = S3_ENDPOINT_URL
    os.environ['INPUT_FILE_PATTERN'] = 's3://nyc-duration/test-data/yellow_tripdata_{year:04d}-{month:02d}.parquet'
    
    exit_code = os.system("python batch.py 2023 01")
    assert exit_code == 0

def test_read_output():
    S3_ENDPOINT_URL = os.getenv("S3_ENDPOINT_URL", "http://localhost:4566")
    options = {
        'client_kwargs': {
            'endpoint_url': S3_ENDPOINT_URL
        }
    }
    
    output_file = get_output_path(year=2023, month=1)
    print(f"Reading output from: {output_file}")
    
    df = pd.read_parquet(output_file, storage_options=options)
    total = df['predicted_duration'].sum()
    print(f"Total predicted duration: {total}")
    assert round(total, 2) == 36.28

if __name__ == '__main__':
    test_save_data()
    test_read_output()

Overwriting tests/integration_test.py


In [77]:
!python tests/integration_test.py

Saving test data to: s3://nyc-duration/test-data/yellow_tripdata_2023-01.parquet
Test data saved successfully!
https://scikit-learn.org/stable/model_persistence.html#security-maintainability-limitations
https://scikit-learn.org/stable/model_persistence.html#security-maintainability-limitations
Predicted mean duration: 18.138625226015364
Reading output from: s3://nyc-duration/taxi_type=fhv/year=2023/month=01/predictions.parquet
Total predicted duration: 36.27725045203073


In [78]:
!aws --endpoint-url=http://localhost:4566 s3 ls s3://nyc-duration/ --recursive

2025-09-05 06:31:45   23128257 in/yellow_tripdata_2023-03.parquet
2025-09-05 06:31:46   23128257 out/yellow_tripdata_2023-03.parquet
2025-09-05 07:02:27       1805 taxi_type=fhv/year=2023/month=01/predictions.parquet
2025-09-05 07:02:00       3215 test-data/yellow_tripdata_2023-01.parquet
