## Pyspark session setup with Minio, lineage data table creation, scoring and model evaluation

In [2]:
from pyspark.sql import SparkSession

# Initialize Spark with Iceberg catalog configurations
spark = SparkSession.builder \
    .appName("Iceberg-MinIO-Demo") \
    .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
    .config("spark.sql.catalog.local", "org.apache.iceberg.spark.SparkCatalog") \
    .config("spark.sql.catalog.local.type", "hadoop") \
    .config("spark.sql.catalog.local.warehouse", "s3a://local-datalake/warehouse") \
    .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
    .config("spark.hadoop.fs.s3a.connection.ssl.enabled", "false") \
    .config("spark.hadoop.fs.s3a.path.style.access", "true") \
    .config("spark.hadoop.fs.s3a.impl.disable.cache", "true") \
    .config("spark.hadoop.fs.s3a.fast.upload", "true") \
    .config("spark.hadoop.fs.s3a.connection.establish.timeout", "5000") \
    .config("spark.hadoop.fs.s3a.attempts.maximum", "20") \
    .config("spark.hadoop.fs.s3a.connection.timeout", "10000") \
    .config("spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version", "2") \
    .getOrCreate()

spark.conf.set("spark.hadoop.fs.s3a.access.key", "fakesecret")
spark.conf.set("spark.hadoop.fs.s3a.secret.key", "fakesecret")
spark.conf.set("spark.hadoop.fs.s3a.endpoint", "http://minio-service:9000")

# Use AnonymousAWSCredentialsProvider for read operations
spark.conf.set("spark.hadoop.fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.AnonymousAWSCredentialsProvider")

# Add ACL permissions
spark.conf.set("spark.hadoop.fs.s3a.acl.default", "PublicReadWrite")
spark.conf.set("spark.hadoop.fs.s3a.multipart.size", "5242880")

# Print Spark configurations to verify they are set correctly
print("Spark Iceberg Configurations:")
print(f"Catalog type: {spark.conf.get('spark.sql.catalog.local.type')}")
print(f"Warehouse location: {spark.conf.get('spark.sql.catalog.local.warehouse')}")
print(f"S3A endpoint: {spark.conf.get('spark.hadoop.fs.s3a.endpoint')}")

# Create namespace for our tables
print("\nCreating namespace...")
spark.sql("CREATE NAMESPACE IF NOT EXISTS local.demo")
print("Namespace created: local.demo")

Spark Iceberg Configurations:
Catalog type: hadoop
Warehouse location: s3a://local-datalake/warehouse
S3A endpoint: http://minio-service:9000

Creating namespace...
Namespace created: local.demo


In [4]:
table_name = "local.demo.linkage"

parsed = spark.read.option("header", "true").option("nullValue", "?").\
option("inferSchema", "true").csv("s3a://local-datalake/raw/block*.csv")

parsed.writeTo(table_name) \
    .tableProperty("write.format.default", "parquet") \
    .tableProperty("write.parquet.compression-codec", "snappy") \
    .createOrReplace()

In [5]:
parsed = spark.table("local.demo.linkage")
parsed.show()
parsed.count()

+-----+-----+-----------------+------------+------------+------------+-------+------+------+------+-------+--------+
| id_1| id_2|     cmp_fname_c1|cmp_fname_c2|cmp_lname_c1|cmp_lname_c2|cmp_sex|cmp_bd|cmp_bm|cmp_by|cmp_plz|is_match|
+-----+-----+-----------------+------------+------------+------------+-------+------+------+------+-------+--------+
|37291|53113|0.833333333333333|        NULL|         1.0|        NULL|      1|     1|     1|     1|      0|    true|
|39086|47614|              1.0|        NULL|         1.0|        NULL|      1|     1|     1|     1|      1|    true|
|70031|70237|              1.0|        NULL|         1.0|        NULL|      1|     1|     1|     1|      1|    true|
|84795|97439|              1.0|        NULL|         1.0|        NULL|      1|     1|     1|     1|      1|    true|
|36950|42116|              1.0|        NULL|         1.0|         1.0|      1|     1|     1|     1|      1|    true|
|42413|48491|              1.0|        NULL|         1.0|       

