# Collocates

> Functionality for collocation analysis.

In [None]:
#| default_exp collocates

In [None]:
#| hide
from nbdev.showdoc import *

In [None]:
#| export
from __future__ import annotations
import numpy as np
import time
import polars as pl
from fastcore.basics import patch
import math

In [None]:
#| export
from conc.corpus import Corpus
from conc.result import Result
from conc.core import logger, PAGE_SIZE, set_logger_state

In [None]:
#| hide
import os

In [None]:
#| hide
source_path = f'{os.environ.get("HOME")}/data/'
save_path = f'{os.environ.get("HOME")}/data/conc-test-corpora/'

path_to_toy_corpus = f'{save_path}toy.corpus'
path_to_brown_corpus = f'{save_path}brown.corpus'
path_to_reuters_corpus = f'{save_path}reuters.corpus'
path_to_gutenberg_corpus = f'{save_path}gutenberg.corpus'
path_to_gardenparty_corpus = f'{save_path}garden-party.corpus'
path_to_congress_corpus = f'{save_path}us-congressional-speeches-subset-100k.corpus'

In [None]:
#| export
class Collocates:
	""" Class for collocation analysis reporting. """
	def __init__(self,
			  corpus:Corpus # Corpus instance
			  ): 
		self.corpus = corpus


In [None]:
#| exporti
@patch
def _shift_zeroes_to_end(self:Collocates,
						arr:np.ndarray # Numpy array of collocate frequencies to process
						):
	""" Move 0 value positions for punctuation and space removal """
	result = np.empty_like(arr)
	for col in range(arr.shape[1]):
		col_data = arr[:, col]
		mask = col_data != 0
		result[:mask.sum(), col] = col_data[mask]
		result[mask.sum():, col] = 0
	return result

In [None]:
#| exporti
@patch
def _zero_after_value(self:Collocates,
					  arr:np.ndarray, # Numpy array of collocate frequencies to process
					  target: int # Target value to find in the array (e.g., an end-of-file token or a specific collocate frequency)
					  ):
	""" Set values from first occurence of target value to 0 in each column (for processing tokens outside text using eof token) """
	arr = arr.copy()  
	for col in range(arr.shape[1]):
		col_data = arr[:, col]
		idx = np.where(col_data == target)[0]
		if idx.size > 0:
			first_idx = idx[0]
			arr[first_idx:, col] = 0
	return arr

For learning journal = notebook pick up collocation dominoes

In [None]:
#| exporti
@patch
def _get_collocates_in_context(self:Collocates,
							   token_positions:np.ndarray, # Numpy array of token positions in the corpus
							   index:str, # Index to use - lower_index, orth_index
							   context_words:int = 5, # Number of context words to consider on each side of the token
							   position_offset:int = 1 # offset to start retrieving context words - -1 for left, positive for right (may be adjusted by sequence_len)
							   ) -> Result:
	""" Get collocates in context for a given token index, operates one side at a time. """

	start_time = time.time()

	if position_offset < 0:
		position_offset_step = -1
	else:
		position_offset_step = 1
	
	collected = False
	context_tokens_arr = []
	while collected == False:
		new_positions = np.array(token_positions[0] + position_offset, dtype = token_positions[0].dtype)
		context_tokens_arr.append(self.corpus.get_tokens_by_index(index)[new_positions])
		position_offset += position_offset_step
		if len(context_tokens_arr) >= context_words: # cleaning spaces and punctuation and check if need more iterations
			context_tokens = np.array(context_tokens_arr, dtype = token_positions[0].dtype)
			context_tokens = np.where(np.isin(context_tokens, self.corpus.punct_tokens + self.corpus.space_tokens), 0, context_tokens)
			counts = np.count_nonzero(context_tokens, axis=0)
			if np.min(counts) < context_words:
				pass
			else:
				collected = True

	context_tokens = self._shift_zeroes_to_end(context_tokens)
	context_tokens = context_tokens[:context_words, :]

	if self.corpus.EOF_TOKEN in context_tokens:
		context_tokens = self._zero_after_value(context_tokens, self.corpus.EOF_TOKEN)

	logger.info(f"Collocates retrieved in {time.time() - start_time:.2f} seconds.")

	return context_tokens

