In [0]:
! pip install dlt

In [0]:
! pip install tensorflow=='2.11.0'
! pip install numpy=='1.21.5'
! pip install transformers=='4.29.2'

Python interpreter will be restarted.
Collecting tensorflow
  Using cached tensorflow-2.13.0-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (524.1 MB)
Collecting termcolor>=1.1.0
  Using cached termcolor-2.3.0-py3-none-any.whl (6.9 kB)
Collecting flatbuffers>=23.1.21
  Using cached flatbuffers-23.5.26-py2.py3-none-any.whl (26 kB)
Collecting astunparse>=1.6.0
  Using cached astunparse-1.6.3-py2.py3-none-any.whl (12 kB)
Collecting grpcio<2.0,>=1.24.3
  Using cached grpcio-1.56.0-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (5.2 MB)
Collecting google-pasta>=0.1.1
  Using cached google_pasta-0.2.0-py3-none-any.whl (57 kB)
Collecting protobuf!=4.21.0,!=4.21.1,!=4.21.2,!=4.21.3,!=4.21.4,!=4.21.5,<5.0.0dev,>=3.20.3
  Using cached protobuf-4.23.4-cp37-abi3-manylinux2014_x86_64.whl (304 kB)
Collecting tensorflow-estimator<2.14,>=2.13.0
  Using cached tensorflow_estimator-2.13.0-py2.py3-none-any.whl (440 kB)
Collecting opt-einsum>=2.3.2
  Using cached opt_einsum-3.3.0-py3-n

In [0]:
import dlt
import numpy as np
import pandas as pd
import tensorflow as tf
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, monotonically_increasing_id
from pyspark.sql.types import StringType
from transformers import TFBertForSequenceClassification, BertTokenizer, pipeline

# Load the fine-tuned ALBERT model
model_lib = "vineet1409/fine-tuned-AlBERT"  
albert_classifier = pipeline("text-classification", model=model_lib, tokenizer=model_lib)

# Load the bio-clinical-BERT model and tokenizer
clinicalbert_model = TFBertForSequenceClassification.from_pretrained("vineet1409/fine-tuned-bioclinical-BERT")
clinicalbert_tokenizer = BertTokenizer.from_pretrained("emilyalsentzer/Bio_ClinicalBERT")



Some layers from the model checkpoint at vineet1409/fine-tuned-AlBERT were not used when initializing TFAlbertForSequenceClassification: ['dropout_9']
- This IS expected if you are initializing TFAlbertForSequenceClassification from the checkpoint of a model trained on another task or with another architecture (e.g. initializing a BertForSequenceClassification model from a BertForPreTraining model).
- This IS NOT expected if you are initializing TFAlbertForSequenceClassification from the checkpoint of a model that you expect to be exactly identical (initializing a BertForSequenceClassification model from a BertForSequenceClassification model).
All the layers of TFAlbertForSequenceClassification were initialized from the model checkpoint at vineet1409/fine-tuned-AlBERT.
If your task is similar to the task the model of the checkpoint was trained on, you can already use TFAlbertForSequenceClassification for predictions without further training.
Some layers from the model checkpoint at vin

In [0]:

## silver Layer albert
import logging
from pyspark.sql.functions import array, lit, when, col
import pyspark.sql.functions as F

logger = logging.getLogger(__name__)

def run_analysis(text):
    predictions = albert_classifier(text)  
    predicted_label = predictions[0]['label']
    if predicted_label == 'LABEL_1':
        result = 'Suicidal'
    elif predicted_label == 'LABEL_0':
        result = 'Non-suicidal'
    return result
    
    

@dlt.table(
comment="Silver_Layer_albert: Custom LLM {ALBERT} Predictions on dataset."
)
def albert_silver_layer():
    logger.info("Reading source table")  
    df =  dlt.read('bronze_layer')
    df = df.limit(20)

    text_col = col("text")
    text_list = df.select(text_col).rdd.flatMap(lambda x: x).collect()

    # Perform analysis and store the results in a list
    results = []
    for text in text_list:
        result = run_analysis(text)
        results.append(result)

    results = [r for r in results if r is not None] 
    df = df.withColumn("albert-predictions", array(*[lit(r) for r in results]))
    df = df.withColumn("albert-predictions", array(*[when(col(r).isNull(), lit("")).otherwise(lit(r)) for r in results]))
    if results:
        df = df.withColumn("albert-predictions", F.concat_ws(",", results))
    else:
        df = df.withColumn("albert-predictions", F.lit(None))

    return df



In [0]:
# Silver Layer: clinical-bert  
import logging
from pyspark.sql.functions import array, lit, when, col
import pyspark.sql.functions as F


@dlt.table(
comment="Silver_Layer_clinicalbert: Custom LLM {clinicalbert} Predictions on dataset."
)
def clinicalbert_silver_layer():  
    df = dlt.read('bronze_layer')
    df = df.limit(20)

    text_col = col("text")
    text_list = df.select(text_col).rdd.flatMap(lambda x: x).collect()


    results = []
    for text in text_list:
        encoded_input = clinicalbert_tokenizer.batch_encode_plus(
                        [text],
                        padding=True,
                        truncation=True,
                        return_tensors="tf",
                        max_length=128
                        ) 
        predict = clinicalbert_model.predict(encoded_input['input_ids'])
        predicted_labels = np.argmax(predict.logits, axis=1)
        class_names = ["Non-suicidal", "Suicidal"]
        predicted_classes = [class_names[label] for label in predicted_labels] 
        output = predicted_classes[0]
        results.append(output)

        results = [r for r in results if r is not None] 
        df = df.withColumn("clinical-bert-predictions", array(*[lit(r) for r in results]))
        df = df.withColumn("clinical-bert-predictions", array(*[when(col(r).isNull(), lit("")).otherwise(lit(r)) for r in results]))
        if results:
            df = df.withColumn("clinical-bert-predictions", F.concat_ws(",", results))
        else:
            df = df.withColumn("clinical-bert-predictions", F.lit(None))

    return df
