In [0]:
%pip install --upgrade pip
%pip install torch==2.0.1
%pip install transformers==4.29.2
%pip install scikit-learn==0.24.2
%pip install pyspark==3.4.0
%pip install pandas==1.3.4
%pip install accelerate==0.20.3
%pip install seqeval==1.2.2
%pip install datasets==2.12.0
%pip install tqdm==4.65.0
%pip install evaluate==0.4.0
%pip install mlflow==2.9.2
%pip install mlflow[pipelines]
%pip install torchvision==0.15.2
%pip install pytorch-lightning==2.0.8 

In [0]:
from transformers import LukeTokenizer, LukeModel, LukeForEntityPairClassification, AdamW
from transformers import Pipeline, pipeline
from transformers.pipelines import PIPELINE_REGISTRY
import pyspark.sql.types as T
import numpy as np
import pandas as pd
import pytorch_lightning as pl
import mlflow

In [0]:
# defines the model

class LUKE(pl.LightningModule):

    def __init__(self):
        super().__init__()
        self.model = LukeForEntityPairClassification.from_pretrained("studio-ousia/luke-large-finetuned-tacred")
        # we can point this to our loaded model to pick up fine tuning, right?

    def forward(self, input_ids, entity_ids, entity_position_ids, attention_mask, entity_attention_mask):     
        outputs = self.model(input_ids=input_ids, attention_mask=attention_mask, entity_ids=entity_ids, 
                             entity_attention_mask=entity_attention_mask, entity_position_ids=entity_position_ids)
        return outputs
    
    def common_step(self, batch, batch_idx):
        labels = batch['label']
        del batch['label']
        outputs = self(**batch)
        logits = outputs.logits

        criterion = torch.nn.CrossEntropyLoss() # multi-class classification
        loss = criterion(logits, labels)
        predictions = logits.argmax(-1)
        correct = (predictions == labels).sum().item()
        accuracy = correct/batch['input_ids'].shape[0]

        return loss, accuracy
      
    def training_step(self, batch, batch_idx):
        loss, accuracy = self.common_step(batch, batch_idx)     
        self.log("training_loss", loss)
        self.log("training_accuracy", accuracy)

        return loss

    def validation_step(self, batch, batch_idx):
        loss, accuracy = self.common_step(batch, batch_idx)     
        self.log("validation_loss", loss, on_epoch=True)
        self.log("validation_accuracy", accuracy, on_epoch=True)

        return loss
    
    # currently we don't have enough datato create a test set, just train and validation
    def test_step(self, batch, batch_idx):
        loss, accuracy = self.common_step(batch, batch_idx)     

        return loss

    def configure_optimizers(self):
        optimizer = AdamW(self.parameters(), lr=5e-5)
        return optimizer

    def train_dataloader(self):
        return train_dataloader

    def val_dataloader(self):
        return valid_dataloader

    def test_dataloader(self):
        return test_dataloader

In [0]:
# defines a pipeline compatible w/ model and serves the outputs I want

def softmax(outputs):
    maxes = np.max(outputs, axis=-1, keepdims=True)
    shifted_exp = np.exp(outputs - maxes)
    return shifted_exp / shifted_exp.sum(axis=-1, keepdims=True)

class EntityPairClassificationPipeline(Pipeline):

    def _sanitize_parameters(self, **kwargs):
        preprocess_params = {}
        if "entity_spans" in kwargs:
            preprocess_params["entity_spans"] = kwargs["entity_spans"]
        return preprocess_params, {}, {}

    def __call__(self, text, **kwargs): # entity_spans):
        # result = super().__call__(text, entity_spans)
        result = super().__call__(text, **kwargs)
        return result

    # the preproccess function IS NOT successfully getting the entity_spans
    def preprocess(self, text, entity_spans=None):
        model_inputs = self.tokenizer(text=text, 
                                      entity_spans=entity_spans, 
                                      return_tensors="pt"
                                      )
        return model_inputs

    def _forward(self, model_inputs):
        return self.model(**model_inputs)

    def postprocess(self, model_outputs):
        logits = model_outputs.logits[0].numpy()
        probabilities = softmax(logits)
        best_class = np.argmax(probabilities)
        label = self.model.config.id2label[best_class]
        score = probabilities[best_class].item()
        return {"predicted label": label, "confidence score": score}

PIPELINE_REGISTRY.register_pipeline(
    "entity-pair-classification",
    pipeline_class=EntityPairClassificationPipeline,
    pt_model=LukeForEntityPairClassification,
    type="text"  
)

print(PIPELINE_REGISTRY.get_supported_tasks())
print(PIPELINE_REGISTRY.check_task('entity-pair-classification'))

