# Join External

Joining data is the bread and butter of data science work.

A single pipeline can consist of a number of merges. We might start with data from various sources that need to be combined. Along the way we might perform various calculations and then need to join the results.

More broadly, when it comes to joining data there are two things we care about:
* it being able to run with available resources
* speed of the operation

`NVTabular` is very useful across both of these dimensions.

It splits the data into "chunks" for processing -- there is no need for all of our data to fit on the GPU at once.

It also runs on the GPU... which makes it considerably faster! And with the amount of merges normally performed, these time savings do add up.

Joining is not necessarily as compute intensive operation, but nonetheless let us see speed up of what magnitude we can expect.

In [1]:
import cudf
import nvtabular as nvt
import numpy as np
import pandas as pd

In [2]:
import string

In [3]:
base_data = pd.DataFrame(data={'id': np.random.randint(10, size=100_000_000)})
to_merge = pd.DataFrame(data={'id': np.arange(10), 'rating': list(string.ascii_letters[:10])})

In [4]:
base_data.head()

Unnamed: 0,id
0,5
1,4
2,7
3,1
4,3


In [5]:
to_merge.head()

Unnamed: 0,id,rating
0,0,a
1,1,b
2,2,c
3,3,d
4,4,e


In [6]:
%%time

merged_pd = base_data.merge(to_merge)

CPU times: user 3.79 s, sys: 10.4 s, total: 14.2 s
Wall time: 14.2 s


In [7]:
merged_pd.head()

Unnamed: 0,id,rating
0,5,f
1,5,f
2,5,f
3,5,f
4,5,f


In [8]:
nvt_dataset = nvt.Dataset(base_data)

nvt_merged = ['id'] >> nvt.ops.JoinExternal(to_merge, 'id')

workflow = nvt.Workflow(nvt_merged)

In [9]:
%%time

gdf = workflow.fit_transform(nvt_dataset).to_ddf().compute()

CPU times: user 301 ms, sys: 79 ms, total: 380 ms
Wall time: 379 ms


In [10]:
gdf.head()

Unnamed: 0,id,rating
0,5,f
1,4,e
2,7,h
3,1,b
4,3,d


Even on this trivial example, where there is not much computation involved, an `nvtabular` join is `35x` faster!

What does this mean for pipeline development?

You can experiment much more rapidly.

It makes sense to run on a sample of your data as you are working on piecing your pipeline together, but thanks to `nvtabular` you can run many larger experiments more quickly!

The speedups carry throughout the entire lifecycle of working on a solution (including full scale training) and are likely to result in vastly improved outcomes.

Without further ado, let's put our newly met nvtabular operator to practice! We will leverage it in a workflow where at the end we submit to a Kaggle competition!

# Submitting to Kaggle Outbrain competition using Join External

First of all, let's download and extract the `outbrain` Kaggle competition data to the `data` directory.

You should have the following files:

In [11]:
!cd data && ls *.csv

clicks_test.csv		  documents_meta.csv	 promoted_content.csv
clicks_train.csv	  events.csv		 sample_submission.csv
documents_categories.csv  page_views.csv	 submission.csv
documents_entities.csv	  page_views_sample.csv


For this example, we will only use the `clicks_train.csv` and `clicks_test.csv` files. They are much smaller than the largest file in the repository, so you might want to consider downloading just these two files.

