In [1]:
import config
import sklearn
import pandas as pd
import boto3
import psycopg2
import pickle
import sqlalchemy
import numpy as np

from sklearn.linear_model import LogisticRegression
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import Normalizer
from sklearn.metrics import precision_score, f1_score, recall_score

from datetime import date
from io import StringIO
from typing import Any, Dict, Tuple

In [None]:
engine = sqlalchemy.create_engine(
    url=
    f"postgresql://{PG_CONFIG['user']}:{PG_CONFIG['password']}@{PG_CONFIG['host']}:{PG_CONFIG['port']}/{PG_CONFIG['database']}"
)

In [None]:
S3_KEY = S3_CONFIG["aws_access_key_id"],
S3_SECRET_KEY = S3_CONFIG["aws_secret_access_key"]

### S3

Данные: https://archive.ics.uci.edu/dataset/186/wine+quality

In [3]:
# Подключение к S3
s3_resource = boto3.resource(
        service_name="s3",
        endpoint_url="https://storage.yandexcloud.net",
        aws_access_key_id=S3_KEY,
        aws_secret_access_key=S3_SECRET_KEY,
    )

In [4]:
test_data = pd.read_csv(filepath_or_buffer="winequality-white.csv", delimiter=";", nrows=10)

In [5]:
test_data.head()

Unnamed: 0,fixed acidity,volatile acidity,citric acid,residual sugar,chlorides,free sulfur dioxide,total sulfur dioxide,density,pH,sulphates,alcohol,quality
0,7.0,0.27,0.36,20.7,0.045,45,170,1.001,3.0,0.45,8.8,6
1,6.3,0.3,0.34,1.6,0.049,14,132,0.994,3.3,0.49,9.5,6
2,8.1,0.28,0.4,6.9,0.05,30,97,0.9951,3.26,0.44,10.1,6
3,7.2,0.23,0.32,8.5,0.058,47,186,0.9956,3.19,0.4,9.9,6
4,7.2,0.23,0.32,8.5,0.058,47,186,0.9956,3.19,0.4,9.9,6


In [6]:
# Сохранение файла
csv_buffer = StringIO()
test_data.to_csv(path_or_buf=csv_buffer, index=False)
s3_resource.Object(bucket_name="test-bucket-lizvladi",
                   key="test_data.csv").put(Body=csv_buffer.getvalue())

{'ResponseMetadata': {'RequestId': '7d8456498d78a620',
  'HostId': '',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'server': 'nginx',
   'date': 'Sun, 10 Mar 2024 13:09:41 GMT',
   'content-type': 'application/octet-stream',
   'transfer-encoding': 'chunked',
   'connection': 'keep-alive',
   'keep-alive': 'timeout=60',
   'etag': '"c304a1bc153ff3f0e5a55e79e8d8cd6f"',
   'x-amz-request-id': '7d8456498d78a620'},
  'RetryAttempts': 0},
 'ETag': '"c304a1bc153ff3f0e5a55e79e8d8cd6f"'}

In [7]:
# Чтение файла
obj = s3_resource.Object(bucket_name="test-bucket-lizvladi", key="test_data.csv") 
initial_df = pd.read_csv(obj.get()['Body'])

In [8]:
initial_df.head()

Unnamed: 0,fixed acidity,volatile acidity,citric acid,residual sugar,chlorides,free sulfur dioxide,total sulfur dioxide,density,pH,sulphates,alcohol,quality
0,7.0,0.27,0.36,20.7,0.045,45,170,1.001,3.0,0.45,8.8,6
1,6.3,0.3,0.34,1.6,0.049,14,132,0.994,3.3,0.49,9.5,6
2,8.1,0.28,0.4,6.9,0.05,30,97,0.9951,3.26,0.44,10.1,6
3,7.2,0.23,0.32,8.5,0.058,47,186,0.9956,3.19,0.4,9.9,6
4,7.2,0.23,0.32,8.5,0.058,47,186,0.9956,3.19,0.4,9.9,6


### SQL

In [9]:
wine_data = pd.read_csv(filepath_or_buffer="winequality-white.csv", delimiter=";")
wine_data.head()

Unnamed: 0,fixed acidity,volatile acidity,citric acid,residual sugar,chlorides,free sulfur dioxide,total sulfur dioxide,density,pH,sulphates,alcohol,quality
0,7.0,0.27,0.36,20.7,0.045,45.0,170.0,1.001,3.0,0.45,8.8,6
1,6.3,0.3,0.34,1.6,0.049,14.0,132.0,0.994,3.3,0.49,9.5,6
2,8.1,0.28,0.4,6.9,0.05,30.0,97.0,0.9951,3.26,0.44,10.1,6
3,7.2,0.23,0.32,8.5,0.058,47.0,186.0,0.9956,3.19,0.4,9.9,6
4,7.2,0.23,0.32,8.5,0.058,47.0,186.0,0.9956,3.19,0.4,9.9,6


In [10]:
# Подключение к БД (SQLAlchemy)
engine = sqlalchemy.create_engine(
    url="postgresql://postgres:postgres@127.0.0.1:5432/postgres")

In [24]:
connection = engine.connect()

In [23]:
connection.close()

In [25]:
# Сохранение в таблицу (SQLAlchemy)
wine_data.to_sql(name="wine_data", con=connection, index=False, if_exists='append')

898

In [14]:
# Исполнение скрипта (SQLAlchemy)
query = "DROP TABLE wine_data"
connection.execute(statement=sqlalchemy.text(text=query))
connection.commit()

