In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, udf
from pyspark.sql.types import StringType, DoubleType
import re
from rapidfuzz import fuzz
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, StringType
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.classification import RandomForestClassificationModel
from pyspark.ml.feature import VectorAssembler
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, udf
from pyspark.sql.types import DoubleType
from pyspark.ml.linalg import Vectors, VectorUDT

In [None]:
!pip install rapidfuzz

In [None]:
def negative_check(value1, value2):
    try:
        # Ensure both values are strings and contain a negative sign before converting
        if not (str(value1).strip().startswith('-') or str(value2).strip().startswith('-')):
            return False

        # Convert values to float and compare their absolute values
        num1 = float(value1)
        num2 = float(value2)
        return abs((abs(num1)- abs(num2)))<1
    except ValueError:
        # Return False if there's an error converting to float (e.g., if the input is not numeric)
        return False

# Test cases
print(negative_check("-5", "-6.0"))  # True
print(negative_check("-10", "-10"))  # True
print(negative_check("-5", "5"))     # False
print(negative_check("5", "-5"))     # False
print(negative_check("-3.2", "-3.2"))# True
print(negative_check("-3.2", "abc")) # False
print(negative_check("5", "5"))      # False
print(negative_check("0", "-0"))     # False

In [None]:
def leading_zero_check(value1, value2):
    try:
        # Strip leading zeros and convert to float to ignore trailing zeros
        normalized_value1 = float(value1.lstrip('0') or '0')
        normalized_value2 = float(value2.lstrip('0') or '0')

        # Check if there were leading zeros
        had_leading_zeros = (value1.lstrip('0') != value1 and len(value1.strip('0')) > 0) or \
                            (value2.lstrip('0') != value2 and len(value2.strip('0')) > 0)

        # Check if values are equal after normalization
        are_equal = normalized_value1 == normalized_value2

        # Return True only if there are leading zeros and values are equal
        return are_equal and had_leading_zeros
    except ValueError:
        # Handle cases where conversion to float fails
        return False

# Example usage:
value1 = "123.0"
value2 = "0123"
are_equal = leading_zero_check(value1, value2)
print("Are equal after removing leading zeros:", are_equal)


In [None]:
import re

def is_scientific_notation(value1, value2):
    """
    Checks if either value1 or value2 is in scientific notation.
    If either is in scientific notation, it verifies if the absolute difference is < 1.

    Returns:
        - True if scientific notation exists and the difference < 1.
        - False otherwise.
    """
    # Regular expression for scientific notation
    sci_notation_regex = r"^-?\d+(\.\d+)?[eE][-+]?\d+$"

    # Convert to string and remove spaces
    value1, value2 = str(value1).strip(), str(value2).strip()

    # Check if either value is in scientific notation
    is_sci1 = bool(re.match(sci_notation_regex, value1))
    is_sci2 = bool(re.match(sci_notation_regex, value2))

    # If neither value is in scientific notation, return False
    if not is_sci1 and not is_sci2:
        return False

    try:
        # Convert both values to float for comparison
        num1, num2 = float(value1), float(value2)

        # Check if absolute difference is less than 1
        return abs(abs(num1) - abs(num2)) < 1

    except ValueError:
        # If conversion fails, return False (invalid input)
        return False

# Example Tests
print(is_scientific_notation("1.23e3", "1230"))  # Expected: True (difference < 1)
print(is_scientific_notation("4.5E-3", "0.0045"))  # Expected: True (difference < 1)
print(is_scientific_notation("2E5", "200001"))  # Expected: False (difference > 1)
print(is_scientific_notation("12345", "1.2345e4"))  # Expected: True (difference < 1)
print(is_scientific_notation("1.23E4", "1.24E4"))  # Expected: False (difference > 1)
print(is_scientific_notation("-1.23e3", "1230"))  # Expected: True (ignores sign difference)
print(is_scientific_notation("-4.5E-3", "-0.0045"))  # Expected: True (ignores sign difference)
print(is_scientific_notation("1E2", "-100"))  # Expected: True (ignores sign, difference < 1)
print(is_scientific_notation("abc", "1.23e3"))  # Expected: False (invalid input)


