In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lower, regexp_replace, udf
from pyspark.sql.types import StringType
from pyspark.ml.feature import Tokenizer, StopWordsRemover, StringIndexer
import re
import numpy as np

In [2]:
spark = SparkSession.builder\
        .appName("Lab 3")\
        .getOrCreate()



## Q1

In [7]:
df = spark.read.csv('/home/lplab/Desktop/janav_220962049/customers-100.csv', header=True, inferSchema=True)
df.printSchema()

root
 |-- Index: integer (nullable = true)
 |-- Customer Id: string (nullable = true)
 |-- First Name: string (nullable = true)
 |-- Last Name: string (nullable = true)
 |-- Company: string (nullable = true)
 |-- City: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Phone 1: string (nullable = true)
 |-- Phone 2: string (nullable = true)
 |-- Email: string (nullable = true)
 |-- Subscription Date: date (nullable = true)
 |-- Website: string (nullable = true)



In [8]:
def clean_text(text):
    if text:
        # Convert to lowercase
        text = text.lower()
        # Remove punctuation and special characters
        text = re.sub(r'[^\w\s]', '', text)
        # Replace multiple spaces with a single space
        text = re.sub(r'\s+', ' ', text)
    return text

In [9]:
clean_text_udf = udf(clean_text, StringType())

In [10]:
df_cleaned = df.withColumn("Customer Id Cleaned", clean_text_udf(col("Customer Id"))) \
                .withColumn("First Name Cleaned", clean_text_udf(col("First Name"))) \
                .withColumn("Last Name Cleaned", clean_text_udf(col("Last Name"))) \
                .withColumn("Company Cleaned", clean_text_udf(col("Company"))) \
                .withColumn("City Cleaned", clean_text_udf(col("City"))) \
                .withColumn("Country Cleaned", clean_text_udf(col("Country"))) \
                .withColumn("Phone 1 Cleaned", clean_text_udf(col("Phone 1"))) \
                .withColumn("Phone 2 Cleaned", clean_text_udf(col("Phone 2"))) \
                .withColumn("Email Cleaned", clean_text_udf(col("Email"))) \
                .withColumn("Website Cleaned", clean_text_udf(col("Website")))

df_cleaned.printSchema()

root
 |-- Index: integer (nullable = true)
 |-- Customer Id: string (nullable = true)
 |-- First Name: string (nullable = true)
 |-- Last Name: string (nullable = true)
 |-- Company: string (nullable = true)
 |-- City: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Phone 1: string (nullable = true)
 |-- Phone 2: string (nullable = true)
 |-- Email: string (nullable = true)
 |-- Subscription Date: date (nullable = true)
 |-- Website: string (nullable = true)
 |-- Customer Id Cleaned: string (nullable = true)
 |-- First Name Cleaned: string (nullable = true)
 |-- Last Name Cleaned: string (nullable = true)
 |-- Company Cleaned: string (nullable = true)
 |-- City Cleaned: string (nullable = true)
 |-- Country Cleaned: string (nullable = true)
 |-- Phone 1 Cleaned: string (nullable = true)
 |-- Phone 2 Cleaned: string (nullable = true)
 |-- Email Cleaned: string (nullable = true)
 |-- Website Cleaned: string (nullable = true)



In [11]:
tokenizer = Tokenizer(inputCol="First Name Cleaned", outputCol="First Name Tokens")
df_tokens = tokenizer.transform(df_cleaned)
print(df_tokens)

DataFrame[Index: int, Customer Id: string, First Name: string, Last Name: string, Company: string, City: string, Country: string, Phone 1: string, Phone 2: string, Email: string, Subscription Date: date, Website: string, Customer Id Cleaned: string, First Name Cleaned: string, Last Name Cleaned: string, Company Cleaned: string, City Cleaned: string, Country Cleaned: string, Phone 1 Cleaned: string, Phone 2 Cleaned: string, Email Cleaned: string, Website Cleaned: string, First Name Tokens: array<string>]


## Q2

In [12]:
from pyspark.ml.linalg import Vectors, VectorUDT
from pyspark.sql.types import ArrayType, FloatType

