## Rank based on External Data (ABS and ATO)

Notebook to produce ranking based on external data (ABS data and ATO data). Weighted average rank based on:
  * Education occupation score
    * Measures the educational and occupational characteristics of communities, including educational attainment and ongoing education pursuits, as well as occupation categories based on the Australian and New Zealand Standard Classification of Occupations (ANZSCO)
  * Relative socioeconomic disadvantage score 
    * A comprehensive socio-economic indicator that condenses various data concerning the economic and social circumstances of individuals and households residing in a specific area
  * Per Capita income = Total taxable income/ number of tax payers

In [1]:
# import libraries and constants
import sys
sys.path.append('../scripts/utils')
from constants import *

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
import pandas as pd

In [2]:
# create a spark session 
spark = (
    SparkSession.builder.appName("MAST30034 Project 2")
    .config("spark.sql.repl.eagerEval.enabled", True) 
    .config("spark.sql.parquet.cacheMetadata", "true")
    .config("spark.sql.session.timeZone", "Etc/UTC")
    .config('spark.driver.memory', '4g')
    .config('spark.executor.memory', '2g')
    .getOrCreate()
)

23/10/19 14:32:55 WARN Utils: Your hostname, vanessas-MacBook-Pro-3.local resolves to a loopback address: 127.0.0.1; using 192.168.18.7 instead (on interface en0)
23/10/19 14:32:55 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/10/19 14:32:56 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
23/10/19 14:32:57 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [3]:
# read data
tbl_merchants = pd.read_pickle(f"{CURATED_DATA}tbl_merchants.pkl")
tbl_consumers = pd.read_pickle(f"{CURATED_DATA}tbl_consumer.pkl")
transactions = spark.read.parquet(f"{TRANSACTION_DATA}")
consumer_abs = spark.createDataFrame(pd.read_pickle(f"{CURATED_DATA}consumer_abs_data.pkl"))
consumer_ato =spark.createDataFrame(pd.read_pickle(f"{CURATED_DATA}consumer_ato_data.pkl"))

# join to get transactions with merchants and consumers details
transactions_joined = transactions.join(spark.createDataFrame(tbl_consumers), on="user_id", how="inner") 
transactions_joined = transactions_joined.withColumnRenamed("name", "con_name")
transactions_joined = transactions_joined.join(spark.createDataFrame(tbl_merchants), on="merchant_abn", how="inner")
transactions_joined = transactions_joined.withColumnRenamed("name", "mer_name")

                                                                                

In [4]:
# add a new column called capita_income (= total_taxable_income / num_tax_payers)
consumer_ato = consumer_ato.withColumn("capita_income", (consumer_ato.total_taxable_income/consumer_ato.num_tax_payers))

In [5]:
# join transactions data with ABS and ATO data
transactions_joined = transactions_joined.join(consumer_abs, on="user_id", how="left")
transactions_joined = transactions_joined.join(consumer_ato, on="user_id", how="left")

In [6]:
# aggregate based on metrics

transactions_grouped = transactions_joined.groupBy("merchant_abn").agg(
    F.mean("relative_SE_dis_score").alias("avg_relative_SE_dis_score"),
    F.mean("education_occupation_score").alias("avg_education_occupation_score"),
    F.mean("capita_income").alias("avg_capita_income")
).toPandas()

23/10/19 14:35:11 WARN TaskSetManager: Stage 2 contains a task of very large size (3029 KiB). The maximum recommended task size is 1000 KiB.
23/10/19 14:35:14 WARN TaskSetManager: Stage 4 contains a task of very large size (13037 KiB). The maximum recommended task size is 1000 KiB.
23/10/19 14:35:16 WARN TaskSetManager: Stage 5 contains a task of very large size (5791 KiB). The maximum recommended task size is 1000 KiB.
                                                                                

### Weighted Average Ranking

The Relative Socioeconomic Disadvantage score serves as a general socio-economic index, encapsulating various factors including education, occupation, and income. In our analysis, we intentionally assign a weight of 0.3 to SES_WEIGHT to balance the comprehensive nature of Relative Socioeconomic Disadvantage score while affording specific indices, education and occupation score, and income, a slightly greater focus with equal weights of 0.35 each. Importantly, we acknowledge the intricate interplay between Index of Education and Occupation and Relative Socioeconomic Disadvantage score, characterized by a non-linear relationship, underlining the distinct yet interconnected roles in influencing socio-economic outcomes.

In [7]:
# define weights

SES_WEIGHT = 0.3
EDU_OCC_WEIGHT = 0.35
INCOME_WEIGHT = 0.35

In [8]:
# remove NaNs
ranking_df = transactions_grouped.dropna()

def normalize_column(column: pd.Series) -> pd.Series:
    '''
    Normalize a series.
    Arguments:
      - column = series to be normalized
    Output:
      - the normalized series
    '''

    min_value = column.min()
    max_value = column.max()
    return (column - min_value) / (max_value - min_value)

# normalize
normalized_columns = ranking_df.drop(columns=['merchant_abn']).apply(normalize_column)
ranking_df = pd.concat([ranking_df['merchant_abn'], normalized_columns], axis=1)

# create new column
ranking_df["metric"] = SES_WEIGHT*ranking_df["avg_relative_SE_dis_score"] + EDU_OCC_WEIGHT*ranking_df["avg_education_occupation_score"] + \
    INCOME_WEIGHT*ranking_df["avg_capita_income"]

# sort, then rank
ranking_df = ranking_df.sort_values(by='metric', ascending=False)
ranking_df['rank'] = range(1, len(ranking_df) + 1)
ranking_df = ranking_df.sort_values(by='merchant_abn')
ranking_df = ranking_df.reset_index(drop=True)

# save to analysis layer
ranking_df.to_pickle(f"{ANALYSIS_DATA}external_data_ranking.pkl")

ranking_df

Unnamed: 0,merchant_abn,avg_relative_SE_dis_score,avg_education_occupation_score,avg_capita_income,metric,rank
0,10023283211,0.576735,0.492216,0.325141,0.459096,3004
1,10142254217,0.575770,0.494277,0.341362,0.465204,2491
2,10165489824,0.695685,0.625098,0.392341,0.564809,76
3,10187291046,0.583622,0.476928,0.329665,0.457394,3112
4,10192359162,0.623367,0.497337,0.363297,0.488232,566
...,...,...,...,...,...,...
4017,99938978285,0.583465,0.494607,0.342689,0.468093,2060
4018,99974311662,0.564760,0.453529,0.296808,0.432046,3685
4019,99976658299,0.585211,0.495118,0.340706,0.468101,2055
4020,99987905597,0.602002,0.524302,0.318035,0.475418,1140