In [None]:
#| export
@patch
def collocates(self:Collocates, 
				token_str:str, # Token to search for
				collocation_measure:str = 'logdice', # statistical measure to use for collocation calculation: logdice, mutual_information 
				context_words:int=5, # Window size
				min_frequency:int=5, # Minimum count of collocates
				page_size:int=PAGE_SIZE, # number of rows to return, if 0 returns all
				page_current:int=1, # current page, ignored if page_size is 0
				) -> Result:
	""" Report collocates for a given token string. """

	token_sequence, index_id = self.corpus.tokenize(token_str, simple_indexing=True)

	index = 'lower_index'

	start_time = time.time()
	sequence_len = len(token_sequence[0])

	token_positions = self.corpus.get_token_positions(token_sequence, index_id)

	# getting context tokens
	left_tokens = self._get_collocates_in_context(token_positions=token_positions, index=index, context_words=context_words, position_offset=-1)
	right_tokens = self._get_collocates_in_context(token_positions=token_positions, index=index, context_words=context_words, position_offset=sequence_len)
	combined_tokens = np.concatenate([left_tokens.flatten(), right_tokens.flatten()])
	combined_tokens = combined_tokens[combined_tokens != 0] # removes punctuation and space placeholder

	# getting frequencies of collocates
	unique_token_ids, counts = np.unique(combined_tokens, return_counts=True)
	#unique_token_ids = unique_token_ids.astype(np.int32)

	df = pl.DataFrame({
		'token_id': unique_token_ids,
		'collocation_frequency': counts
	})

	# adding frequency of collocates in corpus
	df = df.join(self.corpus.vocab.collect().select(['token_id', 'token', 'frequency_lower']), on='token_id', how='left', maintain_order='left')

	# applying min_frequency filter
	if min_frequency > 1:
		df = df.filter(pl.col('collocation_frequency') >= min_frequency)

	# calculating collocation measure
	# from old code: logdice = 14 + math.log2((2 * collocate_count) / (node_frequency + loaded_corpora[corpus_name]['frequency_lookup'][collocate]))
	df = df.with_columns(
		(pl.lit(14) + ((2 * pl.col('collocation_frequency')) / (pl.lit(token_positions[0].shape[0]) + pl.col('frequency_lower'))).log(2))
		.alias('logdice')
	)

	# from old code: mi = math.log2((loaded_corpora[corpus_name]['token_count'] * collocate_count) / (node_frequency * loaded_corpora[corpus_name]['frequency_lookup'][collocate]))
	df = df.with_columns(
		(pl.lit(self.corpus.word_token_count) * pl.col('collocation_frequency') / (pl.lit(token_positions[0].shape[0]) * pl.col('frequency_lower'))).log(2)
		.alias('mutual_information')
	)

	# TODO slice result for page

	# prepare report information e.g. for filters etc - see frequencies or keyness as reference

	logger.info(f"Collocates calculated in {time.time() - start_time:.2f} seconds.")

	return Result(type = 'collocates', df = df.sort(collocation_measure, descending = True).head(10), title='Collocates', description=f'Context words: {context_words}, Corpus: {self.corpus.name}', summary_data={}, formatted_data=[])
	

In [None]:
np.set_printoptions(suppress=True)
set_logger_state('verbose')
reuters = Corpus().load(path_to_reuters_corpus)
brown = Corpus().load(path_to_brown_corpus)
gardenparty = Corpus().load(path_to_gardenparty_corpus)
gutenberg = Corpus().load(path_to_gutenberg_corpus)
congress = Corpus().load(path_to_congress_corpus)

