In [1]:
import collections
import csv
import tempfile
import os
import random
from multiprocessing import cpu_count, set_start_method #, Pool
from pathos.multiprocessing import ProcessingPool as Pool
import numpy as np
from sklearn.preprocessing import MinMaxScaler
import concurrent.futures
from concurrent.futures import ThreadPoolExecutor
from tqdm import tqdm
import pandas as pd
import chromadb
from chromadb.config import Settings

## Function generator

In [2]:
class VirtualAggregator:
    def __init__(self):
        self.collection = None
        self.N = None
        self.k = None
        self.pn = None
        self.chroma_collection = None
        self.init_connection()

    def set_parameters(self, N, k, pn):
        self.N = N
        self.k = k
        self.pn = pn

    def init_connection(self):
        collection_status = False
        max_retries = 5
        retries = 0

        while not collection_status and retries < max_retries:
            try:
                chroma_client = chromadb.HttpClient(host="localhost", port=8000, settings=Settings(allow_reset=True, anonymized_telemetry=False))
                self.chroma_collection = chroma_client.get_or_create_collection(name="articles_with_score")
                collection_status = True
            except Exception as e:
                print(e)
                retries += 1

        if not collection_status:
            raise Exception("Failed to connect to the collection after 5 attempts")

    def get_similar_articles(self, query, k):
        collection_status = False
        max_retries = 5
        retries = 0

        while not collection_status and retries < max_retries:
            try:
                return self.chroma_collection.query(query_texts=[query], n_results=2*k)
            except Exception as e:
                print(e)
                retries += 1

        if not collection_status:
            raise Exception("Failed to connect to the collection after 5 attempts")

    def distribution_function(self, page_count):
        pages_distribution = np.exp(-np.arange(1, page_count + 1))
        pages_distribution /= pages_distribution.sum()
        return pages_distribution

    def distribution_generator(self, collection_dict):
        scaler = MinMaxScaler()
        values_to_scale = np.array([
            collection_dict['year'],
            collection_dict['n_citation'],
            collection_dict['gov_score']
        ]).T

        scaled_values = scaler.fit_transform(values_to_scale)
        collection_dict['year_normalized'] = scaled_values[:, 0].tolist()
        collection_dict['citations_normalized'] = scaled_values[:, 1].tolist()
        collection_dict['points_normalized'] = scaled_values[:, 2].tolist()

        collection_dict['score'] = [
            self.pn[0] * collection_dict['similarity'][i] +
            self.pn[1] * collection_dict['year_normalized'][i] +
            self.pn[2] * collection_dict['citations_normalized'][i] +
            self.pn[3] * collection_dict['points_normalized'][i]
            for i in range(len(collection_dict['id']))
        ]

        sorted_collection = sorted(
            [
                {
                    'id': collection_dict['id'][i],
                    'title': collection_dict['title'][i],
                    'similarity': collection_dict['similarity'][i],
                    'year': collection_dict['year'][i],
                    'n_citation': collection_dict['n_citation'][i],
                    'gov_score': collection_dict['gov_score'][i],
                    'year_normalized': collection_dict['year_normalized'][i],
                    'citations_normalized': collection_dict['citations_normalized'][i],
                    'points_normalized': collection_dict['points_normalized'][i],
                    'score': collection_dict['score'][i]
                }
                for i in range(len(collection_dict['id']))
            ],
            key=lambda x: x['score'],
            reverse=True
        )

        ranked_indices = [entry['id'] for entry in sorted_collection]
        pages = [ranked_indices[i:i + self.N] for i in range(0, len(ranked_indices), self.N)]
        pages_distribution = self.distribution_function(len(pages))

        np.random.seed(42)
        selected_papers = []
        for _ in range(self.k):
            selected_page_index = np.random.choice(len(pages), p=pages_distribution)
            selected_page = pages[selected_page_index]
            selected_paper_index = np.random.choice(selected_page)
            selected_papers.append(selected_paper_index)

            pages[selected_page_index] = [x for x in selected_page if x != selected_paper_index]

        selected_paper_counts = collections.Counter(selected_papers)
        return selected_paper_counts

#def process_query(query, settings, file_path):
def process_query(args):
    query, settings, file_path = args
    virtual_aggregator = VirtualAggregator()
    max_k = max(settings, key=lambda x: x['k'])['k']

    result_dict = {
        'title': [],
        'settings': [],
        'distribution': [],
    }
    print('process_query')
    similar_articles = virtual_aggregator.get_similar_articles(query, max_k)

    for sample in settings:
        virtual_aggregator.set_parameters(sample['N'], sample['k'], sample['pn'])
        distribution = step(query, similar_articles, virtual_aggregator)

        # Save result
        result_dict['title'].append(query)
        result_dict['settings'].append(sample)
        result_dict['distribution'].append(dict(distribution))

    save_results(result_dict, file_path)

def step(query, similar_articles, virtual_aggregator):
    collection_dict = {
        'id': similar_articles['ids'][0],
        'title': similar_articles['documents'][0],
        'similarity': similar_articles['distances'][0],
        'year': [metadata['year'] for metadata in similar_articles['metadatas'][0]],
        'n_citation': [metadata['n_citation'] for metadata in similar_articles['metadatas'][0]],
        'gov_score': [metadata['gov_score'] for metadata in similar_articles['metadatas'][0]]
    }

    return virtual_aggregator.distribution_generator(collection_dict)

