In [1]:
# !python -m venv pyspark_venv
# !source pyspark_venv/bin/activate
# !pip install --upgrade pip
# !pip install venv-pack pyarrow==14.0.0 pandas==2.0.0 numpy==1.24.2 catboost==1.2.2
# !venv-pack -o pyspark_venv.tar.gz

In [2]:
# !hdfs dfs -mkdir /user/ubuntu

In [3]:
import os

import findspark


findspark.init()
findspark.find()


from pyspark import SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import array, pandas_udf, col
from catboost import CatBoostClassifier
import pandas as pd
import numpy as np

In [4]:
os.environ['PYSPARK_DRIVER_PYTHON'] = "python"
os.environ['PYSPARK_PYTHON'] = "./environment/bin/python"

In [5]:
conf = (
    SparkConf().setMaster("yarn").setAppName("inference_dummy_model")
        .set("spark.executor.memory", "2g")
        .set("spark.driver.memory", "4g")
        .set("spark.sql.execution.arrow.pyspark.enabled", "true")
        .set("spark.yarn.dist.archives", "pyspark_venv.tar.gz#environment")
)


spark = SparkSession.builder.config(conf=conf).getOrCreate()

In [None]:
!hdfs dfs mkdir /user/ubuntu

In [6]:
DEFAULT_INPUT_DATA_PATH = "s3a://automl-otus-practice/data.csv"
DEFAULT_OUTPUT_DATA_PATH = "/user/ubuntu/outcome"

In [7]:
features = [
    'has_more_payment_types', 'has_sequential', 'has_installments', 'avg_payment_value',
    'mean_days_purchase_to_approved', 'mean_days_approved_to_carrier',
    'mean_days_limit_to_carrier', 'buy_has_work_day', 'itens', 'sum_price',
    'sum_freight', 'sum_same_city', 'sum_same_state', 'mean_distance_km',
    'mean_p_name_lenght', 'mean_p_photos_qty', 'mean_p_weight_g',
    'mean_volume', 'mean_length_width_ratio'
]
features_array = "features"
order_id = "order_id"
real_score = "score"
predicted_score = "predicted_score"

In [8]:
sdf = (
    spark.read
    .format('csv')
    .options(header='true', inferSchema='true', delimiter=",")
    .load(DEFAULT_INPUT_DATA_PATH)
)
sdf = sdf.dropna()
sdf = sdf.withColumn(features_array, array(*features))

model = CatBoostClassifier().load_model("model.cbm")

In [9]:
@pandas_udf(IntegerType())
def inference_udf(input_data):
    objects = np.vstack(input_data.to_numpy().ravel())
    
    
    predictions = pd.Series(model.predict(objects).ravel())
    
    return predictions

In [10]:
sdf = sdf.withColumn(predicted_score, inference_udf(col(features_array)))

In [11]:
sdf.select([order_id, features_array, real_score, predicted_score]).show(2)

+--------------------+--------------------+-----+---------------+
|            order_id|            features|score|predicted_score|
+--------------------+--------------------+-----+---------------+
|000229ec398224ef6...|[0.0, 0.0, 1.0, 2...|    5|              5|
|00024acbcdf0a6daa...|[0.0, 0.0, 1.0, 2...|    4|              5|
+--------------------+--------------------+-----+---------------+
only showing top 2 rows



In [12]:
sdf = sdf.repartition(4)
sdf.select(
    [order_id, features_array, real_score, predicted_score]
).write.mode("overwrite").parquet(DEFAULT_OUTPUT_DATA_PATH)

In [13]:
!hdfs dfs -ls -R /user/ubuntu/ 

drwxr-xr-x   - ubuntu hadoop          0 2023-11-06 09:33 /user/ubuntu/mlops_ol/outcome
-rw-r--r--   1 ubuntu hadoop          0 2023-11-06 09:33 /user/ubuntu/mlops_ol/outcome/_SUCCESS
-rw-r--r--   1 ubuntu hadoop     974112 2023-11-06 09:33 /user/ubuntu/mlops_ol/outcome/part-00000-fbf7fcb8-7b75-4a2c-a544-97f4c55c2e05-c000.snappy.parquet
-rw-r--r--   1 ubuntu hadoop     975681 2023-11-06 09:33 /user/ubuntu/mlops_ol/outcome/part-00001-fbf7fcb8-7b75-4a2c-a544-97f4c55c2e05-c000.snappy.parquet
-rw-r--r--   1 ubuntu hadoop     973214 2023-11-06 09:33 /user/ubuntu/mlops_ol/outcome/part-00002-fbf7fcb8-7b75-4a2c-a544-97f4c55c2e05-c000.snappy.parquet
-rw-r--r--   1 ubuntu hadoop     974091 2023-11-06 09:33 /user/ubuntu/mlops_ol/outcome/part-00003-fbf7fcb8-7b75-4a2c-a544-97f4c55c2e05-c000.snappy.parquet


In [14]:
spark.stop()