As a side note, seems bulk downloading of competition files from Kaggle for the Outbrain competition [doesn't work at the moment](https://twitter.com/radekosmulski/status/1527929784222851072?s=20&t=AUTQcptQfryJ2SXQ-XsPgQ) so you might need to go via individual files anyhow.

We will mirror the work done in this [Kaggle kernel](https://www.kaggle.com/code/xingobar/pandas-lb-0-63709). It is always a good idea to start with a baseline, which in the case of recommender systems is often some form of popularity ranking. This is what we will attempt doing here.

We start by reading in the data.

In [12]:
%%time

train = cudf.read_csv('data/clicks_train.csv', usecols=['ad_id','clicked'])
test = cudf.read_csv('data/clicks_test.csv')

CPU times: user 503 ms, sys: 121 ms, total: 624 ms
Wall time: 624 ms


We will not personalize the recommendations in any way. We will not consider what user is likely to click what add.

Instead, we will just look at the ads themselves. Which ads are relatively clicked most often?

For each `ad_id`:
* how many times was it served? (`count`)
* how many times was it clicked? (`sum`)
* and what is the ratio of clicks to number of times the ad was served? (`mean`)

In [13]:
ad_likelihood = train.groupby('ad_id')['clicked'].agg(['count','sum','mean']).reset_index()

In [14]:
ad_likelihood.head()

Unnamed: 0,ad_id,count,sum,mean
0,247182,1,0,0.0
1,38499,1,0,0.0
2,338291,10,3,0.3
3,199061,6,0,0.0
4,38049,14,0,0.0


Globally, how often do ads get clicked on?

In [15]:
mean_clicked = train.clicked.mean()

In [16]:
mean_clicked

0.19364537296143453

And we calculate the `likelihood` of an ad being clicked (per each `ad_id`).

In [17]:
ad_likelihood['likelihood'] = (ad_likelihood['sum'] + 10 * mean_clicked) / (ad_likelihood['count'] + 10)

I am not sure why the `+ 10 * mean_clicked` in the numerator and `+ 10` in the denominator.

It is probably an attempt at doing something reasonable for `ad_is` with very small count of being served.

Now that we have calculated the `ad_likelihood`, we need to merge it on top of our data.

In [18]:
joined = ['display_id', 'ad_id'] >> nvt.ops.JoinExternal(ad_likelihood, 'ad_id')
workflow = nvt.Workflow(joined)

In [19]:
%%time

test_nvt_dataset = nvt.Dataset(test)
joined_gdf = workflow.fit_transform(test_nvt_dataset).to_ddf().compute()

CPU times: user 120 ms, sys: 88.6 ms, total: 209 ms
Wall time: 209 ms


That was fast!

We do have some `nan` values for the mean, so let's fill those in (those are `ad_ids` that didn't appear in the train set). 

In [20]:
joined_gdf['mean'].isna().sum()

1108464

In [21]:
joined_gdf['mean'].fillna(mean_clicked, inplace=True)

And now for the grande finale!

Let's output our predictions in a format that the competition will accept.

The logic is as follows:
* for each set of ads displayed together (ads grouped by `display_id`)
* let's rank them by their global popularity ranking and use that as our prediction

In [22]:
joined_gdf = joined_gdf.sort_values(['display_id','likelihood'], ascending=False)

In [23]:
%%time

submission = joined_gdf.to_pandas().groupby('display_id')['ad_id'].apply(lambda x:' '.join(map(str,x))).reset_index()

CPU times: user 55.6 s, sys: 8.28 s, total: 1min 3s
Wall time: 1min 3s


In [24]:
submission.head()

Unnamed: 0,display_id,ad_id
0,16874594,170392 172888 162754 150083 66758 180797
1,16874595,8846 143982 30609
2,16874596,289915 11430 289122 132820 57197 153260 173005...
3,16874597,305790 285834 143981 182039 155945 180965 3088...
4,16874598,145937 335632 67292 250082


In [25]:
submission.to_csv('data/submission.csv', index=False)

Now let's make a submission.

Let's donwload the file and submit it via the [web GUI](https://www.kaggle.com/competitions/outbrain-click-prediction/leaderboard) (this competition has ended but you can still submit to the leaderboard to see how you are doing!)

In [26]:
from IPython.lib.display import FileLink

In [27]:
FileLink('data/submission.csv')

And the submission scores `0.63573`!

How good of a result is that? The winning submission scored `0.70144`, our result is around 10% worse!

Using global populraity goes a long way in solving recommender system problems. It often leads to good results that are hard to improve upon.

But while not easy, it makes a lot of sense to improve upon the results. In a business context, just tiny improvements on the performance can translate to significant improvements to the bottom line!