2025-06-06 11:56:42 - INFO - memory_usage - init, memory usage: 2042.06640625 MB
2025-06-06 11:56:42 - INFO - load - Load time: 0.201 seconds
2025-06-06 11:56:42 - INFO - memory_usage - init, memory usage: 2042.06640625 MB
2025-06-06 11:56:42 - INFO - load - Load time: 0.199 seconds
2025-06-06 11:56:42 - INFO - memory_usage - init, memory usage: 2042.06640625 MB
2025-06-06 11:56:43 - INFO - load - Load time: 0.199 seconds
2025-06-06 11:56:43 - INFO - memory_usage - init, memory usage: 2042.06640625 MB
2025-06-06 11:56:43 - INFO - load - Load time: 0.200 seconds
2025-06-06 11:56:43 - INFO - memory_usage - init, memory usage: 2042.06640625 MB
2025-06-06 11:56:43 - INFO - load - Load time: 0.198 seconds


In [None]:
collocates = Collocates(reuters)

In [None]:
for word in ["economy"]: # brown used 'i went in', 'any of us',  for testing "economy"
    %time collocates.collocates(word, collocation_measure='logdice', context_words=5).display()

2025-06-06 11:57:57 - INFO - tokenize - Tokenization time: 0.00011 seconds
2025-06-06 11:57:57 - INFO - get_token_positions - Token indexing (621) time: 0.00166 seconds
2025-06-06 11:57:57 - INFO - _get_collocates_in_context - Collocates retrieved in 0.00 seconds.
2025-06-06 11:57:57 - INFO - _get_collocates_in_context - Collocates retrieved in 0.00 seconds.
2025-06-06 11:57:57 - INFO - collocates - Collocates calculated in 0.02 seconds.


Collocates,Collocates,Collocates,Collocates,Collocates,Collocates
"Context words: 5, Corpus: Reuters Corpus","Context words: 5, Corpus: Reuters Corpus","Context words: 5, Corpus: Reuters Corpus","Context words: 5, Corpus: Reuters Corpus","Context words: 5, Corpus: Reuters Corpus","Context words: 5, Corpus: Reuters Corpus"
Token Id,Collocation Frequency,Token,Frequency Lower,Logdice,Mutual Information
71765,29,stimulate,85,10.39,9.59
39297,20,boost,222,9.6,7.66
36515,35,japanese,944,9.52,6.38
55412,27,domestic,700,9.39,6.44
21138,23,german,537,9.35,6.59
7135,35,world,1173,9.32,6.07
62950,12,grew,103,9.09,8.04
63903,10,sluggish,44,8.94,9.0
61316,18,economy,621,8.89,6.03
66052,13,measures,288,8.87,6.67


CPU times: user 47.2 ms, sys: 12.7 ms, total: 59.9 ms
Wall time: 27.4 ms


In [None]:
#| hide
## Collocation methods
# 
# # @patch
# def collocates(self: Corpus, word, nice_word, constrain_to = False, context_length = 5, limit = 50, cutoff=5, stat = 'ld', output = False):
#     elements = {}
#     nodes = []
#     edges = []

#     coll_token_sequence, coll_index_id = tokenize_string(corpus_name, word)
#     token_id = coll_token_sequence[0][0]
    
#     #print(token_id, nice_word)
    
#     if constrain_to == False:
#         nodes.append((token_id, {'label': nice_word, 'size': 1}))
    
#     #print(coll_token_sequence, coll_index_id)
#     coll_token_index = profile_get_token_index(corpus_name, coll_token_sequence, coll_index_id) # get_token_positions
#     #print(coll_token_index)
#     positional_columns, concordance = profile_get_concordance(corpus_name, coll_token_sequence, coll_token_index, context_words = context_length, index_id = LOWER)
#     #print(concordance)
#     collocates = []
#     for row in concordance:
#         if eof_token in row:
#             indexes = np.where(np.array(row) == eof_token)[0]
#             #print(indexes)
#             slice_min = -1
#             slice_max = context_length * 2 + 1
#             for i in indexes:
#                 if i < context_length and i > slice_min:
#                     slice_min = i
#                 elif i > context_length and i < slice_max:
#                     #print('***')
#                     slice_max = i
#             slice_min += 1
#             #slice_max -= 1
#             #print(slice_min,slice_max)
#             #print(row)
#             #print(row[slice_min:slice_max])
#             collocates.append(row[slice_min:slice_max])
#         else:
#             collocates.append(row)

