Skip to content

Commit

Permalink
hw3
Browse files Browse the repository at this point in the history
  • Loading branch information
vBykoff committed Dec 4, 2022
1 parent aa6fc91 commit ed62ef4
Show file tree
Hide file tree
Showing 27 changed files with 405 additions and 0 deletions.
9 changes: 9 additions & 0 deletions airflow_ml_dags/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
# HOMEWORK 3

Run:
~~~
#insert your date to PATH_TO_MODEL
export PATH_TO_MODEL=./data/models/2022-12-04
export FERNET_KEY=$(python3 -c "from cryptography.fernet import Fernet; FERNET_KEY = Fernet.generate_key().decode(); print(FERNET_KEY)")
docker compose up --build
~~~
Binary file not shown.
Binary file not shown.
Binary file not shown.
31 changes: 31 additions & 0 deletions airflow_ml_dags/dags/generate.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
from airflow.providers.docker.operators.docker import DockerOperator
from airflow import DAG
from datetime import datetime
from docker.types import Mount



default_args = {
"owner": "vladimir",
"email": "v.bykov.02@bk.ru",
"retries": 1
}


with DAG(
"generate",
default_args=default_args,
schedule_interval="@daily",
start_date=datetime(2022, 12, 1)
) as dag:
generate = DockerOperator(
image="airflow-generate",
command="--output_path ./data/raw/{{ ds }}",
task_id = "generate_synthetic_data",
do_xcom_push=False,
auto_remove=True,
network_mode="bridge",
mounts=[Mount(source="/home/vladimir/PycharmProjects/airflow_ml_dags/data", target='/data', type='bind')]
)

generate
34 changes: 34 additions & 0 deletions airflow_ml_dags/dags/predict.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
from airflow import DAG
from airflow.providers.docker.operators.docker import DockerOperator
from datetime import datetime
from docker.types import Mount
from airflow.models import Variable

PATH_TO_MODEL = Variable.get("path_to_model")

default_args = {
"owner": "vladimir",
"email": "v.bykov.02@bk.ru",
"retries": 1
}


with DAG(
"predict",
default_args=default_args,
schedule_interval="@daily",
start_date=datetime(2022, 12, 1)
) as dag:

predict = DockerOperator(
image="airflow-predict",
command="--input-path ./data/processed/{{ ds }} --model-path " + PATH_TO_MODEL + " --output-path ./data/predictions/{{ ds }}",
task_id = "predict",
do_xcom_push=False,
auto_remove=True,
network_mode="bridge",
mounts=[Mount(source="/home/vladimir/PycharmProjects/airflow_ml_dags/data", target='/data', type='bind')]
)

predict

62 changes: 62 additions & 0 deletions airflow_ml_dags/dags/train.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
from airflow.providers.docker.operators.docker import DockerOperator
from airflow import DAG
from datetime import datetime
from docker.types import Mount

default_args = {
"owner": "vladimir",
"email": "v.bykov.02@bk.ru",
"retries": 1
}


with DAG(
"train",
default_args=default_args,
schedule_interval="@daily",
start_date=datetime(2022, 12, 1)
) as dag:

process = DockerOperator(
image="airflow-process",
command="--input-path ./data/raw/{{ ds }} --output-path ./data/processed/{{ ds }}",
task_id = "process",
do_xcom_push=False,
auto_remove=True,
network_mode="bridge",
mounts=[Mount(source="/home/vladimir/PycharmProjects/airflow_ml_dags/data", target='/data', type='bind')]

)

split = DockerOperator(
image="airflow-split",
command="--input-path ./data/processed/{{ ds }} --output-path ./data/train-val/{{ ds }}",
task_id = "split",
do_xcom_push=False,
auto_remove=True,
network_mode="bridge",
mounts=[Mount(source="/home/vladimir/PycharmProjects/airflow_ml_dags/data", target='/data', type='bind')]

)

train = DockerOperator(
image="airflow-train",
command="--input-path ./data/train-val/{{ ds }} --output-path ./data/models/{{ ds }}",
task_id = "train",
do_xcom_push=False,
auto_remove=True,
network_mode="bridge",
mounts=[Mount(source="/home/vladimir/PycharmProjects/airflow_ml_dags/data", target='/data', type='bind')]
)

validate = DockerOperator(
image="airflow-validate",
command="--input-path ./data/train-val/{{ ds }} --model-path ./data/models/{{ ds }} --output-path ./data/metrics/{{ ds }}",
task_id = "validate",
do_xcom_push=False,
auto_remove=True,
network_mode="bridge",
mounts=[Mount(source="/home/vladimir/PycharmProjects/airflow_ml_dags/data", target='/data', type='bind')]
)

