In [1]:
from pyspark.sql.functions import col
import pyspark.sql.functions as F
import pyspark.sql.types as T

from sklearn.model_selection import train_test_split, cross_val_score
from sklearn.metrics import classification_report, confusion_matrix, accuracy_score, precision_score, recall_score, f1_score, roc_auc_score
from sklearn.compose import ColumnTransformer
from sklearn.impute import SimpleImputer
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler, OneHotEncoder

import optuna

# from giskard import Dataset, Model, scan, testing
import pickle

import warnings

import clickhouse_connect
import psycopg2

from functools import reduce

import clickhouse_connect

import mlflow 
import mlflow.catboost
import mlflow.sklearn
import mlflow.data
from mlflow.models import infer_signature

from datetime import datetime

# Load env variables

In [2]:
from dotenv import load_dotenv
import os

load_dotenv()

SPARK_COMPAT_VERSION = os.getenv('SPARK_COMPAT_VERSION')
SCALA_COMPAT_VERSION = os.getenv('SCALA_COMPAT_VERSION')
CATBOOST_SPARK_VERSION = os.getenv('CATBOOST_SPARK_VERSION')
CLICKHOUSE_HOST = os.getenv('CLICKHOUSE_HOST')
CLICKHOUSE_PORT = os.getenv('CLICKHOUSE_PORT')
CLICKHOUSE_USER = os.getenv('CLICKHOUSE_USER')
CLICKHOUSE_PASSWORD = os.getenv('CLICKHOUSE_PASSWORD')

ACCESS_KEY=os.getenv('MINIO_ROOT_USER')
SECRET_KEY=os.getenv('MINIO_ROOT_PASSWORD')

# Get spark client, connect to clickhouse 

In [3]:
from pyspark.sql import SparkSession

packages = [
    "com.clickhouse.spark:clickhouse-spark-runtime-3.4_2.12:0.8.0",
    "com.clickhouse:clickhouse-client:0.7.0",
    "com.clickhouse:clickhouse-http-client:0.7.0",
    "org.apache.httpcomponents.client5:httpclient5:5.2.1",
    f"ai.catboost:catboost-spark_{SPARK_COMPAT_VERSION}_{SCALA_COMPAT_VERSION}:{CATBOOST_SPARK_VERSION}"
]

spark = (SparkSession
         .builder
         .config("spark.jars.packages", ",".join(packages))
         .master("local[1]")
         .getOrCreate())

spark.conf.set("spark.sql.catalog.clickhouse", "com.clickhouse.spark.ClickHouseCatalog")
spark.conf.set("spark.sql.catalog.clickhouse.host", "127.0.0.1")
spark.conf.set("spark.sql.catalog.clickhouse.protocol", "http")
spark.conf.set("spark.sql.catalog.clickhouse.http_port", "8123")
spark.conf.set("spark.sql.catalog.clickhouse.user", "konsin1988")
spark.conf.set("spark.sql.catalog.clickhouse.password", "r13l02c1988")
spark.conf.set("spark.sql.catalog.clickhouse.database", "credit")
spark.conf.set("spark.clickhouse.write.format", "json")

sc = spark.sparkContext
rdd = sc.parallelize(range(100 + 1))

### Import catboost_spark (only after spark packages loading)

In [4]:
import catboost_spark
from catboost_spark import CatBoostClassifier

# Set feature and target columns

In [5]:
COLUMN_TYPES = {
    'age': 'category',
    'sex': 'category',
    'job': 'category',
    'housing': 'category',
    'credit_amount': 'numeric',
    'duration': 'numeric'
}

TARGET_COLUMN_NAME = 'default'
FEATURE_COLUMNS = [i for i in COLUMN_TYPES.keys()]
FEATURE_TYPES = {i: COLUMN_TYPES[i] for i in COLUMN_TYPES if i != TARGET_COLUMN_NAME}

COLUMNS_TO_SCALE = [key for key in COLUMN_TYPES.keys() if COLUMN_TYPES[key] == "numeric"]
COLUMNS_TO_ENCODE = [key for key in COLUMN_TYPES.keys() if COLUMN_TYPES[key] == "category"]

# Get data

In [6]:
job_list = {
    0: 'unskilled and non-resident', 
    1: 'unskilled and resident', 
    2: 'skilled', 
    3: 'highly skilled'
}

In [None]:
X_query = f"SELECT {reduce(lambda a,b: a + ', ' + b, FEATURE_COLUMNS)} FROM clickhouse.credit.credit;"

str_job = F.udf(lambda x: job_list[x], T.StringType())
X = (
    spark
    .sql(X_query)
    .withColumn('job', str_job(col('job')))
)

y_query = f'SELECT {TARGET_COLUMN_NAME} FROM clickhouse.credit.credit;'

str_job = F.udf(lambda x: job_list[x], T.StringType())
y = (
    spark
    .sql(y_query)
)

# MLflow connection

In [21]:
warnings.filterwarnings('ignore')
mlflow.set_tracking_uri('http://localhost:5000/')
print("URI", mlflow.get_tracking_uri())

URI http://localhost:5000/


In [22]:
numeric_transformer = Pipeline(steps = [
    ("imputer", SimpleImputer(strategy="median")),
    ("scaler", StandardScaler())
])
categorical_transformer = Pipeline(steps = [
    ("imputer", SimpleImputer(strategy="most_frequent")),
    ("onehot", OneHotEncoder(handle_unknown="ignore", sparse_output=False))
])
# 
preprocessor = ColumnTransformer(
    transformers=[
        ("num", numeric_transformer, COLUMNS_TO_SCALE),
        ("cat", categorical_transformer, COLUMNS_TO_ENCODE)
    ]
)
X_preproccessed = preprocessor.fit_transform(X)

ValueError: Expected 2D array, got scalar array instead:
array=DataFrame[age: bigint, sex: string, job: string, housing: string, credit_amount: bigint, duration: bigint].
Reshape your data either using array.reshape(-1, 1) if your data has a single feature or array.reshape(1, -1) if it contains a single sample.