def save_results(result_dict, file_path):
    file_exists = os.path.isfile(file_path)
    keys = result_dict.keys()
    with open(file_path, 'a', newline='') as output_file:
        dict_writer = csv.DictWriter(output_file, fieldnames=keys)
        if not file_exists:
            dict_writer.writeheader()
        dict_writer.writerows([dict(zip(keys, row)) for row in zip(*result_dict.values())])


In [3]:
def run_process_query(args):
    print('run_process_query')
    query, settings, file_path = args
    process_query(query, settings, file_path)


In [None]:
# prev `../data/queries_df.csv` was generated in 3. but was replased with .pkl
class Experiment:
    def __init__(self, settings):
        self.virtual_aggregator = VirtualAggregator()
        self.settings = settings
        self.queries = None
         
    def generate_arguments_with_progress(self, queries, settings, file_path):
        def generate_for_query(query, settings, file_path):
            return (query, settings, file_path)

        results = []
        with ThreadPoolExecutor() as executor:
            futures = [executor.submit(generate_for_query, query, settings, file_path) for query in tqdm(queries, total=len(queries), desc="Generating futures")]
            display(len(futures))
            for future in tqdm(concurrent.futures.as_completed(futures), total=len(futures), desc="Generating arguments"):
                results.append(future.result())
    
        return results

    def run_experiment(self):
        self.load_queries()
        print(f"Loaded: {len(self.queries)} queries")
    
        file_path = '../data/results.csv'
        pool_args = self.generate_arguments_with_progress(self.queries, self.settings, file_path)

        with Pool(int(cpu_count()/2)) as pool:
            for _ in tqdm(pool.uimap(process_query, pool_args), total=len(pool_args), desc="Queries", unit="query"):
                pass

    def load_queries(self):
        df_query = pd.read_csv('../data/queries_df.csv')
        self.queries = df_query['query'].tolist()


# Test

## 1. Health test

In [5]:
# Parametry wirtualnego agregatora
if __name__ == '__main__':
    settings = [
        {
            'N': 20,
            'k': 10,
            'pn': [0.5, 0.3, 0.1, 0.1],
        },
        {
            'N': 20,
            'k': 15,
            'pn': [0.5, 0.2, 0.2, 0.1],
        },
    ]

    #set_start_method('spawn')
    experiment = Experiment(settings)
    experiment.run_experiment()

Loaded: 850000 queries


Generating futures: 100%|███████████████████████████████████████████████████| 850000/850000 [00:32<00:00, 25919.99it/s]


850000

Generating arguments: 100%|████████████████████████████████████████████████| 850000/850000 [00:05<00:00, 148814.53it/s]
Queries:   0%|                                                                           | 0/850000 [00:00<?, ?query/s]


NameError: name 'VirtualAggregator' is not defined

## 2. Fill experiment

In [7]:
def generate_examples_with_fixed_pn(num_examples):
    examples = []
    for _ in range(num_examples):
        N = np.random.randint(10, 30)
        k = np.random.randint(5, N)
        pn = np.random.dirichlet(np.ones(4), size=1)[0]
        pn = np.round(pn, 2).tolist()
        examples.append({
            'N': N,
            'k': k,
            'pn': pn,
        })
    return examples

settings = generate_examples_with_fixed_pn(500)
#display(settings)

In [None]:
experiment = Experiment(settings)
experiment.run_experiment()

Loaded: 850000 queries


Generating futures: 100%|███████████████████████████████████████████████████| 850000/850000 [00:34<00:00, 24485.54it/s]
Generating arguments: 100%|████████████████████████████████████████████████| 850000/850000 [00:04<00:00, 170876.25it/s]


20

Queries:   0%|                                                                           | 0/850000 [00:00<?, ?query/s]

# Read result

In [20]:
df_results = pd.read_csv('../data/results.csv')
display(df_results.head()) 

Unnamed: 0,title,settings,distribution
0,Is Proxy Record Customizable Manager?,"{'N': 11, 'k': 8, 'pn': [0.32, 0.0, 0.56, 0.11]}","{'645696': 2, '132239': 1, '700503': 2, '47508..."
1,Is Proxy Record Customizable Manager?,"{'N': 21, 'k': 13, 'pn': [0.19, 0.53, 0.08, 0....","{'158554': 2, '39779': 1, '565416': 1, '461731..."
2,Is Proxy Record Customizable Manager?,"{'N': 25, 'k': 19, 'pn': [0.2, 0.5, 0.07, 0.23]}","{'46166': 2, '733536': 1, '461731': 2, '158554..."
3,Is Proxy Record Customizable Manager?,"{'N': 12, 'k': 9, 'pn': [0.22, 0.04, 0.02, 0.71]}","{'658043': 2, '132239': 1, '551126': 2, '47508..."
4,Is Proxy Record Customizable Manager?,"{'N': 23, 'k': 22, 'pn': [0.47, 0.02, 0.25, 0....","{'603747': 2, '780573': 1, '210987': 1, '15855..."
