# LAB 4: Entity Resolution Application using PySpark

In [2]:
import pyspark
import pandas as pd

1. Cleaning and Preprocessing data (Tokenization and Normalization)

In [3]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lower, regexp_replace, udf
from pyspark.sql.types import StringType
from pyspark.ml.feature import Tokenizer, StopWordsRemover, RegexTokenizer
import re

# Initialize Spark session
spark = SparkSession.builder \
    .appName("DataCleaningPreprocessing") \
    .getOrCreate()

# Sample DataFrame creation (Replace this with actual data loading)
data = [
    (1, "John Smith", "1234 Elm St., Apt 4, New York, NY"),
    (2, "jane doe", "5678 Oak St., Apt 5, Los Angeles, CA"),
    (3, "John Smith", "1234 elm street, Apt 4, new york, ny"),
]

columns = ["id", "name", "address"]

df = spark.createDataFrame(data, columns)

# Define a UDF for normalization (lowercasing and removing special characters)
def normalize_text(text):
    if text is None:
        return None
    text = text.lower()  # Convert to lowercase
    text = re.sub(r'[^a-z0-9\s]', '', text)  # Remove special characters
    return text

normalize_udf = udf(normalize_text, StringType())

# Apply normalization
df = df.withColumn("normalized_name", normalize_udf(col("name")))
df = df.withColumn("normalized_address", normalize_udf(col("address")))

# Tokenization of the text
tokenizer = Tokenizer(inputCol="normalized_name", outputCol="name_tokens")
df = tokenizer.transform(df)

tokenizer = Tokenizer(inputCol="normalized_address", outputCol="address_tokens")
df = tokenizer.transform(df)

# Remove stop words from tokens (assuming English stop words)
stopwords_remover = StopWordsRemover(inputCol="name_tokens", outputCol="filtered_name_tokens")
df = stopwords_remover.transform(df)

stopwords_remover = StopWordsRemover(inputCol="address_tokens", outputCol="filtered_address_tokens")
df = stopwords_remover.transform(df)

# Show the cleaned and tokenized DataFrame
df.select("id", "name", "normalized_name", "filtered_name_tokens", "address", "normalized_address", "filtered_address_tokens").show(truncate=False)

# Stop the Spark session
spark.stop()


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/08/24 16:08:59 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
                                                                                

+---+----------+---------------+--------------------+------------------------------------+---------------------------------+------------------------------------------+
|id |name      |normalized_name|filtered_name_tokens|address                             |normalized_address               |filtered_address_tokens                   |
+---+----------+---------------+--------------------+------------------------------------+---------------------------------+------------------------------------------+
|1  |John Smith|john smith     |[john, smith]       |1234 Elm St., Apt 4, New York, NY   |1234 elm st apt 4 new york ny    |[1234, elm, st, apt, 4, new, york, ny]    |
|2  |jane doe  |jane doe       |[jane, doe]         |5678 Oak St., Apt 5, Los Angeles, CA|5678 oak st apt 5 los angeles ca |[5678, oak, st, apt, 5, los, angeles, ca] |
|3  |John Smith|john smith     |[john, smith]       |1234 elm street, Apt 4, new york, ny|1234 elm street apt 4 new york ny|[1234, elm, street, apt, 4, new, yor

2. Computing similarity scores between records using a chosen similarity metric

In [4]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col
from pyspark.sql.types import StructType, StructField, ArrayType, DoubleType
import math

# Initialize SparkSession
spark = SparkSession.builder \
    .appName("SimilarityComputation") \
    .getOrCreate()

# Define the schema for the DataFrame
schema = StructType([
    StructField("id", DoubleType(), False),
    StructField("features", ArrayType(DoubleType()), False)
])

# Create sample data with float values
data = [
    (1.0, [1.0, 2.0, 3.0]),
    (2.0, [4.0, 5.0, 6.0]),
    (3.0, [7.0, 8.0, 9.0])
]

# Create the DataFrame
df = spark.createDataFrame(data, schema=schema)

# Define a UDF for cosine similarity using native Python
def cosine_similarity(v1, v2):
    dot_product = sum(a * b for a, b in zip(v1, v2))
    norm_v1 = math.sqrt(sum(a * a for a in v1))
    norm_v2 = math.sqrt(sum(b * b for b in v2))
    if norm_v1 == 0 or norm_v2 == 0:
        return 0.0
    return dot_product / (norm_v1 * norm_v2)

cosine_similarity_udf = udf(cosine_similarity, DoubleType())

# Create a self-join to compute pairwise similarities
df_cross = df.alias("df1").crossJoin(df.alias("df2"))

# Calculate cosine similarity
df_similarity = df_cross.withColumn(
    "similarity",
    cosine_similarity_udf(col("df1.features"), col("df2.features"))
).filter(col("df1.id") < col("df2.id"))

df_similarity.show()

# Stop SparkSession
spark.stop()


                                                                                

+---+---------------+---+---------------+------------------+
| id|       features| id|       features|        similarity|
+---+---------------+---+---------------+------------------+
|1.0|[1.0, 2.0, 3.0]|2.0|[4.0, 5.0, 6.0]|0.9746318461970762|
|1.0|[1.0, 2.0, 3.0]|3.0|[7.0, 8.0, 9.0]|0.9594119455666703|
|2.0|[4.0, 5.0, 6.0]|3.0|[7.0, 8.0, 9.0]|0.9981908926857269|
+---+---------------+---+---------------+------------------+



3. Evaluating the precision, recall, and F1-score of an entity resolution model

In [5]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when, count, sum
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

# Initialize SparkSession
spark = SparkSession.builder \
    .appName("EntityResolutionEvaluation") \
    .getOrCreate()

# Define schema
schema = StructType([
    StructField("id", IntegerType(), False),
    StructField("true_label", StringType(), False),
    StructField("predicted_label", StringType(), False)
])

# Sample data
data = [
    (1, "A", "A"),
    (2, "B", "A"),
    (3, "A", "B"),
    (4, "B", "B"),
    (5, "A", "A"),
    (6, "B", "B"),
    (7, "A", "B"),
    (8, "B", "A")
]

# Create DataFrame
df = spark.createDataFrame(data, schema=schema)

# Define metrics calculation
def calculate_metrics(df):
    # Count True Positives (TP), False Positives (FP), False Negatives (FN), and True Negatives (TN)
    tp = df.filter((col("true_label") == col("predicted_label")) & (col("true_label") == "A")).count()
    fp = df.filter((col("true_label") != col("predicted_label")) & (col("predicted_label") == "A")).count()
    fn = df.filter((col("true_label") != col("predicted_label")) & (col("true_label") == "A")).count()
    tn = df.filter((col("true_label") == col("predicted_label")) & (col("true_label") == "B")).count()
    
    # Compute precision, recall, and F1-score
    precision = tp / (tp + fp) if (tp + fp) != 0 else 0
    recall = tp / (tp + fn) if (tp + fn) != 0 else 0
    f1_score = 2 * (precision * recall) / (precision + recall) if (precision + recall) != 0 else 0
    
    return precision, recall, f1_score

# Calculate metrics
precision, recall, f1_score = calculate_metrics(df)

# Print results
print(f"Precision: {precision:.4f}")
print(f"Recall: {recall:.4f}")
print(f"F1-Score: {f1_score:.4f}")

# Stop SparkSession
spark.stop()


Precision: 0.5000
Recall: 0.5000
F1-Score: 0.5000
