# Q1. Refactoring

Now we need to create the "main" block from which we'll invoke the main function. How does the if statement that we use for this looks like?

`if __name__ == "__main__":`

Code `batch.py ` for Q1
<details>
<summary>Click to show/hide code</summary>

```python
#!/usr/bin/env python
# coding: utf-8

import warnings
import sys
import pickle
import pandas as pd
import click

warnings.filterwarnings("ignore", category=UserWarning, module='sklearn')


def generate_output_file_path(year, month):
 return f'./output/yellow_tripdata_{year:04d}-{month:02d}.parquet'


def read_data(filename, categorical):
    df = pd.read_parquet(filename)

    df['duration'] = df.tpep_dropoff_datetime - df.tpep_pickup_datetime
    df['duration'] = df.duration.dt.total_seconds() / 60

    df = df[(df.duration >= 1) & (df.duration <= 60)].copy()

    df[categorical] = df[categorical].fillna(-1).astype('int').astype('str')

    return df, categorical

@click.command()
@click.option(
    '--year',
    type=int,
    required=True,
    help='Year of the trip data'
)
@click.option(
    '--month',
    type=int,
    required=True,
    help='Month of the trip data'
)
def main(year, month):
    with open('./model.bin', 'rb') as f_in:
        dv, lr = pickle.load(f_in)

    input_file = f'https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_{year:04d}-{month:02d}.parquet'
    df, categorical = read_data(input_file, categorical = ['PULocationID', 'DOLocationID'])

    df['ride_id'] = f'{year:04d}/{month:02d}_' + df.index.astype('str')

    dicts = df[categorical].to_dict(orient='records')
    X_val = dv.transform(dicts)
    y_pred = lr.predict(X_val)

    print('predicted mean duration:', y_pred.mean())


    df_result = pd.DataFrame()
    df_result['ride_id'] = df['ride_id']
    df_result['predicted_duration'] = y_pred

    output_file = generate_output_file_path(year, month)
    df_result.to_parquet(output_file, engine='pyarrow', index=False)


if __name__ == '__main__':
    main()
```

</details>

In [1]:
! mkdir -p output

In [4]:
! python batch.py --year 2023 --month 4

predicted mean duration: 14.292282936862449


In [5]:
! ls ./output -a

.  ..  yellow_tripdata_2023-04.parquet


# Q2. Installing pytest

Next, create a folder tests and create two files. One will be the file with tests. We can name it test_batch.py.

What should be the other file?

 `__init__.py`

Install `pytest` to pipenv -dev
```bash
pipenv install --dev pytest
```

In [7]:
! mkdir -p tests

In [8]:
! touch tests/__init__.py tests/test_batch.py

In [9]:
! ls ./tests -a

.  ..  __init__.py  test_batch.py


# Q3. Writing first unit test

How many rows should be there in the expected dataframe?

* 1
* 2 ✅
* 3
* 4

Code `batch.py` for Q3:
<details>
<summary>Click to show/hide code</summary>

```python
#!/usr/bin/env python
# coding: utf-8

import warnings
import sys
import pickle
import pandas as pd
import click

warnings.filterwarnings("ignore", category=UserWarning, module='sklearn')


def generate_output_file_path(year, month):
 return f'./output/yellow_tripdata_{year:04d}-{month:02d}.parquet'


def prepare_data(df, categorical):
    df['duration'] = df.tpep_dropoff_datetime - df.tpep_pickup_datetime
    df['duration'] = df.duration.dt.total_seconds() / 60
    df = df[(df.duration >= 1) & (df.duration <= 60)].copy()

    df[categorical] = df[categorical].fillna(-1).astype('int').astype('str')
    return df, categorical


def read_data(filename, categorical):
    df = pd.read_parquet(filename)
    return prepare_data(df, categorical)


@click.command()
@click.option(
    '--year',
    type=int,
    required=True,
    help='Year of the trip data'
)
@click.option(
    '--month',
    type=int,
    required=True,
    help='Month of the trip data'
)
def main(year, month):
    with open('./model.bin', 'rb') as f_in:
        dv, lr = pickle.load(f_in)

    input_file = f'https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_{year:04d}-{month:02d}.parquet'
    df, categorical = read_data(input_file, categorical = ['PULocationID', 'DOLocationID'])

    df['ride_id'] = f'{year:04d}/{month:02d}_' + df.index.astype('str')

    dicts = df[categorical].to_dict(orient='records')
    X_val = dv.transform(dicts)
    y_pred = lr.predict(X_val)

    print('predicted mean duration:', y_pred.mean())

    df_result = pd.DataFrame()
    df_result['ride_id'] = df['ride_id']
    df_result['predicted_duration'] = y_pred

    output_file = generate_output_file_path(year, month)
    df_result.to_parquet(output_file, engine='pyarrow', index=False)


if __name__ == '__main__':
    main()
    
```
</details>

