In [0]:
# reading file ANZ_synthesised_transaction_dataset into dataframe 
df_txn = spark.read.format("csv").option("header", "true")\
.option("inferSchema","true").load("dbfs:/FileStore/shared_uploads/anilsjcit@gmail.com/ANZ_synthesised_transaction_dataset.csv")

In [0]:
# reading file postal_code into dataframe 
df_pc = spark.read.format("csv").option("header", "true")\
.option("inferSchema","true").load("dbfs:/FileStore/shared_uploads/anilsjcit@gmail.com/postal_codes.csv")

In [0]:
# 
df_txn_auth = df_txn.filter((df_txn['status'] == "authorized") & (df_txn['card_present_flag'] == 0))

In [0]:
#show the results of Apply rule: Status = “authorized” and card_present_flag=0
df_txn_auth.show(truncate=False)

+----------+-----------------+----------------+--------------+--------+--------------+---------------+------------------------------------+-------------+-----------+---------+--------+------+---+----------------+--------------+-------------------+------+--------------------------------+---------+--------------+-----------------+--------+
|status    |card_present_flag|bpay_biller_code|account       |currency|long_lat      |txn_description|merchant_id                         |merchant_code|first_name |balance  |date    |gender|age|merchant_suburb |merchant_state|extraction         |amount|transaction_id                  |country  |customer_id   |merchant_long_lat|movement|
+----------+-----------------+----------------+--------------+--------+--------------+---------------+------------------------------------+-------------+-----------+---------+--------+------+---+----------------+--------------+-------------------+------+--------------------------------+---------+--------------+--------

In [0]:
#Showing results based on group by to verify filter is working as expected
df_txn_auth.groupBy("status").count().show()

+----------+-----+
|    status|count|
+----------+-----+
|authorized| 1523|
+----------+-----+



In [0]:
#Showing results based on group by to verify filter is working as expected
df_txn_auth.groupBy("status","card_present_flag").count().show()

+----------+-----------------+-----+
|    status|card_present_flag|count|
+----------+-----------------+-----+
|authorized|                0| 1523|
+----------+-----------------+-----+



In [0]:
#Split the long_lat and merchant_long_lat fields for qualified records into long, lat and merch_long, merch_lat fields.
#first lets select few columns from dataframe
df_longlat = df_txn.select("status","account","long_lat","merchant_id","merchant_long_lat")

In [0]:
#selecting first 15 records
df_longlat.show(15,truncate=False)

+----------+--------------+-------------+------------------------------------+-----------------+
|status    |account       |long_lat     |merchant_id                         |merchant_long_lat|
+----------+--------------+-------------+------------------------------------+-----------------+
|authorized|ACC-1598451071|153.41 -27.95|81c48296-73be-44a7-befa-d053f48ce7cd|153.38 -27.99    |
|authorized|ACC-1598451071|153.41 -27.95|830a451c-316e-4a6a-bf25-e37caedca49e|151.21 -33.87    |
|authorized|ACC-1222300524|151.23 -33.94|835c231d-8cdf-4e96-859d-e9d571760cf0|151.21 -33.87    |
|authorized|ACC-1037050564|153.10 -27.66|48514682-c78a-4a88-b0da-2d6302e64673|153.05 -26.68    |
|authorized|ACC-1598451071|153.41 -27.95|b4e02c10-0852-4273-b8fd-7b3395e32eb0|153.44 -28.06    |
|posted    |ACC-1608363396|151.22 -33.87|null                                |null             |
|authorized|ACC-2776252858|144.95 -37.76|3aa18033-a0a9-4190-a117-b7caaf4d07e3|144.95 -37.53    |
|authorized|ACC-2776252858|144

In [0]:
#splitting longitude and latitude details
from pyspark.sql.functions import split
df_split = df_longlat.withColumn('long', split(df_longlat['long_lat'], ' ').getItem(0)) \
                     .withColumn('lat', split(df_longlat['long_lat'], ' ').getItem(1)) \
                     .withColumn('merch_long', split(df_longlat['merchant_long_lat'], ' ').getItem(0)) \
                     .withColumn('merch_lat', split(df_longlat['merchant_long_lat'], ' ').getItem(1))
df_split.show()