In [None]:
import re
from decimal import Decimal

def normalize_number_string(s):
    """Normalize a number string by removing thousand separators and handling decimal points."""
    # Remove common thousand separators (commas, spaces, or periods depending on locale)
    s = re.sub(r'[,\s]', '', s)
    return s

def thousand_separator_difference(value1, value2):

    """
    Determine if two number strings differ only by the presence of thousand separators.

    Args:
    value1 (str): First number string, potentially with thousand separators.
    value2 (str): Second number string, potentially with thousand separators.

    Returns:
    bool: True if numbers differ only by separators, False otherwise.
    """
    normalized1 = normalize_number_string(value1)
    normalized2 = normalize_number_string(value2)

    try:
        # Convert both to Decimal to check numerical equivalence
        num1 = Decimal(normalized1)
        num2 = Decimal(normalized2)
    except:
        return False  # Return False if either number is not a valid number format

    if num1 == num2:
        # If numerically equal, check if original strings were different (thus only separator difference)
        return value1 != value2
    return False

# Example usage:
print(thousand_separator_difference("1,234,567", "123 4567"))  # True
print(thousand_separator_difference("1 234 567", "12345 67"))  # True
print(thousand_separator_difference("1.234.567", "1234567"))  # True in locales where '.' is a thousand separator
print(thousand_separator_difference("1,234,567", "1234,567")) # False, different number placements


In [None]:
def are_numbers_effectively_equal(num1, num2, tolerance=1.0):
  # Check if there were leading zeros
    had_leading_zeros = (num1.lstrip('0') != num1 and len(num1.strip('0')) > 0) or \
                            (num2.lstrip('0') != num2 and len(num2.strip('0')) > 0)


    try:
        # Attempt to convert inputs to floats
        num1 = float(num1)
        num2 = float(num2)

        # Calculate the absolute difference
        difference = abs(num1 - num2)

        # Compare the difference with the tolerance
        are_equal = difference < tolerance
        # Return True only if there are leading zeros and values are equal
        return are_equal != had_leading_zeros
    except ValueError:
        # Return False if there is an error converting inputs to float
        return False

# Example usage with possible non-numeric inputs:
result1 = are_numbers_effectively_equal("100", "100")
result2 = are_numbers_effectively_equal("abc", "100.5")
print("Are the numbers effectively equal?", result1)  # Expected: True
print("Is the input valid?", result2)  # Expected: False, because 'abc' cannot be converted to float



In [None]:
import re

currency_mappings = {
    "$": ["USD", "US DOLLAR", "US DOLLARS", "DOLLAR", "DOLLARS", "$"],
    "€": ["EUR", "EURO", "EUROS", "€"],
    "£": ["GBP", "POUND STERLING", "POUND", "POUNDS", "£"],
    "¥": ["JPY", "YEN", "¥"],
    "₹": ["INR", "INDIAN RUPEE", "RUPEE", "RUPEES", "₹"],
    "₺": ["TRY", "TURKISH LIRA", "LIRA", "₺"],
    "₩": ["KRW", "SOUTH KOREAN WON", "KOREAN WON", "₩"],
    "₦": ["NGN", "NIGERIAN NAIRA", "NAIRA", "₦"],
    "₴": ["UAH", "UKRAINIAN HRYVNIA", "HRYVNIA", "₴"],
    "₽": ["RUB", "RUSSIAN RUBLE", "RUBLE", "₽"],
}

def extract_currency_and_value(value):
    """
    Extracts the currency type and numerical value from a given string.
    Returns:
      - (currency, numeric_value) tuple if currency is found.
      - (None, None) if no currency is found.
    """
    value_clean = value.upper().replace(" ", "").replace(",", "")  # Normalize case, remove spaces & commas
    extracted_currency = None

    # Extract numerical part (allowing negative numbers)
    numeric_part = re.findall(r"[-+]?\d*\.?\d+", value_clean)  # Extract numbers
    numeric_value = numeric_part[0] if numeric_part else None  # Get the first extracted number

    # Identify currency type
    for currency_symbol, keywords in currency_mappings.items():
        for keyword in keywords:
            if keyword in value_clean:
                extracted_currency = currency_symbol  # Assign the standard symbol
                break
        if extracted_currency:
            break  # Stop loop if currency found

    # If no currency is found, return (None, None)
    if not extracted_currency:
        return None, None

    return extracted_currency, numeric_value