#     node_frequency = len(collocates)
            
#     if len(collocates) < 1:
#         print('no collocates')
#     else:
#         #print(collocates)
#         collocates = np.concatenate(collocates)
#         collocates = np.unique(collocates, axis=0, return_counts=True)
#         #collocates = collocates[collocates[:,1].argsort()]
        
#         #print(collocates)
#         logdices = []

#         for row in range(len(collocates[0])):
#             collocate = collocates[0][row]
#             collocate_count = collocates[1][row]
#             if constrain_to == False:
#                 pass
#             else:
#                 if loaded_corpora[corpus_name]['vocab'][collocate] in constrain_to:
#                     pass
#                     #print(corpus['vocab'][collocate],' in constrain_to', constrain_to)
#                 else:
#                     continue
#             if collocate in coll_token_sequence:
#                 #print('match china')
#                 pass
#             elif collocate_count > 1:
#                 #14 + log2D=14+log2(2fxy/(fx+fy))

#                 if re.search(_RE_PUNCT,loaded_corpora[corpus_name]['vocab'][collocate]) is None: # FIX - HAVE OPION TO REMOVE PUNC
#                     if collocate_count >= cutoff:
#                         logdice = 14 + math.log2((2 * collocate_count) / (node_frequency + loaded_corpora[corpus_name]['frequency_lookup'][collocate]))
#                         mi = math.log2((loaded_corpora[corpus_name]['token_count'] * collocate_count) / (node_frequency * loaded_corpora[corpus_name]['frequency_lookup'][collocate]))
#                         logdices.append([logdice, mi, collocate, loaded_corpora[corpus_name]['vocab'][collocate], collocate_count, loaded_corpora[corpus_name]['frequency_lookup'][collocate]])
#                         #if logdice > 8:
#                         #print(collocate, node_frequency, loaded_corpora[corpus_name]['vocab'][collocate], collocates[1][row],logdice, mi)
#                         matches = np.where(collocates == collocate)[0]
#                         collocate_count = len(matches)
#         #        if (collocate_count > 5):
    
#     top_collocates = []
#     if stat == 'mi':
#         sorted_collocates = sorted(logdices, reverse=True, key=lambda x: x[1])[0:limit]
#         for row in sorted_collocates:
#             nodes.append((row[2], {'label': row[3], 'size': 1}))
#             edges.append((token_id, row[2], {'weight': 1}))
#             #print(row)
#             top_collocates.append(row[3])
#     else:
#         sorted_collocates = sorted(logdices, reverse=True, key=lambda x: x[0])[0:limit]
#         for row in sorted_collocates:
#             nodes.append((row[2], {'label': row[3], 'size': 1}))
#             edges.append((token_id, row[2], {'weight': row[0]}))
#             top_collocates.append(row[3])
    
#     if constrain_to == False:
#         for top_collocate in top_collocates:
#             top_collocate_elements, top_collocate_sorted, df = profile_prepare_collocates(corpus_name, top_collocate, top_collocate, constrain_to = top_collocates, context_length=context_length, limit = limit, stat=stat)
#             for edge in top_collocate_elements['edges']:
#                 edges.append(edge) 
    
    
#     #display(sorted(logdices, reverse=True, key=lambda x: x[0])[0:limit])
    
#     elements['nodes'] = nodes
#     elements['edges'] = edges
#     df = pd.DataFrame(sorted_collocates, columns = ['logdice', 'mi', 'collocate_token_id', 'collocate', 'collocate_count', 'collocate_token_frequency'])
#     # if output != False:
#     #     if output == 'file':
#     #         with open(output_dir + corpus_name + '/collocates-' + stat + '-' + nice_word + '.html', 'w', encoding='utf8') as f:
#     #             f.write(df.to_html(classes='table table-stripped'))
#     return elements, sorted_collocates, df


In [None]:
#| hide
import nbdev; nbdev.nbdev_export()