# Chapter 2. Introduction to Data Analysis with PySpark

The dataset used in below code was curated from a record linkage study performed at a German hospital in 2010, and it contains several million pairs of patient records that were matched according to several different criteria, such as the patient’s name (first and last), address, and birthday. Each matching field was assigned a numerical score from 0.0 to 1.0 based on how similar the strings were, and the data was then hand-labeled to identify which pairs represented the same person and which did not.

The first two fields are integer IDs that represent the patients who were matched in the record.

The following fields are stored as integers when the only possible values are match (1) or no-match (0), and doubles whenever partial matches are possible.

The last field is a boolean value (true or false) indicating whether or not the pair of patient records represented by the line was a match.

The goal is to come up with a simple classifier that allows us to predict whether a record will be a match based on the values of the match scores for the patient records.

In [3]:
import pyspark

from pyspark.sql import SparkSession

In [5]:
spark = SparkSession.builder.config("spark.driver.memory", "4g").appName("intro_to_pyspark").getOrCreate()

24/12/24 08:17:14 WARN Utils: Your hostname, green-nbjupyterhub12 resolves to a loopback address: 127.0.0.1; using 10.0.0.73 instead (on interface ens5)
24/12/24 08:17:14 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/12/24 08:17:15 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


When Spark reads a CSV file without schema inference, by default, every column in it is treated as a `string` type.

In order to perform the schema inference, Spark must do two passes over the dataset:
one pass to figure out the type of each column, and a second pass to do the actual
parsing.

In [7]:
parsed_df = spark.read.option("header", "true").option("inferSchema", "true").option("nullValue", "?").csv("./datasets/donation/block_*.csv")

                                                                                

## Analyzing Data

In [9]:
parsed_df.printSchema()

root
 |-- id_1: integer (nullable = true)
 |-- id_2: integer (nullable = true)
 |-- cmp_fname_c1: double (nullable = true)
 |-- cmp_fname_c2: double (nullable = true)
 |-- cmp_lname_c1: double (nullable = true)
 |-- cmp_lname_c2: double (nullable = true)
 |-- cmp_sex: integer (nullable = true)
 |-- cmp_bd: integer (nullable = true)
 |-- cmp_bm: integer (nullable = true)
 |-- cmp_by: integer (nullable = true)
 |-- cmp_plz: integer (nullable = true)
 |-- is_match: boolean (nullable = true)



If you know the schema that you want to use for a CSV file ahead of time, you can create an instance of the `pyspark.sql.types.StructType` class and pass it to the Reader API via the `schema` function. **This can have a significant performance benefit when the dataset is very large, since Spark will not need to perform an extra pass over the data to figure out the data type of each column.**

For example,

In [None]:
from pyspark.sql.types import *

schema = StructType(
    [
        StructField("id_1", IntegerType(), False), 
        StructField("id_2", StringType(), False),
        StructField("cmp_fname_c1", DoubleType(), False)
    ]
)

# or use DDL (Data Definition Language)
# schema = "id_1 INT, id_2 INT, cmp_fname_c1 DOUBLE"

spark.read.schema(schema).csv("...")

In [11]:
parsed_df.cache()

DataFrame[id_1: int, id_2: int, cmp_fname_c1: double, cmp_fname_c2: double, cmp_lname_c1: double, cmp_lname_c2: double, cmp_sex: int, cmp_bd: int, cmp_bm: int, cmp_by: int, cmp_plz: int, is_match: boolean]

In [13]:
parsed_df.count()



5749132

                                                                                

In [37]:
from pyspark.sql.functions import col

In [None]:
parsed_df.groupBy("is_match").count().orderBy(col("count").desc()).show()

In [11]:
from pyspark.sql.functions import avg, stddev

In [None]:
# By default Spark computes the sample standard deviation; there is also a stddev_pop function for computing the population standard deviation.
parsed_df.agg(avg("cmp_sex"), stddev("cmp_sex")).show()

In [17]:
# in order to use Spark SQL
parsed_df.createOrReplaceTempView("linkage")

In [None]:
spark.sql("""
SELECT is_match, COUNT(*) cnt
FROM linkage
GROUP BY is_match
ORDER BY cnt DESC
""").show()

## Fast Summary Statistics for DataFrames

In [15]:
summary_df = parsed_df.describe()
summary_df.show()

24/12/24 08:19:34 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.

