In [0]:
!pip install fuzzywuzzy

Collecting fuzzywuzzy
  Obtaining dependency information for fuzzywuzzy from https://files.pythonhosted.org/packages/43/ff/74f23998ad2f93b945c0309f825be92e04e0348e062026998b5eefef4c33/fuzzywuzzy-0.18.0-py2.py3-none-any.whl.metadata
  Downloading fuzzywuzzy-0.18.0-py2.py3-none-any.whl.metadata (4.9 kB)
Downloading fuzzywuzzy-0.18.0-py2.py3-none-any.whl (18 kB)
Installing collected packages: fuzzywuzzy
Successfully installed fuzzywuzzy-0.18.0
[43mNote: you may need to restart the kernel using %restart_python or dbutils.library.restartPython() to use updated packages.[0m


##Create Dataframes with sample data 
#### Dataframe1 df1 = Data for glue data catalog with schema name, table name and s3 path of the table where data is stored
#### Dataframe2 df2 = Data for s3 path from s3 inventory to give s3 path object with fully qualified path of object and corresponding storage cost

In [0]:
df1 = spark.createDataFrame([("schema1", "table1", "s3://bucket/abc/def/table1"),
                             ("schema1", "table2", "s3://bucket/abc/table2"),
                             ("schema1", "table3", "s3://bucket/abc/def/ghi/table3"),
                             ("schema1", "table4", "s3://bucket/abc/pqr/xyz/table4"),
                             ("schema2", "table5", "s3://bucket/ijk/table5")],["schema","table","table_path"])
df2 = spark.createDataFrame([("s3://bucket/abc/def/table1/1.parquet", 100),
                             ("s3://bucket/abc/def/table1/2.parquet", 40),
                             ("s3://bucket/abc/table2/pt_cycle_id=20250121000000/1.parquet", 100),
                             ("s3://bucket/abc/table2/pt_cycle_id=20250121000000/2.parquet", 110),
                             ("s3://bucket/abc/table2/pt_cycle_id=20250121000000/3.csv", 160),
                             ("s3://bucket/abc/pqr/xyz/table4/2.parquet", 260),
                             ("s3://bucket/abc/pqr/xyz/table4/aaa/7.parquet", 580),
                             ("s3://bucket/ijk/table5/0.parquet", 780),
                             ("s3://bucket/1.parquet", 1000),
                             ("s3://bucket/ppp/uss/1.parquet", 1780)],["s3_object", "cost"])


## Show data for data frames

In [0]:
display(df1)

schema,table,table_path
schema1,table1,s3://bucket/abc/def/table1
schema1,table2,s3://bucket/abc/table2
schema1,table3,s3://bucket/abc/def/ghi/table3
schema1,table4,s3://bucket/abc/pqr/xyz/table4
schema2,table5,s3://bucket/ijk/table5



## Show data for dataframe 2


In [0]:
display(df2)

s3_object,cost
s3://bucket/abc/def/table1/1.parquet,100
s3://bucket/abc/def/table1/2.parquet,40
s3://bucket/abc/table2/pt_cycle_id=20250121000000/1.parquet,100
s3://bucket/abc/table2/pt_cycle_id=20250121000000/2.parquet,110
s3://bucket/abc/table2/pt_cycle_id=20250121000000/3.csv,160
s3://bucket/abc/pqr/xyz/table4/2.parquet,260
s3://bucket/abc/pqr/xyz/table4/aaa/7.parquet,580
s3://bucket/ijk/table5/0.parquet,780
s3://bucket/1.parquet,1000
s3://bucket/ppp/uss/1.parquet,1780



## Create a sql UDF with fuzzywuzzy to do a partial join with closes match of string. Partial token ratio is used in cases where one string is subset of another, which is the case for us. There are other fuzzy match ratios in fuzzywuzzy.

In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import IntegerType
from fuzzywuzzy import fuzz

def match(str1, str2):
  match = fuzz.partial_ratio(str1, str2)
  return match

fuzzy_match = udf(match, IntegerType())



## Register the dataframes as temporary view in spark

In [0]:
df1.createOrReplaceTempView("tables")
df2.createOrReplaceTempView("s3_inventory")


##Register the udf to be used in spark UDF

In [0]:
spark.udf.register("fuzzy_match", match, IntegerType())

<function __main__.match(str1, str2)>

##Perform a cross join and return only records with partial match value as 100. 

In [0]:
%sql
select *
from
tables t
cross join
s3_inventory s3
where fuzzy_match(t.table_path, s3.s3_object)=100

schema,table,table_path,s3_object,cost
schema1,table1,s3://bucket/abc/def/table1,s3://bucket/abc/def/table1/1.parquet,100
schema1,table1,s3://bucket/abc/def/table1,s3://bucket/abc/def/table1/2.parquet,40
schema1,table2,s3://bucket/abc/table2,s3://bucket/abc/table2/pt_cycle_id=20250121000000/1.parquet,100
schema1,table2,s3://bucket/abc/table2,s3://bucket/abc/table2/pt_cycle_id=20250121000000/2.parquet,110
schema1,table2,s3://bucket/abc/table2,s3://bucket/abc/table2/pt_cycle_id=20250121000000/3.csv,160
schema1,table4,s3://bucket/abc/pqr/xyz/table4,s3://bucket/abc/pqr/xyz/table4/2.parquet,260
schema1,table4,s3://bucket/abc/pqr/xyz/table4,s3://bucket/abc/pqr/xyz/table4/aaa/7.parquet,580
schema2,table5,s3://bucket/ijk/table5,s3://bucket/ijk/table5/0.parquet,780
