Skip to content

Python UDA return type is inconsistent with doc #571

@yokofly

Description

@yokofly

Company or project name

Timeplus

Describe what's wrong

Please refer to this test of UDA, with version 3.0.8, the return from finalize should be a list, but in my test, it seems expect to be directly a string type, something seems wrong there.

-- install scikit-learn package
SYSTEM INSTALL PYTHON PACKAGE 'scikit-learn';
SYSTEM INSTALL PYTHON PACKAGE 'pandas';

-- create iris dataset with stream and insert data
DROP STREAM IF EXISTS iris;

CREATE STREAM IF NOT EXISTS iris
(
  `sepal.length` float64,
  `sepal.width` float64,
  `petal.length` float64,
  `petal.width` float64,
  `variety` string
);

insert into iris (
  `sepal.length`,
  `sepal.width`,
  `petal.length`,
  `petal.width`,
  `variety`
)
SELECT * 
FROM url('https://tp-solutions.s3.us-west-2.amazonaws.com/iris.csv', 'CSVWithNames');

-- create python UDF for iris classification
CREATE OR REPLACE AGGREGATE FUNCTION train_sklearn_classifier(features array(float64), label string, name string) RETURNS string LANGUAGE PYTHON AS 
$$

import numpy as np
import pandas as pd
import joblib
import pickle
import json
from sklearn.linear_model import LogisticRegression

class train_sklearn_classifier:
    def __init__(self):
        self.all_features = []
        self.all_labels = []
        self.all_names = []

    def serialize(self):
        data = {
            "all_features": self.all_features,
            "all_labels": self.all_labels,
            "all_names": self.all_names
        }
        return pickle.dumps(data)

    def deserialize(self, data):
        data = pickle.loads(data)
        self.all_features = data["all_features"]
        self.all_labels = data["all_labels"]
        self.all_names = data["all_names"]

    def merge(self, other):
        self.all_features.extend(other.all_features)
        self.all_labels.extend(other.all_labels)
        self.all_names.extend(other.all_names)

    def process(self, features, labels, names):
        # Just accumulate the inputs into internal state
        self.all_features.extend(features)
        self.all_labels.extend(labels)
        self.all_names.extend(names)

    def finalize(self):
        try:
            # Now do the training with all accumulated data
            if not self.all_features:
                return ['No data to train']
            
            # Prepare data
            feature_names = [f'f{n}' for n in range(len(self.all_features[0]))]
            data = []
            for feature, label in zip(self.all_features, self.all_labels):
                row = list(feature) + [label]
                data.append(row)
            
            df = pd.DataFrame(data, columns=feature_names + ['label'])
            X = df[feature_names].values
            y = df['label'].values
            
            # Train model with original string labels
            classifier = LogisticRegression(max_iter=200)
            classifier.fit(X, y)  # Use y, not y_encoded!
            
            # Save model
            model_path = f'/tmp/{self.all_names[0]}.pkl'
            joblib.dump(classifier, model_path)
            
            # Return model info
            model_info = {
                'model_type': classifier.__class__.__name__,
                'path': model_path,
                'n_features': len(feature_names),
                'n_samples': len(X)
            }
            return json.dumps(model_info)
            
        except Exception as e:
            return f'Error: {str(e)}'

$$;

CREATE OR REPLACE FUNCTION predict_sklearn_classifier(features array(float64), name string) RETURNS string LANGUAGE PYTHON AS 
$$
import traceback 
import joblib
import numpy as np

def predict_sklearn_classifier(features, name):
    results = []
    for (features, name) in zip(features, name):
        try:
            loaded_classifier = joblib.load(f'/tmp/{name}.pkl')
            new_data = np.array([features]) 
            new_prediction = loaded_classifier.predict(new_data)
            results = [ str(v) for v in new_prediction]
        except Exception as e:
            trace = traceback.format_exc()
            results.append(trace)

    return results

$$;


-- train the model with historical data
SELECT
  train_sklearn_classifier([sepal.length, sepal.width, petal.length, petal.width], variety, 'test_sklearn_classifier')
FROM
  table(iris);


-- prediction
SELECT
  predict_sklearn_classifier([5.1, 3.5, 1.4, 0.2], 'test_sklearn_classifier')

Does it reproduce on the most recent release?

Yes

How to reproduce

refer to the code

Expected behavior

No response

Error message and/or stacktrace

No response

Additional context

No response

Metadata

Metadata

Assignees

Labels

No labels
No labels

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions