In [6]:
import pandas as pd
import os
from pyspark.sql import SparkSession, functions as F



# Create a spark session (which will run spark jobs)
spark = (
    SparkSession.builder.appName("ETL")
    .config("spark.sql.repl.eagerEval.enabled", True)
    .config("spark.sql.parquet.cacheMetadata", "true")
    .config("spark.sql.session.timeZone", "Etc/UTC")
    .config('spark.driver.memory', '4g')
    .config('spark.executor.memory', '2g')
    .getOrCreate()
)

In [7]:
output_relative_dir = '../../data/curated/'

if not os.path.exists(output_relative_dir):
    os.makedirs(output_relative_dir)
    
# now, for each type of data set we will need, we will create the paths
for target_dir in ('consumer', 'merchant', 'transaction', 'sa2'): 
    if not os.path.exists(output_relative_dir + target_dir):
        os.makedirs(output_relative_dir + target_dir)
        

In [8]:
output_relative_dir = '../../data/nulls&missing_analysis/'

if not os.path.exists(output_relative_dir):
    os.makedirs(output_relative_dir)
    
# now, for each type of data set we will need, we will create the paths
for target_dir in ('consumer', 'merchant', 'transaction', 'sa2'): 
    if not os.path.exists(output_relative_dir + target_dir):
        os.makedirs(output_relative_dir + target_dir)

In [9]:
from urllib.request import urlretrieve
import zipfile

# Define the output directory where you want to save the downloaded and extracted files
output_relative_dir = '../../data/'

if not os.path.exists(output_relative_dir):
    os.makedirs(output_relative_dir)
    
if not os.path.exists(output_relative_dir + "externaldataset"):
    os.makedirs(output_relative_dir + "externaldataset", exist_ok=True)
        
print("Download Start")
    
# URL for the first dataset
url1 = "https://www.abs.gov.au/census/find-census-data/datapacks/download/2021_GCP_SA2_for_AUS_short-header.zip"
output_file1 = output_relative_dir + "/externaldataset/2021_GCP_SA2_for_AUS_short-header.zip"
urlretrieve(url1, output_file1)
    
# URL for the second dataset
url2 = "https://data.gov.au/data/dataset/6cd8989d-4aca-46b7-b93e-77befcffa0b6/resource/cb659d81-5bd2-41f5-a3d0-67257c9a5893/download/asgs2021codingindexs.zip"
output_file2 = output_relative_dir + "/externaldataset/asgs2021codingindexs.zip"
urlretrieve(url2, output_file2)
    
# URL for the third dataset
    
url3 = "https://www.abs.gov.au/statistics/people/population/regional-population/2021-22/32180DS0003_2001-22r.xlsx"
output_file3 = output_relative_dir + "/externaldataset/SA2_Populations_AUS.xlsx"
urlretrieve(url3, output_file3)
    
print("Download Complete")

# List of files to extract from the ZIP files
files_to_extract1 = [
    "2021Census_G02_AUST_SA2.csv",
]

# file from the second dataset
files_to_extract2 = [
    "2022 Locality to 2021 SA2 Coding Index.csv"    
]

# Path to the ZIP files
zip_file_path1 = output_file1
zip_file_path2 = output_file2

# Destination folder for extraction
destination_folder = "../../data/externaldataset"

# Extract files from the first ZIP file
with zipfile.ZipFile(zip_file_path1, 'r') as zip_ref:
    for file_name in files_to_extract1:
        try:
            zip_ref.extract(f"2021 Census GCP Statistical Area 2 for AUS/{file_name}", destination_folder)
            print(f"Extracted from ZIP 1: {file_name}")
        except KeyError:
            print(f"File not found in ZIP 1: {file_name}")

# Extract files from the second ZIP file
with zipfile.ZipFile(zip_file_path2, 'r') as zip_ref:
    for file_name in files_to_extract2:
        try:
            zip_ref.extract(file_name, destination_folder)
            print(f"Extracted from ZIP 2: {file_name}")
        except KeyError:
            print(f"File not found in ZIP 2: {file_name}")

Download Start
Download Complete
Extracted from ZIP 1: 2021Census_G02_AUST_SA2.csv
Extracted from ZIP 2: 2022 Locality to 2021 SA2 Coding Index.csv


## Load in datasets

### Transactions

In [10]:
# 'transactions.parquet' files
path = '../../data/tables/transactions_20210228_20210827_snapshot'
transactions_21_02_21_08 = spark.read.parquet(path)

transactions_21_02_21_08.printSchema()


path = '../../data/tables/transactions_20210828_20220227_snapshot'
transactions_21_08_22_02 = spark.read.parquet(path)

transactions_21_08_22_02.printSchema()

path = '../../data/tables/transactions_20220228_20220828_snapshot'
transactions_22_02_22_08 = spark.read.parquet(path)

transactions_22_02_22_08.printSchema()

transactions_21_02_21_08.limit(10)


                                                                                

root
 |-- user_id: long (nullable = true)
 |-- merchant_abn: long (nullable = true)
 |-- dollar_value: double (nullable = true)
 |-- order_id: string (nullable = true)
 |-- order_datetime: date (nullable = true)

root
 |-- user_id: long (nullable = true)
 |-- merchant_abn: long (nullable = true)
 |-- dollar_value: double (nullable = true)
 |-- order_id: string (nullable = true)
 |-- order_datetime: date (nullable = true)



                                                                                

root
 |-- user_id: long (nullable = true)
 |-- merchant_abn: long (nullable = true)
 |-- dollar_value: double (nullable = true)
 |-- order_id: string (nullable = true)
 |-- order_datetime: date (nullable = true)