In [13]:
def jaccard_similarity(set1, set2):
    set1 = set(set1)
    set2 = set(set2)
    intersection = len(set1.intersection(set2))
    union = len(set1.union(set2))
    return intersection / union if union != 0 else 0.0

jaccard_similarity_udf = udf(jaccard_similarity, FloatType())

# Convert phone numbers to sets of characters for comparison
def phone_to_set(phone):
    return list(phone) if phone else []

phone_to_set_udf = udf(phone_to_set, ArrayType(StringType()))

In [14]:
df_with_sets = df.withColumn("Phone 1 Set", phone_to_set_udf(col("Phone 1"))) \
                 .withColumn("Phone 2 Set", phone_to_set_udf(col("Phone 2")))
df_with_jaccard = df_with_sets.withColumn("Jaccard Similarity", jaccard_similarity_udf(col("Phone 1 Set"), col("Phone 2 Set")))

In [15]:
results = df_with_jaccard.collect()

for row in results:
    phone1 = row['Phone 1']
    phone2 = row['Phone 2']
    similarity = row['Jaccard Similarity']
    print(f"Phone 1: {phone1}, Phone 2: {phone2}, Jaccard Similarity: {similarity}")

Phone 1: 229.077.5154, Phone 2: 397.884.0519x718, Jaccard Similarity: 0.6363636255264282
Phone 1: 5153435776, Phone 2: 686-620-1820x944, Jaccard Similarity: 0.25
Phone 1: +1-539-402-0259, Phone 2: (496)978-3969x58947, Jaccard Similarity: 0.3333333432674408
Phone 1: 001-808-617-6467x12895, Phone 2: +1-813-324-8756, Jaccard Similarity: 0.6153846383094788
Phone 1: 001-234-203-0635x76146, Phone 2: 001-199-446-3860x3486, Jaccard Similarity: 0.5833333134651184
Phone 1: (283)437-3886x88321, Phone 2: 999-728-1637, Jaccard Similarity: 0.5833333134651184
Phone 1: (496)452-6181x3291, Phone 2: +1-247-266-0963x4995, Jaccard Similarity: 0.6000000238418579
Phone 1: 001-583-352-7197x297, Phone 2: 001-333-145-0369, Jaccard Similarity: 0.5
Phone 1: 854-138-4911x5772, Phone 2: +1-448-910-2276x729, Jaccard Similarity: 0.6153846383094788
Phone 1: 739.218.2516x459, Phone 2: 001-054-401-0347x617, Jaccard Similarity: 0.5384615659713745
Phone 1: 637-854-0256x825, Phone 2: 114.336.0784x788, Jaccard Similarity: 

## Q3

In [16]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.sql.types import DoubleType

# Initialize SparkSession
spark = SparkSession.builder \
    .appName("EntityResolutionEvaluation") \
    .getOrCreate()

# Sample data for true and predicted labels
data = [
    (0, 0),
    (0, 1),
    (1, 1),
    (1, 0),
    (1, 1),
    (0, 0),
    (1, 1),
    (0, 1)
]

# Create a DataFrame
df = spark.createDataFrame(data, ["true_label", "predicted_label"])

# Convert columns to DoubleType
df = df.withColumn("true_label", col("true_label").cast(DoubleType()))
df = df.withColumn("predicted_label", col("predicted_label").cast(DoubleType()))

# Initialize evaluator
evaluator = MulticlassClassificationEvaluator(
    labelCol="true_label",
    predictionCol="predicted_label"
)

# Calculate Precision
precision = evaluator.setMetricName("precisionByLabel").evaluate(df)
print(f"Precision: {precision}")

# Calculate Recall
recall = evaluator.setMetricName("recallByLabel").evaluate(df)
print(f"Recall: {recall}")

# Calculate F1-Score
f1_score = evaluator.setMetricName("f1").evaluate(df)
print(f"F1-Score: {f1_score}")

# Stop SparkSession
spark.stop()


Precision: 0.6666666666666666
Recall: 0.5
F1-Score: 0.6190476190476191
