# PySpark Notebook
1. Run PostgreSQL ddl script
2. Load CSV Data files
3. Write Data to PostgreSQL source db
4. Analyze Data with Spark SQL
5. Transform data into hash values
6. Write Data to PostgreSQL target db

_Prepared by: [Noam Marianne]

### Run PostgreSQL Script
Run the PostgreSQL sql script

In [1]:
# ! pip install psycopg2-binary --upgrade --quiet

In [1]:
%run -i 'scans_ddl.py'

In [2]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
import hashlib
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

In [3]:
spark = SparkSession \
    .builder \
    .appName('pyspark_demo_app') \
    .config('spark.driver.extraClassPath',
            'postgresql-42.2.10.jar') \
    .master("local[*]") \
    .getOrCreate()

### declare PostgreSQL source prop
Load the PostgreSQL 'bakery_basket' table's contents into a DataFrame

In [4]:
properties = {
    'driver': 'org.postgresql.Driver',
    'url': 'jdbc:postgresql://postgres:5432/source',
    'target_url': 'jdbc:postgresql://postgres:5432/target',
    'user': 'postgres',
    'password': 'postgres1234',
    'dbtable': 'scans',
}

### Create schema
load users csv into a DataFrame

In [5]:
# File location and type
file_location = "input_files/scans_and_viz_results_de.csv"
file_type = "csv"

# CSV options
infer_schema = "false"
first_row_is_header = "true"
delimiter = ","

# The applied options are for CSV files. For other file types, these will be ignored.
scans_df = spark.read.format(file_type) \
.option("inferSchema", infer_schema) \
.option("header", first_row_is_header) \
.option("sep", delimiter) \
.load(file_location)

scans_df.printSchema()

root
 |-- study_uid: string (nullable = true)
 |-- viz_lvo: string (nullable = true)
 |-- first_acquired: string (nullable = true)
 |-- patient_first_acquired: string (nullable = true)
 |-- patient institution: string (nullable = true)
 |-- patient: string (nullable = true)



In [6]:
scans_df.show()
scans_df.count()

+----------------+-------+--------------------+----------------------+-------------------+----------+
|       study_uid|viz_lvo|      first_acquired|patient_first_acquired|patient institution|   patient|
+----------------+-------+--------------------+----------------------+-------------------+----------+
|950cc6a871cb0303|  FALSE|2020-01-13 23:37:...|  2020-01-13 23:37:...|               East|7149a1930c|
|9b2536e770e8fa5f|  FALSE|2020-01-10 01:02:...|  2020-01-10 00:52:...|               West|a2305aa197|
|a361cd8729754567|  FALSE|2020-03-07 07:51:...|  2020-03-07 07:51:...|              North|69f553c297|
|14cdfc53743ae0f9|  FALSE|2020-01-04 01:26:...|  2020-01-04 01:19:...|              North|090a9ae40f|
|6f7d2e16f4448806|  FALSE|2020-01-01 09:09:...|  2020-01-01 09:04:...|               West|ea66c29cee|
|b1d08eae7a8c3e95|  FALSE|2020-01-26 23:33:...|  2020-01-26 23:29:...|               West|aaf6927bbf|
|694b64f805439246|  FALSE|2020-02-29 19:39:...|  2020-02-29 19:32:...|            

840

In [None]:
### enrich df with cre_datetime column

In [7]:
scans_df = scans_df.withColumn("cre_datetime",F.current_timestamp())
scans_df = scans_df.withColumn("patient_institution",F.col("patient institution"))
scans_df = scans_df.drop(F.col("patient institution"))
scans_df.printSchema()

root
 |-- study_uid: string (nullable = true)
 |-- viz_lvo: string (nullable = true)
 |-- first_acquired: string (nullable = true)
 |-- patient_first_acquired: string (nullable = true)
 |-- patient: string (nullable = true)
 |-- cre_datetime: timestamp (nullable = false)
 |-- patient_institution: string (nullable = true)



In [8]:
scans_df.show()