def detect_currency(value1, value2):
    """
    Compares two currency values:
      - Returns True if both values match in currency type and numeric value.
      - Returns False if there's a mismatch in currency type or numeric value.
      - Returns 0 if neither value contains a currency reference (meaning we don’t check it).
    """
    currency1, num1 = extract_currency_and_value(value1)
    currency2, num2 = extract_currency_and_value(value2)

    # If neither value has a currency, return 0 (ignore non-currency cases)
    if currency1 is None and currency2 is None:
        return False

    # If one has a currency and the other doesn't, return False (they are different)
    if (currency1 is not None and currency2 is None) or (currency2 is not None and currency1 is None):
        return False

    # If currency types don't match, return False
    if currency1 != currency2:
        return False

    # If both values have the same currency but different numbers, return False
    if num1 != num2:
        return False

    return True  # Both currency type and numeric values match

# Example Tests
print("Match between -100 and 100:", detect_currency("-100", "100"))  # Expected: 0 (No currency, ignore)
print("Match between € 50 and 50 EUR:", detect_currency("€ 50", "50 EUR"))  # Expected: True (Same currency, same value)
print("Match between 100 INR and 100:", detect_currency("100 INR", "100"))  # Expected: False (One has currency, one doesn't)
print("Match between 500 and 500:", detect_currency("500", "500"))  # Expected: 0 (No currency, ignore)
print("Match between $100 and 100USD:", detect_currency("$ 100", "100 USD"))  # Expected: True (Same currency, same value)
print("Match between 100$ and 150$:", detect_currency("100$", "150$"))  # Expected: False (Same currency, different value)
print("Match between 200 GBP and 200 USD:", detect_currency("200 GBP", "200 USD"))  # Expected: False (Different currency)
print("Match between $3,487.14 and 3487.14 $:", detect_currency("$3,487.14", "3487.14 $"))  # Expected: True (Same currency, same value)


In [None]:
import re
from decimal import Decimal

def scientific_notation_with_tolerance(value1, value2, tolerance=1):
    """
    Compares two values after checking if either is in scientific notation. If true,
    compares their difference to a specified tolerance.

    Args:
    value1, value2 (str): Strings representing the numbers, where one might be in scientific notation.
    tolerance (float): The maximum absolute difference allowed to consider the numbers equivalent.

    Returns:
    bool: True if the numbers are considered equivalent within the given tolerance, False otherwise.
    """
    # Regular expression to detect scientific notation
    sci_notation_regex = r'^-?\d+(\.\d+)?[eE][-+]?\d+$'

    # Check if either value is in scientific notation
    if re.match(sci_notation_regex, value1) or re.match(sci_notation_regex, value2):
        try:
            # Convert both values to Decimal for precision
            num1 = Decimal(value1.strip())
            num2 = Decimal(value2.strip())

            # Calculate the absolute difference
            difference = abs(abs(num1) - abs(num2))

            # Print the comparison details for debugging
            print(f"Comparing {num1} to {num2}, difference: {difference}")

            # Determine if the difference is within the allowed tolerance
            return difference < Decimal(tolerance)
        except Exception as e:
            # Handle conversion errors and other exceptions
            print(f"Error during conversion or calculation: {e}")
            return False
    else:
        # If neither value is in scientific notation, return False
        return False

# Example usage:
print(scientific_notation_with_tolerance("-5.6145711E+07", "56145711.73", tolerance=1))  # Expected: True
print(scientific_notation_with_tolerance("2.98E+07", "29847591.85", tolerance=1))  # Expected: True
print(scientific_notation_with_tolerance("9.50E+07", "95006862.32", tolerance=1))  # Expected: True


In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col, when, lit
from pyspark.sql.types import StringType, BooleanType, IntegerType

