# CSP Application on OGBN-Papers100M Dataset  

## Introduction  
This notebook demonstrates the application of the CSP algorithm on the OGBN-Papers100M dataset. We start by understanding the structure of the data using a toy dataset with 5 nodes and 4 hyperedges. Precomputed small tables are utilized for the demonstration.  

### Precomputed Tables  
1. **`df_graph`**: Represents the hypergraph structure as a Polars DataFrame.  
2. **`label_df`**: Includes details such as split (train/test) and label.  
3. **`node_degree` and `edge_degree`**: Calculated from the hypergraph, these are used to compute mean scores, as explained later.

In [None]:
import polars as pl

df_graph = pl.DataFrame({ 
    'nodeId': [0, 0, 1, 1, 2, 2, 3, 3, 4],
    'edgeId': [0, 2, 0, 1, 1, 3, 2, 3, 3]
}, schema={'nodeId': pl.UInt32, 'edgeId': pl.UInt32})  

label_df = pl.DataFrame({
    'nodeId': [0, 1, 2, 3, 4],
    'label': [0, 1, 0, 0, 1],
    'split': ['train', 'train', 'test', 'test', 'test']
}, schema={'nodeId': pl.UInt32, 'label': pl.Float32, 'split': pl.String})

edge_degree = df_graph.group_by('edgeId').agg(pl.len().alias('edge_degree'))
node_degree = df_graph.group_by('nodeId').agg(pl.len().alias('node_degree'))

In [2]:
import polars as pl

First, let us get familiar with structrure of data we are using. Consider the toy dataset of 5 nodes and 4 hyperedges for demonstration. We have precomputed the following tables:
 * df_graph is represetntation of hypergraph in dataframe
 * information about nodes such as split and label
 * node_degree and edge_degree are calculated from the hypergraph. They are used for calculation of mean of scores as it will be shown later

In [4]:

df_graph = pl.DataFrame({ 
                  'nodeId': [0, 0, 1, 1, 2, 2, 3, 3, 4],
                  'edgeId': [0, 2, 0, 1, 1, 3, 2, 3, 3]
}, schema={'nodeId': pl.UInt32, 'edgeId': pl.UInt32})  
label_df = pl.DataFrame({
                        'nodeId': [0, 1, 2, 3, 4],
                        'label': [0, 1, 0, 0, 1],
                        'split':['train', 'train', 'test', 'test', 'test']
}, schema={'nodeId': pl.UInt32, 'label': pl.Float32, 'split':pl.String})
edge_degree = df_graph.group_by('edgeId').agg(pl.len().alias('edge_degree'))
node_degree = df_graph.group_by('nodeId').agg(pl.len().alias('node_degree'))

The CSP implementation consists in the following steps that propagates the labels from training set through the hypergraph and accuracy on all data spilts is calculated.

In [8]:
train_label = label_df.filter(pl.col('split') == 'train').select('nodeId', pl.lit(1).cast(pl.Float32).alias('score'), pl.col('label').alias('target'))
stage1_cnt = train_label.join(df_graph, on = 'nodeId').group_by('edgeId', 'target').agg(pl.sum('score'))
stage1_output = stage1_cnt.join(edge_degree, on='edgeId').select('edgeId', 'target', (pl.col('score') / pl.col('edge_degree')).alias('edge_score'))
stage2_cnt = stage1_output.join(df_graph, on='edgeId').group_by('nodeId', 'target').agg(pl.sum('edge_score'))
stage2_output = node_degree.join(stage2_cnt, on='nodeId').select('nodeId', 'target', (pl.col('edge_score') / pl.col('node_degree')).alias('node_score'))
predictions = stage2_output.select('nodeId', 'node_score', pl.col('node_score').rank(descending=True, method='random').over('nodeId').alias('rank'), 'target')
result = label_df.join(predictions.filter(pl.col('rank') == 1), on='nodeId', how='left')
accuracy = result.select('split', (pl.col('target') == pl.col('label')).cast(pl.Float32).alias('hit')).fill_null(0) \
                 .group_by('split').agg(pl.mean('hit').alias("accuracy"))
display(accuracy)

split,accuracy
str,f32
"""train""",1.0
"""test""",0.333333


Let us break it down to individual steps

In the first step, we isolate the training nodes and assign them a score of 1 to ensure avoiding leak of validation/test labels.
The score is the value that will be propagated through the hypergraph in the CSP algorithm. In fact, there will be several
CSP run in parallel for each target label.


In [10]:
train_label = label_df.filter(pl.col("split") == "train").select(
    "nodeId", pl.lit(1).cast(pl.Float32).alias("score"), pl.col("label").alias("target")
)
display(train_label)

nodeId,score,target
u32,f32,f32
0,1.0,0.0
1,1.0,1.0


In contrast with the SQL query in Appendix of the submission, we aggregate the sum of all positive scores. The advantage is that we avoid propagation of zero labels having no information that saves a lot of compuational resources. 

In [11]:
stage1_cnt = train_label.join(df_graph, on = 'nodeId').group_by('edgeId', 'target').agg(pl.sum('score'))
display(stage1_cnt)

edgeId,target,score
u32,f32,f32
1,1.0,1.0
2,0.0,1.0
0,1.0,1.0
0,0.0,1.0


As stated before, the average is calculated by fraction of the summed score to the node degree. Therefore no information is needed from other nodes here.

In [14]:
stage1_output = stage1_cnt.join(edge_degree, on='edgeId').select('edgeId', 'target', (pl.col('score') / pl.col('edge_degree')).alias('edge_score'))
display(stage1_output)

