# Test pyspark with AWS S3

start pyspark as (Based on Spark version and Hadoop-client version, use compatible hadoop-aws library and it's dependent aws-java-sdk-bundle library and install as packages):

pyspark --packages com.amazonaws:aws-java-sdk-bundle:1.11.819,org.apache.hadoop:hadoop-aws:3.2.1

In [7]:
print('Spark version='+sc.version)
import sys
print('Python version='+ sys.version)
print(sys.path)

# imports
from pyspark.sql import DataFrame
from pyspark.sql.types import *
from pyspark.sql.functions import col, lit, count
from pyspark.sql.functions import current_timestamp, current_date
from pyspark.sql.functions import date_format, to_timestamp, to_date

# constants
DATETIMESTAMP_PATTERN:str = 'yyyy-MM-dd HH:mm'

Spark version=2.4.5
Python version=3.7.7 (v3.7.7:d7c567b08f, Mar 10 2020, 02:56:16) 
[Clang 6.0 (clang-600.0.57)]
['/private/var/folders/8y/7n0k3c351b31f9g43spw4jqd3rym28/T/spark-89bf1f5d-6286-47b7-918b-cba4bdf05277/userFiles-8c4aaaf8-2d3e-4b35-9d5d-f7cc148704cc/org.apache.hadoop_hadoop-aws-3.2.1.jar', '/private/var/folders/8y/7n0k3c351b31f9g43spw4jqd3rym28/T/spark-89bf1f5d-6286-47b7-918b-cba4bdf05277/userFiles-8c4aaaf8-2d3e-4b35-9d5d-f7cc148704cc/com.amazonaws_aws-java-sdk-bundle-1.11.819.jar', '/private/var/folders/8y/7n0k3c351b31f9g43spw4jqd3rym28/T/spark-89bf1f5d-6286-47b7-918b-cba4bdf05277/userFiles-8c4aaaf8-2d3e-4b35-9d5d-f7cc148704cc', '/Users/chawl001/Dev/Python/Spark', '/opt/spark/spark-2.4.5-bin-without-hadoop-scala-2.12/python/lib/py4j-0.10.7-src.zip', '/opt/spark/spark-2.4.5-bin-without-hadoop-scala-2.12/python', '/Users/chawl001/Dev/Python/Spark', '/Library/Frameworks/Python.framework/Versions/3.7/lib/python37.zip', '/Library/Frameworks/Python.framework/Versions/3.7/lib/py

In [8]:
test_data = [("A",    1, "Data Science", "Y"),
             ("BB",   2,  None, None),
             ("CcC",  3, "Biotech", "N"),
             ("dddd", 4, "Astrophysics", None),
             ("NaMeE",5, "Nuclear Medicine", "Y"),
             ("666",  6, "Phycology", "N")
            ]

test_schema = StructType(
    [
     StructField("name", StringType(), nullable=False),
     StructField("id", IntegerType(), nullable=False),
     StructField("dept_name", StringType(), nullable=True),
     StructField("new_flag", StringType(), nullable=True)
    ]
)
    
test_df = spark.createDataFrame(data=test_data, schema=test_schema, verifySchema=True)
test_df = test_df.withColumn('create_dt', date_format(current_timestamp(), DATETIMESTAMP_PATTERN))

print('Left-side Schema:')
test_df.printSchema()
test_df.show(truncate=False)

Left-side Schema:
root
 |-- name: string (nullable = false)
 |-- id: integer (nullable = false)
 |-- dept_name: string (nullable = true)
 |-- new_flag: string (nullable = true)
 |-- create_dt: string (nullable = false)

+-----+---+----------------+--------+----------------+
|name |id |dept_name       |new_flag|create_dt       |
+-----+---+----------------+--------+----------------+
|A    |1  |Data Science    |Y       |2020-10-16 19:26|
|BB   |2  |null            |null    |2020-10-16 19:26|
|CcC  |3  |Biotech         |N       |2020-10-16 19:26|
|dddd |4  |Astrophysics    |null    |2020-10-16 19:26|
|NaMeE|5  |Nuclear Medicine|Y       |2020-10-16 19:26|
|666  |6  |Phycology       |N       |2020-10-16 19:26|
+-----+---+----------------+--------+----------------+



In [9]:
# ${db.table.fullname}.*, 
# '0' AS SRC_KEY_VAL,
# 'NONE' AS SRC_CDC_OPER_NM,
# TO_CHAR(CURRENT_TIMESTAMP,'YYYY-MM-DD HH24:MI') AS SRC_COMMIT_DT_UTC,
# TO_CHAR(CURRENT_TIMESTAMP,'YYYY-MM-DD HH24:MI') AS TRG_CRT_DT_PART_UTC,
# 'N/A' AS SRC_SCHEMA_NM

def persist_test_data(s3_save_location:str, df_to_persist:DataFrame, verifyPersist:bool=False) -> DataFrame :
    kms_key_conformance = 'arn:aws:kms:us-east-1:550060283415:key/2bb9d33c-8b5b-4f67-bccc-6d9f603d7609'
    
    # set credential provider
    spark._jsc.hadoopConfiguration().set('fs.s3a.aws.credentials.provider', 'com.amazonaws.auth.DefaultAWSCredentialsProviderChain')
    
    # s3a specific parameters using KMS
    spark._jsc.hadoopConfiguration().set('fs.s3a.endpoint', 's3.us-east-1.amazonaws.com')
    spark._jsc.hadoopConfiguration().set('fs.s3a.server-side-encryption-algorithm', 'SSE-KMS')
    spark._jsc.hadoopConfiguration().set('fs.s3a.impl.disable.cache', 'true')
    spark._jsc.hadoopConfiguration().set('fs.s3a.server-side-encryption.key', kms_key_conformance)
    
    print('Ready to write to KMS encrypted S3 bucket...')
    
    df_with_CDC_fields = df_to_persist \
        .withColumn('SRC_KEY_VAL', lit('0')) \
        .withColumn('SRC_CDC_OPER_NM', lit('NONE')) \
        .withColumn('SRC_COMMIT_DT_UTC', date_format(current_timestamp(), DATETIMESTAMP_PATTERN)) \
        .withColumn('TRG_CRT_DT_PART_UTC', date_format(current_timestamp(), DATETIMESTAMP_PATTERN)) \
        .withColumn('SRC_SCHEMA_NM', lit('N/A'))
    
    if (verifyPersist):
        df_with_CDC_fields.show(2, truncate=False)
    
    df_with_CDC_fields.repartition(1) \
        .write \
        .mode('overwrite') \
        .parquet(s3_save_location)
    
    print(f'Successfully persisted test data file at {s3_save_location}')
    
    if (verifyPersist):
        s3_read_loc:str = f'{s3_save_location}/*'
        test_read_df = spark.read.parquet(s3_read_loc)
        print('Verifying schema of persisted dataframe:')
        test_read_df.printSchema()
        print(f'Successfully verified test data file from {s3_read_loc}')
        

        
### Ensure to have function initiated and access to S3 and KMS
s3_test_file_loc = 's3a://lineardp-conformed-proposal-dev/qubole-generated-test-data/test_Schema/test_Table/'
persist_test_data(s3_save_location=s3_test_file_loc, df_to_persist=test_df, verifyPersist=True)

Ready to write to KMS encrypted S3 bucket...
+----+---+------------+--------+----------------+-----------+---------------+-----------------+-------------------+-------------+
|name|id |dept_name   |new_flag|create_dt       |SRC_KEY_VAL|SRC_CDC_OPER_NM|SRC_COMMIT_DT_UTC|TRG_CRT_DT_PART_UTC|SRC_SCHEMA_NM|
+----+---+------------+--------+----------------+-----------+---------------+-----------------+-------------------+-------------+
|A   |1  |Data Science|Y       |2020-10-16 19:27|0          |NONE           |2020-10-16 19:27 |2020-10-16 19:27   |N/A          |
|BB  |2  |null        |null    |2020-10-16 19:27|0          |NONE           |2020-10-16 19:27 |2020-10-16 19:27   |N/A          |
+----+---+------------+--------+----------------+-----------+---------------+-----------------+-------------------+-------------+
only showing top 2 rows

Successfully persisted test data file at s3a://lineardp-conformed-proposal-dev/qubole-generated-test-data/test_Schema/test_Table/
Verifying schema of 

In [10]:
right_side_data = [(1, 'Seattle'), (3, 'Denver'), (5, 'Austin') , (6, None)]

# short-cut way to define schema
right_side_schema = 'id INTEGER, location STRING'

right_side_df = spark.createDataFrame(data=right_side_data, schema=right_side_schema)

print('Right-side Schema:')
right_side_df.printSchema()
right_side_df.show(5)

print('Filtered RHS dataset (with non-null location):')
filtered_rhs_df = right_side_df.filter('location is not null')
filtered_rhs_df.show()

Right-side Schema:
root
 |-- id: integer (nullable = true)
 |-- location: string (nullable = true)

+---+--------+
| id|location|
+---+--------+
|  1| Seattle|
|  3|  Denver|
|  5|  Austin|
|  6|    null|
+---+--------+

Filtered RHS dataset (with non-null location):
+---+--------+
| id|location|
+---+--------+
|  1| Seattle|
|  3|  Denver|
|  5|  Austin|
+---+--------+



In [11]:
print('\n========= Left Outer Join dataset =========\n')

print('Result ===')
print('   Note: Duplicate "id" column')
left_outer_join_df = test_df.join(right_side_df, test_df.id == right_side_df.id, 'left_outer')
# left_outer_join_df = test_df.join(right_side_df, test_df['id'] == right_side_df['id'], 'left_outer')
left_outer_join_df.show(truncate=False)

print('Result ===')
print('   Note: Duplicate column resolved, if both sides of datasets have same name.')
print('         However, the column used for equality has been shifted as first column.')
left_outer_join_df = test_df.join(right_side_df, ['id'], 'left_outer')
left_outer_join_df.show(truncate=False)



Result ===
   Note: Duplicate "id" column
+-----+---+----------------+--------+----------------+----+--------+
|name |id |dept_name       |new_flag|create_dt       |id  |location|
+-----+---+----------------+--------+----------------+----+--------+
|A    |1  |Data Science    |Y       |2020-10-16 19:27|1   |Seattle |
|666  |6  |Phycology       |N       |2020-10-16 19:27|6   |null    |
|CcC  |3  |Biotech         |N       |2020-10-16 19:27|3   |Denver  |
|NaMeE|5  |Nuclear Medicine|Y       |2020-10-16 19:27|5   |Austin  |
|dddd |4  |Astrophysics    |null    |2020-10-16 19:27|null|null    |
|BB   |2  |null            |null    |2020-10-16 19:27|null|null    |
+-----+---+----------------+--------+----------------+----+--------+

Result ===
   Note: Duplicate column resolved, if both sides of datasets have same name.
         However, the column used for equality has been shifted as first column.
+---+-----+----------------+--------+----------------+--------+
|id |name |dept_name       |new

In [14]:
print('\n========= Left Anti Join dataset =========\n')

print('LHS dataset:')
test_df.show()
print('RHS dataset:')
filtered_rhs_df.show()
print('Problem Statement: Find all LHS records with "id" matching with that from RHS')

print('\n---\n')

print('OPTION #1 : Use Equi Join to effectively filter:')
print('RESULT #1 : filtered Left dataset with matching "id" from Right dataset using "Equi Join"')
result_df_option_1 = test_df.join(filtered_rhs_df.select('id'), ['id'])
result_df_option_1.show()
result_df_option_1.explain()

print('\n---\n')

print('OPTION #2 : Use Left Anti Join to effectively filter:')
left_anti_join_df = test_df.join(filtered_rhs_df, test_df['id'] == filtered_rhs_df['id'], 'left_anti')
print('Intermediate dataset (left_anti join) i.e. all LHS records which are not in RHS ')
left_anti_join_df.show(truncate=False)
print('RESULT #2 : filtered Left dataset with matching "id" from Right dataset using "intermediate Left_Anti Join"')
result_df_option_2 = test_df.join(left_anti_join_df, test_df.id == left_anti_join_df.id, 'left_anti')
result_df_option_2.show()
result_df_option_2.explain()

print('\n---\n')

print('OPTION #3 : Use Spark SQL to effectively filter:')
test_df.createOrReplaceTempView('lhs')
print('Temp table "lhs" is available for SparkSQL')
filtered_rhs_df.createOrReplaceTempView('rhs')
print('Temp table "rhs" is available for SparkSQL')

sparkSql = '''
SELECT lhs.* 
FROM lhs 
WHERE
 EXISTS 
 (SELECT 1 FROM rhs WHERE lhs.id = rhs.id)
'''
print(f'Spark SQL : {sparkSql}')

result_df_option_3_1 = spark.sql(sparkSql)
print('RESULT 3.1: All LHS records with "id" matching that of RHS:')
result_df_option_3_1.show()
result_df_option_3_1.explain()
print()

sparkSql = '''
SELECT lhs.* 
FROM lhs 
WHERE
 NOT EXISTS 
 (SELECT 1 FROM rhs WHERE lhs.id = rhs.id)
'''
print(f'Spark SQL : {sparkSql}')
result_df_option_3_2 = spark.sql(sparkSql)
print('RESULT 3.2: All LHS records with "id" NOT matching that of RHS:')
result_df_option_3_2.show()
result_df_option_3_2.explain()



LHS dataset:
+-----+---+----------------+--------+----------------+
| name| id|       dept_name|new_flag|       create_dt|
+-----+---+----------------+--------+----------------+
|    A|  1|    Data Science|       Y|2020-10-16 19:34|
|   BB|  2|            null|    null|2020-10-16 19:34|
|  CcC|  3|         Biotech|       N|2020-10-16 19:34|
| dddd|  4|    Astrophysics|    null|2020-10-16 19:34|
|NaMeE|  5|Nuclear Medicine|       Y|2020-10-16 19:34|
|  666|  6|       Phycology|       N|2020-10-16 19:34|
+-----+---+----------------+--------+----------------+

RHS dataset:
+---+--------+
| id|location|
+---+--------+
|  1| Seattle|
|  3|  Denver|
|  5|  Austin|
+---+--------+

Problem Statement: Find all LHS records with "id" matching with that from RHS

---

OPTION #1 : Use Equi Join to effectively filter:
RESULT #1 : filtered Left dataset with matching "id" from Right dataset using "Equi Join"
+---+-----+----------------+--------+----------------+
| id| name|       dept_name|new_flag|