# Initialize Spark Session
spark = SparkSession.builder.appName("Data Classification").getOrCreate()

# Load data
df = spark.read.csv("/content/Final_ML.csv", header=True, inferSchema=True)

# Define the UDFs and ensure they return Boolean
is_scientific_notation_udf = udf(scientific_notation_with_tolerance, BooleanType())
thousand_separator_difference_udf = udf(thousand_separator_difference, BooleanType())
leading_zero_check_udf = udf(leading_zero_check, BooleanType())
are_numbers_effectively_equal_udf = udf(are_numbers_effectively_equal, BooleanType())
detect_currency_udf = udf(detect_currency, BooleanType())
negative_check_udf = udf(negative_check, BooleanType())

# Convert Boolean outputs to Integer (1 if True, 0 if False)
df = df.withColumn("Scientific_Notation", when(is_scientific_notation_udf(col("Source"), col("Destination")) == True, 1).otherwise(0))
df = df.withColumn("Thousand_Separator", when(thousand_separator_difference_udf(col("Source"), col("Destination")) == True, 1).otherwise(0))
df = df.withColumn("Rounded_Off", when(are_numbers_effectively_equal_udf(col("Source"), col("Destination")) == True, 1).otherwise(0))
df = df.withColumn("Leading_Zero", when(leading_zero_check_udf(col("Source"), col("Destination")) == True, 1).otherwise(0))
df = df.withColumn("Currency_Diff", when(detect_currency_udf(col("Source"), col("Destination")) == True, 1).otherwise(0))
df = df.withColumn("Negative_Check", when(negative_check_udf(col("Source"), col("Destination")) == True, 1).otherwise(0))

# Create the `Discrepancy_Type` column based on function outputs
df = df.withColumn(
    "Discrepancy_Type",
    when(col("Scientific_Notation") == 1, "Scientific Notation Difference")
    .when(col("Thousand_Separator") == 1, "Thousand Separator Difference")
    .when(col("Rounded_Off") == 1, "Rounded Off")
    .when(col("Leading_Zero") == 1, "Leading Zero Issue")
    .when(col("Currency_Diff") == 1, "Currency Symbol Difference")
    .when(col("Negative_Check") == 1, "Numbers are the same but one is negative")
    .otherwise("No discrepancy")
)

# Show results and save to CSV
df.show()
# df.write.csv("/path/to/save/classified_data.csv", header=True)

# Stop Spark session
# spark.stop()


In [None]:
df = df.withColumn("Case_Sensitive_Score", lit(0).cast('integer'))
df = df.withColumn("Case_Insensitive_Score", lit(0).cast('integer'))
df = df.withColumn("Case_Sensitivity_Diff", lit(0).cast('integer'))
df = df.withColumn("Special_Character_Score", lit(0).cast('integer'))
df = df.withColumn("Special_Character_Diff", lit(0).cast('integer'))
df = df.withColumn("Space_diff", lit(0).cast('integer'))
df = df.withColumn("space_score", lit(0).cast('integer'))

In [None]:
df.show(5)

In [None]:
# Define fuzzy matching functions
def get_fuzzy_scores(str1, str2):
    """ Compute various fuzzy matching scores between two strings. """
    case_sensitive_score = fuzz.ratio(str1, str2)
    case_insensitive_score = fuzz.ratio(str1.lower(), str2.lower())
    case_sensitivity_diff = abs(case_sensitive_score - case_insensitive_score)

    str1_clean, str2_clean = re.sub(r'[^\w\s]', '', str1), re.sub(r'[^\w\s]', '', str2)
    special_char_score = fuzz.ratio(str1_clean, str2_clean)
    special_char_diff = abs(case_sensitive_score - special_char_score)

    str1_space_norm, str2_space_norm = ' '.join(str1.split()), ' '.join(str2.split())
    space_score = fuzz.ratio(str1_space_norm, str2_space_norm)
    space_diff = abs(case_sensitive_score - space_score)

    token_score = fuzz.token_set_ratio(str1, str2)
    abbreviation_diff = abs(case_sensitive_score - token_score)

    return (case_sensitive_score, case_insensitive_score, case_sensitivity_diff,
            special_char_score,special_char_diff, space_diff,space_score, abbreviation_diff)