+-------+------------------+------------------+------------------+------------------+------------------+-------------------+------------------+-------------------+-------------------+-------------------+-------------------+
|summary|              id_1|              id_2|      cmp_fname_c1|      cmp_fname_c2|      cmp_lname_c1|       cmp_lname_c2|           cmp_sex|             cmp_bd|             cmp_bm|             cmp_by|            cmp_plz|
+-------+------------------+------------------+------------------+------------------+------------------+-------------------+------------------+-------------------+-------------------+-------------------+-------------------+
|  count|           5749132|           5749132|           5748125|            103698|           5749132|               2464|           5749132|            5748337|            5748337|            5748337|            5736289|
|   mean| 33324.48559643438| 66587.43558331935|0.7129024704436274|0.9000176718903216|0.3156278193084133|

                                                                                

In [39]:
matches_df = parsed_df.where("is_match = true")  # SQL-style syntax
match_summary_df = matches_df.describe()

misses_df = parsed_df.filter(col("is_match") == False)  # DataFrame API syntax
miss_summary_df = misses_df.describe()

## Pivoting DataFrames

PySpark allows conversion between Spark and pandas DataFrames because of the Apache Arrow project, which allows efficient data transfer between JVM and Python processes.

The PyArrow library is installed as a dependency of the Spark SQL module when pyspark is installed.

In [17]:
summary_p_df = summary_df.toPandas()

                                                                                

In [19]:
summary_p_df.head()

Unnamed: 0,summary,id_1,id_2,cmp_fname_c1,cmp_fname_c2,cmp_lname_c1,cmp_lname_c2,cmp_sex,cmp_bd,cmp_bm,cmp_by,cmp_plz
0,count,5749132.0,5749132.0,5748125.0,103698.0,5749132.0,2464.0,5749132.0,5748337.0,5748337.0,5748337.0,5736289.0
1,mean,33324.48559643438,66587.43558331935,0.7129024704436274,0.9000176718903216,0.3156278193084133,0.3184128315317437,0.955001381078048,0.2244652670850717,0.488855298497635,0.2227485966810923,0.0055286614743434
2,stddev,23659.859374488213,23620.487613269885,0.3887583596162788,0.2713176105782331,0.3342336339615816,0.3685670662006653,0.2073011111689795,0.4172297223846255,0.4998758236779038,0.4160909629831734,0.0741491492542006
3,min,1.0,6.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
4,max,99980.0,100000.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0


In [19]:
summary_p_df.shape

(5, 12)

In [21]:
summary_p_df = summary_p_df.set_index("summary").transpose().reset_index() \
.rename(columns = {"index": "field"}) \
.rename_axis(None, axis=1)

In [23]:
summary_p_df

Unnamed: 0,field,count,mean,stddev,min,max
0,id_1,5749132,33324.48559643438,23659.859374488213,1.0,99980.0
1,id_2,5749132,66587.43558331935,23620.487613269885,6.0,100000.0
2,cmp_fname_c1,5748125,0.7129024704436274,0.3887583596162788,0.0,1.0
3,cmp_fname_c2,103698,0.9000176718903216,0.2713176105782331,0.0,1.0
4,cmp_lname_c1,5749132,0.3156278193084133,0.3342336339615816,0.0,1.0
5,cmp_lname_c2,2464,0.3184128315317437,0.3685670662006653,0.0,1.0
6,cmp_sex,5749132,0.955001381078048,0.2073011111689795,0.0,1.0
7,cmp_bd,5748337,0.2244652670850717,0.4172297223846255,0.0,1.0
8,cmp_bm,5748337,0.488855298497635,0.4998758236779038,0.0,1.0
9,cmp_by,5748337,0.2227485966810923,0.4160909629831734,0.0,1.0


In [25]:
summary_t_df = spark.createDataFrame(summary_p_df)

In [27]:
summary_t_df.show()

                                                                                

+------------+-------+-------------------+-------------------+---+------+
|       field|  count|               mean|             stddev|min|   max|
+------------+-------+-------------------+-------------------+---+------+
|        id_1|5749132|  33324.48559643438| 23659.859374488213|  1| 99980|
|        id_2|5749132|  66587.43558331935| 23620.487613269885|  6|100000|
|cmp_fname_c1|5748125| 0.7129024704436274| 0.3887583596162788|0.0|   1.0|
|cmp_fname_c2| 103698| 0.9000176718903216| 0.2713176105782331|0.0|   1.0|
|cmp_lname_c1|5749132| 0.3156278193084133| 0.3342336339615816|0.0|   1.0|
|cmp_lname_c2|   2464|0.31841283153174377|0.36856706620066537|0.0|   1.0|
|     cmp_sex|5749132|  0.955001381078048| 0.2073011111689795|  0|     1|
|      cmp_bd|5748337|0.22446526708507172| 0.4172297223846255|  0|     1|
|      cmp_bm|5748337|0.48885529849763504| 0.4998758236779038|  0|     1|
|      cmp_by|5748337| 0.2227485966810923|0.41609096298317344|  0|     1|
|     cmp_plz|5736289|0.00552866147434