user_id,merchant_abn,dollar_value,order_id,order_datetime
18478,62191208634,63.255848959735246,949a63c8-29f7-4ab...,2021-08-20
2,15549624934,130.3505283105634,6a84c3cf-612a-457...,2021-08-20
18479,64403598239,120.15860593212784,b10dcc33-e53f-425...,2021-08-20
3,60956456424,136.6785200286976,0f09c5a5-784e-447...,2021-08-20
18479,94493496784,72.96316578355305,f6c78c1a-4600-4c5...,2021-08-20
3,76819856970,448.529684285612,5ace6a24-cdf0-4aa...,2021-08-20
18479,67609108741,86.4040605836911,d0e180f0-cb06-42a...,2021-08-20
3,34096466752,301.5793450525113,6fb1ff48-24bb-4f9...,2021-08-20
18482,70501974849,68.75486276223054,8505fb33-b69a-412...,2021-08-20
4,49891706470,48.89796461900801,ed11e477-b09f-4ae...,2021-08-20


### Consumer Data

In [11]:
# 'consumer_fraud_probability.csv' file
path = '../../data/tables/consumer_fraud_probability.csv'
cust_fp = spark.read.csv(path, header=True)

cust_fp.printSchema()
cust_fp.limit(10)
# cust_fp.dtypes
# cust_fp.count()

root
 |-- user_id: string (nullable = true)
 |-- order_datetime: string (nullable = true)
 |-- fraud_probability: string (nullable = true)



user_id,order_datetime,fraud_probability
6228,2021-12-19,97.6298077657765
21419,2021-12-10,99.24738020302328
5606,2021-10-17,84.05825045251777
3101,2021-04-17,91.42192091901347
22239,2021-10-19,94.70342477508036
16556,2022-02-20,89.65663294494827
10278,2021-09-28,83.59136689427714
15790,2021-12-30,71.77065889280253
5233,2021-08-29,85.87123303878818
230,2021-08-28,86.28328808934151


In [12]:
# 'consumer_user_details.parquet' file
path = '../../data/tables/consumer_user_details.parquet'
cust_user_det = spark.read.parquet(path)

cust_user_det.printSchema()
cust_user_det.limit(10)
# cust_user_det.count()

root
 |-- user_id: long (nullable = true)
 |-- consumer_id: long (nullable = true)



user_id,consumer_id
1,1195503
2,179208
3,1194530
4,154128
5,712975
6,407340
7,511685
8,448088
9,650435
10,1058499


In [13]:
# 'tbl_consumer.csv' consumer data file
path = '../../data/tables/tbl_consumer.csv'
cust_tbl = spark.read.csv(path, sep='|', header=True)

cust_tbl.printSchema()
cust_tbl.limit(10)
# cust_tbl.dtypes
# cust_tbl.count()

root
 |-- name: string (nullable = true)
 |-- address: string (nullable = true)
 |-- state: string (nullable = true)
 |-- postcode: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- consumer_id: string (nullable = true)



name,address,state,postcode,gender,consumer_id
Yolanda Williams,413 Haney Gardens...,WA,6935,Female,1195503
Mary Smith,3764 Amber Oval,NSW,2782,Female,179208
Jill Jones MD,40693 Henry Greens,NT,862,Female,1194530
Lindsay Jimenez,00653 Davenport C...,NSW,2780,Female,154128
Rebecca Blanchard,9271 Michael Mano...,WA,6355,Female,712975
Karen Chapman,2706 Stewart Oval...,NSW,2033,Female,407340
Andrea Jones,122 Brandon Cliff,QLD,4606,Female,511685
Stephen Williams,6804 Wright Crest...,WA,6056,Male,448088
Stephanie Reyes,5813 Denise Land ...,NSW,2482,Female,650435
Jillian Gonzales,461 Ryan Common S...,VIC,3220,Female,1058499


### Merchant Data

In [14]:
# 'merchant_fraud_probability.csv' file
path = '../../data/tables/merchant_fraud_probability.csv'
merch_fp = spark.read.csv(path, header=True)

merch_fp.printSchema()
merch_fp.limit(10)
# merch_fp.dtypes
# merch_fp.count()

root
 |-- merchant_abn: string (nullable = true)
 |-- order_datetime: string (nullable = true)
 |-- fraud_probability: string (nullable = true)



merchant_abn,order_datetime,fraud_probability
19492220327,2021-11-28,44.40365864749536
31334588839,2021-10-02,42.75530083865367
19492220327,2021-12-22,38.867790051131095
82999039227,2021-12-19,94.1347004808891
90918180829,2021-09-02,43.32551731714902
31334588839,2021-12-26,38.36165958070444
23686790459,2021-12-10,79.4543441508535
14827550074,2021-11-26,46.45775596795885
31334588839,2021-11-26,36.20971272078342
19492220327,2021-12-18,33.819672154331755


In [15]:
# 'tbl_merchants.csv' merchants data file
path = '../../data/tables/tbl_merchants.parquet'
merch_tbl = spark.read.parquet(path)

merch_tbl.printSchema()
merch_tbl.limit(10)

root
 |-- name: string (nullable = true)
 |-- tags: string (nullable = true)
 |-- merchant_abn: long (nullable = true)