# Register UDFs
fuzzy_udf = udf(lambda str1, str2: get_fuzzy_scores(str1, str2),"array<double>")

# Apply fuzzy matching functions
df = df.withColumn("Fuzzy Scores", fuzzy_udf(col("Source"), col("Destination")))

# Extract individual scores
df = df.withColumn("Case_Sensitive_Score", col("Fuzzy Scores")[0]) \
                       .withColumn("Case_Insensitive_Score", col("Fuzzy Scores")[1]) \
                       .withColumn("Case_Sensitivity_Diff", col("Fuzzy Scores")[2]) \
                       .withColumn("Special_Character_Score", col("Fuzzy Scores")[3]) \
                       .withColumn("Special_Character_Diff", col("Fuzzy Scores")[4]) \
                       .withColumn("Space_diff", col("Fuzzy Scores")[5]) \
                       .withColumn("space_score", col("Fuzzy Scores")[6]) \
                       .drop("Fuzzy Scores")
                      #  .withColumn("Abbreviation Diff", col("Fuzzy Scores")[5]) \

In [None]:
# save the csv file
df.toPandas().to_csv('FuzzyMatching_Final.csv', index=False)

In [None]:
df.show(1)

In [None]:
# drop th column Discrepancy_Type
df = df.drop("Discrepancy_Type")

In [None]:
df.select("Label").distinct().show(truncate=False)

In [None]:
indexer = StringIndexer(inputCol="Label", outputCol="Label_Index")
model_indexer = indexer.fit(df)
df_pyspark = model_indexer.transform(df)

In [None]:
labels = model_indexer.labels
# Print out the index to label mapping
print("Index to Label Mapping:")
for idx, label in enumerate(labels):
    print(f"Index {idx} corresponds to label '{label}'")

In [None]:
df.columns

In [None]:
# Select relevant features
feature_columns = ['Scientific_Notation',
 'Thousand_Separator',
 'Rounded_Off',
 'Leading_Zero',
 'Currency_Diff',
 'Negative_Check',
 'Case_Sensitive_Score',
 'Case_Insensitive_Score',
 'Case_Sensitivity_Diff',
 'Special_Character_Score',
 'Special_Character_Diff',
 'Space_diff',
 'space_score',]
assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")

In [None]:
# List of columns to cast
columns_to_cast = ['Scientific_Notation',
 'Thousand_Separator',
 'Rounded_Off',
 'Leading_Zero',
 'Currency_Diff',
 'Negative_Check',
 'Case_Sensitive_Score',
 'Case_Insensitive_Score',
 'Case_Sensitivity_Diff',
 'Special_Character_Score',
 'Special_Character_Diff',
 'Space_diff',
 'space_score']

# Cast each column to Integer
for col_name in columns_to_cast:
    df_pyspark = df_pyspark.withColumn(col_name, col(col_name).cast("int"))

In [None]:
df_pyspark = assembler.transform(df_pyspark)

In [None]:
df_pyspark.show(5)

In [None]:
# Split dataset into training (80%) and testing (20%) sets
train_data, test_data = df_pyspark.randomSplit([0.8, 0.2], seed=42)

In [None]:
# Initialize and train the classifier (Random Forest)
rf_classifier = RandomForestClassifier(labelCol="Label_Index", featuresCol="features", numTrees=50)
model = rf_classifier.fit(train_data)

# Make predictions on the test set
predictions = model.transform(test_data)

# Evaluate the model using accuracy
evaluator = MulticlassClassificationEvaluator(labelCol="Label_Index", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)

print(f"✅ Model Accuracy: {accuracy * 100:.2f}%")

# Save trained model
model.save("FuzzyMatching_RF_Model_3")
print("✅ Model saved successfully!")

