In [1]:
import sys
import os
os.chdir("..")
os.getcwd()

'/Users/laurasisson/dream'

In [2]:
os.listdir()

['dreamdata.py',
 'aggregate.py',
 'cross.py',
 '.DS_Store',
 'embedding.py',
 'crossoverlap.py',
 'dictionary.pt',
 'olfactor.py',
 'mpnn.py',
 'encoder.py',
 'README.md',
 'tokenizer.py',
 'activation.py',
 '.gitignore',
 'utils.py',
 'Model',
 'Colab',
 '.git',
 'Data',
 'Notebooks',
 'pairdata.py',
 'data.py']

In [3]:
import json
with open("Data/full_large.json") as f:
    pair_dataset = json.load(f)

pair_dataset = [d for d in pair_dataset if d["mol1"] and d["mol2"]]

len(pair_dataset),pair_dataset[0]

(266758,
 {'mol1': 'CC1=CC2C(C2(C)C)CC1C(=O)C',
  'mol2': 'CCCC(CC)O',
  'blend_notes': ['herbal']})

In [4]:
import collections
all_notes_sets = collections.defaultdict(list)
all_mols = set()
for d in pair_dataset:
  all_notes_sets[frozenset(d["blend_notes"])].append({d["mol1"],d["mol2"]})
  all_mols.add(d["mol1"])
  all_mols.add(d["mol2"])

ex_notes, ex_blends = next(iter(all_notes_sets.items()))
all_mols = list(all_mols)
len(all_notes_sets), len(all_mols), ex_notes, len(ex_blends)

(6850, 3428, frozenset({'herbal'}), 4384)

In [5]:
all_recipes = list(all_notes_sets.values())
len(all_recipes), len(all_recipes[0]), all_recipes[0][0]

(6850, 4384, {'CC1=CC2C(C2(C)C)CC1C(=O)C', 'CCCC(CC)O'})

### Quick check to see which way we should precompute our indices
Precomputing by combination is # all recipes across all combinations.

Precompution by ingredient pairing is # ingredients<sup>2</sup>. *(which only works for pairs)*

In [6]:
total_recipe_count = sum([len(recipes) for recipes in all_recipes])
all_combination_count = len(all_mols)**2
f"Precompute by combination :{total_recipe_count:,.0f}, Precompute by ingredients: {all_combination_count:,.0f}"

'Precompute by combination :266,758, Precompute by ingredients: 11,751,184'

If this wasn't pairing, and recipes could be arbitrary number of ingredients, it would be is 2<sup># ingredients</sup>.

In [7]:
all_combination_count = 2**len(all_mols)
# f"Precompute by ingredients (log10): {np.log10(all_combination_count):,.0f}"
f"10^{len(str(all_combination_count))}"

'10^1032'

In [8]:
import collections

def precompute_ingredient_indices(sample_recipes):
    """Precompute which recipes each ingredient can cover."""
    ingredient_indices = nested_dict = collections.defaultdict(lambda: collections.defaultdict(set))
    current_recipe_idx = 0

    # The combination is the underlying set of descriptors.
    for combination_idx, recipes in enumerate(sample_recipes):
      # The recipe is one particular pairing that results in that combination.
      for recipe_idx, recipe in enumerate(recipes):
        for ingredient in recipe:
          ingredient_indices[ingredient][combination_idx].add(recipe_idx)

    return ingredient_indices

all_ingredient_indices = precompute_ingredient_indices(all_recipes)
next(iter(all_ingredient_indices.items()))