+----------------+-------+--------------------+----------------------+----------+--------------------+-------------------+
|       study_uid|viz_lvo|      first_acquired|patient_first_acquired|   patient|        cre_datetime|patient_institution|
+----------------+-------+--------------------+----------------------+----------+--------------------+-------------------+
|950cc6a871cb0303|  FALSE|2020-01-13 23:37:...|  2020-01-13 23:37:...|7149a1930c|2022-03-12 19:34:...|               East|
|9b2536e770e8fa5f|  FALSE|2020-01-10 01:02:...|  2020-01-10 00:52:...|a2305aa197|2022-03-12 19:34:...|               West|
|a361cd8729754567|  FALSE|2020-03-07 07:51:...|  2020-03-07 07:51:...|69f553c297|2022-03-12 19:34:...|              North|
|14cdfc53743ae0f9|  FALSE|2020-01-04 01:26:...|  2020-01-04 01:19:...|090a9ae40f|2022-03-12 19:34:...|              North|
|6f7d2e16f4448806|  FALSE|2020-01-01 09:09:...|  2020-01-01 09:04:...|ea66c29cee|2022-03-12 19:34:...|               West|
|b1d08eae7a8c3e9

### Write to PostgreSQL source db Table

In [9]:
scans_df.write \
    .format('jdbc') \
    .option('driver', properties['driver']) \
    .option('url', properties['url']) \
    .option('user', properties['user']) \
    .option('password', properties['password']) \
    .option('dbtable', properties['dbtable']) \
    .mode('append') \
    .save()

In [None]:
### read data from PostgreSQL source db Table

In [10]:
scans_df_from_db = spark.read \
    .format('jdbc') \
    .option('driver', properties['driver']) \
    .option('url', properties['url']) \
    .option('user', properties['user']) \
    .option('password', properties['password']) \
    .option('dbtable', properties['dbtable']) \
    .load()

scans_df_from_db.printSchema()

root
 |-- study_uid: string (nullable = true)
 |-- viz_lvo: string (nullable = true)
 |-- first_acquired: string (nullable = true)
 |-- patient_first_acquired: string (nullable = true)
 |-- patient_institution: string (nullable = true)
 |-- patient: string (nullable = true)
 |-- cre_datetime: timestamp (nullable = true)



In [11]:
scans_df_from_db.show(10)
scans_df_from_db.count()

+----------------+-------+--------------------+----------------------+-------------------+----------+--------------------+
|       study_uid|viz_lvo|      first_acquired|patient_first_acquired|patient_institution|   patient|        cre_datetime|
+----------------+-------+--------------------+----------------------+-------------------+----------+--------------------+
|950cc6a871cb0303|  FALSE|2020-01-13 23:37:...|  2020-01-13 23:37:...|               East|7149a1930c|2022-03-12 19:34:...|
|9b2536e770e8fa5f|  FALSE|2020-01-10 01:02:...|  2020-01-10 00:52:...|               West|a2305aa197|2022-03-12 19:34:...|
|a361cd8729754567|  FALSE|2020-03-07 07:51:...|  2020-03-07 07:51:...|              North|69f553c297|2022-03-12 19:34:...|
|14cdfc53743ae0f9|  FALSE|2020-01-04 01:26:...|  2020-01-04 01:19:...|              North|090a9ae40f|2022-03-12 19:34:...|
|6f7d2e16f4448806|  FALSE|2020-01-01 09:09:...|  2020-01-01 09:04:...|               West|ea66c29cee|2022-03-12 19:34:...|
|b1d08eae7a8c3e9

840

### Analyze Data with Spark SQL
Analyze the DataFrame's users data using Spark SQL

In [12]:
scans_df_from_db.createOrReplaceTempView("scans")
df_sql = spark.sql("SELECT sum(case when study_uid is null then 0 else 1 end) as cnt_study_with_id, " +
                "sum(case when study_uid is null then 1 else 0 end) as cnt_study_without_id FROM scans")
df_sql.show(10)

+-----------------+--------------------+
|cnt_study_with_id|cnt_study_without_id|
+-----------------+--------------------+
|              840|                   0|
+-----------------+--------------------+



In [None]:
### Transform Data

In [13]:
#   Define the UDF function
def algo(input_string):
    if (input_string):
        encoded_string = input_string.encode("utf-8")
    else:
        encoded_string = "none".encode("utf-8")
    return hashlib.sha256(encoded_string).hexdigest()

#   Register the UDF function.
algo_udf = spark.udf.register("algo", algo)

