In [1]:
#@title Notebook & DagsHub pre-configurations  🏗🐶

#@markdown Enter the branch name:
WEEK = "week3" #@param {type:"string"}

#@markdown Enter the DAGsHub repository owner name:
DAGSHUB_REPO_OWNER= "wonhyeongseo" #@param {type:"string"} 

#@markdown Enter the DAGsHub repository name:
DAGSHUB_REPO_NAME= "mlops-zoomcamp" #@param {type:"string"}

#@markdown Enter the username of your DAGsHub account:
DAGSHUB_USER_NAME = "wonhyeongseo" #@param {type:"string"}

#@markdown Enter the email for your DAGsHub account:
DAGSHUB_EMAIL = "wonhseo@kakao.com" #@param {type:"string"}

#@markdown Clone the Git repo to the Colab runtime
CLONE = True #@param {type:"boolean"}

#@markdown Pull the changes from the Git server to Colab runtime
PULL_GIT = True #@param {type:"boolean"}

#@markdown Set DVC’s user configurations for DagsHub user (will be set locally - should only done **per runtime**)
SET_DVC_USER = True #@param {type:"boolean"}

#@markdown Pull the changes from the DagsHub storage to Colab runtime
PULL_DVC = True #@param {type:"boolean"}

#@markdown Configure MLflow remote tracking server
MLFLOW = True #@param {type:"boolean"}

# Additional information 💡

**DagsHub**

In [2]:
import getpass
DAGSHUB_TOKEN = getpass.getpass('Please enter your DAGsHub token or password: ')
NGROK_TOKEN = getpass.getpass('Please enter your Ngrok authtoken: ')

Please enter your DAGsHub token or password: ··········


# Help Functions 🚁

In [3]:
# Imports
import requests
import datetime
import os
from pathlib import Path

In [4]:
def git_push():
  "Push files to remote Git server on DAGsHub or GitHub"
  !git push https://{DAGSHUB_USER_NAME}:{DAGSHUB_TOKEN}@dagshub.com/{DAGSHUB_REPO_OWNER}/{DAGSHUB_REPO_NAME}.git 

# Black Magic 🪄 

**Configure Git**

In [5]:
!git config --global user.email {DAGSHUB_EMAIL}
!git config --global user.name {DAGSHUB_USER_NAME}

**Clone the Repository**

In [6]:
if CLONE:
  !git clone https://{DAGSHUB_USER_NAME}:{DAGSHUB_TOKEN}@dagshub.com/{DAGSHUB_REPO_OWNER}/{DAGSHUB_REPO_NAME}.git
  %cd {DAGSHUB_REPO_NAME}
if PULL_GIT:
  !git pull

fatal: destination path 'mlops-zoomcamp' already exists and is not an empty directory.
/content/mlops-zoomcamp
There is no tracking information for the current branch.
Please specify which branch you want to merge with.
See git-pull(1) for details.

    git pull <remote> <branch>

If you wish to set tracking information for this branch you can do so with:

    git branch --set-upstream-to=origin/<branch> week3



In [7]:
!git switch -C {WEEK}

Reset branch 'week3'


**Install Requirements**

In [9]:
from pathlib import Path

!pip install --upgrade pip --quiet

req_path = Path("requirements.txt")
if req_path.is_file():
  !pip install -qr requirements.txt
  !pip install -qU prefect pyngrok

**Configure MLflow**

In [10]:
if MLFLOW:
  
  mlflow_installed = !pip list -v | grep mlflow
  if not mlflow_installed:
    print("Installing MLflow")
    !pip install mlflow --quiet

  import mlflow

  os.environ['MLFLOW_TRACKING_USERNAME'] = DAGSHUB_USER_NAME
  os.environ['MLFLOW_TRACKING_PASSWORD'] = DAGSHUB_TOKEN
  os.environ['MLFLOW_TRACKING_URI'] = f'https://dagshub.com/{DAGSHUB_REPO_OWNER}/{DAGSHUB_REPO_NAME}.mlflow'
  mlflow.set_tracking_uri(os.environ['MLFLOW_TRACKING_URI'])
  os.environ['MLFLOW_EXPERIMENT_NAME'] = 'default'

In [11]:
!mkdir -p {WEEK}
%cd {WEEK}
!mkdir -p {data,models}

/content/mlops-zoomcamp/week3


# 2. Orchestration with Prefect

Check Prefect version:

In [12]:
!prefect --version

2.10.12


Download and preprocess data:

In [30]:
!wget -P data/raw/ https://d37ci6vzurychx.cloudfront.net/trip-data/green_tripdata_2023-0{1..3}.parquet

--2023-06-05 10:16:59--  https://d37ci6vzurychx.cloudfront.net/trip-data/green_tripdata_2023-01.parquet
Resolving d37ci6vzurychx.cloudfront.net (d37ci6vzurychx.cloudfront.net)... 13.249.90.174, 13.249.90.176, 13.249.90.209, ...
Connecting to d37ci6vzurychx.cloudfront.net (d37ci6vzurychx.cloudfront.net)|13.249.90.174|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 1427002 (1.4M) [binary/octet-stream]
Saving to: ‘data/raw/green_tripdata_2023-01.parquet’


