# The following code is for external data processing

In [2]:
import re
import numpy as np
import pandas as pd
import re
import geopandas as gpd
import os

In [3]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
spark = (
    SparkSession.builder.appName("MAST30034 Project 2 Preprocessing")
    .config("spark.driver.memory", '4g')
    .config("spark.executor.memory", '8g')
    .config("spark.sql.session.timeZone", "Etc/UTC")
    .config("spark.sql.parquet.enableVectorizedReader","false")
    .config("spark.sql.repl.eagerEval.enabled", True) 
    .config("spark.sql.parquet.cacheMetadata", "true")
    .config("spark.sql.parquet.writeLegacyFormat", 'true')
    .getOrCreate()
)

22/09/19 14:42:34 WARN Utils: Your hostname, DESKTOP-3NQ3PQI resolves to a loopback address: 127.0.1.1; using 172.31.183.205 instead (on interface eth0)
22/09/19 14:42:34 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


22/09/19 14:42:36 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


### Mapping Postcode to ABS Postal Areas

In [40]:
def postcode_to_str(col):
    return col.astype(str).str.zfill(4)

In [58]:
# Read in data
postal_areas_gdf = gpd.read_file('../data/raw/postcodes/abs_postal_areas.zip')
consumer_details_df = pd.read_csv('../data/tables/tbl_consumer.csv', delimiter="|")
postcode_df = pd.read_csv('../data/raw/postcodes/postcodes.csv').drop_duplicates('postcode')

consumer_details_df['postcode'] = postcode_to_str(consumer_details_df['postcode'])
postcode_df['postcode'] = postcode_to_str(postcode_df['postcode'])

# Convert postcode dataframe to geodataframe
postcode_gdf = gpd.GeoDataFrame(
    postcode_df, geometry=gpd.points_from_xy(postcode_df['long'], postcode_df['lat'])
)
postcode_gdf.crs = postal_areas_gdf.crs

# Get list of postcodes not listed as abs postal areas and filter geodataframe to just these postcodes
unmapped = consumer_details_df[~consumer_details_df['postcode'].astype(str).str.zfill(4).isin(postal_areas_gdf['POA_CODE21'])]['postcode'].unique()
postcodes_gdf = postcode_gdf[postcode_gdf['postcode'].isin(unmapped)]

# Spatially join unmapped postcodes and abs postal areas
postcode_poa_gdf = postcodes_gdf.sjoin(postal_areas_gdf, how = 'inner')
postcode_poa_df = postcode_poa_gdf[['postcode', 'POA_CODE21']]
postcode_poa_df = postcode_poa_df.rename(columns = {'POA_CODE21' : 'poa'})
postcode_poa_df = pd.concat([postcode_poa_df, postal_areas_gdf[['POA_CODE21', 'POA_CODE21']].set_axis(['postcode', 'poa'], axis = 1)], ignore_index = True).reset_index(drop = True)

In [63]:
output_dir = '../data/curated/census/'
if not os.path.exists(output_dir):
    os.makedirs(output_dir)

postcode_poa_df.to_csv(output_dir + 'postcode_poa.csv', index = False)

All but 2 postcodes could be mapped to abs postal areas. Niether of these could be found in the Australia post website. https://postcodes-australia.com/postcodes/6958 says 6958 is a Western Australian postcode reserved for non standard use 

In [45]:
removed_consumers = consumer_details_df[~consumer_details_df['postcode'].astype(str).str.zfill(4).isin(postcode_poa_df['postcode'])]
len(removed_consumers), removed_consumers['postcode'].unique()

(317, array(['6958', '3989'], dtype=object))

# Age

Read in data

In [64]:
transactions_sdf = spark.read.parquet(
    '../data/tables/transactions_20210228_20210827_snapshot/'
).union(
    spark.read.parquet(
        '../data/tables/transactions_20210828_20220227_snapshot/'
    )
).union(
    spark.read.parquet(
        '../data/tables/transactions_20220228_20220828_snapshot/'
    )
)

ids_sdf = spark.read.parquet(
    '../data/tables/consumer_user_details.parquet'
)

