In [2]:
from pyspark.sql import SparkSession
spark_path = "/home/shsa3327"
spark = SparkSession.builder\
    .config('spark.driver.memory', '40g')\
    .config('spark.executor.memory', '20g')\
    .config('spark.executor.cores', '30')\
    .config('spark.local.dir', f'{spark_path}/tmp') \
    .config('spark.driver.maxResultSize', '40g')\
    .config("spark.driver.bindAddress", "0.0.0.0")\
    .config("spark.sql.parquet.columnarReaderBatchSize", "1024") \
    .config("spark.sql.parquet.enableVectorizedReader", "true") \
    .config('spark.driver.extraJavaOptions', f'-Djava.io.tmpdir={spark_path}/tmp') \
    .config('spark.executor.extraJavaOptions', f'-Djava.io.tmpdir={spark_path}/tmp') \
    .config('hive.exec.scratchdir', f'{spark_path}/tmp/hive') \
    .enableHiveSupport() \
    .getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/04/29 20:49:56 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
24/04/29 20:49:56 WARN SparkConf: Note that spark.local.dir will be overridden by the value set by the cluster manager (via SPARK_LOCAL_DIRS in mesos/standalone/kubernetes and LOCAL_DIRS in YARN).


In [3]:
embeddings_file = 'data.parquet'
embeddings_file_read = spark.read.parquet(embeddings_file)
embeddings_file_read.count()

572227

In [29]:
embeddings_file_read.describe().show()

24/04/29 20:56:51 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
                                                                                

+-------+--------------------+-------------------+--------------------+-------------------+-------------------+-------------------+--------------------+------------------+-----------------+-----------------+------------------+------------------+------+------------------+
|summary|                   0|                  1|                   2|                  3|                  4|                  5|                   6|                 7|                8|                9|                10|                11|    12|                13|
+-------+--------------------+-------------------+--------------------+-------------------+-------------------+-------------------+--------------------+------------------+-----------------+-----------------+------------------+------------------+------+------------------+
|  count|              442512|             572227|              572227|             572227|             572227|             572227|              572227|            572227|           57

In [31]:
embeddings_file_read.columns

['0', '1', '2', '3', '4', '5', '6', '7', '8', '9', '10', '11', '12', '13']

In [34]:
from pyspark.sql.functions import col, count, when, isnull
for _col in embeddings_file_read.columns:
    null_count = embeddings_file_read.filter(col(_col).isNull()).count()
    print(f"Null count in column {_col} is {null_count}")

Null count in column 0 is 129715
Null count in column 1 is 0
Null count in column 2 is 0
Null count in column 3 is 0
Null count in column 4 is 0
Null count in column 5 is 0
Null count in column 6 is 0
Null count in column 7 is 0
Null count in column 8 is 0
Null count in column 9 is 0
Null count in column 10 is 0
Null count in column 11 is 0
Null count in column 12 is 0
Null count in column 13 is 0


In [35]:
# Filter out null values : @todo : Fix this issue
embeddings_file_read_filtered = embeddings_file_read.filter(col("0").isNotNull())

In [36]:
embeddings_file_read_filtered.count()

442512

In [37]:
from collections import defaultdict

rows = embeddings_file_read_filtered.collect()
embedded_groups = defaultdict(list)

for row in rows:
    embedded_groups[row[0]].append(list(row[1:]))

In [38]:
# Divide into test and train groups
test_size = 0.2
test_groups_size = int(len(embedded_groups)*test_size)
train_groups_size = len(embedded_groups)-test_groups_size
test_groups = list(embedded_groups.keys())[:test_groups_size]
train_groups = list(embedded_groups.keys())[test_groups_size:]

In [39]:
from collections import Counter
test_data =[]
test_queries = []
for test_group in test_groups:
    for group in embedded_groups[test_group]:
        test_data.append(group)
        test_queries.append(test_group)