2023-06-05 10:16:59 (16.1 MB/s) - ‘data/raw/green_tripdata_2023-01.parquet’ saved [1427002/1427002]

--2023-06-05 10:16:59--  https://d37ci6vzurychx.cloudfront.net/trip-data/green_tripdata_2023-02.parquet
Reusing existing connection to d37ci6vzurychx.cloudfront.net:443.
HTTP request sent, awaiting response... 200 OK
Length: 1533740 (1.5M) [binary/octet-stream]
Saving to: ‘data/raw/green_tripdata_2023-02.parquet’


2023-06-05 10:16:59 (21.6 MB/s) - ‘data/raw/green_tripdata_2023-02.parquet’ saved [1533740/1533740

**Start prefect server & tunnel through ngrok**

In [16]:
!ngrok config add-authtoken {NGROK_TOKEN}

Authtoken saved to configuration file: /root/.ngrok2/ngrok.yml


In [35]:
get_ipython().system_raw('prefect server start &')
get_ipython().system_raw('ngrok http 4200 &')
! curl -s http://localhost:4040/api/tunnels

In [41]:
! curl -s http://localhost:4040/api/tunnels

{"tunnels":[{"name":"command_line","ID":"28e12611ae35d21787bc4db818b07329","uri":"/api/tunnels/command_line","public_url":"https://cfd3-35-224-253-210.ngrok-free.app","proto":"https","config":{"addr":"http://localhost:4200","inspect":true},"metrics":{"conns":{"count":4,"gauge":0,"rate1":0.01759348888981656,"rate5":0.010212850646699224,"rate15":0.004066453045582061,"p50":3385156398.5,"p90":6911187015,"p95":6911187015,"p99":6911187015},"http":{"count":9,"rate1":0.03759988446556868,"rate5":0.022745578210574796,"rate15":0.009118487539342334,"p50":4493390,"p90":948406303,"p95":948406303,"p99":948406303}}}],"uri":"/api/tunnels"}


In [49]:
!python src/orchestrate.py data/raw/green_tripdata_2023-01.parquet data/raw/green_tripdata_2023-02.parquet

12:08:45.495 | [36mINFO[0m    | prefect.engine - Created flow run[35m 'utopian-donkey'[0m for flow[1;35m 'main-flow'[0m
12:08:45.868 | [36mINFO[0m    | Flow run[35m 'utopian-donkey'[0m - Created task run 'Read taxi data-0' for task 'Read taxi data'
12:08:45.869 | [36mINFO[0m    | Flow run[35m 'utopian-donkey'[0m - Executing 'Read taxi data-0' immediately...
12:08:46.381 | [36mINFO[0m    | Task run 'Read taxi data-0' - Finished in state [32mCompleted[0m()
12:08:46.412 | [36mINFO[0m    | Flow run[35m 'utopian-donkey'[0m - Created task run 'Read taxi data-1' for task 'Read taxi data'
12:08:46.413 | [36mINFO[0m    | Flow run[35m 'utopian-donkey'[0m - Executing 'Read taxi data-1' immediately...
12:08:46.692 | [36mINFO[0m    | Task run 'Read taxi data-1' - Finished in state [32mCompleted[0m()
12:08:46.730 | [36mINFO[0m    | Flow run[35m 'utopian-donkey'[0m - Created task run 'add_features-0' for task 'add_features'
12:08:46.731 | [36mINFO[0m    | Flow run

**Push the files to the remotes** 🏁

In [None]:
!git add .
!git commit -m 'homework: week3'
git_push()

[week2-wandb cf05914] remove: week2-wandb DAG
 10 files changed, 12 deletions(-)
 delete mode 100644 week2-wandb/data/.gitignore
 delete mode 100644 week2-wandb/data/processed.dvc
 create mode 100644 week2-wandb/data/processed/dv.pkl
 create mode 100644 week2-wandb/data/processed/test.pkl
 create mode 100644 week2-wandb/data/processed/train.pkl
 create mode 100644 week2-wandb/data/processed/val.pkl
 delete mode 100644 week2-wandb/data/raw.dvc
 create mode 100644 week2-wandb/data/raw/green_tripdata_2022-01.parquet
 create mode 100644 week2-wandb/data/raw/green_tripdata_2022-02.parquet
 create mode 100644 week2-wandb/data/raw/green_tripdata_2022-03.parquet
Enumerating objects: 11, done.
Counting objects: 100% (11/11), done.
Delta compression using up to 2 threads
Compressing objects: 100% (8/8), done.
Writing objects: 100% (8/8), 4.06 MiB | 11.97 MiB/s, done.
Total 8 (delta 2), reused 0 (delta 0)
To https://dagshub.com/wonhyeongseo/mlops-zoomcamp.git
   036f4c0..cf05914  week2-wandb -> w