5749132

In [7]:
from pyspark.sql import DataFrame
from pyspark.sql.types import DoubleType
from pyspark.sql.functions import col
def pivot_summary(desc):
   # convert to pandas dataframe
   desc_p = desc.toPandas()
   # transpose
   desc_p = desc_p.set_index('summary').transpose().reset_index()
   desc_p = desc_p.rename(columns={'index':'field'})
   desc_p = desc_p.rename_axis(None, axis=1)
   # convert to Spark dataframe
   descT = spark.createDataFrame(desc_p)
   # convert metric columns to double from string
   for c in descT.columns:
    if c == 'field':
        continue
    else:
        descT = descT.withColumn(c, descT[c].cast(DoubleType()))
   return descT
    
parsed = spark.table("local.demo.linkage")
parsed.cache()
matches = parsed.where("is_match = true")
match_summary = matches.describe()
misses = parsed.filter(col("is_match") == False)
miss_summary = misses.describe()

match_summaryT = pivot_summary(match_summary)
miss_summaryT = pivot_summary(miss_summary)

In [8]:
match_summaryT.createOrReplaceTempView("match_desc")
miss_summaryT.createOrReplaceTempView("miss_desc")
spark.sql("""
SELECT a.field, a.count + b.count total, a.mean - b.mean delta
FROM match_desc a INNER JOIN miss_desc b ON a.field = b.field
WHERE a.field NOT IN ("id_1", "id_2")
ORDER BY delta DESC, total DESC
""").show()

+------------+---------+--------------------+
|       field|    total|               delta|
+------------+---------+--------------------+
|     cmp_plz|5736289.0|  0.9563812499852176|
|cmp_lname_c2|   2464.0|  0.8064147192926265|
|      cmp_by|5748337.0|  0.7762059675300512|
|      cmp_bd|5748337.0|   0.775442311783404|
|cmp_lname_c1|5749132.0|   0.683877248259059|
|      cmp_bm|5748337.0|  0.5109496938298685|
|cmp_fname_c1|5748125.0| 0.28545290574607884|
|cmp_fname_c2| 103698.0| 0.09104268062279974|
|     cmp_sex|5749132.0|0.032408185250332844|
+------------+---------+--------------------+



In [9]:
from pyspark.sql.functions import expr

good_features = ["cmp_lname_c1", "cmp_plz", "cmp_by", "cmp_bd", "cmp_bm"]
sum_expression = " + ".join(good_features)

scored = parsed.fillna(0, subset=good_features).\
            withColumn('score', expr(sum_expression)).\
            select('score', 'is_match')

scored.show()

+-----+--------+
|score|is_match|
+-----+--------+
|  4.0|    true|
|  5.0|    true|
|  5.0|    true|
|  5.0|    true|
|  5.0|    true|
|  5.0|    true|
|  5.0|    true|
|  4.0|    true|
|  5.0|    true|
|  5.0|    true|
|  5.0|    true|
|  5.0|    true|
|  5.0|    true|
|  5.0|    true|
|  4.0|    true|
|  5.0|    true|
|  5.0|    true|
|  5.0|    true|
|  5.0|    true|
|  5.0|    true|
+-----+--------+
only showing top 20 rows



In [10]:
def crossTabs(scored: DataFrame, t: DoubleType) -> DataFrame:
    return scored.selectExpr(f"score >= {t} as above", "is_match").\
            groupBy("above").pivot("is_match", ("true", "false")).\
            count()

crossTabs(scored, 4.0).show()
crossTabs(scored, 2.0).show()

+-----+-----+-------+
|above| true|  false|
+-----+-----+-------+
| true|20871|    637|
|false|   60|5727564|
+-----+-----+-------+

+-----+-----+-------+
|above| true|  false|
+-----+-----+-------+
| true|20931| 596414|
|false| NULL|5131787|
+-----+-----+-------+

