# PySpark performance data manipulation example

This notebook presents how using of py spark parallel reads and in-memory computation can speed up data manipulation process.

### Used optimizations

1. Parallel Reading: The numPartitions parameter ensures that data is read in parallel, leveraging the cluster's parallel processing capabilities.

2. Transformations: Added an upper transformation and a conditional flag column (recent_entry_flag) to demonstrate data manipulation. All computation is done in memory.

3. Batch Writing: The batchsize option ensures efficient data writing to PostgreSQL by writing in chunks, reducing transaction overhead.

4. Optimized Configuration: Set the number of shuffle partitions and memory configuration for performance tuning.

### 1. Connect to database

In [7]:
import findspark
findspark.init()

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, upper, when, year, lit

# Initialize the Spark session
spark = SparkSession.builder \
    .appName("PostgreSQL Connection with PySpark") \
    .config("spark.jars", "postgresql-42.7.4.jar") \
    .config("spark.sql.shuffle.partitions", "8") \
    .config("spark.executor.memory", "2g") \
    .getOrCreate()

# JDBC properties
url = "jdbc:postgresql://127.0.0.1:5432/moneyhouse_germany_dev"
properties = {
    "user": "mhde",
    "password": "",
    "driver": "org.postgresql.Driver"
}

table_name = "person"

# Read the table with partitioning for parallel reads
num_partitions = 4  # Adjust based on the cluster resources
df = spark.read.jdbc(
    url=url,
    table=table_name,
    properties=properties,
    numPartitions=num_partitions
)
df.show(5)

+--------------------+---------+-----------+----------------+------+----------+--------+----------+-------------------+-------------------+-------+------+--------------+-------+-------------+--------------+-------------+--------------------+----------------------+-------------+------------------+-----+-----+---+-------+-----------------------------+---------------------+---------------+----------------------+-----------+--------------------+------------------+--------------+---------------------------+---------------------------+---------+----------+------------------+----+---------------+
|                  id|lock_flag|family_name|scientific_title|gender|birth_date|deceased|occupation|         created_at|         updated_at|   city|status|   hitpage_uri|   name|   first_name|disabled_tiles|deceased_date|         profile_uri|creditsafe_safe_number|creditsafe_id|former_occupations|email|phone|fax|website|network_graph_data_created_at|normalized_first_name|normalized_name|normalized_fam

### 2. Do some data manipulation

In [2]:
# Filter and transform the data - real-life example: identifying potential retirees
filtered_df = df.filter((col("birth_date").isNotNull()) & (col("deceased").isNull()))
transformed_df = filtered_df.withColumn(
    "age",
    (lit(2024) - year(col("birth_date")))  # Replace 2024 with current year dynamically in production
)
transformed_df.show(5)

+--------------------+---------+-----------+----------------+------+----------+--------+----------+-------------------+-------------------+------------------+------+--------------------+-------------+--------------------+--------------+-------------+--------------------+----------------------+-------------+------------------+-----+-----+---+-------+-----------------------------+---------------------+---------------+----------------------+--------------------+--------------------+-------------------+--------------------+---------------------------+---------------------------+---------+----------+------------------+---+---------------+
|                  id|lock_flag|family_name|scientific_title|gender|birth_date|deceased|occupation|         created_at|         updated_at|              city|status|         hitpage_uri|         name|          first_name|disabled_tiles|deceased_date|         profile_uri|creditsafe_safe_number|creditsafe_id|former_occupations|email|phone|fax|website|network

In [13]:
print("How many rows were efficiently processed in memeory using PySpark?\n{0}".format(df.count()))

How many rows were efficiently processed in memeory using PySpark?
24564


In [3]:
transformed_df = transformed_df.withColumn(
    "retirement_flag",
    when(col("age") >= 65, True).otherwise(False)
)

# Show some details and transformations
print("Data description:")
transformed_df.show()

# Show first few rows
transformed_df.select("id", "first_name", "family_name", "age", "retirement_flag").show(5)

Data description:
+--------------------+---------+-----------+----------------+------+----------+--------+----------+-------------------+-------------------+------------------+------+--------------------+-------------+--------------------+--------------+-------------+--------------------+----------------------+-------------+------------------+-----+-----+---+-------+-----------------------------+---------------------+---------------+----------------------+--------------------+--------------------+-------------------+--------------------+---------------------------+---------------------------+---------+----------+------------------+---+---------------+
|                  id|lock_flag|family_name|scientific_title|gender|birth_date|deceased|occupation|         created_at|         updated_at|              city|status|         hitpage_uri|         name|          first_name|disabled_tiles|deceased_date|         profile_uri|creditsafe_safe_number|creditsafe_id|former_occupations|email|phone|f

### 3. Save data using batch inserts

In [4]:
# Identify modified records based on a condition (e.g., age or retirement_flag changes)
existing_df = df.select("id")
updates_df = transformed_df.join(existing_df, on="id", how="inner")

# Write modified records to a temporary table
(transformed_df.write.mode("append")
 .option("batchsize", 1000)  # Use batch size for efficient writing
 .format("jdbc")
 .option("driver", "org.postgresql.Driver")
 .option("url", url)
 .option("dbtable", "person_updates_temp")  # Write to a temporary table
 .option("user", "mhde")
 .save())

# Stop the Spark session
spark.stop()

                                                                                

### Challenges:
1. Handling foreign keys
2. Handling data that depends on other data - in such case all data needs to be loaded in same batch and replaced afterwards together  

In [5]:
import psycopg2

# Connect to PostgreSQL
conn = psycopg2.connect(
    host="127.0.0.1",
    database="moneyhouse_germany_dev",
    user="mhde",
    password=""
)
cursor = conn.cursor()

# Execute the update command
update_query = """
    UPDATE person p
    SET age = u.age,
        retirement_flag = u.retirement_flag
    FROM person_updates_temp u
    WHERE p.id::text = u.id;
"""

cursor.execute(update_query)
conn.commit()

# Drop the temporary table
cursor.execute("DROP TABLE person_updates_temp;")
conn.commit()

# Close the connection
cursor.close()
conn.close()