consumers_sdf = spark.read.options(
    header = True, delimiter = '|'
).csv(
    '../data/tables/tbl_consumer.csv'
)

age_sdf = spark.read.options(
    header = True
).csv(
    '../data/curated/census/age_data.csv'
)

postcode_poa_sdf = spark.read.options(
    header = True
).csv(
    '../data/curated/census/postcode_poa.csv'
)

                                                                                

Join on poa (abs postal area)

In [68]:
consumers_sdf.join(postcode_poa_sdf, how = 'inner', on = 'postcode')

postcode,name,address,state,gender,consumer_id,poa
6935,Yolanda Williams,413 Haney Gardens...,WA,Female,1195503,6053
2782,Mary Smith,3764 Amber Oval,NSW,Female,179208,2782
2780,Lindsay Jimenez,00653 Davenport C...,NSW,Female,154128,2780
6355,Rebecca Blanchard,9271 Michael Mano...,WA,Female,712975,6355
2033,Karen Chapman,2706 Stewart Oval...,NSW,Female,407340,2033
4606,Andrea Jones,122 Brandon Cliff,QLD,Female,511685,4606
6056,Stephen Williams,6804 Wright Crest...,WA,Male,448088,6056
2482,Stephanie Reyes,5813 Denise Land ...,NSW,Female,650435,2482
3220,Jillian Gonzales,461 Ryan Common S...,VIC,Female,1058499,3220
3063,Eugene Lucas,33983 Kevin Drive...,VIC,Undisclosed,428325,3063


In [None]:
transactions_sdf.sample(0.01).write.parquet('../data/raw/samples/transaction_sample.parquet')

In [47]:
transactions_sdf = spark.read.parquet('../data/raw/samples/transaction_sample.parquet')

In [None]:

# Creates dataframe grouped by merchant and postcode with propn of customers for each corresponding postcode
merchants_sdf = transactions_sdf.join(
    ids_sdf,
    on = 'user_id'
).join(
    consumers_sdf,
    on = 'consumer_id'
).groupBy(
    'merchant_abn', 'postcode'
).count().join(
    transactions_sdf.groupby(
        'merchant_abn'
    ).count().withColumnRenamed(
        'count',
        'size'
    ),
    on = 'merchant_abn'
).withColumn(
    'propn',
    F.col('count')/F.col('size')
).drop(
    'count',
    'size'
)

# Joins merchant and postcode data with abs data for population by age
merchants_sdf = merchants_sdf.join(
    age_sdf,
    on = 'postcode'
)

# Creates scaled version of each population metric by age
for col in age_sdf.columns:
    if col == 'postcode':
        continue
    merchants_sdf = merchants_sdf.withColumn(
        col+'_scaled',
        F.col(col)*F.col('propn')
    )

# Removes non scaled columns (used to make the scaled columns) and calculates weighted sum of each population metric by propn of customers from that postcode
merchants_sdf = merchants_sdf.select(
    merchants_sdf.colRegex("`merchant_abn|.*_scaled`")
).groupBy(
    'merchant_abn'
).sum()

merchants_df = merchants_sdf.toPandas()

# Renames columns and sets index
merchants_df = merchants_df.drop(
    'sum(merchant_abn)',
    axis = 1
).set_index(
    'merchant_abn'
).rename(
    columns = {col : col[4:-1] for col in merchants_df.columns}
)

In [None]:
# Calculates median of each row in dataframe where each row corresponds to a count of the given column value
def get_median_col(df):
    median_count = df.sum(axis = 1)/2

    return df.cumsum(axis = 1).apply(
        lambda col : (col > median_count)
    ).idxmax(
        axis = 1
    )

# Executes get_medial_col function for males females and persons    
for person_type in ['m', 'f', 'p']:
    merchants_df[f'median_age_{person_type}'] = get_median_col(
        merchants_df.filter(
            regex = f'age_yr_(\d+|(80_84)|(85_89)|(90_94)|(95_99)|(100_yr_over))_{person_type}_scaled',
            axis = 1
        )
    ).apply(
        lambda x : re.findall('\d+', x)[0]
    )