+----------+--------------+-------------+--------------------+-----------------+------+------+----------+---------+
|    status|       account|     long_lat|         merchant_id|merchant_long_lat|  long|   lat|merch_long|merch_lat|
+----------+--------------+-------------+--------------------+-----------------+------+------+----------+---------+
|authorized|ACC-1598451071|153.41 -27.95|81c48296-73be-44a...|    153.38 -27.99|153.41|-27.95|    153.38|   -27.99|
|authorized|ACC-1598451071|153.41 -27.95|830a451c-316e-4a6...|    151.21 -33.87|153.41|-27.95|    151.21|   -33.87|
|authorized|ACC-1222300524|151.23 -33.94|835c231d-8cdf-4e9...|    151.21 -33.87|151.23|-33.94|    151.21|   -33.87|
|authorized|ACC-1037050564|153.10 -27.66|48514682-c78a-4a8...|    153.05 -26.68|153.10|-27.66|    153.05|   -26.68|
|authorized|ACC-1598451071|153.41 -27.95|b4e02c10-0852-427...|    153.44 -28.06|153.41|-27.95|    153.44|   -28.06|
|    posted|ACC-1608363396|151.22 -33.87|                null|          

In [0]:
#Join postal reference data to pull the postal code to the output record
#Check few records from postal code csv file
df_pc.show(15,truncate=False)

+----------------+-----+-----------+
|suburb          |state|postal_code|
+----------------+-----+-----------+
|Ashmore         |QLD  |700001     |
|Sydney          |NSW  |700002     |
|Buderim         |QLD  |700003     |
|Mermaid Beach   |QLD  |700004     |
|Kalkallo        |VIC  |700005     |
|Melbourne       |VIC  |700006     |
|Yokine          |WA   |700007     |
|Cockburn Central|WA   |700008     |
|Mount Ommaney   |QLD  |700009     |
|Brunswick       |VIC  |700010     |
|Byron Bay       |NSW  |700011     |
|Lismore         |NSW  |700012     |
|Fremantle       |WA   |700013     |
|Mordialloc      |VIC  |700014     |
|Chatswood       |NSW  |700015     |
+----------------+-----+-----------+
only showing top 15 rows



In [0]:
#Do the inner join with field merchant_suburb from df_txn and field suburb from df_pc
df_join = df_txn.join(df_pc,df_txn.merchant_suburb ==  df_pc.suburb,"inner")
df_join.show(truncate=False)

+----------+-----------------+----------------+--------------+--------+-------------+---------------+------------------------------------+-------------+----------+--------+--------+------+---+----------------+--------------+-------------------+------+--------------------------------+---------+--------------+-----------------+--------+----------------+-----+-----------+
|status    |card_present_flag|bpay_biller_code|account       |currency|long_lat     |txn_description|merchant_id                         |merchant_code|first_name|balance |date    |gender|age|merchant_suburb |merchant_state|extraction         |amount|transaction_id                  |country  |customer_id   |merchant_long_lat|movement|suburb          |state|postal_code|
+----------+-----------------+----------------+--------------+--------+-------------+---------------+------------------------------------+-------------+----------+--------+--------+------+---+----------------+--------------+-------------------+------+-----

In [0]:
#select few records to verify join is working fine or not
df_join.select("status","account","long_lat","merchant_id","merchant_long_lat","merchant_suburb","state","postal_code").show(15,truncate=False)


+----------+--------------+-------------+------------------------------------+-----------------+----------------+-----+-----------+
|status    |account       |long_lat     |merchant_id                         |merchant_long_lat|merchant_suburb |state|postal_code|
+----------+--------------+-------------+------------------------------------+-----------------+----------------+-----+-----------+
|authorized|ACC-1598451071|153.41 -27.95|81c48296-73be-44a7-befa-d053f48ce7cd|153.38 -27.99    |Ashmore         |QLD  |700001     |
|authorized|ACC-1598451071|153.41 -27.95|830a451c-316e-4a6a-bf25-e37caedca49e|151.21 -33.87    |Sydney          |NSW  |700002     |
|authorized|ACC-1222300524|151.23 -33.94|835c231d-8cdf-4e96-859d-e9d571760cf0|151.21 -33.87    |Sydney          |NSW  |700002     |
|authorized|ACC-1037050564|153.10 -27.66|48514682-c78a-4a88-b0da-2d6302e64673|153.05 -26.68    |Buderim         |QLD  |700003     |
|authorized|ACC-1598451071|153.41 -27.95|b4e02c10-0852-4273-b8fd-7b3395e32eb