In [14]:
scans_df_from_db.createOrReplaceTempView("scans")
df_sql = spark.sql("SELECT algo(study_uid) as study_uid, viz_lvo, algo(first_acquired) as first_acquired, algo(patient_first_acquired) as patient_first_acquired, patient_institution, algo(patient) as patient, current_timestamp() as cre_datetime FROM scans")
df_sql.show(10)

+--------------------+-------+--------------------+----------------------+-------------------+--------------------+--------------------+
|           study_uid|viz_lvo|      first_acquired|patient_first_acquired|patient_institution|             patient|        cre_datetime|
+--------------------+-------+--------------------+----------------------+-------------------+--------------------+--------------------+
|4fdadb8aa01868052...|  FALSE|3d86dce3af7956487...|  3d86dce3af7956487...|               East|b151a1325d743ee09...|2022-03-12 19:34:...|
|e4a9f87fcd0d0b079...|  FALSE|3681431cb224882b1...|  d5f9771bb4956c027...|               West|460e50aa4fdb85420...|2022-03-12 19:34:...|
|ac6a8fbdc13d0fdfb...|  FALSE|8bb37f496c9857d29...|  8bb37f496c9857d29...|              North|e22e13e3e5c891ee8...|2022-03-12 19:34:...|
|cf5421cddb04d6dda...|  FALSE|bb04067f43524de9c...|  591ae5d0aa8d39748...|              North|682c3e7894ba8929b...|2022-03-12 19:34:...|
|c916ca091775d8ce2...|  FALSE|f2c182b59e5

In [8]:
### Write users_df to PostgreSQL target db Table

root
 |-- name: string (nullable = true)
 |-- address: string (nullable = true)
 |-- user_uid: string (nullable = true)
 |-- cre_datetime: timestamp (nullable = true)
 |-- name_hashed: string (nullable = true)
 |-- address_hashed: string (nullable = true)
 |-- user_uid_hashed: string (nullable = true)



In [15]:
df_sql.write \
    .format('jdbc') \
    .option('driver', properties['driver']) \
    .option('url', properties['target_url']) \
    .option('user', properties['user']) \
    .option('password', properties['password']) \
    .option('dbtable', properties['dbtable']) \
    .mode('append') \
    .save()

In [None]:
### read data from PostgreSQL target db Table

In [16]:
scans_df_from_target_db = spark.read \
    .format('jdbc') \
    .option('driver', properties['driver']) \
    .option('url', properties['target_url']) \
    .option('user', properties['user']) \
    .option('password', properties['password']) \
    .option('dbtable', properties['dbtable']) \
    .load()

scans_df_from_target_db.printSchema()

root
 |-- study_uid: string (nullable = true)
 |-- viz_lvo: string (nullable = true)
 |-- first_acquired: string (nullable = true)
 |-- patient_first_acquired: string (nullable = true)
 |-- patient_institution: string (nullable = true)
 |-- patient: string (nullable = true)
 |-- cre_datetime: timestamp (nullable = true)



In [17]:
scans_df_from_target_db.show(10)

+--------------------+-------+--------------------+----------------------+-------------------+--------------------+--------------------+
|           study_uid|viz_lvo|      first_acquired|patient_first_acquired|patient_institution|             patient|        cre_datetime|
+--------------------+-------+--------------------+----------------------+-------------------+--------------------+--------------------+
|4fdadb8aa01868052...|  FALSE|3d86dce3af7956487...|  3d86dce3af7956487...|               East|b151a1325d743ee09...|2022-03-12 19:34:...|
|e4a9f87fcd0d0b079...|  FALSE|3681431cb224882b1...|  d5f9771bb4956c027...|               West|460e50aa4fdb85420...|2022-03-12 19:34:...|
|ac6a8fbdc13d0fdfb...|  FALSE|8bb37f496c9857d29...|  8bb37f496c9857d29...|              North|e22e13e3e5c891ee8...|2022-03-12 19:34:...|
|cf5421cddb04d6dda...|  FALSE|bb04067f43524de9c...|  591ae5d0aa8d39748...|              North|682c3e7894ba8929b...|2022-03-12 19:34:...|
|c916ca091775d8ce2...|  FALSE|f2c182b59e5