In [10]:
! python batch.py --year 2023 --month 4

predicted mean duration: 14.292282936862449


Code for `tests/test_batch.py`

<details>
<summary>Click to show/hide code</summary>

```python
import pandas as pd
from datetime import datetime
import numpy as np

from batch import prepare_data


def dt(hour, minute, second=0):
    return datetime(2023, 1, 1, hour, minute, second)


def prepare_test_data():
    data = [
    (None, None, dt(1, 1), dt(1, 10)),
    (1, 1, dt(1, 2), dt(1, 10)),
    (1, None, dt(1, 2, 0), dt(1, 2, 59)),
    (3, 4, dt(1, 2, 0), dt(2, 2, 1))
    ]

    columns_test_df = ['PULocationID', 'DOLocationID', 'tpep_pickup_datetime', 'tpep_dropoff_datetime']
    test_df = pd.DataFrame(data, columns=columns_test_df)

    categorical = ['PULocationID', 'DOLocationID']

    prepared_test_df, categorical = prepare_data(test_df, categorical)
    print(prepared_test_df)

    expected_prepared_test_df = [
        ('-1', '-1', 9.),
        ('1', '1', 8.),
    ]
    columns_expected_df = ['PULocationID', 'DOLocationID', 'duration']
    expected_prepared_test_df = pd.DataFrame(expected_prepared_test_df, columns=columns_expected_df)

    catigorial_cols = ['PULocationID', 'DOLocationID']
    for col in catigorial_cols:
        assert (prepared_test_df[col] == expected_prepared_test_df[col]).all()

    float_cols = ['duration']
    epsilon = 1e-9
    for col in float_cols:
        np.allclose(prepared_test_df[col], expected_prepared_test_df[col], atol=epsilon)

```

In [11]:
from tests.test_batch import prepare_test_data

In [17]:
prepare_test_data()

  PULocationID DOLocationID tpep_pickup_datetime tpep_dropoff_datetime  \
0           -1           -1  2023-01-01 01:01:00   2023-01-01 01:10:00   
1            1            1  2023-01-01 01:02:00   2023-01-01 01:10:00   

   duration  
0       9.0  
1       8.0  


# Q4. Mocking S3 with Localstack

* --backend-store-uri
* --profile
* --endpoint-url ✅
* --version

In [18]:
! touch docker-compose.yaml

Code `docker-compose.yaml`
<details>
<summary>Click to show/hide code</summary>

```YAML
services:
  localstack:
    image: localstack/localstack
    container_name: localstack
    ports:
      - "4566:4566"
    environment:
      - SERVICES=s3
      - DEBUG=1
      - AWS_ACCESS_KEY_ID=dummyAccessKeyId
      - AWS_SECRET_ACCESS_KEY=dummySecretAccessKey
      - DEFAULT_REGION=us-east-1
    volumes:
      - "./localstack:/var/lib/localstack"
```
</details>

Build localstack docker image

`docker compose up -d --build`

Prepare dummy AWS credentials
```batch
mkdir -p ~/.aws
nano ~/.aws/credentials
```

Code for `~/.aws/credentials`
<details>
<summary>Click to show/hide code</summary>

```
[default]
aws_access_key_id = foo
aws_secret_access_key = bar
```
</details>

Set AWS Configuration
```batch
nano ~/.aws/config
```

Code for `~/.aws/config`
<details>
<summary>Click to show/hide code</summary>

```
[default]
region = us-east-1
```
</details>

Install AWS CLI

`pipenv install awscli`

In [7]:
! aws --version

aws-cli/1.33.22 Python/3.10.13 Linux/6.5.0-1022-azure botocore/1.34.131


In [8]:
! aws --endpoint-url=http://localhost:4566 s3 mb s3://nyc-duration

make_bucket: nyc-duration


In [9]:
import pandas as pd

year, month =  2023, 4

input_file = f'https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_{year:04d}-{month:02d}.parquet'
df = pd.read_parquet(input_file)


s3_endpoint_url="http://localhost:4566"
input_file=f"s3://nyc-duration/in/{year:04d}-{month:02d}.parquet"

storage_options = {'client_kwargs': {'endpoint_url': s3_endpoint_url}}
df.to_parquet(input_file, engine='pyarrow', index=False, storage_options=storage_options)


Code `batch.py` for Q4
<details>
<summary>Click to show/hide code</summary>