entity-pair-classification is already registered. Overwriting pipeline for task entity-pair-classification...
['audio-classification', 'automatic-speech-recognition', 'conversational', 'depth-estimation', 'document-question-answering', 'entity-pair-classification', 'feature-extraction', 'fill-mask', 'image-classification', 'image-segmentation', 'image-to-text', 'mask-generation', 'ner', 'object-detection', 'question-answering', 'sentiment-analysis', 'summarization', 'table-question-answering', 'text-classification', 'text-generation', 'text2text-generation', 'token-classification', 'translation', 'video-classification', 'visual-question-answering', 'vqa', 'zero-shot-audio-classification', 'zero-shot-classification', 'zero-shot-image-classification', 'zero-shot-object-detection']
('entity-pair-classification', {'impl': <class '__main__.EntityPairClassificationPipeline'>, 'pt': (<class 'transformers.models.luke.modeling_luke.LukeForEntityPairClassification'>,), 'tf': (), 'type': 'text'},

In [0]:
# demonstrate that the pipeline works locally

model = LukeForEntityPairClassification.from_pretrained(
    "studio-ousia/luke-large-finetuned-tacred",
    )

tokenizer = LukeTokenizer.from_pretrained(
    "studio-ousia/luke-large-finetuned-tacred",
    max_mention_length = 64,
    )

# pass the model and previously used classifier to the custom transformers pipeline I created
classifier = pipeline("entity-pair-classification",
                      model=model,
                      tokenizer=tokenizer,
                      framework='pt',
                      config=model.config,
                      ) 

# send a toy example to the classifier to make sure it works correctly
entity_spans_list = ((0, 7), (17, 28))
entity_spans_tuple = [tuple(x) for x in entity_spans_list]
print(entity_spans_tuple)
text1 = "Beyoncé lives in Los Angeles."

# call the classifier pipeline function defined above
classifier(text=text1, entity_spans=entity_spans_tuple) 

# should output...
# {'predicted label': 'per:cities_of_residence', 'confidence score': 0.9899731874465942}

Some weights of the model checkpoint at studio-ousia/luke-large-finetuned-tacred were not used when initializing LukeForEntityPairClassification: ['luke.embeddings.position_ids']
- This IS expected if you are initializing LukeForEntityPairClassification from the checkpoint of a model trained on another task or with another architecture (e.g. initializing a BertForSequenceClassification model from a BertForPreTraining model).
- This IS NOT expected if you are initializing LukeForEntityPairClassification from the checkpoint of a model that you expect to be exactly identical (initializing a BertForSequenceClassification model from a BertForSequenceClassification model).
[(0, 7), (17, 28)]
Out[6]: {'predicted label': 'per:cities_of_residence',
 'confidence score': 0.9899731874465942}

In [0]:
# log model
mlflow.end_run()
mlflow.start_run()
mlflow.transformers.log_model(
    classifier,
    artifact_path= "re_custom_pipeline",
    task="entity-pair-classification",
    code_paths=["/Workspace/Users/FAKE/FAKE/FAKE/"]
    # code_paths contains .py with class EntityPairClassificationPipeline(Pipeline); class LUKE(pl.LightningModule); PIPELINE_REGISTRY.register_pipeline() 
)
mlflow.end_run()


Entry Not Found for url: https://huggingface.co/studio-ousia/luke-large-finetuned-tacred/resolve/main/README.md.


Uploading artifacts:   0%|          | 0/19 [00:00<?, ?it/s]

2024/01/17 20:02:12 INFO mlflow.store.artifact.cloud_artifact_repo: The progress bar can be disabled by setting the environment variable MLFLOW_ENABLE_ARTIFACTS_PROGRESS_BAR to false


Uploading /tmp/repl_tmp_data/ReplId-3536d-aace2-81a75-e/tmppg4q_qdz/model/model/pytorch_model-00002-of-00006.b…

In [0]:
# load model 
loaded_classifier = mlflow.transformers.load_model(
    'runs:/27c7190cf78d493ca4c3d779c73d6581/re_custom_pipeline',  # update with runid
    return_type="pipeline" 
    )

print(loaded_classifier.task)
print(loaded_classifier.model)
print(loaded_classifier.tokenizer)

# use model locally
tup1 = tuple((0, 7))
tup2 = tuple((17, 28))
my_tuple = [tup1, tup2]
text = "Beyoncé lives in Los Angeles."

loaded_classifier.__call__(
    text=text,
    entity_spans=(my_tuple))
    
# outputs: {'predicted label': 'per:cities_of_residence', 'confidence score': 0.9899731874465942}

Downloading artifacts:   0%|          | 0/19 [00:00<?, ?it/s]

2024/01/17 20:12:50 INFO mlflow.store.artifact.artifact_repo: The progress bar can be disabled by setting the environment variable MLFLOW_ENABLE_ARTIFACTS_PROGRESS_BAR to false
2024/01/17 20:13:01 INFO mlflow.transformers: 'runs:/27c7190cf78d493ca4c3d779c73d6581/re_custom_pipeline' resolved as 'dbfs:/databricks/mlflow-tracking/1829326925589999/27c7190cf78d493ca4c3d779c73d6581/artifacts/re_custom_pipeline'


Downloading artifacts:   0%|          | 0/1 [00:00<?, ?it/s]

Loading checkpoint shards:   0%|          | 0/6 [00:00<?, ?it/s]

entity-pair-classification
LukeForEntityPairClassification(
  (luke): LukeModel(
    (embeddings): LukeEmbeddings(
      (word_embeddings): Embedding(50267, 1024, padding_idx=1)
      (position_embeddings): Embedding(514, 1024, padding_idx=1)
      (token_type_embeddings): Embedding(1, 1024)
      (LayerNorm): LayerNorm((1024,), eps=1e-05, elementwise_affine=True)
      (dropout): Dropout(p=0.1, inplace=False)
    )
    (entity_embeddings): LukeEntityEmbeddings(
      (entity_embeddings): Embedding(500000, 256, padding_idx=0)
      (entity_embedding_dense): Linear(in_features=256, out_features=1024, bias=False)
      (position_embeddings): Embedding(514, 1024)
      (token_type_embeddings): Embedding(1, 1024)
      (LayerNorm): LayerNorm((1024,), eps=1e-05, elementwise_affine=True)
      (dropout): Dropout(p=0.1, inplace=False)
    )
    (encoder): LukeEncoder(
      (layer): ModuleList(
        (0-23): 24 x LukeLayer(
          (attention): LukeAttention(
            (self): LukeSelfA

In [0]:
# create spark dataframe for example

schema = (
    T.StructType([
        T.StructField("text", T.StringType(), True),
        T.StructField("entity_spans", T.ArrayType(
            T.StructType([
                T.StructField("tuple_1", T.IntegerType(), True), 
                T.StructField("tuple_2", T.IntegerType(), True)])))]))

spark_df = spark.createDataFrame([(text, (my_tuple))], schema=schema)

spark_df.printSchema()
spark_df.show()
display(spark_df)

root
 |-- text: string (nullable = true)
 |-- entity_spans: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- tuple_1: integer (nullable = true)
 |    |    |-- tuple_2: integer (nullable = true)

+--------------------+------------------+
|                text|      entity_spans|
+--------------------+------------------+
|Beyoncé lives in ...|[{0, 7}, {17, 28}]|
+--------------------+------------------+



text,entity_spans
Beyoncé lives in Los Angeles.,"List(List(0, 7), List(17, 28))"


In [0]:
import mlflow
from pyspark.sql.functions import struct, col
logged_model = 'runs:/27c7190cf78d493ca4c3d779c73d6581/re_custom_pipeline'

# Load model as a Spark UDF. Override result_type if the model does not return double values.
loaded_model = mlflow.pyfunc.spark_udf(
    spark, 
    model_uri=logged_model, 
    result_type='string',
    )

# Predict on a Spark DataFrame.
display(spark_df.withColumn('predictions', loaded_model(struct(*map(col, spark_df.columns)))))

Downloading artifacts:   0%|          | 0/19 [00:00<?, ?it/s]

2024/01/17 20:16:41 INFO mlflow.store.artifact.artifact_repo: The progress bar can be disabled by setting the environment variable MLFLOW_ENABLE_ARTIFACTS_PROGRESS_BAR to false


Downloading artifacts:   0%|          | 0/1 [00:00<?, ?it/s]

2024/01/17 20:16:51 INFO mlflow.models.flavor_backend_registry: Selected backend for flavor 'python_function'


Does the call from _validate_transformers_task_type() to get_supported_tasks() (https://github.com/mlflow/mlflow/blob/98aefdf1af299c4ddef898377860e06eda7969f3/mlflow/transformers/__init__.py#L1276) simply call for the list in  https://github.com/huggingface/transformers/blob/f4f57f9dfa68948a383c352a900d588f63f6290a/src/transformers/pipelines/__init__.py#L434?  And is the code in the directory specified by code_paths run once the environment is set up? Could get_supported_tasks() call PIPELINE_REGISTRY.get_supported_tasks() once prepended code is run? 