Skip to content

Commit

Permalink
upd
Browse files Browse the repository at this point in the history
  • Loading branch information
webmakaka committed Jul 3, 2024
1 parent d8bbf54 commit 55f2bf7
Show file tree
Hide file tree
Showing 2 changed files with 127 additions and 26 deletions.
6 changes: 6 additions & 0 deletions website/00-index.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,12 @@ permalink: /

Планируется собирать записи, связанные с автоматизацией процессов машинного обучения - kubernetes, kubeflow, mlflow, airflow etc.

<br/>

### Предлагаю покопать [[Stepic] MLOps. Начало [RUS, 2023]](/courses/stepik-mlops-beginning/)

И заменить s3 от amazon / yandex на локальный аналог minio. Советы по улучшению и указания на ошибки принимаются.

<br/>
<br/>

Expand Down
147 changes: 121 additions & 26 deletions website/03-courses/01-stepic-mlops-beginning.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,20 @@ https://www.youtube.com/watch?v=skTh3tGksIQ&list=PLmA-1xX7IuzAixCe10sFhyTcyunSc5

<br/>

```
$ lsb_release -a
No LSB modules are available.
Distributor ID: Ubuntu
Description: Ubuntu 22.04.4 LTS
Release: 22.04
Codename: jammy
$ python --version
Python 3.10.12
```

<br/>

### [Postgres](//gitops.ru/tools/containers/docker/db/postgresql/)

### [Airflow](/tools/airflow/)
Expand All @@ -40,6 +54,11 @@ https://www.youtube.com/watch?v=skTh3tGksIQ&list=PLmA-1xX7IuzAixCe10sFhyTcyunSc5

<br/>

Делаю:
2024.07.03

<br/>

### 01. DAG Hello World

<br/>
Expand Down Expand Up @@ -79,7 +98,7 @@ def init() -> NoReturn:
print("Hello, World")


task_init = PythonOperator(task_id = "init", python_callable = init, dag = dag)
task_init = PythonOperator(task_id = "init", python_callable = init, dag = dag)

task_init
```
Expand All @@ -97,6 +116,8 @@ $ airflow dags test mlops_dag_1
<br/>

```
// pandas 2.1.0
// sqlalchemy==1.4.36
$ pip install numpy==1.26.4 pandas==2.1.4 scikit-learn==1.5.0 sqlalchemy==0.28.2 psycopg2-binary==2.9.9
```

Expand Down Expand Up @@ -157,6 +178,33 @@ $ PGPASSWORD=pA55w0rd123 psql --host=localhost --username=admin1 --port=5432 --d

<br>

```
Airflow -> Admin -> Connections
Connection id: pg_connection
Connection Type: Postgres
Host: localhost
Schema: postgresdb
Login admin1
Password: pA55w0rd123
Port: 5432
```

<br>

```
$ pip install apache-airflow-providers-postgres
$ pip install apache-airflow-providers-amazon
```

<br/>

```
$ vi dags/train_dag.py
```

<br>

```python
import io
import json
Expand All @@ -177,81 +225,128 @@ from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.utils.dates import days_ago


DEFAULT_ARGS = {
#TO-DO: прописать аргументы
"owner" : "Ryabukhin Alex",
"email" : "santolyich@gmail.com",
"email_on_failure" : True,
"email_on_retry" : False,
"retry" : 3,
"retry_delay" : timedelta(minutes=1)
}