In [None]:
# Select relevant features
feature_columns = ['Scientific_Notation',
 'Thousand_Separator',
 'Rounded_Off',
 'Leading_Zero',
 'Currency_Diff',
 'Negative_Check',
 'Case_Sensitive_Score',
 'Case_Insensitive_Score',
 'Case_Sensitivity_Diff',
 'Special_Character_Score',
 'Special_Character_Diff',
 'Space_diff',
 'space_score',]

# List of columns to cast
columns_to_cast = ['Scientific_Notation',
 'Thousand_Separator',
 'Rounded_Off',
 'Leading_Zero',
 'Currency_Diff',
 'Negative_Check',
 'Case_Sensitive_Score',
 'Case_Insensitive_Score',
 'Case_Sensitivity_Diff',
 'Special_Character_Score',
 'Special_Character_Diff',
 'Space_diff',
 'space_score']

# Cast each column to Integer
for col_name in columns_to_cast:
    df_pyspark = df_pyspark.withColumn(col_name, col(col_name).cast("int"))
assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")
df_pyspark = assembler.transform(df_pyspark)

In [None]:
# Register UDFs
fuzzy_udf = udf(lambda str1, str2: get_fuzzy_scores(str1, str2),"array<double>")

In [None]:
# Example input words
# word1 = "srinvas"
# word2 = "sr invas"

# Convert input into DataFrame for prediction
df_input = spark.createDataFrame([(word1, word2)], ["Source", "Destination"])
df_input = df_input.withColumn("Fuzzy Scores", fuzzy_udf(col("Source"), col("Destination")))
# Extract individual scores
df_input = df_input.withColumn("Case_Sensitive_Score", col("Fuzzy Scores")[0]) \
                       .withColumn("Case_Insensitive_Score", col("Fuzzy Scores")[1]) \
                       .withColumn("Case_Sensitivity_Diff", col("Fuzzy Scores")[2]) \
                       .withColumn("Special_Character_Score", col("Fuzzy Scores")[3]) \
                       .withColumn("Special_Character_Diff", col("Fuzzy Scores")[4]) \
                       .withColumn("Space_diff", col("Fuzzy Scores")[5]) \
                       .withColumn("space_score", col("Fuzzy Scores")[6]) \
                       .drop("Fuzzy Scores")
                      #  .withColumn("Abbreviation Diff", col("Fuzzy Scores")[5]) \
# Apply other UDFs
df_input = df_input.withColumn("Scientific_Notation", is_scientific_notation_udf(col("Source"), col("Destination")))
df_input = df_input.withColumn("Thousand_Separator", thousand_separator_difference_udf(col("Source"), col("Destination")))
df_input = df_input.withColumn("Leading_Zero", leading_zero_check_udf(col("Source"), col("Destination")))
df_input = df_input.withColumn("Rounded_Off", are_numbers_effectively_equal_udf(col("Source"), col("Destination")))
df_input = df_input.withColumn("Currency_Diff", detect_currency_udf(col("Source"), col("Destination")))
df_input = df_input.withColumn("Negative_Check", negative_check_udf(col("Source"), col("Destination")))
from pyspark.sql.functions import col, when

# Convert Boolean columns to integer
columns_to_convert = ['Scientific_Notation',
 'Thousand_Separator',
 'Rounded_Off',
 'Leading_Zero',
 'Currency_Diff',
 'Negative_Check',]
for column in columns_to_convert:
    df_input = df_input.withColumn(column, when(col(column), 1).otherwise(0))

# Select relevant features
feature_columns = ['Scientific_Notation',
 'Thousand_Separator',
 'Rounded_Off',
 'Leading_Zero',
 'Currency_Diff',
 'Negative_Check',
 'Case_Sensitive_Score',
 'Case_Insensitive_Score',
 'Case_Sensitivity_Diff',
 'Special_Character_Score',
 'Special_Character_Diff',
 'Space_diff',
 'space_score',]

# List of columns to cast
columns_to_cast = ['Scientific_Notation',
 'Thousand_Separator',
 'Rounded_Off',
 'Leading_Zero',
 'Currency_Diff',
 'Negative_Check',
 'Case_Sensitive_Score',
 'Case_Insensitive_Score',
 'Case_Sensitivity_Diff',
 'Special_Character_Score',
 'Special_Character_Diff',
 'Space_diff',
 'space_score']