In [26]:
# Исполнение скрипта (SQLAlchemy)
query = """CREATE TABLE wine_results_2 (
    model_name varchar(40) primary key,
    precision DECIMAL(5,2) NOT NULL,
    recall DECIMAL(5,2) NOT NULL,
    f1_score DECIMAL(5,2) default NULL,
    date timestamp NOT NULL DEFAULT NOW()
);
            
"""
connection.execute(statement=sqlalchemy.text(query))
connection.commit()

In [27]:
# Чтение данных из таблицы (SQLAlchemy)
data = pd.read_sql_query(sql="SELECT * FROM wine_data", con=connection)

In [28]:
data.head()

Unnamed: 0,fixed acidity,volatile acidity,citric acid,residual sugar,chlorides,free sulfur dioxide,total sulfur dioxide,density,pH,sulphates,alcohol,quality
0,7.0,0.27,0.36,20.7,0.045,45.0,170.0,1.001,3.0,0.45,8.8,6
1,6.3,0.3,0.34,1.6,0.049,14.0,132.0,0.994,3.3,0.49,9.5,6
2,8.1,0.28,0.4,6.9,0.05,30.0,97.0,0.9951,3.26,0.44,10.1,6
3,7.2,0.23,0.32,8.5,0.058,47.0,186.0,0.9956,3.19,0.4,9.9,6
4,7.2,0.23,0.32,8.5,0.058,47.0,186.0,0.9956,3.19,0.4,9.9,6


In [29]:
data.shape

(4898, 12)

In [30]:
# Подключение к БД (PsycoPG)
conn = psycopg2.connect(database="postgres",
                        user="postgres",
                        password="postgres",
                        host="127.0.0.1",
                        port="5432")

In [47]:
#Альтернатива conn = psycopg2.connect(**config.PG_CONFIG)

### Pipeline

In [31]:
def download_data():
    engine = sqlalchemy.create_engine(
        url="postgresql://postgres:postgres@127.0.0.1:5432/postgres")
    data = pd.read_sql_query("SELECT * FROM wine_data", engine.connect())
    engine.dispose()
    return data

In [32]:
data = download_data()
data.head()

Unnamed: 0,fixed acidity,volatile acidity,citric acid,residual sugar,chlorides,free sulfur dioxide,total sulfur dioxide,density,pH,sulphates,alcohol,quality
0,7.0,0.27,0.36,20.7,0.045,45.0,170.0,1.001,3.0,0.45,8.8,6
1,6.3,0.3,0.34,1.6,0.049,14.0,132.0,0.994,3.3,0.49,9.5,6
2,8.1,0.28,0.4,6.9,0.05,30.0,97.0,0.9951,3.26,0.44,10.1,6
3,7.2,0.23,0.32,8.5,0.058,47.0,186.0,0.9956,3.19,0.4,9.9,6
4,7.2,0.23,0.32,8.5,0.058,47.0,186.0,0.9956,3.19,0.4,9.9,6


In [34]:
def preprcocess_and_split_data(
        data: pd.DataFrame) -> Tuple[np.array, np.array, pd.Series, pd.Series]:

    target = data["quality"].copy()
    del data["quality"]
    features = data

    X_train, X_test, y_train, y_test = train_test_split(features,
                                                        target,
                                                        test_size=0.2,
                                                        random_state=42,
                                                        shuffle=True,
                                                        stratify=target)

    norm = Normalizer()
    X_train_norm = norm.fit_transform(X_train)
    X_test_norm = norm.transform(X_test)

    return X_train_norm, X_test_norm, y_train, y_test

In [35]:
X_train, X_test, y_train, y_test = preprcocess_and_split_data(data)

In [36]:
def train_model(X_train: np.array, X_test: np.array, y_train: pd.Series,
                y_test: pd.Series) -> Tuple[Any, Dict[str, Any]]:
    
    # Обучение модели
    clf = LogisticRegression(solver="liblinear")
    clf.fit(X_train, y_train)
    y_pred = clf.predict(X_test)
    
    # Сбор метрик
    metrics = {
        "precision": precision_score(y_test, y_pred, average="micro"),
        "recall": recall_score(y_test, y_pred, average="micro"),
        "f1_score": f1_score(y_test, y_pred, average="micro")
    }
    
    # Добавление метрик в словарь
    timestamp = date.today().strftime("%d%m%Y")
    model_name = f"logreg_{timestamp}"
    metrics["model_name"] = model_name

    return clf, metrics

In [37]:
clf, metrics = train_model(X_train, X_test, y_train, y_test)

In [40]:
def save_metrics(metrics):
    # Подключение к БД (SQLAlchemy)
    engine = sqlalchemy.create_engine(
        url="postgresql://postgres:postgres@127.0.0.1:5432/postgres")
    # Вызов описания таблицы
    table = sqlalchemy.Table("wine_results",
                             sqlalchemy.MetaData(),
                             autoload_with=engine)
    # Подготовка insert statement
    stmt = sqlalchemy.insert(table).values(**metrics)
    # Исполнение скрипта
    connection = engine.connect()
    connection.execute(stmt)
    connection.commit()
    connection.close()
    engine.dispose()

In [43]:
save_metrics(metrics)

In [44]:
def save_results(clf, model_name):
    s3_resource = boto3.resource(
        service_name="s3",
        endpoint_url="https://storage.yandexcloud.net",
        aws_access_key_id=S3_KEY,
        aws_secret_access_key=S3_SECRET_KEY,
    )
    pickle_byte_obj = pickle.dumps(clf)
    s3_resource.Object(bucket_name="test-bucket-lizvladi",
                       key=model_name).put(Body=pickle_byte_obj)

In [45]:
save_results(clf, metrics["model_name"])