process >> split >> train >> validate
Empty file added airflow_ml_dags/data/.gitkeep
Empty file.
8 changes: 8 additions & 0 deletions airflow_ml_dags/images/airflow-docker/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
ARG AIRFLOW_BASE_IMAGE="apache/airflow:2.3.0-python3.8"
FROM ${AIRFLOW_BASE_IMAGE}

RUN pip install --user --no-cache-dir \
apache-airflow-providers-docker==2.6.0

USER root
ENV PYTHONPATH=/home/airflow/.local/lib/python3.8/site-packages
8 changes: 8 additions & 0 deletions airflow_ml_dags/images/airflow-generate/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
FROM python:3.10-slim-bullseye
COPY generate.py requirements.txt ./
RUN python3 -m pip install --upgrade pip \
&& pip3 install -r requirements.txt
WORKDIR ./
ENTRYPOINT ["python3", "generate.py"]


49 changes: 49 additions & 0 deletions airflow_ml_dags/images/airflow-generate/generate.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
import pandas as pd
import numpy as np
import click
import os


@click.command("generate")
@click.option("--output_path", type=click.Path())
def generate_data(output_path: str, size: int = 20) -> pd.DataFrame:
columns = ['sex', 'cp', 'fbs', 'restecg', 'exang', 'slope', 'ca', 'thal',
'age', 'trestbps', 'chol', 'thalach', 'oldpeak']

synthetic_data = pd.DataFrame(columns=columns)
synthetic_target = pd.DataFrame(columns=['condition'])

synthetic_data['sex'] = np.random.randint(0, 2, size)
synthetic_data['cp'] = np.random.randint(0, 5, size)
synthetic_data['fbs'] = np.random.randint(0, 2, size)

synthetic_data['restecg'] = np.random.randint(0, 3, size)
synthetic_data['exang'] = np.random.randint(0, 2, size)
synthetic_data['slope'] = np.random.randint(0, 3, size)

synthetic_data['ca'] = np.random.randint(0, 4, size)
synthetic_data['thal'] = np.random.randint(0, 3, size)
synthetic_data['age'] = np.array(np.random.normal(54, 9, size), dtype="int")

synthetic_data['trestbps'] = np.array(np.random.normal(132, 17, size), dtype="int")
synthetic_data['chol'] = np.array(np.random.normal(247, 51, size), dtype="int")
synthetic_data['thalach'] = np.array(np.random.normal(150, 20, size), dtype="int")

synthetic_data['oldpeak'] = np.random.exponential(1, size)
synthetic_target['condition'] = np.random.randint(0, 2, size)

os.makedirs(output_path, exist_ok=True)
synthetic_data.to_csv(f"{output_path}/data.csv", index=False)
synthetic_target.to_csv(f"{output_path}/target.csv", index=False)

return synthetic_data


def extract_target_for_test(path_to_synthetic_data: str, predict_data_path: str) -> None:
synthetic_data = pd.read_csv(path_to_synthetic_data)
synthetic_data = synthetic_data.drop('condition', axis=1)
synthetic_data.to_csv(predict_data_path)

if __name__ == "__main__":
generate_data()
#extract_target_for_test("tests/test_data/synthetic_data.csv", "tests/test_data/data_for_predict.csv")
4 changes: 4 additions & 0 deletions airflow_ml_dags/images/airflow-generate/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
numpy
pandas
click

6 changes: 6 additions & 0 deletions airflow_ml_dags/images/airflow-predict/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
FROM python:3.10-slim-bullseye
COPY predict.py requirements.txt ./
RUN python3 -m pip install --upgrade pip \
&& pip3 install -r requirements.txt
ENTRYPOINT ["python3", "predict.py"]

29 changes: 29 additions & 0 deletions airflow_ml_dags/images/airflow-predict/predict.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
import pickle
import os
import pandas as pd
import click
import json
from sklearn.metrics import f1_score, precision_score, recall_score, accuracy_score


@click.command("validate")
@click.option("--input-path", type=click.Path())
@click.option("--model-path", type=click.Path())
@click.option("--output-path", type=click.Path())
def predict(input_path: str, model_path: str, output_path: str):

data = pd.read_csv(f"{input_path}/train_data.csv")

print(model_path)
with open(f"{model_path}/rf_clf.pkl", "rb") as f:
model = pickle.load(f)

y_pred = model.predict(data)
predictions = pd.DataFrame(y_pred)

os.makedirs(output_path, exist_ok=True)
predictions.to_csv(f"{output_path}/predictions.csv")