edgeId,target,edge_score
u32,f32,f64
2,0.0,0.5
0,1.0,0.5
0,0.0,0.5
1,1.0,0.5


Similar procedure is applied in nodes, where we again sum the score from the neighboring nodes according to the CSP definition.

In [16]:
stage2_cnt = stage1_output.join(df_graph, on='edgeId').group_by('nodeId', 'target').agg(pl.sum('edge_score'))
display(stage2_cnt)

nodeId,target,edge_score
u32,f32,f64
0,0.0,1.0
3,0.0,0.5
2,1.0,0.5
1,0.0,0.5
1,1.0,1.0
0,1.0,0.5


Similarly as in edges, the node degree is used to calculate the ratio

In [17]:
stage2_output = node_degree.join(stage2_cnt, on='nodeId').select('nodeId', 'target', (pl.col('edge_score') / pl.col('node_degree')).alias('node_score'))
display(stage2_output)

nodeId,target,node_score
u32,f32,f64
0,0.0,0.5
3,0.0,0.25
2,1.0,0.25
1,0.0,0.25
1,1.0,0.5
0,1.0,0.25


For each node, labels are ranked by score. Nodes without propagated signals remain unranked.

In [18]:
predictions = stage2_output.select('nodeId', 'node_score', pl.col('node_score').rank(descending=True, method='random').over('nodeId').alias('rank'), 'target')
display(predictions)

nodeId,node_score,rank,target
u32,f64,u32,f32
0,0.5,1,0.0
3,0.25,1,0.0
2,0.25,1,1.0
1,0.25,2,0.0
1,0.5,1,1.0
0,0.25,2,1.0


We now join the table with predictions with the labels. The predicted class is the class with rank 1 of the predicted score. Note that ties are broken randomly.

In [20]:
result = label_df.join(predictions.filter(pl.col('rank') == 1), on='nodeId', how='left')
display(result)

nodeId,label,split,node_score,rank,target
u32,f32,str,f64,u32,f32
0,0.0,"""train""",0.5,1.0,0.0
1,1.0,"""train""",0.5,1.0,1.0
2,0.0,"""test""",0.25,1.0,1.0
3,0.0,"""test""",0.25,1.0,0.0
4,1.0,"""test""",,,


Accuracy is computed as the ratio of correct predictions (`hit`) to the total size of each split.  

In [None]:
accuracy = result.select('split', (pl.col('target') == pl.col('label')).cast(pl.Float32).alias('hit')).fill_null(0) \
                 .group_by('split').agg(pl.mean('hit').alias("accuracy"))
display(accuracy)

## Notes  

- The method described calculates CSP for all 171 labels simultaneously, which may not fit into standard laptop memory. To overcome this, CSP computations can be batched.  
- Alternatively, the dataset can be directly loaded and processed without batching if memory allows.  


In [None]:


df_graph = pl.scan_parquet(os.path.join(DATASET_PATH, "papers100M-bin/processed/graph.parquet"))
label_df = pl.read_parquet(os.path.join(DATASET_PATH, "papers100M-bin/processed/labels.parquet"))
edge_degree = pl.read_parquet(os.path.join(DATASET_PATH, "papers100M-bin/processed/edge_hist.parquet"))
node_degree = pl.read_parquet(os.path.join(DATASET_PATH, "papers100M-bin/processed/node_hist.parquet"))



## Scaling with Spark  

For large-scale computations, Spark offers a more efficient solution by enabling distributed processing. The SQL implementation of CSP allows scaling across a computational cluster.  With df_graph and label_df tables as before, the SQL query corresponding to the script above can look like

``` sql
with edge_degree as (
  select edgeId, count(*) as edge_degree from df_graph
  group by edgeId
),
node_degree as (
  select nodeId, count(*) as node_degree from df_graph
  group by nodeId
),
train_label as (
  select distinct nodeId, label from label_df
  where split = 'train' 
),
stage1_cnt as (
select graph.edgeId, train_label.label as target, count(*) as cnt 
from train_label inner join df_graph as graph on graph.nodeId = train_label.nodeId
group by graph.edgeId, train_label.label
),
stage1_full as (
  select stage1_cnt.edgeId, stage1_cnt.target, stage1_cnt.cnt / edge_degree.edge_degree as edge_score from stage1_cnt inner join edge_degree on stage1_cnt.edgeId = edge_degree.edgeId
),
node_cnt as (
  select sum(stage1_full.edge_score) as node_score_sum, stage1_full.target, graph.nodeId from stage1_full inner join df_graph as graph on stage1_full.edgeId = graph.edgeId
  group by graph.nodeId, stage1_full.target
),
csp_output as (
  select node_cnt.nodeId,node_cnt.target, node_cnt.node_score_sum / node_degree.node_degree as node_score from node_cnt inner join node_degree on node_cnt.nodeId = node_degree.nodeId
),
sorted_csp_output as (
select nodeId, target, node_score, row_number() OVER (PARTITION BY nodeId ORDER BY node_score desc) as rank from csp_output
)
select all_labels.split, mean(if(isnull(sorted_csp_output.target), 0, float(all_labels.label == sorted_csp_output.target))) as accuracy, count(distinct all_labels.nodeId) as num_nodes from label_restriction as all_labels left join sorted_csp_output on sorted_csp_output.nodeId = all_labels.nodeId
where IsNull(sorted_csp_output.rank) or sorted_csp_output.rank = 1
group by all_labels.split
```