In [1]:
from datetime import date, timedelta
import pandas as pd
import numpy as np

In [35]:
np.diag(np.full(3, 1))


array([[1, 0, 0],
       [0, 1, 0],
       [0, 0, 1]])

In [2]:
class SubstitutionMatrix:
  """
  This class represents a substitution matrix for a given number of events.
  """

  def __init__(self, num_events):
    """
    Initializes the substitution matrix with 0
    """
    self.num_events = num_events + 2
    if not isinstance(num_events, int) or num_events <= 0:
      raise ValueError("Number of events must be a positive integer.")
    self.matrix = np.full((self.num_events, self.num_events), 0)
      
  def set_index(self, indexs):
      self.indexs = ["#"]+ indexs + ["U"] 
      
  def save_df(self):
    self.df_matrix = pd.DataFrame(data = self.matrix, index = self.indexs, columns = self.indexs)
    self.df_dis_matrix = -1*self.df_matrix
  def set_scores(self, match_score, unmatch_score):
    self.matrix += np.diag(np.full(self.num_events, match_score - unmatch_score))
    self.matrix += np.full((self.num_events, self.num_events), unmatch_score)
    
  def save_matrix_txt(self, path_out):
      np.savetxt(path_out, self.matrix, fmt = '%.1f')
  def save_df_matrix_txt(self, path_out):
      self.df_matrix.to_csv(path_out, sep='\t')
      
  def __str__(self):
    """
    Returns a string representation of the substitution matrix.
    """
    return self.df_matrix

In [176]:
def initate_submatrix(seq1, seq2):
    unique_index = sorted(list(set(seq1+seq2)))
    #print (unique_index)
    SubMatrix = SubstitutionMatrix(len(unique_index))
    SubMatrix.set_index(unique_index)
    ## score for set_scores(match, mismatch) 
    ## set_scores(0, -1) means Levenshtein Distance
    SubMatrix.set_scores(0, -1)
    SubMatrix.save_df()
    return SubMatrix.df_dis_matrix
    
def levenshtein_distance(seq1, seq2, df_sub_matrix):
    """
    Calculates the Levenshtein distance between two sequences.
    Args:
    seq1: The first sequence (string).
    seq2: The second sequence (string).
    Returns:
    The Levenshtein distance between the two sequences.
    """
    
    seq1 = "#"+seq1
    seq2 = "#"+seq2
    m = len(seq1)
    n = len(seq2)
    # Ref: https://medium.com/@ethannam/understanding-the-levenshtein-distance-equation-for-beginners-c4285a5604f0
    # Create a distance matrix
    dp = np.full((m, n), 0) 
    # Initialize the first row and column
    for i in range(1, m):
        dp[i][0] = i
    for j in range(1, n):
        dp[0][j] = j
    
    ## Fill the DP table
    for i in range(1, m):
        for j in range(1, n):
            insertion_cost = dp[i-1][j] + df_sub_matrix.loc[seq1[i-1], seq2[0]]  ## ( # -> letter)
            deletion_cost = dp[i][j-1] +  df_sub_matrix.loc[seq1[0], seq2[j-1]]  ## (letter -> # )
            cost = (0 if seq1[i] == seq2[j] else df_sub_matrix.loc[seq1[i], seq2[j]])
            substitution_cost = dp[i-1][j-1] + cost # ## only +1 when i j not same
            dp[i][j] = min(insertion_cost, deletion_cost, substitution_cost)
        #break
    print ("Dis: ", dp[m - 1][n - 1])
    return dp

