In [1]:
# Install packages
import sys
import os
import findspark

!{sys.executable} -m pip install thefuzz

# Import java packages
os.environ['PYSPARK_SUBMIT_ARGS'] = '--driver-class-path /home/marcelo/libs/mysql-connector-java-8.0.19.jar --jars /home/marcelo/libs/mysql-connector-java-8.0.19.jar pyspark-shell'
findspark.add_packages('mysql:mysql-connector-java:8.0.19')



In [2]:
import re
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.functions import col, concat_ws, explode, size, to_timestamp, udf, upper
from pyspark.sql.types import ArrayType, StructType, StructField, IntegerType, StringType

from itertools import combinations
import hashlib
from thefuzz import fuzz
from thefuzz import process



In [3]:
spark = SparkSession \
    .builder \
    .appName('POC ETL') \
    .master("local[*]") \
    .getOrCreate()

### Reading Data From JDBC

In [4]:
address_df = spark.read.format("jdbc") \
    .option("url", "jdbc:mysql://mysql:3306/persondb") \
    .option("driver", "com.mysql.jdbc.Driver").option("dbtable", "address") \
    .option("user", "root").option("password", "123456").load()

address_df.printSchema()
address_df.show(10)

root
 |-- id: string (nullable = true)
 |-- person_id: string (nullable = true)
 |-- line1: string (nullable = true)
 |-- line2: string (nullable = true)
 |-- zipcode: string (nullable = true)
 |-- city: string (nullable = true)
 |-- country: string (nullable = true)

+---+---------+-----+-----+-------+----+-------+
| id|person_id|line1|line2|zipcode|city|country|
+---+---------+-----+-----+-------+----+-------+
+---+---------+-----+-----+-------+----+-------+



### ETL

In [5]:
def md5_str(val):
    result = hashlib.md5(val.encode())
    return str(result.hexdigest())

@udf(returnType=StringType())
def md5_udf_func(val):
    return md5_str(val)


@udf(returnType=ArrayType(StructType([
    StructField("ratio", IntegerType(), False),
    StructField("md5_pair_0", StringType(), False),
    StructField("md5_pair_1", StringType(), False)
    ]))
)
def fuzz_udf_func(arr):
    return [[fuzz.ratio(comb[0], comb[1]), md5_str(comb[0]), md5_str(comb[1])] for comb in list(combinations(arr, 2))
]

In [6]:
address_df = address_df.withColumn("complete_address", concat_ws(",","line1",'line2','zipcode','city','country'))
address_df = address_df.withColumn("complete_address", upper('complete_address')).withColumn("md5", md5_udf_func('complete_address'))
address_df.printSchema()
address_df.show(10)

root
 |-- id: string (nullable = true)
 |-- person_id: string (nullable = true)
 |-- line1: string (nullable = true)
 |-- line2: string (nullable = true)
 |-- zipcode: string (nullable = true)
 |-- city: string (nullable = true)
 |-- country: string (nullable = true)
 |-- complete_address: string (nullable = false)
 |-- md5: string (nullable = true)

+---+---------+-----+-----+-------+----+-------+----------------+---+
| id|person_id|line1|line2|zipcode|city|country|complete_address|md5|
+---+---------+-----+-----+-------+----+-------+----------------+---+
+---+---------+-----+-----+-------+----+-------+----------------+---+



### Grouping

In [7]:
addresses_grouped_df = address_df.groupBy('person_id').agg(F.collect_list("complete_address").alias('addresses'))
addresses_grouped_df = addresses_grouped_df.withColumn('addresses_size', size("addresses"))
addresses_grouped_df = addresses_grouped_df.where(col("addresses_size") > 1)
addresses_grouped_df = addresses_grouped_df.withColumn("fuzz_info", fuzz_udf_func('addresses'))
addresses_grouped_df.printSchema()
addresses_grouped_df.show(10)
addresses_grouped_df.select("person_id", "fuzz_info.ratio", "fuzz_info.md5_pair_0", "fuzz_info.md5_pair_1").show(10)

root
 |-- person_id: string (nullable = true)
 |-- addresses: array (nullable = false)
 |    |-- element: string (containsNull = false)
 |-- addresses_size: integer (nullable = false)
 |-- fuzz_info: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- ratio: integer (nullable = false)
 |    |    |-- md5_pair_0: string (nullable = false)
 |    |    |-- md5_pair_1: string (nullable = false)

+---------+---------+--------------+---------+
|person_id|addresses|addresses_size|fuzz_info|
+---------+---------+--------------+---------+
+---------+---------+--------------+---------+

+---------+-----+----------+----------+
|person_id|ratio|md5_pair_0|md5_pair_1|
+---------+-----+----------+----------+
+---------+-----+----------+----------+



### Normalize

In [8]:
addresses_grouped_norm_df = addresses_grouped_df.select("person_id", explode("fuzz_info").alias('fuzz_info'))
addresses_grouped_norm_df.printSchema()
addresses_grouped_norm_df.show(10)

root
 |-- person_id: string (nullable = true)
 |-- fuzz_info: struct (nullable = true)
 |    |-- ratio: integer (nullable = false)
 |    |-- md5_pair_0: string (nullable = false)
 |    |-- md5_pair_1: string (nullable = false)

+---------+---------+
|person_id|fuzz_info|
+---------+---------+
+---------+---------+



### Joining dataframes

In [9]:
conditions = [addresses_grouped_norm_df.person_id == address_df.person_id, (addresses_grouped_norm_df.fuzz_info.md5_pair_0 == address_df.md5) | (addresses_grouped_norm_df.fuzz_info.md5_pair_1 == address_df.md5)]
#addresses_joined_df = addresses_grouped_norm_df.join(address_df, (addresses_grouped_norm_df.person_id == address_df.person_id) & ((addresses_grouped_norm_df.fuzz_info.md5_pair_0 == address_df.md5) | (addresses_grouped_norm_df.fuzz_info.md5_pair_1 == address_df.md5)), "left")
#print(addresses_grouped_norm_df.join(address_df, (addresses_grouped_norm_df.person_id == address_df.person_id) & ((addresses_grouped_norm_df.fuzz_info.md5_pair_0 == address_df.md5) | (addresses_grouped_norm_df.fuzz_info.md5_pair_1 == address_df.md5)), "left").count())
addresses_joined_df = addresses_grouped_norm_df.join(address_df, conditions, "left").drop(address_df.person_id)
addresses_joined_df.printSchema()
addresses_joined_df.show(10)

root
 |-- fuzz_info: struct (nullable = true)
 |    |-- ratio: integer (nullable = false)
 |    |-- md5_pair_0: string (nullable = false)
 |    |-- md5_pair_1: string (nullable = false)
 |-- id: string (nullable = true)
 |-- person_id: string (nullable = true)
 |-- line1: string (nullable = true)
 |-- line2: string (nullable = true)
 |-- zipcode: string (nullable = true)
 |-- city: string (nullable = true)
 |-- country: string (nullable = true)
 |-- complete_address: string (nullable = true)
 |-- md5: string (nullable = true)

+---------+---+---------+-----+-----+-------+----+-------+----------------+---+
|fuzz_info| id|person_id|line1|line2|zipcode|city|country|complete_address|md5|
+---------+---+---------+-----+-----+-------+----+-------+----------------+---+
+---------+---+---------+-----+-----+-------+----+-------+----------------+---+



### Saving results

In [10]:
addresses_joined_df.write.mode('overwrite').parquet("addresses_filtered")