# packages

In [0]:
!pip install lightgbm

You should consider upgrading via the '/local_disk0/.ephemeral_nfs/envs/pythonEnv-38988a65-edd1-42ba-91ba-b0da5880e5ed/bin/python -m pip install --upgrade pip' command.[0m


In [0]:
!pip install catboost

Collecting catboost
  Downloading catboost-1.1.1-cp39-none-manylinux1_x86_64.whl (76.6 MB)
[?25l[K     |                                | 10 kB 21.6 MB/s eta 0:00:04[K     |                                | 20 kB 16.0 MB/s eta 0:00:05[K     |                                | 30 kB 21.6 MB/s eta 0:00:04[K     |                                | 40 kB 15.5 MB/s eta 0:00:05[K     |                                | 51 kB 11.7 MB/s eta 0:00:07[K     |                                | 61 kB 13.6 MB/s eta 0:00:06[K     |                                | 71 kB 14.7 MB/s eta 0:00:06[K     |                                | 81 kB 13.9 MB/s eta 0:00:06[K     |                                | 92 kB 15.2 MB/s eta 0:00:06[K     |                                | 102 kB 13.3 MB/s eta 0:00:06[K     |                                | 112 kB 13.3 MB/s eta 0:00:06[K     |                                | 122 kB 13.3 MB/s eta 0:00:06[K     |                                | 13

In [0]:
import lightgbm as lgb
import numpy as np
import pandas as pd
from sklearn import preprocessing
from sklearn.impute import SimpleImputer
from catboost import CatBoostClassifier, Pool

# data

In [0]:
df_panddas = pd.DataFrame([
    {'model':1, 'id':1, 'feature_num':1.0, 'featuer_cat':'a', 'label':1},
    {'model':1, 'id':2, 'feature_num':None, 'featuer_cat':'b', 'label':0},
    {'model':1, 'id':3, 'feature_num':1.0, 'featuer_cat':'a', 'label':1},
    {'model':2, 'id':4, 'feature_num':1.0, 'featuer_cat':None, 'label':1},
    {'model':2, 'id':5, 'feature_num':2.0, 'featuer_cat':'b', 'label':0},
    {'model':2, 'id':6, 'feature_num':1.0, 'featuer_cat':'a', 'label':1},
])

df = spark.createDataFrame(df_panddas)

In [0]:
df.show()

+-----+---+-----------+-----------+-----+
|model| id|feature_num|featuer_cat|label|
+-----+---+-----------+-----------+-----+
|    1|  1|        1.0|          a|    1|
|    1|  2|       null|          b|    0|
|    1|  3|        1.0|          a|    1|
|    2|  4|        1.0|       null|    1|
|    2|  5|        2.0|          b|    0|
|    2|  6|        1.0|          a|    1|
+-----+---+-----------+-----------+-----+



# features

In [0]:
categorical_feature = ['featuer_cat']
feature_name = ['feature_num', 'featuer_cat']

# light gbm

In [0]:
def fit_model(
    df_panddas,
    ):

    ## categorical feature encoding
    for c in categorical_feature:
        le = preprocessing.LabelEncoder()
        le.fit(df_panddas[c])
        df_panddas[c] = le.transform(df_panddas[c]) 
            
    ## training set
    train_data = lgb.Dataset(
        data = df_panddas[feature_name], 
        label = df_panddas['label'],
        categorical_feature  = categorical_feature,
        feature_name = feature_name,
        )

    param = {'objective': 'binary'}

    bst = lgb.train(
        param, 
        train_data, 
        num_boost_round = 512,
        )

    df_panddas['prediction'] = bst.predict(df_panddas[feature_name])

    return df_panddas[['model', 'id', 'prediction']]

In [0]:
df.groupBy([
    'model'
]).applyInPandas(
    fit_model,
    schema = f"""
    model int,
    id int, 
    prediction double
    """
).show()

+-----+---+------------------+
|model| id|        prediction|
+-----+---+------------------+
|    1|  1|0.6666666666666666|
|    1|  3|0.6666666666666666|
|    1|  2|0.6666666666666666|
|    2|  5|0.6666666666666666|
|    2|  4|0.6666666666666666|
|    2|  6|0.6666666666666666|
+-----+---+------------------+



# catboost

In [0]:
def fit_model(
    df_panddas,
    ):

    ## categorical feature encoding
    for c in categorical_feature:
        le = preprocessing.LabelEncoder()
        le.fit(df_panddas[c])
        df_panddas[c] = le.transform(df_panddas[c]) 

    ## 
    imp = SimpleImputer(
        missing_values= np.nan, 
        strategy = 'most_frequent',
        )
    imp.fit(df_panddas[feature_name])
    df_panddas[feature_name] = imp.transform(df_panddas[feature_name])

    for c in categorical_feature:
        df_panddas[c] = df_panddas[c].astype('int')
    
    train_data = Pool(
        data = df_panddas[feature_name], 
        label = df_panddas['label'],
        cat_features  = categorical_feature,
        feature_names = feature_name,
        )

    model = CatBoostClassifier(
        iterations=2,
        depth=2,
        learning_rate=1,
        loss_function='Logloss',
        verbose = False)

    # train the model
    model.fit(train_data)

    # make the prediction using the resulting model
    preds_proba  = model.predict_proba(df_panddas[feature_name])
    df_panddas['prediction'] = preds_proba[:,1]

    return df_panddas[['model', 'id', 'label', 'prediction']]

In [0]:
df.groupBy([
    'model'
]).applyInPandas(
    fit_model,
    schema = f"""
    model int,
    id int, 
    label int,
    prediction double
    """
).show()

+-----+---+-----+-------------------+
|model| id|label|         prediction|
+-----+---+-----+-------------------+
|    1|  2|    0|0.42654888850053757|
|    1|  1|    1|  0.629855013297618|
|    1|  3|    1|  0.629855013297618|
|    2|  4|    1|  0.629855013297618|
|    2|  6|    1|  0.629855013297618|
|    2|  5|    0|0.42654888850053757|
+-----+---+-----+-------------------+



# transformer

In [0]:
from transformers import AutoTokenizer, AutoModelForSeq2SeqLM

tokenizer = AutoTokenizer.from_pretrained("google/flan-t5-small")

model = AutoModelForSeq2SeqLM.from_pretrained("google/flan-t5-small")

Downloading tokenizer_config.json:   0%|          | 0.00/2.48k [00:00<?, ?B/s]

Downloading spiece.model:   0%|          | 0.00/773k [00:00<?, ?B/s]

Downloading tokenizer.json:   0%|          | 0.00/2.31M [00:00<?, ?B/s]

Downloading special_tokens_map.json:   0%|          | 0.00/2.15k [00:00<?, ?B/s]

Downloading config.json:   0%|          | 0.00/1.37k [00:00<?, ?B/s]

Downloading pytorch_model.bin:   0%|          | 0.00/294M [00:00<?, ?B/s]

In [0]:
df_pandas = pd.DataFrame([
    {'model':1, 'id':1, 'text':"My name is Jimmy. Question: What is my name?"},
    {'model':1, 'id':2, 'text':"My name is Smith. Question: What is my name?"},
    {'model':1, 'id':3, 'text':"My name is Yanni. Question: What is my name?"},
    {'model':2, 'id':4, 'text':"My name is Dan. Question: What is my name?"},
    {'model':2, 'id':5, 'text':"My name is Jose. Question: What is my name?"},
    {'model':2, 'id':6, 'text':"My name is Wang. Question: What is my name?"},])

df = spark.createDataFrame(df_pandas)

In [0]:
def prompt_to_resoonse(
	prompt,
	max_length = 128,
	):
	inputs = tokenizer(
		prompt, 
		return_tensors="pt",
		)
	model_output = model.generate(
		**inputs,
		output_scores=True,
		max_length = max_length,
		)
	response = tokenizer.batch_decode(
		model_output, 
		skip_special_tokens=True)[0]
	return response
    
def prediction_model(
    df_panddas,
    ):
    df_panddas['response'] = df_panddas['text'].apply(prompt_to_resoonse)
    return df_panddas[['model', 'id', 'text', 'response']]

In [0]:
df.groupBy([
    'model'
]).applyInPandas(
    prediction_model,
    schema = f"""
    model int,
    id int, 
    text string,
    response string
    """
).show(100, False)

+-----+---+--------------------------------------------+----------+
|model|id |text                                        |response  |
+-----+---+--------------------------------------------+----------+
|1    |1  |My name is Jimmy. Question: What is my name?|Jimmy     |
|1    |3  |My name is Yanni. Question: What is my name?|Yanni     |
|1    |2  |My name is Smith. Question: What is my name?|john smith|
|2    |5  |My name is Jose. Question: What is my name? |Jose      |
|2    |4  |My name is Dan. Question: What is my name?  |Dan       |
|2    |6  |My name is Wang. Question: What is my name? |Wang      |
+-----+---+--------------------------------------------+----------+



# end