def levenshtein_distance_with_transposition_date(seq1, seq2, dates1, dates2, df_sub_matrix, max_transposition_date):
  """
  Calculates the Levenshtein distance between two sequences, considering transpositions.
  Args:
    seq1: The first sequence.
    seq2: The second sequence.
    df_sub_matrix: Pandas DataFrame containing substitution costs.
  Returns:
    The Levenshtein distance between the two sequences.
  """
  seq1 = "#"+''.join(seq1)
  seq2 = "#"+''.join(seq2)
  dates1 = np.insert(dates1, 0,  "None")
  dates2 = np.insert(dates2, 0,  "None")  # Adding None to align dates with the prefixed '#'
  m = len(seq1)
  n = len(seq2)
  max_transposition_date +=1
  # Create a distance matrix
  dp = np.full((m, n), 0)  # Initialize with infinity to handle transpositions

  # Initialize the first row and column
  for i in range(1, m):
    dp[i][0] = i
  for j in range(1, n):
    dp[0][j] = j

  # Fill the DP table
  for i in range(1, m):
    for j in range(1, n):
      # Standard costs
      insertion_cost = dp[i-1][j] + df_sub_matrix.loc[seq1[i-1], seq2[0]]  # (letter -> # )
      deletion_cost = dp[i][j-1] + df_sub_matrix.loc[seq1[0], seq2[j-1]]  # ( # -> letter)
      # Substitution cost
      cost = (0 if seq1[i] == seq2[j] else df_sub_matrix.loc[seq1[i], seq2[j]])
      substitution_cost = dp[i-1][j-1] + cost
      dp[i][j] = min(insertion_cost, deletion_cost, substitution_cost)
      
      # Handle transpositions with date constraint
      if ((i >= 1) & (j >= 1)):
          for k in range(m-1, 0, -1):
              date_diff1 = abs((dates1[i] - dates2[k]).days)
              #print (date_diff1)
              if (date_diff1 < max_transposition_date):
                  transposition_cost = dp[i-1][j-1] + df_sub_matrix.loc[seq1[i], seq2[k]]
                  #print ("k: ", df_sub_matrix.loc[seq1[i], seq2[k]])
                  dp[i][j] = min(dp[i][j], transposition_cost)
          for l in range(n-1, 0, -1):
              date_diff2 = abs((dates1[j] - dates2[l]).days)
              if (date_diff2 < max_transposition_date):
                  transposition_cost = dp[i-1][j-1] + df_sub_matrix.loc[seq1[l], seq2[j]]
                  #print ("L: ",df_sub_matrix.loc[seq1[l], seq2[j]])
                  dp[i][j] = min(dp[i][j], transposition_cost)
      #print (dp)
      #break
    #break
  # Return the distance and matrix (optional)
  return dp[m - 1][n - 1], dp



# Example usage
seq1 = "sitting"
seq2 = "kitten"
df_sub_matrix = initate_submatrix(seq1, seq2)
distance = levenshtein_distance(seq1, seq2, df_sub_matrix)
#distance = levenshtein_distance_with_transposition(seq1, seq2, df_sub_matrix)
distance

Dis:  3


array([[0, 1, 2, 3, 4, 5, 6],
       [1, 1, 2, 3, 4, 5, 6],
       [2, 2, 1, 2, 3, 4, 5],
       [3, 3, 2, 1, 2, 3, 4],
       [4, 4, 3, 2, 1, 2, 3],
       [5, 5, 4, 3, 2, 2, 3],
       [6, 6, 5, 4, 3, 3, 2],
       [7, 7, 6, 5, 4, 4, 3]])

In [177]:
seq1 = [('s', date(2024, 1, 1)),('i', date(2024, 1, 2)), ('t', date(2024, 1, 4)), ('t', date(2024, 1, 4)),
        ('i', date(2024, 1, 4)), ('n', date(2024, 1, 4)), ('g', date(2024, 1, 4))]

seq2 = [('k', date(2024, 1, 1)),('i', date(2024, 1, 2)), ('t', date(2024, 1, 2)), ('t', date(2024, 1, 4)),
        ('e', date(2024, 1, 4)), ('n', date(2024, 1, 4))]

seq1 = [('A', date(2024, 1, 1)),('B', date(2024, 1, 1)), ('C', date(2024, 1, 4)), ('D', date(2024, 1, 5))]

seq2 = [('C', date(2024, 1, 1)),('D', date(2024, 1, 1)), ('A', date(2024, 1, 2)), ('B', date(2024, 1, 3))]

df_seq1 = pd.DataFrame(data = seq1, columns= ["dx", "date"]).sort_values('date')
df_seq2 = pd.DataFrame(data = seq2, columns= ["dx", "date"]).sort_values('date')
seq1 = ''.join(df_seq1.dx)
date1 = df_seq1.date.values
seq2 = ''.join(df_seq2.dx)
date2 = df_seq2.date.values
#first_items = 
#seq1 = ''.join([item[0] for item in seq1])
df_sub_matrix = initate_submatrix(seq1, seq2)
distance = levenshtein_distance(seq1, seq2, df_sub_matrix)
distance = levenshtein_distance_with_transposition_date(seq1, seq2, date1, date2, df_sub_matrix, 4)
distance


Dis:  4


(np.int64(0),
 array([[0, 1, 2, 3, 4],
        [1, 0, 1, 2, 3],
        [2, 1, 0, 1, 2],
        [3, 2, 1, 0, 1],
        [4, 3, 2, 1, 0]]))

In [178]:
df_sub_matrix

Unnamed: 0,#,A,B,C,D,U
#,0,1,1,1,1,1
A,1,0,1,1,1,1
B,1,1,0,1,1,1
C,1,1,1,0,1,1
D,1,1,1,1,0,1
U,1,1,1,1,1,0