('CC1=CC2C(C2(C)C)CC1C(=O)C',
 defaultdict(set,
             {0: {0,
               55,
               134,
               258,
               357,
               583,
               601,
               614,
               712,
               811,
               955,
               1114,
               1168,
               1188,
               1248,
               1451,
               1516,
               1586,
               1612,
               1652,
               1666,
               1681,
               1709,
               1762,
               2244,
               2556,
               2648,
               2873,
               2886,
               3001,
               3015,
               3026,
               3027,
               3028,
               3029,
               3030,
               3031,
               3032,
               3033,
               3034,
               3035,
               3141,
               3257,
               3302,
               3310,
               352

In [9]:
import collections
from copy import deepcopy

class Solution:
    def __init__(self, ingredients=None, covered_combinations=None):
        self._ingredients = ingredients if ingredients is not None else frozenset()
        self._covered_combinations = (
            deepcopy(covered_combinations) if covered_combinations is not None else collections.defaultdict(set)
        )

    def add(self, ingredient):
        assert ingredient not in self._ingredients
        new_ingredients = self._ingredients | frozenset([ingredient])
        new_covered_combinations = deepcopy(self._covered_combinations)

        # Find the precomputed index for the particular ingredient.
        ingredient_index = all_ingredient_indices[ingredient]

        # Check through every combination across each recipe.
        for combination_idx, recipe_indices in ingredient_index.items():
            # Check every recipe for this particular combination and see if we can satisfy it.
            for recipe_idx in recipe_indices:
                recipe_ingredients = all_recipes[combination_idx][recipe_idx]
                if recipe_ingredients <= new_ingredients:  # Check if all ingredients are covered
                    new_covered_combinations[combination_idx].add(recipe_idx)

        return Solution(new_ingredients, new_covered_combinations)

    def remove(self, ingredient):
        assert ingredient in self._ingredients
        new_ingredients = self._ingredients - frozenset([ingredient])
        new_covered_combinations = deepcopy(self._covered_combinations)

        # Find the precomputed index for the particular ingredient.
        ingredient_index = all_ingredient_indices[ingredient]

        # Check through every combination across each recipe.
        for combination_idx, recipe_indices in ingredient_index.items():
            for recipe_idx in recipe_indices:
                # If removing the ingredient causes the recipe to be unsatisfied, remove it from coverage
                recipe_ingredients = all_recipes[combination_idx][recipe_idx]
                if not recipe_ingredients <= new_ingredients:
                    new_covered_combinations[combination_idx].discard(recipe_idx)

        return Solution(new_ingredients, new_covered_combinations)

    def __contains__(self, ingredient):
        return ingredient in self._ingredients

    def coverage(self):
        return len([rs for c, rs in self._covered_combinations.items() if len(rs) > 0])

    def __repr__(self):
        return f"Solution of size {len(self._ingredients)} w/ coverage {self.coverage()}"

# Example usage:
current = Solution()
print("Empty solution")
display(current)

mol1, mol2 = all_recipes[0][0]

print("\nAdding", mol1)
current = current.add(mol1)
display(current)

print("\nAdding", mol2)
current = current.add(mol2)
display(current)
print(f"Contains {mol2} == {mol2 in current}")

print("\nRemoving", mol2)
current = current.remove(mol2)
display(current)
print(f"Contains {mol2} == {mol2 in current}")

Empty solution


Solution of size 0 w/ coverage 0


Adding CC1=CC2C(C2(C)C)CC1C(=O)C


Solution of size 1 w/ coverage 0


Adding CCCC(CC)O


Solution of size 2 w/ coverage 1

Contains CCCC(CC)O == True

Removing CCCC(CC)O


Solution of size 1 w/ coverage 0

Contains CCCC(CC)O == False


In [10]:
import random

def generate_random_solution(sample_mols, max_ingredients=10):
    """Generate a random set of ingredients."""
    solution = Solution()

    selected_mols = set(random.sample(sample_mols, max_ingredients))
    for m in selected_mols:
      solution = solution.add(m)

    return solution

In [None]:
import numpy as np
import matplotlib.pyplot as plt
from matplotlib.ticker import MultipleLocator, ScalarFormatter
from tqdm.notebook import tqdm

x = np.logspace(0, 6, 250)  # Exponential range from 10^0 to 10^4 (1 to 1e4)
x_int = np.unique(np.round(x).astype(int))  # Round and convert to integer, then keep unique values
x_int = x_int[x_int < len(all_mols)]
x_int = np.append(x_int,len(all_mols))

random_passes = 3

random_fraction_at_x = []
pbar = tqdm(total=len(x_int), desc="Random Solution", leave=True,smoothing=1)
for max_ingredients in x_int:
  coverage = []
  for _ in range(random_passes):
    random_solution = generate_random_solution(list(all_mols), max_ingredients)
    random_coverage = random_solution.coverage()
    coverage.append(random_coverage)

  fraction = np.mean(coverage) / len(all_recipes)
  pbar.set_postfix({'Blend Size':max_ingredients,'Fraction Covered': f'{fraction:.4f}'})
  pbar.update(1)
  random_fraction_at_x.append(fraction)


def make_chart(fraction_at_x,title):
  # Plot the data points on the primary axis
  fig, ax1 = plt.subplots(figsize=(8, 6))
  ax1.plot(x_int[:len(fraction_at_x)], fraction_at_x, marker=".", linestyle="")
  ax1.set_xscale('log')  # Set x-axis to log scale

  # Set major y-ticks at 0.1 intervals
  ax1.yaxis.set_major_locator(MultipleLocator(0.1))
  ax1.minorticks_on()  # Turn on minor ticks
  ax1.yaxis.set_minor_locator(MultipleLocator(0.01))  # Minor ticks with 0.01 spacing

  # Format the x-axis log scale to show numbers without scientific notation
  ax1.xaxis.set_major_formatter(ScalarFormatter())
  ax1.grid()

  # Primary axis labels and title
  ax1.set_title(title)
  ax1.set_xlabel('# of Molecules')
  ax1.set_ylabel('Covered Fraction')

  # Add twin y-axis, scaled version without plotting points again
  ax2 = ax1.twinx()
  scale_factor = len(all_notes_sets)
  ax2.set_ylim(ax1.get_ylim()[0] * scale_factor, ax1.get_ylim()[1] * scale_factor)
  ax2.set_ylabel('# of Odor Combinations')

  # Show the plot
  plt.show()

make_chart(random_fraction_at_x,"Coverage by Primary Set Size (Random Molecule Sampling)")

Random Solution:   0%|          | 0/113 [00:00<?, ?it/s]

In [None]:
greedy_cache = dict()

# Greedy solution with caching (breaks if we want to subsample from the original set)
def greedy_coverage(max_ingredients, use_cache=True, disable_tqdm=False):
  global greedy_cache

  current_solution = Solution()

  total_recipes = len(all_recipes)

  # Initialize progress bar
  pbar = tqdm(total=max_ingredients, desc="Greedy Solution", leave=True, disable=disable_tqdm)

  for size in range(max_ingredients):
    if size in greedy_cache and use_cache:
      solution = greedy_cache[size]
      # Update the progress bar and show the fraction of recipes covered
      pbar.set_postfix({'Fraction Covered': f'{current_solution.coverage() / total_recipes:.4f}'})
      pbar.update(1)
      continue

    # Find the best current_coverage at each size
    best_solution = current_solution

    for mol in all_mols:
      # Check every ingredient that we don't have already
      if mol in current_solution:
        continue


      # Check the improved current_coverage
      greedy_solution = current_solution.add(mol)
      if greedy_solution.coverage() > best_solution.coverage():
        best_solution = greedy_solution

    # Update our results
    current_solution = best_solution
    # Update the cache
    greedy_cache[size] = current_solution

    # Update the progress bar and show the fraction of recipes covered
    pbar.set_postfix({'Fraction Covered': f'{current_solution.coverage() / total_recipes:.4f}'})
    pbar.update(1)

  return current_solution


# Doing this just to check caching works.
_ = greedy_coverage(max_ingredients=50)

# Attempt large greedy solution.
best_solution = greedy_coverage(max_ingredients=100)
print(f"Random Start Solution: {best_solution}")
print(f"Max Recipes Covered: {best_solution.coverage()}")
print(f"Fraction of Recipes Covered: {best_solution.coverage()/len(all_recipes):.4f}")


In [None]:
pbar = tqdm(total=len(x_int), desc="Greedy Solution", leave=True,smoothing=1)

greedy_fraction_at_x = []
for max_ingredients in x_int:
  best_solution = greedy_coverage(max_ingredients=max_ingredients, use_cache=True, disable_tqdm=True)
  pbar.set_postfix({'Blend Size':max_ingredients,'Fraction Covered': f'{best_solution.coverage() / len(all_recipes):.4f}'})
  pbar.update(1)
  greedy_fraction_at_x.append(best_solution.coverage() / len(all_recipes))

make_chart(greedy_fraction_at_x,"Coverage by Primary Set Size (Greedy Solution)")

In [None]:
def simulated_annealing_max_coverage(max_ingredients, start_random=True, initial_temp=100, cooling_rate=0.99, num_iterations=10000, disable_tqdm=False):
    """Simulated annealing to find the best set of ingredients that maximizes recipe coverage."""

    total_recipes = len(all_recipes)

    if start_random:
      current_solution = generate_random_solution(all_mols, max_ingredients)
    else:
      current_solution = greedy_coverage(max_ingredients=max_ingredients, use_cache=True, disable_tqdm=True)

    # Track the best solution
    best_solution = current_solution

    temp = initial_temp

    # Initialize progress bar
    pbar = tqdm(total=num_iterations, desc="Simulated Annealing", leave=True, disable=disable_tqdm)

    for i in range(num_iterations):
        # Generate a new solution by swapping one ingredient
        new_solution = current_solution

        # If we're at the max ingredients, ensure we don't exceed it
        if len(new_solution) >= max_ingredients:
            # Randomly remove an ingredient before adding a new one
            ingredient_to_remove = random.choice(list(new_solution._ingredients))
            new_solution = new_solution.remove(ingredient_to_remove)

        # Add a new ingredient from the pool of unused ingredients
        available_ingredients = [i for i in all_mols if i not in new_solution]
        if available_ingredients:
            ingredient_to_add = random.choice(available_ingredients)
            new_solution = new_solution.add(ingredient_to_add)


        # Calculate acceptance probability
        if new_solution.coverage() > current_solution.coverage():
            accept = True  # Always accept a better solution
        else:
            delta = current_solution.coverage() - new_solution.coverage()
            accept_probability = math.exp(-delta / temp)
            accept = random.random() < accept_probability  # Accept worse solution with some probability

        # Accept the new solution
        if accept:
            current_solution = new_solution

            # Update the best solution if the new solution is better
            if current_solution.coverage() > best_solution.coverage():
                best_solution = current_solution

        # Cool down the temperature
        temp *= cooling_rate

        # Update the progress bar and show the fraction of recipes covered
        pbar.set_postfix({'Fraction Covered': f'{current_solution.coverage() / total_recipes:.4f}', 'Temperature': f'{temp:.2f}'})
        pbar.update(1)

    pbar.close()

    return best_solution, best_coverage, best_coverage / total_recipes

best_solution, best_coverage, best_fraction = simulated_annealing_max_coverage(max_ingredients=100, initial_temp=1e4)
print(f"Random Start Solution: {best_solution}")
print(f"Max Recipes Covered: {best_coverage}")
print(f"Fraction of Recipes Covered: {best_fraction:.4f}")


In [None]:
pbar = tqdm(total=len(x_int), desc="Simulated Annealing", leave=True,smoothing=1)

annealed_fraction_at_x = []
for max_ingredients in x_int:
  best_solution, best_coverage, best_fraction = simulated_annealing_max_coverage(max_ingredients=max_ingredients, num_iterations=int(1e4), initial_temp=1e4, disable_tqdm=True)
  pbar.set_postfix({'Blend Size':max_ingredients,'Fraction Covered': f'{best_fraction:.4f}'})
  pbar.update(1)
  annealed_fraction_at_x.append(best_fraction)

make_chart(annealed_fraction_at_x,"Coverage by Primary Set Size (Simulated Annealing)")

In [None]:
pbar = tqdm(total=len(x_int), desc="Simulated Annealing", leave=True,smoothing=1)

greedy_annealed_fraction_at_x = []
for max_ingredients in x_int:
  best_solution, best_coverage, best_fraction = simulated_annealing_max_coverage(max_ingredients=max_ingredients, start_random=False, num_iterations=int(1e4), initial_temp=1e4, disable_tqdm=True)
  pbar.set_postfix({'Blend Size':max_ingredients,'Fraction Covered': f'{best_fraction:.4f}'})
  pbar.update(1)
  greedy_annealed_fraction_at_x.append(best_fraction)

make_chart(greedy_annealed_fraction_at_x,"Coverage by Primary Set Size (Simulated Annealing)")

In [None]:
import random
import math
from tqdm.notebook import tqdm  # Use tqdm for Jupyter Notebook progress bar


def generate_random_solution(sample_mols, max_ingredients=10):
    """Generate a random set of ingredients."""
    return set(random.sample(sample_mols, max_ingredients))

def get_coverage(selected_ingredients):
    """Calculate how many combinations can be covered by the selected ingredients using precomputed data."""
    covered_combinations = set()

    # For every ingredient, we will check across every combination and recipe
    # to see if we can make it.
    for ingredient in selected_ingredients:
      # Find the precomputed index for the particular ingredient.
      ingredient_index = all_ingredient_indices[ingredient]

      # Check through every combination across each recipe.
      for combination_idx, recipe_indices in ingredient_index.items():
        # We can skip if we already cover this combination.
        if combination_idx in covered_combinations:
          continue

        # Otherwise, we have to check every recipe for this particular
        # combination and see if we can satisfy it.
        for recipe_idx in recipe_indices:
          recipe_ingredients = all_recipes[combination_idx][recipe_idx]

          # It is satisifed if we have every ingredient selected.
          if not recipe_ingredients.difference(selected_ingredients):
            covered_combinations.add(combination_idx)

    return len(covered_combinations)

In [None]:
# Ok, instead of doing this
# We should have a solution object
# And it holds the number of completed recipes across each combination.
# It has two functions. Add and remove.

Add in a greedy chart!