name,tags,merchant_abn
Felis Limited,"((furniture, home...",10023283211
Arcu Ac Orci Corp...,"([cable, satellit...",10142254217
Nunc Sed Company,"([jewelry, watch,...",10165489824
Ultricies Digniss...,"([wAtch, clock, a...",10187291046
Enim Condimentum PC,([music shops - m...,10192359162
Fusce Company,"[(gift, card, nov...",10206519221
Aliquam Enim Inco...,"[(computers, comP...",10255988167
Ipsum Primis Ltd,"[[watch, clock, a...",10264435225
Pede Ultrices Ind...,([computer progra...,10279061213
Nunc Inc.,"[(furniture, home...",10323485998


### External Data

In [16]:
# 2021 census data CSVs

path = '../../data/externaldataset/2021 Census GCP Statistical Area 2 for AUS/2021Census_G02_AUST_SA2.csv'
sa2_census = spark.read.csv(path, header=True)

sa2_census.limit(10)


SA2_CODE_2021,Median_age_persons,Median_mortgage_repay_monthly,Median_tot_prsnl_inc_weekly,Median_rent_weekly,Median_tot_fam_inc_weekly,Average_num_psns_per_bedroom,Median_tot_hhd_inc_weekly,Average_household_size
101021007,51,1732,760,330,1886,0.8,1429,2.2
101021008,38,1950,975,350,2334,0.8,1989,2.6
101021009,37,1700,996,330,2233,0.9,1703,2.1
101021010,36,1700,1104,310,2412,0.9,1796,2.1
101021012,37,2300,1357,430,3332,0.8,3014,2.9
101021610,31,2600,1629,531,3444,0.8,3363,3.1
101021611,42,2417,1243,465,3117,0.8,2926,2.9
101031013,49,1083,704,200,1759,0.7,1253,2.2
101031014,44,1300,771,290,1911,0.7,1377,2.2
101031015,50,1627,794,290,2068,0.7,1596,2.4


In [17]:
###############################################################################################################################################################################################

In [18]:
# SA2 population data

path = '../../data/externaldataset/SA2_Populations_AUS.xlsx'

# Have to manually fix the dataframe as the formatting is messed up

sa2_pops = pd.read_excel(path, sheet_name=1)

# remove empty / useless rows at top
sa2_pops = sa2_pops.iloc[5:,:]

sa2_pops.head()

# drop unnecessary columns
sa2_pops = sa2_pops.drop(sa2_pops.iloc[:, 10:30],axis = 1)

sa2_pops = sa2_pops.drop(sa2_pops.columns[[0,1,2,3,4,6,11]], axis=1)

# fix header
sa2_pops.columns = sa2_pops.iloc[1]

sa2_pops = sa2_pops[3:]

# drop empty rows at bottom of table
sa2_pops = sa2_pops.dropna()

sa2_pops = sa2_pops.rename(columns={'no.': 'population_2021'})

sa2_pops.head()

6,SA4 name,SA3 name,SA2 code,SA2 name,population_2021
8,Capital Region,Queanbeyan,101021007,Braidwood,4332
9,Capital Region,Queanbeyan,101021008,Karabar,8548
10,Capital Region,Queanbeyan,101021009,Queanbeyan,11375
11,Capital Region,Queanbeyan,101021010,Queanbeyan - East,5097
12,Capital Region,Queanbeyan,101021012,Queanbeyan West - Jerrabomberra,12748


In [19]:
###############################################################################################################################################################################################

In [20]:
# Locality to SA2 coding index CSV

path = '../../data/externaldataset/2022 Locality to 2021 SA2 Coding Index.csv'

sa2_to_postcode = spark.read.csv(path, header=True)

sa2_to_postcode.limit(10)

LOCALITY_ID,LOCALITY_NAME,LOCALITY_TYPE,POSTCODE,STATE,SA2_MAINCODE_2021,SA2_NAME_2021
ABS195,ACTON PARK,EXTRA LOCALITY,2600,ACT,801051123,Black Mountain
loc6f02f098e82d,ACTON,GAZETTED LOCALITY,2601,ACT,801051049,Acton
loc6f02f098e82d,ANU,ALIAS LOCALITY,2601,ACT,801051049,Acton
loc6f02f098e82d,AUSTRALIAN NATION...,ALIAS LOCALITY,2601,ACT,801051049,Acton
loc6f02f098e82d,SPINNAKER ISLAND,ALIAS LOCALITY,2601,ACT,801051049,Acton
loc6f02f098e82d,SPRINGBANK ISLAND,ALIAS LOCALITY,2601,ACT,801051049,Acton
loc8af26acb532d,AINSLIE,GAZETTED LOCALITY,2602,ACT,801051050,Ainslie
locf0cacaa1119e,AMAROO,GAZETTED LOCALITY,2914,ACT,801041034,Amaroo
locf94248966b6c,ARANDA,GAZETTED LOCALITY,2614,ACT,801011001,Aranda
ABS358,BELLVIEW,EXTRA LOCALITY,2620,ACT,801071089,Tuggeranong


## 1st Step: Remove useless columns, data type conversions, rename columns where necessary

### Transactions

In [21]:
#transactions_21_02_21_08 = transactions_21_02_21_08.drop('order_id')

#transactions_21_08_22_02 = transactions_21_08_22_02.drop('order_id')

#transactions_22_02_22_08 = transactions_22_02_22_08.drop('order_id')

transactions_21_02_21_08.printSchema()

transactions_21_08_22_02.printSchema()

transactions_22_02_22_08.printSchema()

root
 |-- user_id: long (nullable = true)
 |-- merchant_abn: long (nullable = true)
 |-- dollar_value: double (nullable = true)
 |-- order_id: string (nullable = true)
 |-- order_datetime: date (nullable = true)

root
 |-- user_id: long (nullable = true)
 |-- merchant_abn: long (nullable = true)
 |-- dollar_value: double (nullable = true)
 |-- order_id: string (nullable = true)
 |-- order_datetime: date (nullable = true)

root
 |-- user_id: long (nullable = true)
 |-- merchant_abn: long (nullable = true)
 |-- dollar_value: double (nullable = true)
 |-- order_id: string (nullable = true)
 |-- order_datetime: date (nullable = true)



### Consumer Data

In [22]:
cust_fp = \
cust_fp.withColumn(
    'user_id',
    F.col('user_id').cast('long')
).withColumn(
    'order_datetime',
    F.col('order_datetime').cast('date')
).withColumn(
    'consumer_fraud_probability_%',
    F.col('fraud_probability').cast('double')
).drop('fraud_probability')

cust_fp.printSchema()


root
 |-- user_id: long (nullable = true)
 |-- order_datetime: date (nullable = true)
 |-- consumer_fraud_probability_%: double (nullable = true)



In [23]:
cust_user_det.printSchema()

root
 |-- user_id: long (nullable = true)
 |-- consumer_id: long (nullable = true)



In [24]:
cust_tbl = \
cust_tbl.withColumn(
    'postcode',
    F.col('postcode').cast('long')
).withColumnRenamed(
    'name',
    'consumer_name'   
).withColumnRenamed(
    'state',
    'consumer_state'   
).withColumnRenamed(
    'postcode',
    'consumer_postcode'   
).withColumnRenamed(
    'gender',
    'consumer_gender'   
).withColumn(
    'consumer_id',
    F.col('consumer_id').cast('long')
).drop('address')

cust_tbl.printSchema()

root
 |-- consumer_name: string (nullable = true)
 |-- consumer_state: string (nullable = true)
 |-- consumer_postcode: long (nullable = true)
 |-- consumer_gender: string (nullable = true)
 |-- consumer_id: long (nullable = true)



### Merchant Data

In [25]:
merch_fp = \
merch_fp.withColumn(
    'merchant_abn',
    F.col('merchant_abn').cast('long')
).withColumn(
    'order_datetime',
    F.col('order_datetime').cast('date')
).withColumn(
    'merchant_fraud_probability_%',
    F.col('fraud_probability').cast('double')
).drop('fraud_probability')

merch_fp.printSchema()

root
 |-- merchant_abn: long (nullable = true)
 |-- order_datetime: date (nullable = true)
 |-- merchant_fraud_probability_%: double (nullable = true)



In [26]:
merch_tbl = \
merch_tbl.withColumnRenamed(
    'name',
    'merchant_name'
)

merch_tbl.printSchema()

root
 |-- merchant_name: string (nullable = true)
 |-- tags: string (nullable = true)
 |-- merchant_abn: long (nullable = true)



### External Data

In [27]:
# 2021 census data CSVs

sa2_census.printSchema()


root
 |-- SA2_CODE_2021: string (nullable = true)
 |-- Median_age_persons: string (nullable = true)
 |-- Median_mortgage_repay_monthly: string (nullable = true)
 |-- Median_tot_prsnl_inc_weekly: string (nullable = true)
 |-- Median_rent_weekly: string (nullable = true)
 |-- Median_tot_fam_inc_weekly: string (nullable = true)
 |-- Average_num_psns_per_bedroom: string (nullable = true)
 |-- Median_tot_hhd_inc_weekly: string (nullable = true)
 |-- Average_household_size: string (nullable = true)



In [28]:
sa2_census = sa2_census.withColumn(
    'sa2_code',
    F.col('SA2_CODE_2021').cast('long')
).withColumn(
    'sa2_median_age',
    F.col('Median_age_persons').cast('long')
).withColumn(
    'sa2_median_mortgage_repay_monthly',
    F.col('Median_mortgage_repay_monthly').cast('long')
).withColumn(
    'sa2_median_tot_prsnl_inc_weekly',
    F.col('Median_tot_prsnl_inc_weekly').cast('long')
).withColumn(
    'sa2_median_rent_weekly',
    F.col('Median_rent_weekly').cast('long')
).withColumn(
    'sa2_median_tot_fam_inc_weekly',
    F.col('Median_tot_fam_inc_weekly').cast('long')
).withColumn(
    'sa2_average_num_psns_per_bedroom',
    F.col('Average_num_psns_per_bedroom').cast('double')
).withColumn(
    'sa2_median_tot_hhd_inc_weekly',
    F.col('Median_tot_hhd_inc_weekly').cast('long')
).withColumn(
    'sa2_average_household_size',
    F.col('Average_household_size').cast('double')
)

sa2_census = sa2_census.select(sa2_census.columns[9:])

In [29]:
sa2_census.printSchema()

root
 |-- sa2_code: long (nullable = true)
 |-- sa2_median_age: long (nullable = true)
 |-- sa2_median_mortgage_repay_monthly: long (nullable = true)
 |-- sa2_median_tot_prsnl_inc_weekly: long (nullable = true)
 |-- sa2_median_rent_weekly: long (nullable = true)
 |-- sa2_median_tot_fam_inc_weekly: long (nullable = true)
 |-- sa2_average_num_psns_per_bedroom: double (nullable = true)
 |-- sa2_median_tot_hhd_inc_weekly: long (nullable = true)
 |-- sa2_average_household_size: double (nullable = true)



In [30]:
###############################################################################################################################################################################################

In [31]:
# SA2 population data
sa2_pops=spark.createDataFrame(sa2_pops)

In [32]:
sa2_pops.limit(10)

sa2_pops.printSchema()

root
 |-- SA4 name: string (nullable = true)
 |-- SA3 name: string (nullable = true)
 |-- SA2 code: long (nullable = true)
 |-- SA2 name: string (nullable = true)
 |-- population_2021: long (nullable = true)



In [33]:
# convert casings and rename columns where necessary
sa2_pops = \
sa2_pops.withColumnRenamed(
    'SA4 name',
    'sa4_name'
).withColumnRenamed(
    'SA3 name',
    'sa3_name'
).withColumnRenamed(
    'SA2 code',
    'sa2_code'
).withColumnRenamed(
    'SA2 name',
    'sa2_name'
)


In [34]:
###############################################################################################################################################################################################

In [35]:
# Locality to SA2 coding index CSV
sa2_to_postcode.limit(10)

sa2_to_postcode.printSchema()



root
 |-- LOCALITY_ID: string (nullable = true)
 |-- LOCALITY_NAME : string (nullable = true)
 |-- LOCALITY_TYPE: string (nullable = true)
 |-- POSTCODE: string (nullable = true)
 |-- STATE: string (nullable = true)
 |-- SA2_MAINCODE_2021: string (nullable = true)
 |-- SA2_NAME_2021: string (nullable = true)



In [36]:
sa2_to_postcode = sa2_to_postcode.select(['POSTCODE', 'SA2_MAINCODE_2021']).withColumn(
    'POSTCODE',
    F.col('POSTCODE').cast('long')
).withColumn(
    'SA2_MAINCODE_2021',
    F.col('SA2_MAINCODE_2021').cast('long')
).withColumnRenamed(
    'SA2_MAINCODE_2021',
    'sa2_code'
)

sa2_to_postcode = sa2_to_postcode.toDF(*[c.lower() for c in sa2_to_postcode.columns])

## 2nd Step: Cleaning, feature engineering, aggregation

### Transactions

In [37]:
transactions_all = transactions_21_02_21_08.union(transactions_21_08_22_02).union(transactions_22_02_22_08)

In [38]:
transactions_all.sort('order_datetime').head(3)

                                                                                

[Row(user_id=1, merchant_abn=28000487688, dollar_value=133.22689421562643, order_id='0c37b3f7-c7f1-48cb-bcc7-0a58e76608ea', order_datetime=datetime.date(2021, 2, 28)),
 Row(user_id=1, merchant_abn=83690644458, dollar_value=30.441348317517228, order_id='40a2ff69-ea34-4657-8429-df7ca957d6a1', order_datetime=datetime.date(2021, 2, 28)),
 Row(user_id=18485, merchant_abn=62191208634, dollar_value=79.13140006851712, order_id='9e18b913-0465-4fd4-92fd-66d15e65d93c', order_datetime=datetime.date(2021, 2, 28))]

In [39]:
transactions_all.sort('order_datetime').tail(3)

                                                                                

[Row(user_id=11136, merchant_abn=63685007785, dollar_value=48.12988669206644, order_id='a6194c61-ae7a-4037-a425-2dd7cda42fb7', order_datetime=datetime.date(2022, 10, 26)),
 Row(user_id=11137, merchant_abn=85502310765, dollar_value=4.768751754248673, order_id='72a3639e-25d5-4bcc-9e8b-26e737b8c0e4', order_datetime=datetime.date(2022, 10, 26)),
 Row(user_id=11138, merchant_abn=43186523025, dollar_value=35.12766841919401, order_id='3f3b1d40-e8ec-4d4b-ac08-5f273586523c', order_datetime=datetime.date(2022, 10, 26))]

In [40]:
from datetime import datetime

from pyspark.sql import functions as F
from datetime import datetime

df = transactions_all

df.write.mode('overwrite').parquet("../../data/nulls&missing_analysis/transaction/transactions_all.parquet")

# 1. Remove Null Values
df = df.na.drop()

# 2. Check for Duplicates
df = df.dropDuplicates()
df = df.dropDuplicates(['order_id'])

# 3. Validate user_id and merchant_abn
df = df.filter((F.col('user_id') > 0) & (F.col('merchant_abn') > 0))

# 4. Validate dollar value
df = df.filter(F.col('dollar_value') > 0)

# 5. Validate order_datetime for the specified date range
start_date = datetime.strptime("20210228", "%Y%m%d").date()
end_date = datetime.strptime("20221026", "%Y%m%d").date()
df = df.filter(F.col('order_datetime').between(start_date, end_date))


transactions_all_clean = df.drop('order_id')

                                                                                

In [41]:
transactions_all_clean

                                                                                

user_id,merchant_abn,dollar_value,order_datetime
83,46804135891,40.88736687931136,2021-08-20
18574,40252040480,110.2065688165326,2021-08-20
18653,17324645993,6.067842686819487,2021-08-20
215,81219314324,66.32075609316571,2021-08-20
18860,77013874702,2.082645905098512,2021-08-20
18881,29616684420,91.12197214702582,2021-08-20
399,71674475255,34.45156578850835,2021-08-20
646,24852446429,44.287278647072334,2021-08-20
19179,95492562735,43.393091988896295,2021-08-20
19230,11437621158,259.7314287658183,2021-08-20


### Consumer Data

In [42]:
from datetime import datetime

from pyspark.sql import functions as F
from datetime import datetime

df = cust_user_det

df.write.mode('overwrite').parquet("../../data/nulls&missing_analysis/consumer/consumer_user_details.parquet")

# 1. Remove Null Values
df = df.na.drop()

# 2. Check for Duplicates
df = df.dropDuplicates()

# 3. Validate consumer_id and user_id
df = df.filter((F.col('consumer_id') > 0) & (F.col('user_id') > 0))

cust_user_det_clean = df

In [43]:
from datetime import datetime

from pyspark.sql import functions as F
from datetime import datetime

df = cust_tbl

df.write.mode('overwrite').parquet("../../data/nulls&missing_analysis/consumer/consumer_tbl.parquet")


# 1. Remove Null Values
df = df.na.drop()

# 2. Check for Duplicates
df = df.dropDuplicates()

# 3. Validate consumer_id
df = df.filter((F.col('consumer_id') > 0))

# 4. Validate consumer_name 
df = df.filter(F.col('consumer_name').rlike("[a-zA-Z][a-zA-Z ]+"))

# 5. Validate consumer_state 
df = df.filter(F.col('consumer_state').isin(['NT','ACT','SA','TAS','WA','QLD','VIC','NSW']))

# 6. Validate consumer_postcode
df = df.filter(F.col('consumer_postcode').between(200, 9729))

# 7. Validate consumer_gender
df = df.filter(F.col('consumer_gender').isin(['Undisclosed', 'Male', 'Female']))

cust_tbl_clean = df
               
            

                                                                                

In [44]:
from datetime import datetime

from pyspark.sql import functions as F
from datetime import datetime

df = cust_fp

df.write.mode('overwrite').parquet("../../data/nulls&missing_analysis/consumer/consumer_fraud_probability.parquet")

# 1. Remove Null Values
df = df.na.drop()

# 2. Check for Duplicates
df = df.dropDuplicates()

# 3. Validate user_id
df = df.filter((F.col('user_id') > 0))

# 4. Validate order_datetime for the specified date range
start_date = datetime.strptime("20210228", "%Y%m%d").date()
end_date = datetime.strptime("20221026", "%Y%m%d").date()
df = df.filter(F.col('order_datetime').between(start_date, end_date))

# 5. Validate consumer fraud probability
df = df.filter(F.col('consumer_fraud_probability_%').between(0, 100))

cust_fp_clean = df

In [45]:
cust_fp_clean.sort('user_id')

user_id,order_datetime,consumer_fraud_probability_%
1,2022-02-20,9.80543113652096
2,2021-09-25,10.069850934775245
2,2021-08-30,9.599513915425788
3,2021-11-03,8.300636455314633
4,2021-10-09,9.63330241109042
5,2022-01-11,27.496186536467164
5,2022-02-08,9.02022421158597
5,2021-10-04,10.868364868449886
6,2021-12-12,10.459280127078758
9,2021-12-13,10.58055311139687


### Merchant Data

In [46]:
merch_tbl.count()

4026

In [47]:
# Extract different features from tags (words, revenue band, take rate)
import re
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType, FloatType, ArrayType

# Define a UDF to extract the sections within square and normal brackets
def split_tags(input_tags):
    sep_tags = re.findall(r'[\[\(\{]([^\[\]\(\)\{\}]*)[\]\)\}]', input_tags)
    sep_tags_without_brackets = [re.sub(r'[\[\]\(\)\{\}]', '', tag) for tag in sep_tags]
        
    return sep_tags_without_brackets

# Define a UDF to extract "take rate" numbers
def extract_take_rate(input_rate):
    match = re.search(r'take rate: ([\d.]+)', input_rate)
    if match:
        return float(match.group(1))
    return None

# Register the UDFs
split_tags_udf = udf(split_tags, ArrayType(StringType()))

clean_take_rate_udf = udf(extract_take_rate, FloatType())

# Apply the UDF to the DataFrame
merch_tbl_clean = merch_tbl.withColumn("sep_tags", split_tags_udf(merch_tbl["tags"]))

# Create separate columns each segment of the tags
merch_tbl_clean = merch_tbl_clean.withColumn("words", merch_tbl_clean["sep_tags"].getItem(0))
merch_tbl_clean = merch_tbl_clean.withColumn("revenue_level", merch_tbl_clean["sep_tags"].getItem(1))
merch_tbl_clean = merch_tbl_clean.withColumn("take_rate_%", merch_tbl_clean["sep_tags"].getItem(2))

merch_tbl_clean = merch_tbl_clean.withColumn("take_rate_%", clean_take_rate_udf(merch_tbl_clean["take_rate_%"]))



In [48]:
merch_tbl_clean = merch_tbl_clean.drop('tags', 'sep_tags')

merch_tbl_clean.limit(10)

                                                                                

merchant_name,merchant_abn,words,revenue_level,take_rate_%
Felis Limited,10023283211,"furniture, home f...",e,0.18
Arcu Ac Orci Corp...,10142254217,"cable, satellite,...",b,4.22
Nunc Sed Company,10165489824,"jewelry, watch, c...",b,4.4
Ultricies Digniss...,10187291046,"wAtch, clock, and...",b,3.29
Enim Condimentum PC,10192359162,music shops - mus...,a,6.33
Fusce Company,10206519221,"gift, card, novel...",a,6.34
Aliquam Enim Inco...,10255988167,"computers, comPUt...",b,4.32
Ipsum Primis Ltd,10264435225,"watch, clock, and...",c,2.39
Pede Ultrices Ind...,10279061213,computer programm...,a,5.71
Nunc Inc.,10323485998,"furniture, home f...",a,6.61


In [49]:
merch_tbl_clean.count()

4026

In [50]:
df = merch_tbl_clean 

df.write.mode('overwrite').parquet("../../data/nulls&missing_analysis/merchant/merchant_tbl.parquet")

# 1. Remove Null Values
df = df.na.drop()

# 2. Check for Duplicates
df = df.dropDuplicates()

# 3. Validate merchant_abn
df = df.filter((F.col('merchant_abn') > 0))

# 4. Validate merchant_name 
df = df.filter(F.col('merchant_name').rlike("[a-zA-Z][a-zA-Z ]+"))

# 5. Validate words (SPELL CHECK?) # https://scikit-learn.org/stable/modules/generated/sklearn.feature_extraction.text.TfidfVectorizer.html#sklearn.feature_extraction.text.TfidfVectorizer)
                                   # https://scikit-learn.org/stable/modules/generated/sklearn.feature_extraction.text.CountVectorizer.html 
#df = df.filter(F.col('words') in ['NT','ACT','SA','TAS','WA','QLD','VIC','NSW'])

# 6. Validate take_rate
df = df.filter((F.col('take_rate_%').between(0, 100)))

# 7. Validate revenue_level
df = df.filter(F.col('revenue_level').isin(['a', 'b', 'c', 'd', 'e']))


merch_tbl_clean = df


In [51]:
df = merch_fp

df.write.mode('overwrite').parquet("../../data/nulls&missing_analysis/merchant/merchant_fraud_probability.parquet")

# 1. Check for Null Values
df = df.na.drop()

# 2. Check for Duplicates
df = df.dropDuplicates()

# 3. Validate merchant_abn
df = df.filter((F.col('merchant_abn') > 0))

# 4. Validate order_datetime for the specified date range
start_date = datetime.strptime("20210228", "%Y%m%d").date()
end_date = datetime.strptime("20221026", "%Y%m%d").date()
df = df.filter(F.col('order_datetime').between(start_date, end_date))

# 5. Validate merchant fraud probability
df = df.filter(F.col('merchant_fraud_probability_%').between(0, 100))

merch_fp_clean = df


### External Data

In [52]:
# 2021 census data CSVs

df = sa2_census

df.write.mode('overwrite').parquet("../../data/nulls&missing_analysis/sa2/sa2_census.parquet")

# 1. Remove Null Values
df = df.na.drop()

# 2. Check for Duplicates
df = df.dropDuplicates()

# 3. Validate sa2_code
df = df.filter(F.col('sa2_code') > 0)

# 4. Validate median_age
df = df.filter(F.col('sa2_median_age') >= 0)

# 5. Validate median mortgage repay monthly
df = df.filter(F.col('sa2_median_mortgage_repay_monthly') >= 0)

# 6. Validate median total personal weekly income
df = df.filter(F.col('sa2_median_tot_prsnl_inc_weekly') >= 0)

# 7. Validate median weekly rent
df = df.filter(F.col('sa2_median_rent_weekly') >= 0)

# 8. Validate median total family weekly income
df = df.filter(F.col('sa2_median_tot_fam_inc_weekly') >= 0)

# 9. Validate average number of persons per bedroom
df = df.filter(F.col('sa2_average_num_psns_per_bedroom') >= 0)

# 10. Validate median total household weekly income
df = df.filter(F.col('sa2_median_tot_hhd_inc_weekly') >= 0)

# 11. Validate average household size
df = df.filter(F.col('sa2_average_household_size') >= 0)

sa2_census_clean = df

In [53]:
###############################################################################################################################################################################################

In [54]:
sa2_pops.limit(10)

sa2_pops.printSchema()

root
 |-- sa4_name: string (nullable = true)
 |-- sa3_name: string (nullable = true)
 |-- sa2_code: long (nullable = true)
 |-- sa2_name: string (nullable = true)
 |-- population_2021: long (nullable = true)



In [55]:
df = sa2_pops

df.write.mode('overwrite').parquet("../../data/nulls&missing_analysis/sa2/sa2_pops.parquet")

# 1. Remove Null Values
df = df.na.drop()

# 2. Check for Duplicates
df = df.dropDuplicates()

# 3. Validate population
df = df.filter(F.col('population_2021') >= 0)

sa2_pops_clean = df


                                                                                

In [56]:
###############################################################################################################################################################################################

In [57]:
df = sa2_to_postcode

df.write.mode('overwrite').parquet("../../data/nulls&missing_analysis/sa2/sa2_to_postcode.parquet")

# 1. Check for Null Values
df = df.na.drop()

# 2. Check for Duplicates
df = df.dropDuplicates()

# No need to validate postcode as inner join will be performed with validated postcodes

sa2_to_postcode_clean=df

sa2_to_postcode_clean.sort('postcode').limit(10)



postcode,sa2_code
800,701011002
810,701021029
810,701021026
810,701021027
810,701021016
810,701021023
810,701021018
810,701021010
810,701021024
810,701021013


### Aggregation

In [58]:
output_relative_dir = '../data/curated/'

# check if it exists as it makedir will raise an error if it does exist
if not os.path.exists(output_relative_dir):
    os.makedirs(output_relative_dir)
   

# now, for each type of data set we will need, we will create the paths
for target_dir in ('consumer', 'merchant'): # taxi_zones should already exist
    if not os.path.exists(output_relative_dir + target_dir):
        os.makedirs(output_relative_dir + target_dir)


In [59]:
#cons_combined = 

In [60]:
#cons_combined.limit(5)

In [61]:
#cons_combined.write.parquet("../data/curated/consumer/consumer_combined.parquet")

In [62]:
#merch_combined

In [63]:
#merch_combined.limit(5)

In [64]:
#merch_combined.write.parquet("../data/curated/merchant/merchant_combined.parquet")

In [65]:
orig_combined = cust_tbl_clean.join(cust_user_det_clean, on='consumer_id', how='inner') \
.join(transactions_all_clean, on='user_id', how ='inner') \
.join(cust_fp_clean, on=['user_id', 'order_datetime'], how='left').na.fill(0) \
.join(merch_tbl_clean, on='merchant_abn', how='inner') \
.join(merch_fp_clean, on=['merchant_abn', 'order_datetime'], how='left').na.fill(0)



In [66]:
new_column_name_list= ['sa2_code'] + ['consumer_' + col for col in sa2_census_clean.columns[1:]]

sa2_census_clean = sa2_census_clean.toDF(*new_column_name_list)

sa2_combined = sa2_to_postcode_clean.join(sa2_pops_clean, on='sa2_code', how='inner').withColumnRenamed('population_2021','sa2_population').join(sa2_census, on='sa2_code', how='inner')


In [67]:
all_combined = orig_combined.join(sa2_combined.withColumnRenamed('postcode','consumer_postcode'), on='consumer_postcode', how='inner')

In [68]:
all_combined = all_combined.withColumnRenamed(
    'sa2_code',
    'consumer_sa2_code'
).withColumnRenamed(
    'sa4_name',
    'consumer_sa4_name'
).withColumnRenamed(
    'sa3_name',
    'consumer_sa3_name'
).withColumnRenamed(
    'sa2_name',
    'consumer_sa2_name'
).withColumnRenamed(
    'sa2_population_2021',
    'consumer_sa2_pop_2021'
).withColumnRenamed(
    'words',
    'merchant_description'
).withColumnRenamed(
    'revenue_level',
    'merchant_revenue_level'
).withColumnRenamed(
    'take_rate_%',
    'merchant_take_rate_%'
)



In [69]:
all_combined.limit(5)

23/09/05 21:38:07 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
                                                                                

consumer_postcode,merchant_abn,order_datetime,user_id,consumer_id,consumer_name,consumer_state,consumer_gender,dollar_value,consumer_fraud_probability_%,merchant_name,merchant_description,merchant_revenue_level,merchant_take_rate_%,merchant_fraud_probability_%,consumer_sa2_code,consumer_sa4_name,consumer_sa3_name,consumer_sa2_name,sa2_population,sa2_median_age,sa2_median_mortgage_repay_monthly,sa2_median_tot_prsnl_inc_weekly,sa2_median_rent_weekly,sa2_median_tot_fam_inc_weekly,sa2_average_num_psns_per_bedroom,sa2_median_tot_hhd_inc_weekly,sa2_average_household_size
4606,49505931725,2022-10-13,7,511685,Andrea Jones,QLD,Female,80.56308158918402,0.0,Suspendisse Ac As...,digital goods: bo...,b,4.7,0.0,319021506,Wide Bay,Burnett,Kingaroy Surround...,9615,47,992,490,180,1190,0.8,950,2.4
4606,80518954462,2021-04-11,7,511685,Andrea Jones,QLD,Female,248.259198693175,0.0,Neque Sed Dictum ...,"computers, compUt...",b,3.49,0.0,319021506,Wide Bay,Burnett,Kingaroy Surround...,9615,47,992,490,180,1190,0.8,950,2.4
4606,20885454195,2021-06-29,7,511685,Andrea Jones,QLD,Female,106.09604494465586,0.0,Pharetra Ut Indus...,"cable, satellite,...",b,4.94,0.0,319021506,Wide Bay,Burnett,Kingaroy Surround...,9615,47,992,490,180,1190,0.8,950,2.4
4606,78327477486,2022-05-24,7,511685,Andrea Jones,QLD,Female,250.1700381157457,0.0,Auctor Inc.,lawn and garden s...,c,1.47,0.0,319021506,Wide Bay,Burnett,Kingaroy Surround...,9615,47,992,490,180,1190,0.8,950,2.4
4606,90173050473,2022-03-14,7,511685,Andrea Jones,QLD,Female,108.27880211271156,0.0,Lobortis Class In...,lawn and garden s...,c,2.49,0.0,319021506,Wide Bay,Burnett,Kingaroy Surround...,9615,47,992,490,180,1190,0.8,950,2.4


In [70]:
all_combined.write.mode('overwrite').parquet("../../data/curated/all_data_combined.parquet")

                                                                                

----------------------------------------
Exception occurred during processing of request from ('127.0.0.1', 38604)
Traceback (most recent call last):
  File "/usr/lib/python3.10/socketserver.py", line 316, in _handle_request_noblock
    self.process_request(request, client_address)
  File "/usr/lib/python3.10/socketserver.py", line 347, in process_request
    self.finish_request(request, client_address)
  File "/usr/lib/python3.10/socketserver.py", line 360, in finish_request
    self.RequestHandlerClass(request, client_address, self)
  File "/usr/lib/python3.10/socketserver.py", line 747, in __init__
    self.handle()
  File "/home/jlafontaine/.local/lib/python3.10/site-packages/pyspark/accumulators.py", line 281, in handle
    poll(accum_updates)
  File "/home/jlafontaine/.local/lib/python3.10/site-packages/pyspark/accumulators.py", line 253, in poll
    if func():
  File "/home/jlafontaine/.local/lib/python3.10/site-packages/pyspark/accumulators.py", line 257, in accum_updates
    n