# Cast each column to Integer
for col_name in columns_to_cast:
    df_input = df_input.withColumn(col_name, col(col_name).cast("int"))
assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")
df_input = assembler.transform(df_input)

In [None]:
word1 = "srinivas"
word2 = "sr in iva"

In [None]:
# Make prediction
prediction = model.transform(df_input).select("prediction").collect()[0][0]
prediction


In [None]:
Index 0 corresponds to label 'Case Sensitivity'
Index 1 corresponds to label 'Extra Space Issues'
Index 2 corresponds to label 'Special Character Differences'
Index 3 corresponds to label 'Rounded Off Numbers'
Index 4 corresponds to label 'Currency Symbol Difference'
Index 5 corresponds to label 'Leading Zero Issue'
Index 6 corresponds to label 'Negative vs Positive'
Index 7 corresponds to label 'Scientific Notation Difference'
Index 8 corresponds to label 'Thousands Separator Difference'
Index 9 corresponds to label 'No Match'

In [None]:
def prepare_features(word1, word2):
  from pyspark.sql.functions import col
    # Example input words
  # word1 = "srinvas"
  # word2 = "sr invas"

  # Convert input into DataFrame for prediction
  df_input = spark.createDataFrame([(word1, word2)], ["Source", "Destination"])
  df_input = df_input.withColumn("Fuzzy Scores", fuzzy_udf(col("Source"), col("Destination")))
  # Extract individual scores
  df_input = df_input.withColumn("Case_Sensitive_Score", col("Fuzzy Scores")[0]) \
                        .withColumn("Case_Insensitive_Score", col("Fuzzy Scores")[1]) \
                        .withColumn("Case_Sensitivity_Diff", col("Fuzzy Scores")[2]) \
                        .withColumn("Special_Character_Score", col("Fuzzy Scores")[3]) \
                        .withColumn("Special_Character_Diff", col("Fuzzy Scores")[4]) \
                        .withColumn("Space_diff", col("Fuzzy Scores")[5]) \
                        .withColumn("space_score", col("Fuzzy Scores")[6]) \
                        .drop("Fuzzy Scores")
                        #  .withColumn("Abbreviation Diff", col("Fuzzy Scores")[5]) \
  # Apply other UDFs
  df_input = df_input.withColumn("Scientific_Notation", is_scientific_notation_udf(col("Source"), col("Destination")))
  df_input = df_input.withColumn("Thousand_Separator", thousand_separator_difference_udf(col("Source"), col("Destination")))
  df_input = df_input.withColumn("Leading_Zero", leading_zero_check_udf(col("Source"), col("Destination")))
  df_input = df_input.withColumn("Rounded_Off", are_numbers_effectively_equal_udf(col("Source"), col("Destination")))
  df_input = df_input.withColumn("Currency_Diff", detect_currency_udf(col("Source"), col("Destination")))
  df_input = df_input.withColumn("Negative_Check", negative_check_udf(col("Source"), col("Destination")))
  from pyspark.sql.functions import col, when

  # Convert Boolean columns to integer
  columns_to_convert = ['Scientific_Notation',
  'Thousand_Separator',
  'Rounded_Off',
  'Leading_Zero',
  'Currency_Diff',
  'Negative_Check',]
  for column in columns_to_convert:
      df_input = df_input.withColumn(column, when(col(column), 1).otherwise(0))

  # Select relevant features
  feature_columns = ['Scientific_Notation',
  'Thousand_Separator',
  'Rounded_Off',
  'Leading_Zero',
  'Currency_Diff',
  'Negative_Check',
  'Case_Sensitive_Score',
  'Case_Insensitive_Score',
  'Case_Sensitivity_Diff',
  'Special_Character_Score',
  'Special_Character_Diff',
  'Space_diff',
  'space_score',]

  # List of columns to cast
  columns_to_cast = ['Scientific_Notation',
  'Thousand_Separator',
  'Rounded_Off',
  'Leading_Zero',
  'Currency_Diff',
  'Negative_Check',
  'Case_Sensitive_Score',
  'Case_Insensitive_Score',
  'Case_Sensitivity_Diff',
  'Special_Character_Score',
  'Special_Character_Diff',
  'Space_diff',
  'space_score']

  # Cast each column to Integer
  for col_name in columns_to_cast:
      df_input = df_input.withColumn(col_name, col(col_name).cast("int"))
  assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")
  df_input = assembler.transform(df_input)
  return df_input