In [29]:
summary_t_df.printSchema()

root
 |-- field: string (nullable = true)
 |-- count: string (nullable = true)
 |-- mean: string (nullable = true)
 |-- stddev: string (nullable = true)
 |-- min: string (nullable = true)
 |-- max: string (nullable = true)



In [31]:
from pyspark.sql.types import DoubleType

In [33]:
# Convert data type to double for statistics columns.
for c in summary_t_df.columns: 
    if c == "field":
        continue
    summary_t_df = summary_t_df.withColumn(c, summary_t_df[c].cast(DoubleType()))

summary_t_df.printSchema()

root
 |-- field: string (nullable = true)
 |-- count: double (nullable = true)
 |-- mean: double (nullable = true)
 |-- stddev: double (nullable = true)
 |-- min: double (nullable = true)
 |-- max: double (nullable = true)



In [41]:
# Extract pivoting logic as a function.
def pivot_summary(df):
    df_p = df.toPandas()
    df_p = df_p.set_index("summary").transpose().reset_index()
    df_p = df_p.rename(columns = {"index": "field"}).rename_axis(None, axis=1)
    
    df_t = spark.createDataFrame(df_p)

    for c in df_t.columns: 
        if c == "field":
            continue
        df_t = df_t.withColumn(c, df_t[c].cast(DoubleType()))

    return df_t

In [43]:
match_summary_t_df = pivot_summary(match_summary_df)
miss_summary_t_df = pivot_summary(miss_summary_df)

                                                                                

## Joining DataFrames and Selecting Features

In [48]:
match_summary_t_df.createOrReplaceTempView("match_summary")
miss_summary_t_df.createOrReplaceTempView("miss_summary")

spark.sql("""
SELECT a.field, a.count + b.count total, a.mean - b.mean delta
FROM match_summary a INNER JOIN miss_summary b ON a.field = b.field
WHERE a.field NOT IN ("id_1", "id_2")
ORDER BY delta DESC, total DESC
""").show()



+------------+---------+--------------------+
|       field|    total|               delta|
+------------+---------+--------------------+
|     cmp_plz|5736289.0|  0.9563812499852176|
|cmp_lname_c2|   2464.0|  0.8064147192926264|
|      cmp_by|5748337.0|  0.7762059675300512|
|      cmp_bd|5748337.0|   0.775442311783404|
|cmp_lname_c1|5749132.0|  0.6838772482590526|
|      cmp_bm|5748337.0|  0.5109496938298685|
|cmp_fname_c1|5748125.0|  0.2854529057460786|
|cmp_fname_c2| 103698.0| 0.09104268062280008|
|     cmp_sex|5749132.0|0.032408185250332844|
+------------+---------+--------------------+



                                                                                

## Scoring and Model Evaluation

In [51]:
good_features = ["cmp_lname_c1", "cmp_plz", "cmp_by", "cmp_bd", "cmp_bm"]

sum_expr = " + ".join(good_features)

sum_expr

'cmp_lname_c1 + cmp_plz + cmp_by + cmp_bd + cmp_bm'

In [53]:
from pyspark.sql.functions import expr

In [55]:
scored_df = parsed_df.fillna(0, subset = good_features) \
.withColumn("score", expr(sum_expr)) \
.select("score", "is_match")

scored_df.show()

+-----+--------+
|score|is_match|
+-----+--------+
|  5.0|    true|
|  5.0|    true|
|  5.0|    true|
|  5.0|    true|
|  5.0|    true|
|  5.0|    true|
|  4.0|    true|
|  5.0|    true|
|  5.0|    true|
|  5.0|    true|
|  5.0|    true|
|  5.0|    true|
|  5.0|    true|
|  5.0|    true|
|  5.0|    true|
|  5.0|    true|
|  4.0|    true|
|  5.0|    true|
|  5.0|    true|
|  5.0|    true|
+-----+--------+
only showing top 20 rows



In [59]:
from pyspark.sql import DataFrame

# Create contingency table (or cross tabulation or crosstab) 
# that counts the number of records whose scores fall above/below the threshold value crossed with the number of records 
# in each of those categories that were/were not matches.
def cross_tabs(scored_df: DataFrame, threshold: DoubleType) -> DataFrame: 
    return scored_df.selectExpr(f"score >= {threshold} as is_above", "is_match") \
    .groupBy("is_above").pivot("is_match", ("true", "false")).count()

In [61]:
cross_tabs(scored_df, 4.0).show()



+--------+-----+-------+
|is_above| true|  false|
+--------+-----+-------+
|    true|20871|    637|
|   false|   60|5727564|
+--------+-----+-------+



                                                                                