```python
#!/usr/bin/env python
# coding: utf-8

import warnings
import sys
import pickle
import pandas as pd
import click
import os

warnings.filterwarnings("ignore", category=UserWarning, module='sklearn')


def get_input_path(year, month):
    default_input_pattern = 'https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_{year:04d}-{month:02d}.parquet'
    input_pattern = os.getenv('INPUT_FILE_PATTERN', default_input_pattern)
    return input_pattern.format(year=year, month=month)

def get_output_path(year, month):
    default_output_pattern = './output/yellow_tripdata_{year:04d}-{month:02d}.parquet'
    output_pattern = os.getenv('OUTPUT_FILE_PATTERN', default_output_pattern)
    return output_pattern.format(year=year, month=month)

def prepare_data(df, categorical):
    df['duration'] = df.tpep_dropoff_datetime - df.tpep_pickup_datetime
    df['duration'] = df.duration.dt.total_seconds() / 60
    df = df[(df.duration >= 1) & (df.duration <= 60)].copy()

    df[categorical] = df[categorical].fillna(-1).astype('int').astype('str')
    return df, categorical

def read_data(year, month, categorical):
    options = {}
    s3_endpoint_url = os.getenv('S3_ENDPOINT_URL')
    input_pattern = os.getenv('INPUT_FILE_PATTERN')
    input_file = get_input_path(year, month)

    if s3_endpoint_url and input_pattern:
        options['storage_options'] = {'client_kwargs': {'endpoint_url': s3_endpoint_url}}
        df = pd.read_parquet(input_file, storage_options=options['storage_options'])
        print(f'Data loaded from S3, INPUT_FILE_PATTERN is {input_pattern}')
    else:
        print('else')
        df = pd.read_parquet(input_file)
        print(f'Data loaded from the internet, INPUT_FILE_PATTERN is {input_pattern}')
    return prepare_data(df, categorical)

def save_parquet_to_s3(output_file, df):
    s3_endpoint_url = os.getenv('S3_ENDPOINT_URL')
    output_pattern = os.getenv('OUTPUT_FILE_PATTERN')
    options = {}
    if s3_endpoint_url and output_pattern:
        options['storage_options'] = {'client_kwargs': {'endpoint_url': s3_endpoint_url}}
        df.to_parquet(output_file, engine='pyarrow', index=False, storage_options=options['storage_options'])
        print(f'File saved to S3, OUTPUT_FILE_PATTERN is {output_pattern}')
    else:
        df.to_parquet(output_file, engine='pyarrow', index=False)
        print(f'File saved locally, OUTPUT_FILE_PATTERN is {output_pattern}')


@click.command()
@click.option(
    '--year',
    type=int,
    required=True,
    help='Year of the trip data'
)
@click.option(
    '--month',
    type=int,
    required=True,
    help='Month of the trip data'
)
def main(year, month):
    with open('./model.bin', 'rb') as f_in:
        dv, lr = pickle.load(f_in)

    df, categorical = read_data(year, month, categorical = ['PULocationID', 'DOLocationID'])

    df['ride_id'] = f'{year:04d}/{month:02d}_' + df.index.astype('str')

    dicts = df[categorical].to_dict(orient='records')
    X_val = dv.transform(dicts)
    y_pred = lr.predict(X_val)

    print('predicted mean duration:', y_pred.mean())

    df_result = pd.DataFrame()
    df_result['ride_id'] = df['ride_id']
    df_result['predicted_duration'] = y_pred

    output_file = get_output_path(year, month)
    save_parquet_to_s3(output_file, df_result)


if __name__ == '__main__':
    main()

```
</details>

In [10]:
! touch run_batch_py_local.sh

Code `run_batch_py_local.sh`
<details>
<summary>Click to show/hide code</summary>

```shell
#!/bin/bash

# Set environment variables
export INPUT_FILE_PATTERN="s3://nyc-duration/in/{year:04d}-{month:02d}.parquet"
export OUTPUT_FILE_PATTERN="s3://nyc-duration/out/{year:04d}-{month:02d}.parquet"
export S3_ENDPOINT_URL="http://localhost:4566"

# Run the Python script with parameters
python batch.py --year 2023 --month 4
```

In [11]:
! chmod +x run_batch_py_local.sh

In [12]:
! ./run_batch_py_local.sh

Data loaded from S3, INPUT_FILE_PATTERN is s3://nyc-duration/in/{year:04d}-{month:02d}.parquet
predicted mean duration: 14.292282936862449
File saved to S3, OUTPUT_FILE_PATTERN is s3://nyc-duration/out/{year:04d}-{month:02d}.parquet
