## Q1 

In [1]:
# Import necessary libraries and set up Spark session
import pyspark
import os
import sys
from pyspark import SparkContext
# os.environ['PYSPARK_PYTHON'] = sys.executable
# os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable
from pyspark.sql import SparkSession
spark = SparkSession.builder.config("spark.driver.memory", "16g").appName('chapter_2').getOrCreate()

# Read data from CSV files
prev = spark.read.option("recursiveFileLookup", "true").csv("./donation")

# Show a sample of the data
prev.show(2)



+-----+-----+------------+------------+------------+------------+-------+------+------+------+-------+--------+
|  _c0|  _c1|         _c2|         _c3|         _c4|         _c5|    _c6|   _c7|   _c8|   _c9|   _c10|    _c11|
+-----+-----+------------+------------+------------+------------+-------+------+------+------+-------+--------+
| id_1| id_2|cmp_fname_c1|cmp_fname_c2|cmp_lname_c1|cmp_lname_c2|cmp_sex|cmp_bd|cmp_bm|cmp_by|cmp_plz|is_match|
|53719|60579|           1|           ?|           1|           ?|      1|     1|     1|     1|      1|    TRUE|
+-----+-----+------------+------------+------------+------------+-------+------+------+------+-------+--------+
only showing top 2 rows



In [2]:
from pyspark.ml.feature import Tokenizer, StopWordsRemover
from pyspark.sql.functions import col, lower, regexp_replace

parsed = spark.read.option("header", "true").option("nullValue", "?").\
option("inferSchema", "true").csv("donation/block_1.csv")
# Assuming `parsed` is the DataFrame after loading data
def preprocess_data(df):
    # Convert text to lowercase
    df = df.withColumn("clean_text", lower(col("cmp_bd")))

    # Remove special characters and punctuation
    df = df.withColumn("clean_text", regexp_replace(col("clean_text"), "[^a-zA-Z0-9\\s]", ""))

    # Tokenization
    tokenizer = Tokenizer(inputCol="clean_text", outputCol="tokens")
    df = tokenizer.transform(df)

    # Remove stop words
    remover = StopWordsRemover(inputCol="tokens", outputCol="filtered_tokens")
    df = remover.transform(df)

    return df

# Apply the preprocessing function to your data
preprocessed_data = preprocess_data(parsed)
preprocessed_data.show(truncate=False)


+-----+-----+-----------------+------------+------------+------------+-------+------+------+------+-------+--------+----------+------+---------------+
|id_1 |id_2 |cmp_fname_c1     |cmp_fname_c2|cmp_lname_c1|cmp_lname_c2|cmp_sex|cmp_bd|cmp_bm|cmp_by|cmp_plz|is_match|clean_text|tokens|filtered_tokens|
+-----+-----+-----------------+------------+------------+------------+-------+------+------+------+-------+--------+----------+------+---------------+
|37291|53113|0.833333333333333|null        |1.0         |null        |1      |1     |1     |1     |0      |true    |1         |[1]   |[1]            |
|39086|47614|1.0              |null        |1.0         |null        |1      |1     |1     |1     |1      |true    |1         |[1]   |[1]            |
|70031|70237|1.0              |null        |1.0         |null        |1      |1     |1     |1     |1      |true    |1         |[1]   |[1]            |
|84795|97439|1.0              |null        |1.0         |null        |1      |1     |1     |1 

## Q2 

In [3]:
# Perform data analysis and compute similarity scores
from pyspark.sql.functions import col
parsed.groupBy("is_match").count().orderBy(col("count").desc()).show()

parsed.createOrReplaceTempView("linkage")

spark.sql("""
    SELECT is_match, COUNT(*) cnt
    FROM linkage
    GROUP BY is_match
    ORDER BY cnt DESC
""").show()

summary = parsed.describe()
summary.select("summary", "cmp_fname_c1", "cmp_fname_c2").show()

matches = parsed.where("is_match = true")
match_summary = matches.describe()
misses = parsed.filter(col("is_match") == False)
miss_summary = misses.describe()

+--------+------+
|is_match| count|
+--------+------+
|   false|572820|
|    true|  2093|
+--------+------+

+--------+------+
|is_match|   cnt|
+--------+------+
|   false|572820|
|    true|  2093|
+--------+------+

+-------+------------------+------------------+
|summary|      cmp_fname_c1|      cmp_fname_c2|
+-------+------------------+------------------+
|  count|            574811|             10325|
|   mean|0.7127592938253411|0.8977586763518969|
| stddev|0.3889286452463531|0.2742577520430532|
|    min|               0.0|               0.0|
|    max|               1.0|               1.0|
+-------+------------------+------------------+



## Q3 

In [6]:
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.functions import col, expr
from pyspark.sql.types import DoubleType
import pandas 

# Assuming 'parsed' DataFrame is already defined

# Function for pivoting summary data
def pivot_summary(desc):
    desc_p = desc.toPandas()
    desc_p = desc_p.set_index('summary').transpose().reset_index()
    desc_p = desc_p.rename(columns={'index':'field'})
    desc_p = desc_p.rename_axis(None, axis=1)
    descT = spark.createDataFrame(desc_p)
    for c in descT.columns:
        if c == 'field':
            continue
        else:
            descT = descT.withColumn(c, descT[c].cast(DoubleType()))
    return descT

match_summaryT = pivot_summary(match_summary)
miss_summaryT = pivot_summary(miss_summary)
good_features = ["cmp_lname_c1", "cmp_plz", "cmp_by", "cmp_bd", "cmp_bm"]
sum_expression = " + ".join(good_features)

# Evaluate precision, recall, and F1-score
scored = parsed.fillna(0, subset=good_features).\
                withColumn('score', expr(sum_expression)).\
                select('score', 'is_match')

scored.show()

def crossTabs(scored: DataFrame, t: float) -> DataFrame:
    return scored.selectExpr(f"score >= {t} as above", "is_match").\
        groupBy("above").pivot("is_match", ("true", "false")).\
        count()

# Confusion matrix and evaluation metrics
confused = crossTabs(scored, 4.0)
confused.show()

confused2 = crossTabs(scored, 2.0)
confused2.show()

tp = confused.filter("above = true").select("true").collect()[0].true
fp = confused.filter("above = true").select("false").collect()[0].false
fn = confused.filter("above = false").select("true").fillna(0).collect()[0].true
tn = confused.filter("above = false").select("false").collect()[0].false

precision = tp / (tp + fp)
recall = tp / (tp + fn)
f1 = 2 * precision * recall / (precision + recall)

precision, recall, f1

ImportError: Pandas >= 1.0.5 must be installed; however, your version was 0.23.4.