dag = DAG(#TO-DO: прописать аргументы)

dag = DAG(
dag_id = "train",
schedule_interval = "0 1 * * *",
start_date = days_ago(2),
catchup = False,
tags = ["mlops"],
default_args = DEFAULT_ARGS
)

_LOG = logging.getLogger()
_LOG.addHandler(logging.StreamHandler())

BUCKET = #TO-DO: your bucket
DATA_PATH = #TO-DO: your path
BUCKET = "mlops-bucket"
DATA_PATH = "datasets/california_housing.pkl"
FEATURES = ["MedInc", "HouseAge", "AveRooms", "AveBedrms",
"Population", "AveOccup", "Latitude", "Longitude"]
TARGET = "MedHouseVal"


def init() -> None:
_LOG.info("Train pipeline started.")
_LOG.info("Train pipeline started!")

def get_data_from_postgres() -> None:
#TO-DO: Заполнить все шаги

# Использовать созданный ранее PG connection
pg_hook = PostgresHook("pg_connection")
conn = pg_hook.get_conn()

# Прочитать все данные из таблицы california_housing
data = pd.read_sql_query("SELECT * FROM california_housing", conn)

# Использовать созданный ранее S3 connection
s3_hook = S3Hook("s3_connection")
session = s3_hook.get_session("ru-central")
resource = session.resource("s3")

# Сохранить файл в формате pkl на S3
pickle_byte_obj = pickle.dumps(date)
resource.Object(BUCKET, DATA_PATH).put(Body=pickle_byte_obj)

_LOG.info("Data download finished!")


def prepare_data() -> None:
#TO-DO: Заполнить все шаги

# Использовать созданный ранее S3 connection
s3_hook = S3Hook("s3_connection")

# Сделать препроцессинг
file = s3_hook.download_file(key = DATA_PATH, bucket_name = BUCKET)
data = pd.read_pickle(file)

# Разделить на фичи и таргет
X, y = data[FEATURES], data[TARGET]

# Разделить данные на обучение и тест
X_train, X_text, y_train, y_test = train_test_split(X,y, test_size=0.2, random_state=42)

# Обучить стандартизатор на train
scaler = StandardScaler()
X_train_fitted = scalar.fit_transform(X_train)
X_test_fitted = scaler.transform(X_test)

# Сохранить готовые данные на S3
session = s3_hook.get_session("ru-central")
resource = session.resource("s3")

def train_model() -> None:
#TO-DO: Заполнить все шаги
for name, data in (zip(["X_train", "X_test", "y_train", "y_test"],
["X_train_fitted, X_test_fitted, y_train, y_test"])):
pickle_byte_obj = pickle.dumps(data)
resource.Object(BUCKET, f"dataset/{name}.pkl").put(Body=pickle_byte_obj)


_LOG.info("Data download finished!")



def train_model() -> None:
# Использовать созданный ранее S3 connection
s3_hook = S3Hook("s3_connection")

# Загрузить готовые данные с S3
data = {}
for name in ["X_train", "X_test", "y_train", "y_test"]:
file = s3_hook.download_file(key = f"dataset/{name}.pkl", bucket_name = BUCKET)
data[name] = pd.read_pickle(file)

# Обучить модель
modle = RandomForestRegressor()
modle.fit(data["X_train"], data["y_train"])
predicton = model.predict(data["X_test"])

# Посчитать метрики
result = {}
result["r2_score"] = r2_score(data["y_test"], prdictoin)
result["rmse"] = mean_squared_error(data["y_test"], pridiction)**0.5
result["mse"] = median_absolute_error(data["y_test"], pridiction)

# Сохранить результат на S3
data = datetime.now().strftime("%Y_%m_%d_%H")
session = s3_hook.get_session("ru-central")
resource = session.resource("s3")
json_byte_object = json.dumps(result)
resource.Object(BUCKET, f"results/{date}.json").put(Body=json_byte_object)

_LOG.info("Model training finished!")

def save_results() -> None:
_LOG.info("Success.")


task_init = #TO-DO: написать оператор

task_get_data = #TO-DO: написать оператор

task_prepare_data = #TO-DO: написать оператор

task_train_model = #TO-DO: написать оператор
def save_results() -> None:
_LOG.info("Success!")

task_save_results = #TO-DO: написать оператор

#TO-DO: Архитектура DAG'а
task_init = PythonOperator(task_id="init", python_callable=init, dag=dag)
task_get_data = PythonOperator(task_id="get_data", python_callable=get_data_from_postgres, dag=dag)
task_prepare_data = PythonOperator(task_id="prepare_data", python_callable=prepare_data, dag=dag)
task_train_model = PythonOperator(task_id="train_model", python_callable=train_model, dag=dag)
task_save_results = PythonOperator(task_id="save_results", python_callable=save_results, dag=dag)
```

0 comments on commit 55f2bf7

Please sign in to comment.