if __name__ == "__main__":
predict()
3 changes: 3 additions & 0 deletions airflow_ml_dags/images/airflow-predict/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
pandas
click
scikit-learn
6 changes: 6 additions & 0 deletions airflow_ml_dags/images/airflow-process/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
FROM python:3.10-slim-bullseye
COPY process.py requirements.txt ./
RUN python3 -m pip install --upgrade pip \
&& pip3 install -r requirements.txt
ENTRYPOINT ["python3", "process.py"]

31 changes: 31 additions & 0 deletions airflow_ml_dags/images/airflow-process/process.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
import pandas as pd
import numpy as np
from sklearn.impute import SimpleImputer
import click
import os


@click.command("process")
@click.option("--input-path", type=click.Path())
@click.option("--output-path", type=click.Path())
def process(input_path: str, output_path: str):

print(input_path)
data = pd.read_csv(f"{input_path}/data.csv")
target = pd.read_csv(f"{input_path}/target.csv")

print(data)
# process data
imputer = SimpleImputer(strategy="most_frequent")
processed = imputer.fit_transform(data)
processed_data = pd.DataFrame(processed, columns=data.columns)

os.makedirs(output_path, exist_ok=True)
processed_data.to_csv(f"{output_path}/train_data.csv", index=False)

# save target to new folder
target.to_csv(f"{output_path}/train_target.csv", index=False)


if __name__ == "__main__":
process()
4 changes: 4 additions & 0 deletions airflow_ml_dags/images/airflow-process/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
click
scikit-learn
pandas

6 changes: 6 additions & 0 deletions airflow_ml_dags/images/airflow-split/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
FROM python:3.10-slim-bullseye
COPY split.py requirements.txt ./
RUN python3 -m pip install --upgrade pip \
&& pip3 install -r requirements.txt
ENTRYPOINT ["python3", "split.py"]

4 changes: 4 additions & 0 deletions airflow_ml_dags/images/airflow-split/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
click
scikit-learn
pandas

26 changes: 26 additions & 0 deletions airflow_ml_dags/images/airflow-split/split.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
import click
import pandas as pd
from sklearn.model_selection import train_test_split
import os


@click.command("split")
@click.option("--input-path", type=click.Path())
@click.option("--output-path", type=click.Path())
def split(input_path: str, output_path: str, test_size=0.2):

data = pd.read_csv(f"{input_path}/train_data.csv")
target = pd.read_csv(f"{input_path}/train_target.csv")

x_train, x_val, y_train, y_val = train_test_split(data, target, test_size=test_size)


os.makedirs(output_path, exist_ok=True)
x_train.to_csv(f"{output_path}/x_train.csv", index=False)
x_val.to_csv(f"{output_path}/x_val.csv", index=False)
y_train.to_csv(f"{output_path}/y_train.csv", index=False)
y_val.to_csv(f"{output_path}/y_val.csv", index=False)


if __name__ == "__main__":
split()
6 changes: 6 additions & 0 deletions airflow_ml_dags/images/airflow-train/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
FROM python:3.10-slim-bullseye
COPY train.py requirements.txt ./
RUN python3 -m pip install --upgrade pip \
&& pip3 install -r requirements.txt
ENTRYPOINT ["python3", "train.py"]

4 changes: 4 additions & 0 deletions airflow_ml_dags/images/airflow-train/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
click
scikit-learn
pandas

27 changes: 27 additions & 0 deletions airflow_ml_dags/images/airflow-train/train.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
from sklearn.ensemble import RandomForestClassifier
import pickle
import os
import pandas as pd
import click

@click.command("train")
@click.option("--input-path", type=click.Path())
@click.option("--output-path", type=click.Path())
def train(input_path: str, output_path: str):

x_train = pd.read_csv(f"{input_path}/x_train.csv")
y_train = pd.read_csv(f"{input_path}/y_train.csv")

model = RandomForestClassifier(
n_estimators=20,
random_state=89
)
model.fit(x_train, y_train)

os.makedirs(output_path, exist_ok=True)
with open(f"{output_path}/rf_clf.pkl", "wb") as f:
pickle.dump(model, f)


if __name__ == "__main__":
train()
6 changes: 6 additions & 0 deletions airflow_ml_dags/images/airflow-validate/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
FROM python:3.10-slim-bullseye
COPY validate.py requirements.txt ./
RUN python3 -m pip install --upgrade pip \
&& pip3 install -r requirements.txt
ENTRYPOINT ["python3", "validate.py"]

3 changes: 3 additions & 0 deletions airflow_ml_dags/images/airflow-validate/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
pandas
click
scikit-learn

0 comments on commit ed62ef4

Please sign in to comment.