In [None]:
import ipywidgets as widgets
from IPython.display import display

# Text input fields for the source and destination strings
text_source = widgets.Text(value='srinivas', description='Source:', placeholder='Type something')
text_destination = widgets.Text(value='sr in iva', description='Destination:', placeholder='Type something')

# Button to trigger the prediction
button_predict = widgets.Button(description='Predict')

# Output widget to display results
output = widgets.Output()

# Function to handle button click event
def on_button_clicked(b):
    with output:
        output.clear_output()
        # Assuming 'model' and 'prepare_features' are defined and loaded properly
        word1 = text_source.value
        word2 = text_destination.value
        # Prepare features and predict
        df_input = prepare_features(word1, word2)  # You need to implement this function
        prediction = model.transform(df_input).select("prediction").collect()[0][0]
        # Map prediction index to label
        labels = {
            0: 'Case Sensitivity',
            1: 'Extra Space Issues',
            2: 'Special Character Differences',
            3: 'Rounded Off Numbers',
            4: 'Currency Symbol Difference',
            5: 'Leading Zero Issue',
            6: 'Negative vs Positive',
            7: 'Scientific Notation Difference',
            8: 'Thousands Separator Difference',
            9: 'No Match'
        }
        print(f'Prediction: {labels[prediction]}')

# Attach the button click event
button_predict.on_click(on_button_clicked)

# Display widgets
widgets.VBox([text_source, text_destination, button_predict, output])


In [None]:
Index 0 corresponds to label 'Case Sensitivity'
Index 1 corresponds to label 'Extra Space Issues'
Index 2 corresponds to label 'Special Character Differences'
Index 3 corresponds to label 'Rounded Off Numbers'
Index 4 corresponds to label 'Currency Symbol Difference'
Index 5 corresponds to label 'Leading Zero Issue'
Index 6 corresponds to label 'Negative vs Positive'
Index 7 corresponds to label 'Scientific Notation Difference'
Index 8 corresponds to label 'Thousands Separator Difference'
Index 9 corresponds to label 'No Match'

In [None]:
fuzzy_udf = udf(get_fuzzy_scores, VectorUDT())
is_scientific_notation_udf = udf(scientific_notation_with_tolerance, BooleanType())
thousand_separator_difference_udf = udf(thousand_separator_difference, BooleanType())
leading_zero_check_udf = udf(leading_zero_check, BooleanType())
are_numbers_effectively_equal_udf = udf(are_numbers_effectively_equal, BooleanType())
detect_currency_udf = udf(detect_currency, BooleanType())
negative_check_udf = udf(negative_check, BooleanType())

In [None]:
# Example input words
word1 = "srinvas"
word2 = "sr invas"

# Convert input into DataFrame for prediction
df_input = spark.createDataFrame([(word1, word2)], ["Source", "Destination"])
df_input = df_input.withColumn("features", fuzzy_udf(col("Source"), col("Destination")))

In [None]:

df_input = df_input.withColumn("Scientific_Notation", is_scientific_notation_udf(col("Source"), col("Destination")))
df_input = df_input.withColumn("Thousand_Separator", thousand_separator_difference_udf(col("Source"), col("Destination")))
df_input = df_input.withColumn("Leading_Zero", leading_zero_check_udf(col("Source"), col("Destination")))
df_input = df_input.withColumn("Rounded_Off", are_numbers_effectively_equal_udf(col("Source"), col("Destination")))
df_input = df_input.withColumn("Currency_Difference", detect_currency_udf(col("Source"), col("Destination")))
df_input = df_input.withColumn("Negative_Check", negative_check_udf(col("Source"), col("Destination")))