X_test = [data[:-1] for data in test_data]
y_test = [data[-1] for data in test_data]
Counter(y_test).items()

dict_items([(1, 27546), (3, 9332), (2, 17936), (0, 37320)])

In [40]:
train_data =[]
train_queries = []
for train_group in train_groups:
    for group in embedded_groups[train_group]:
        train_data.append(group)
        train_queries.append(train_group)
X_train = [data[:-1] for data in train_data]
y_train = [data[-1] for data in train_data]
Counter(y_train).items()

dict_items([(0, 141632), (3, 35414), (2, 67941), (1, 105391)])

In [8]:
y_train[130:140]

[1, 1, 1, 1, 1, 1, 1, 1, 1, 1]

In [41]:
import numpy as np
max_relevance = max(np.max(y_train),np.max(y_test))
print(max_relevance)
y_train /= max_relevance
y_test /= max_relevance

3


In [10]:
X_train[:5]

[[0.10751860588788986,
  0.08733455091714859,
  0.09396737068891525,
  0.058277860283851624,
  0.06814789772033691,
  0.09721718728542328,
  12,
  89,
  97,
  676,
  2658,
  0],
 [0.00875704362988472,
  0.00875704362988472,
  -0.05622100085020065,
  -0.05622100085020065,
  -0.011902314610779285,
  -0.011902314610779285,
  2021,
  89,
  0,
  676,
  0,
  0],
 [0.00875704362988472,
  0.00875704362988472,
  -0.05622100085020065,
  -0.05622100085020065,
  -0.06730138510465622,
  -0.06730138510465622,
  2021,
  89,
  0,
  676,
  0,
  0],
 [0.11539703607559204,
  0.07943225651979446,
  0.14079385995864868,
  0.1621164083480835,
  -0.07954061031341553,
  0.014380712062120438,
  3,
  89,
  144,
  676,
  1643,
  0],
 [0.0347573384642601,
  -0.05417788028717041,
  0.03712962195277214,
  0.016498995944857597,
  0.02730901539325714,
  0.002647528424859047,
  0,
  89,
  141,
  676,
  1066,
  0]]

In [24]:
from catboost import CatBoostRanker, Pool, MetricVisualizer
train = Pool(
    data=X_train[:165],
    label=y_train[:165],
    group_id=train_queries[:165]
)

In [23]:
test = Pool(
    data=X_test[:15],
    label=y_test[:15],
    group_id=test_queries[:15]
)

In [42]:
from catboost import CatBoostRanker, Pool, MetricVisualizer
train = Pool(
    data=X_train,
    label=y_train,
    group_id=train_queries
)

test = Pool(
    data=X_test,
    label=y_test,
    group_id=test_queries
)

In [43]:
default_parameters = {
    'iterations': 2000,
    'custom_metric': ['NDCG', 'PFound', 'AverageGain:top=10'],
    'verbose': False,
    'random_seed': 0,
}

parameters = {}

In [44]:
from copy import deepcopy
def fit_model(loss_function, additional_params=None, train_pool=train, test_pool=test):
    parameters = deepcopy(default_parameters)
    parameters['loss_function'] = loss_function
    parameters['train_dir'] = loss_function

    if additional_params is not None:
        parameters.update(additional_params)

    model = CatBoostRanker(**parameters)
    model.fit(train_pool, eval_set=test_pool, plot=True)

    return model

In [45]:
fit_model('YetiRank', {'train_dir': 'YetiRank-lr-0.3', 'learning_rate': 0.3})

MetricVisualizer(layout=Layout(align_self='stretch', height='500px'))

Training has stopped (degenerate solution on iteration 1109, probably too small l2-regularization, try to increase it)


<catboost.core.CatBoostRanker at 0x7f1a3f4ee050>

In [47]:
fit_model('YetiRankPairwise')

MetricVisualizer(layout=Layout(align_self='stretch', height='500px'))

<catboost.core.CatBoostRanker at 0x7f1a3f4edc90>