In [1]:
import os
from pyspark.sql import SparkSession, types
from pyspark.sql import functions as F
from pyspark.sql import Row

spark = SparkSession.builder.getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/12/04 17:32:39 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
23/12/04 17:32:40 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [2]:
# from countryguess import guess_country

In [3]:
input_mhdb = "Datasets/MH Database - Combined Countries.csv"
output_atlas_countries_facts = "Datasets/temp_atlas_outputs/atlas_countries_facts"

### List of countries in Mental Health Atlas

In [4]:
countries = spark.read.option("header", True).option("multiline", "true").csv(input_mhdb)

[Stage 0:>                                                          (0 + 1) / 1]                                                                                

In [5]:
countries.count()

                                                                                

164

In [6]:
list_of_countries = countries.select(["country"]).distinct().collect()

                                                                                

In [7]:
atlas_countries_list = [list_of_countries[i]["country"] for i in range(len(list_of_countries))]
print(len(atlas_countries_list))

164


### Create a country_dict_df of country_name, official_name, iso3, un_region to avoid using countryguess

In [8]:
# total_countries = set(atlas_countries_list)

In [9]:
# country_dict_schema = types.StructType([
#     types.StructField("country", types.StringType()),
# ])
# rows = [Row(row) for row in total_countries]
# country_dict_df = spark.createDataFrame(rows, schema=country_dict_schema)
# country_dict_df = country_dict_df.withColumn("country_name", get_country_name(F.col("country"))).withColumn("country_code", get_country_code(F.col("country"))).withColumn("un_region", get_country_region(F.col("country")))
# country_dict_df.show()
# country_dict_df.write.csv("atlas_country_dict", mode="overwrite")

### Create fact table of countries

In [10]:
# # Need to change these functions to make use of a country_name, official_name, iso3, un_region dictionary
# # That way I won't have to install a separate package on EMR
# @F.udf(returnType=types.StringType())
# def get_country_code(country):
#     return guess_country(country)["iso3"]

# @F.udf(returnType=types.StringType())
# def get_country_name(country):
#     return guess_country(country)["name_official"]

# @F.udf(returnType=types.StringType())
# def get_country_region(country):
#     return guess_country(country)["unregion"]

### Read atlas_country_dict :D

In [11]:
countries_dict_schema = types.StructType([
    types.StructField("country_dict_country", types.StringType()),
    types.StructField("country_dict_country_name", types.StringType()),
    types.StructField("country_dict_country_code", types.StringType()),
    types.StructField("country_dict_un_region", types.StringType()),
])

countries_dict_df = spark.read.csv("atlas_country_dict", schema=countries_dict_schema)

In [12]:
countries_dict_df.count() # 164

                                                                                

164

In [13]:
# countries_dict_df.show()

In [14]:
# countries_1 = countries.join(countries_dict_df.hint("broadcast"), (countries["country"]==countries_dict_df["country"]))

In [15]:
# countries_1.select(["country_code"]).distinct().count()

In [16]:
countries = countries.join(countries_dict_df.hint("broadcast"), (countries["country"]==countries_dict_df["country_dict_country"]), how='left')

In [17]:
countries.count()

                                                                                

164

In [18]:
countries = countries.drop("country").withColumnsRenamed({
    "country_dict_country": "country",
    "country_dict_country_name": "country_name",
    "country_dict_country_code": "country_code",
    "country_dict_un_region": "un_region"
})

In [19]:
atlas_countries_df = countries.select([
    "country_name",
    "country_code",
    "un_region"
])

In [20]:
atlas_countries_df.select(["country_code"]).distinct().count() # 164, checks out fine

164

In [106]:
atlas_countries_df.printSchema()

root
 |-- country_name: string (nullable = true)
 |-- country_code: string (nullable = true)
 |-- un_region: string (nullable = true)



In [21]:
atlas_countries_df.write.csv(output_atlas_countries_facts, mode='overwrite')

### Create dimension table of basic population, income_group, expenditure (CAD) and who_region

In [22]:
input_currency_rates = "Datasets/rates.csv"
output_atlas_countries_basic_info_dims = "Datasets/temp_atlas_outputs/atlas_countries_basic_info_dims"

In [23]:
atlas_countries_basic_info_df = countries.select([
    "country_code",
    "population",
    "income_group",
    "expenditure($)",
    "who_region"
])

In [24]:
@F.udf(returnType=types.IntegerType())
def convert_population(population):
    if population == "-":
        return None
    if isinstance(population, type(None)):
        return
    if isinstance(population, int):
        return population
    population = population.replace(" ", "").replace(",", "").replace(", ", "")
    return int(population)

In [25]:
@F.udf(returnType=types.StringType())
def convert_who_region(region):
    if region == "-":
        return None
    if isinstance(region, type(None)):
        return
    return region.lower()

In [26]:
@F.udf(returnType=types.StringType())
def convert_income_group(val):
    if val == "-":
        return None
    if isinstance(val, type(None)):
        return
    return val.lower()

In [27]:
atlas_countries_basic_info_df = atlas_countries_basic_info_df.withColumn("population", convert_population(F.col("population")))

In [28]:
atlas_countries_basic_info_df = atlas_countries_basic_info_df.withColumn("who_region", convert_who_region(F.col("who_region")))

In [29]:
atlas_countries_basic_info_df = atlas_countries_basic_info_df.withColumn("income_group", convert_income_group(F.col("income_group")))

In [30]:
import re

code_pattern = r'[A-Z]{3}'

@F.udf(returnType=types.StringType())
def extract_code(value):
    if value == "-":
        return None
    if isinstance(value, type(None)):
        return
    value_str = str(value).upper()
    if re.search(code_pattern, value_str):
        match = re.search(code_pattern, value_str)
        if match:
            start, end = match.span()
            return value_str[start:end] 
        else:
            None

@F.udf(returnType=types.FloatType())
def extract_exp(value):
    if value == "-":
        return None
    if isinstance(value, type(None)):
        return
    value_str = str(value).upper()
    if re.search(code_pattern, value_str):
        match = re.search(code_pattern, value_str)
        if match:
            start, end = match.span()
            return float(value_str[0:start-1].replace(" ", "").replace(",", "").replace(", ", "")) 
        else:
            None

In [31]:
atlas_countries_basic_info_df = atlas_countries_basic_info_df.withColumn("expenditure_amount", extract_exp(F.col("expenditure($)")))

In [32]:
atlas_countries_basic_info_df = atlas_countries_basic_info_df.withColumn("expenditure_currency", extract_code(F.col("expenditure($)")))

In [33]:
rates_schema = types.StructType([
    types.StructField('currency', types.StringType()),
    types.StructField('rate', types.StringType()),
])

rates_df = spark.read.option("header", True).csv(input_currency_rates, schema=rates_schema)

In [34]:
atlas_basic_info_df_currency = atlas_countries_basic_info_df.join(rates_df.hint("broadcast"), (atlas_countries_basic_info_df["expenditure_currency"]==rates_df["currency"]), how="left")

In [35]:
rate = atlas_basic_info_df_currency.filter(atlas_basic_info_df_currency["expenditure_currency"]=="CAD").collect()[0]["rate"]

23/12/04 17:33:02 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: country, rate
 Schema: currency, rate
Expected: currency but found: country
CSV file: file:///Users/fayad/Documents/Canada/SFU/Term%20Fall%2023/CMPT%20732%20G1%20-%20Programming%20For%20Big%20Data/Project/Datasets/rates.csv
                                                                                

In [36]:
atlas_basic_info_df_currency = atlas_basic_info_df_currency.withColumn("expenditure_cad", F.col("expenditure_amount")/F.col("rate")*rate)

In [37]:
atlas_basic_info_final_df = atlas_basic_info_df_currency.select([
    "country_code",
    "population",
    "income_group",
    "who_region",
    "expenditure_cad"
])

In [107]:
atlas_basic_info_final_df.printSchema()

root
 |-- country_code: string (nullable = true)
 |-- population: integer (nullable = true)
 |-- income_group: string (nullable = true)
 |-- who_region: string (nullable = true)
 |-- expenditure_cad: double (nullable = true)



In [39]:
atlas_basic_info_final_df.write.csv(output_atlas_countries_basic_info_dims, mode='overwrite')

23/12/04 17:33:05 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: country, rate
 Schema: currency, rate
Expected: currency but found: country
CSV file: file:///Users/fayad/Documents/Canada/SFU/Term%20Fall%2023/CMPT%20732%20G1%20-%20Programming%20For%20Big%20Data/Project/Datasets/rates.csv


In [40]:
atlas_basic_info_final_df.filter(atlas_basic_info_final_df["country_code"]=="AFG").show()

23/12/04 17:33:06 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: country, rate
 Schema: currency, rate
Expected: currency but found: country
CSV file: file:///Users/fayad/Documents/Canada/SFU/Term%20Fall%2023/CMPT%20732%20G1%20-%20Programming%20For%20Big%20Data/Project/Datasets/rates.csv


+------------+----------+------------+----------+---------------+
|country_code|population|income_group|who_region|expenditure_cad|
+------------+----------+------------+----------+---------------+
|         AFG|      null|        null|      null|           null|
+------------+----------+------------+----------+---------------+



### Create dimension table for SMR rates from 2013-2019

In [41]:
output_atlas_smr_info_dims = "Datasets/temp_atlas_outputs/atlas_smr_info_dims"

In [42]:
smr_info_df = countries.select([
    "country_code",
    "smr_2013",
    "smr_2016",
    "smr_2019"
])

In [43]:
@F.udf(returnType=types.FloatType())
def convert_smr(value):
    if value == "-":
        return None
    if isinstance(value, type(None)):
        return
    else:
        return float(value)

In [44]:
smr_info_df = smr_info_df.withColumn("suicide_mortality_rate_2013", convert_smr(F.col("smr_2013"))).withColumn("suicide_mortality_rate_2016", convert_smr(F.col("smr_2016"))).withColumn("suicide_mortality_rate_2019", convert_smr(F.col("smr_2019"))).drop("smr_2013").drop("smr_2016").drop("smr_2019")

In [45]:
smr_info_df.count()

164

In [46]:
smr_info_df.write.csv(output_atlas_smr_info_dims, mode='overwrite')

In [47]:
smr_info_df.printSchema()

root
 |-- country_code: string (nullable = true)
 |-- suicide_mortality_rate_2013: float (nullable = true)
 |-- suicide_mortality_rate_2016: float (nullable = true)
 |-- suicide_mortality_rate_2019: float (nullable = true)



### Create dimension table for policies

In [48]:
output_atlas_policies_info_dims = "Datasets/temp_atlas_outputs/atlas_policies_info_dims"

In [49]:
policies_info_df = countries.select([
    "country_code",
    "policy",
    "year_policy",
    "hr_resources_plan",
    "finance_resources_plan",
    "mh_law",
    "law_year",
    "dedcated_body",
    "mh_child_policy",
    "year_child_policy",
    "mh_adolescent_policy",
    "year_adolescent_policy",
    "mh_suicide_prevention_policy",
    "year_suicide_prevention_policy"
])

In [50]:
policies_info_df = policies_info_df.withColumnsRenamed({
    "policy": "mental_health_policy",
    "year_policy": "mental_health_policy_year",
    "hr_resources_plan": "human_resources_for_mental_health_policy",
    "finance_resources_plan": "finance_resources_for_mental_health_policy",
    "mh_law": "mental_health_law",
    "law_year": "mental_health_law_year",
    "dedcated_body": "dedicated_authority",
    "mh_child_policy": "mental_health_child_policy",
    "year_child_policy": "mental_health_child_policy_year",
    "mh_adolescent_policy": "mental_health_adolescent_policy",
    "year_adolescent_policy": "mental_health_adolescent_policy_year",
    "mh_suicide_prevention_policy": "suicide_prevention_policy",
    "year_suicide_prevention_policy": "suicide_prevention_policy_year"
})

In [51]:
@F.udf(returnType=types.StringType())
def convert_dash_to_null_string(value):
    if value == "-":
        return None
    elif isinstance(value, type(None)):
        return
    else:
        return str(value)

In [52]:
for col in policies_info_df.columns:
    policies_info_df = policies_info_df.withColumn(col, convert_dash_to_null_string(F.col(col)))

In [53]:
replacement_pattern = "[\\n\\r]"

policies_info_df = policies_info_df.withColumn("dedicated_authority", F.regexp_replace("dedicated_authority", replacement_pattern, " "))

In [54]:
# N - does not exist, EW - exists and functions well, EP - exists and functions poorly
@F.udf(returnType=types.StringType())
def convert_authority(value):
    transformation_dictionary = {
        'A dedicated body authority does not exist': "N",
        'A dedicated authority body does  not exist': "N",
        'Exist and provides regular inspections of facilities and reports at least annually': "EW",
        'A dedicated body authority undertakes regular inspections, responds to complaints, and reports its findings at least once a year': "EW",
        'A dedicated authority body does not exist': "N",
        'A dedicated authority undertakes regular inspections,responds to complaints and reports its findings at least once a year': "EW",
        'A dedicated body exists but it is not functioning well': "EP",
        'exists but not fully functioning': "EP",
        'A dedicated authority undertakes  irregular inspections of mental  health services and irregularly  responds to complaints of human  rights violations': "EP",
        'A dedicated body does not exist': "N",
        'A dedicated body undertakes irregular inspections of mental health services and irregularly responds to complaints of human rights violations': "EP",
        'A dedicated authority undertakes irregular inspections of mental health services and irregularly responds to complaints of human rights violations': "EP",
        'A dedicated authority body exists  but it is not functioning well': "EP",
        'A dedicated authority undertakes regular inspections, responds to complaints, and reports its finding at least once a year': "EW",
        'A dedicated authority undertakes regular inspections, responds to complaints, and reports its findings at least once a year': "EW",
        'A dedicated authority body exists but it is not functioning well': "EP",
        'A dedicated authority undertakes regular inspections, responds to complaints and reports its findings at least once a year': "EW",
        'A dedicated authority undertakes  regular inspections, responds to  complaints, and reports its findings  at least once a year': "EW"
    }
    if value == "-":
        return None
    if isinstance(value, type(None)) or value in ["N", "EW", "EP"]:
        return value
    else:
        return transformation_dictionary[value]

In [55]:
policies_info_df = policies_info_df.withColumn("dedicated_authority", convert_authority(F.col("dedicated_authority")))

In [56]:
policies_info_df.count()

164

In [57]:
policies_info_df.printSchema()

root
 |-- country_code: string (nullable = true)
 |-- mental_health_policy: string (nullable = true)
 |-- mental_health_policy_year: string (nullable = true)
 |-- human_resources_for_mental_health_policy: string (nullable = true)
 |-- finance_resources_for_mental_health_policy: string (nullable = true)
 |-- mental_health_law: string (nullable = true)
 |-- mental_health_law_year: string (nullable = true)
 |-- dedicated_authority: string (nullable = true)
 |-- mental_health_child_policy: string (nullable = true)
 |-- mental_health_child_policy_year: string (nullable = true)
 |-- mental_health_adolescent_policy: string (nullable = true)
 |-- mental_health_adolescent_policy_year: string (nullable = true)
 |-- suicide_prevention_policy: string (nullable = true)
 |-- suicide_prevention_policy_year: string (nullable = true)



In [58]:
policies_info_df.write.csv(output_atlas_policies_info_dims, mode='overwrite')

### Create dimension table for expenditure_and_pay_methods

In [59]:
output_atlas_exp_and_pay_info_dims = "Datasets/temp_atlas_outputs/atlas_exp_and_pay_info_dims"

In [60]:
exp_and_pay_info_df = countries.select([
    "country_code",
    "mh_expenditure",
    "mh_expenditure_hospital",
    "service_pay_method",
    "service_pay_medication",
    "mh_inclusive_schemes"
])

In [61]:
@F.udf(returnType=types.FloatType())
def convert_dash_to_null_float(value):
    if value == "-" or value == "--":
        return None
    elif isinstance(value, type(None)):
        return
    elif isinstance(value, float):
        return value
    else:
        return float(value.replace(" ", "").replace(",", "").replace(", ", ""))

In [62]:
exp_and_pay_info_df = exp_and_pay_info_df.withColumn("mh_expenditure", convert_dash_to_null_float(F.col("mh_expenditure"))).withColumn("mh_expenditure_hospital", convert_dash_to_null_float(F.col("mh_expenditure_hospital"))).withColumn("mh_inclusive_schemes", convert_dash_to_null_string(F.col("mh_inclusive_schemes")))

In [63]:
replacement_pattern = "[\\n\\r]"

exp_and_pay_info_df = exp_and_pay_info_df.withColumn("service_pay_method", F.regexp_replace("service_pay_method", replacement_pattern, "")).withColumn("service_pay_medication", F.regexp_replace("service_pay_medication", replacement_pattern, ""))

In [64]:
# FI - fully insured, 20 - at least 20% paid by individual, M - mostly paid by individual
@F.udf(returnType=types.StringType())
def convert_payment(value):
    transformation_dictionary = {
        'Persons pay nothing at the point of service use (fully insured)': "FI",
        'Persons pay mostly or entirely out of pocket for services': "M",
        'Persons pay mostly or entirely out of pocket medicines': "M",
        'Persons pay mostly or entirely out of pocket for medicines': "M",
        'Persons pay at least 20% towards the cost of services': "20",
        'Persons pay at least 20% towards the cost of medicines': "20",
        'Persons pay atleast 20% towards the cost of services': "20",
        'nothing, fully insured': "FI",
        'at least 20% paid by individual': "20",
        'Persons pay nothing at the point of service use(fully insured)': "FI",
        'Personds pay nothing at the point of service use(fully insured)': "FI",
        'persons pay nothing at the point of service use(fully insured)': "FI",
        'Personds pay nothing at the point of service use (fully insured)': "FI",
        'mostly paid by individuals': "M"
    }
    if value == "-":
        return None
    if isinstance(value, type(None)) or value in ["FI", "20", "M"]:
        return value
    else:
        return transformation_dictionary[value]

In [65]:
exp_and_pay_info_df = exp_and_pay_info_df.withColumn("service_pay_medication", convert_payment(F.col("service_pay_medication"))).withColumn("service_pay_method", convert_payment(F.col("service_pay_method")))

In [66]:
exp_and_pay_info_df = exp_and_pay_info_df.withColumnsRenamed({
    "mh_expenditure": "govt_exp_mental_health_%_budget",
    "mh_hospital": "govt_exp_mental_health_%_hospital",
    "service_pay_method": "pay_for_services",
    "service_pay_medication": "pay_for_medication",
    "mh_inclusive_schemes": "insurance_and_reimbursement_includes_mental_health"
})

In [67]:
exp_and_pay_info_df.printSchema()

root
 |-- country_code: string (nullable = true)
 |-- govt_exp_mental_health_%_budget: float (nullable = true)
 |-- mh_expenditure_hospital: float (nullable = true)
 |-- pay_for_services: string (nullable = true)
 |-- pay_for_medication: string (nullable = true)
 |-- insurance_and_reimbursement_includes_mental_health: string (nullable = true)



In [68]:
exp_and_pay_info_df.count()

164

In [69]:
exp_and_pay_info_df.write.csv(output_atlas_exp_and_pay_info_dims, mode='overwrite')

### Create dimension table for mental_health_workers from 2014-2020

In [70]:
output_atlas_mental_health_workers_info_dims = "Datasets/temp_atlas_outputs/atlas_mental_health_workers_info_dims"

In [71]:
mental_health_workers_info_df = countries.select([
    "country_code",
    "no_psychiatrists",
    "no_total_health_profs",
    "no_psychologists",
    "no_social_wrks",
    "no_other_workers",
    "no_total_workers",
    "mh_workers_2014",
    "mh_workers_2017",
    "mh_workers_2020",
    "mh_workers_psych",
    "mh_workers_all"
])

In [72]:
for col in mental_health_workers_info_df.columns:
    if col == "country_code":
        continue
    mental_health_workers_info_df = mental_health_workers_info_df.withColumn(col, convert_dash_to_null_float(F.col(col)))

In [73]:
# mental_health_workers_info_df = mental_health_workers_info_df.withColumn("no_psychiatrists", convert_dash_to_null_float(F.col("no_psychiatrists"))).withColumn("no_total_health_profs", convert_dash_to_null_float(F.col("no_total_health_profs"))).withColumn("no_psychologists", convert_dash_to_null_float(F.col("no_psychologists"))).withColumn("no_social_wrks", convert_dash_to_null_float(F.col("no_social_wrks"))).withColumn("no_other_workers", convert_dash_to_null_float(F.col("no_other_workers"))).withColumn("no_total_workers", convert_dash_to_null_float(F.col("no_total_workers"))).withColumn("mh_workers_psych", convert_dash_to_null_float(F.col("mh_workers_psych"))).withColumn("mh_workers_all", convert_dash_to_null_float(F.col("mh_workers_all"))).withColumn("mh_workers_2014", convert_dash_to_null_float(F.col("mh_workers_2014"))).withColumn("mh_workers_2017", convert_dash_to_null_float(F.col("mh_workers_2017"))).withColumn("mh_workers_2020", convert_dash_to_null_float(F.col("mh_workers_2020")))

In [74]:
mental_health_workers_info_df = mental_health_workers_info_df.withColumnsRenamed({
    "no_psychiatrists": "num_psychiatrists",
    "no_total_health_profs": "num_nurses",
    "no_psychologists": "num_psychologists",
    "no_social_wrks": "num_social_workers",
    "no_other_workers": "num_other_specialized_workers",
    "no_total_workers": "num_total_mental_health_workers",
    "mh_workers_2014": "num_total_mental_health_workers_2014",
    "mh_workers_2017": "num_total_mental_health_workers_2017",
    "mh_workers_2020": "num_total_mental_health_workers_2020",
    "mh_workers_psych": "num_child_psychiatrists",
    "mh_workers_all": "num_child_mental_health_workers"
})

In [75]:
mental_health_workers_info_df.printSchema()

root
 |-- country_code: string (nullable = true)
 |-- num_psychiatrists: float (nullable = true)
 |-- num_nurses: float (nullable = true)
 |-- num_psychologists: float (nullable = true)
 |-- num_social_workers: float (nullable = true)
 |-- num_other_specialized_workers: float (nullable = true)
 |-- num_total_mental_health_workers: float (nullable = true)
 |-- num_total_mental_health_workers_2014: float (nullable = true)
 |-- num_total_mental_health_workers_2017: float (nullable = true)
 |-- num_total_mental_health_workers_2020: float (nullable = true)
 |-- num_child_psychiatrists: float (nullable = true)
 |-- num_child_mental_health_workers: float (nullable = true)



In [76]:
mental_health_workers_info_df.write.csv(output_atlas_mental_health_workers_info_dims, mode="overwrite")

### Create dimension table for patient_admission_stats from 2014-2020

In [77]:
output_atlas_patient_admission_info_dims = "Datasets/temp_atlas_outputs/atlas_patient_admission_info_dims"

In [78]:
patient_admissions_info_df = countries.select([
    "country_code",
    "ip_adm_2014",
    "ip_adm_2017",
    "ip_adm_2020",
    "op_adm_2014",
    "op_adm_2017",
    "op_adm_2020",
    "ca_services_2014",
    "ca_services_2017",
    "ca_services_2020",
    "hosp_facilities_vists",
    "non_hosp_facilities_visits",
    "other_facilities_vists",
    "op_facilities_ca_visits",
    "mh_annual",
    "psych_admission",
    "community_admissions",
    "ca_adminssions",
    "mh_hosps_total_adm",
    "involuntary_adms",
    "follow_up",
    "ip_lt_1yr",
    "ip_1_5",
    "ip_gt_5",
    "ip_timely_servies",
    "total_pshycosis",
    "total_pshycosis_male",
    "total_pshycosis_female"
])

In [79]:
for col in patient_admissions_info_df.columns:
    if col == "country_code" or col == "follow_up" or col == "ip_timely_servies":
        continue
    patient_admissions_info_df = patient_admissions_info_df.withColumn(col, convert_dash_to_null_float(F.col(col)))

In [80]:
# patient_admissions_info_df = patient_admissions_info_df.withColumn("ip_adm_2014", convert_dash_to_null_float(F.col("ip_adm_2014"))).withColumn("ip_adm_2017", convert_dash_to_null_float(F.col("ip_adm_2017"))).withColumn("ip_adm_2020", convert_dash_to_null_float(F.col("ip_adm_2020"))).withColumn("op_adm_2014", convert_dash_to_null_float(F.col("op_adm_2014"))).withColumn("op_adm_2017", convert_dash_to_null_float(F.col("op_adm_2017"))).withColumn("op_adm_2020", convert_dash_to_null_float(F.col("op_adm_2020"))).withColumn("ca_services_2014", convert_dash_to_null_float(F.col("ca_services_2014"))).withColumn("ca_services_2017", convert_dash_to_null_float(F.col("ca_services_2017"))).withColumn("ca_services_2020", convert_dash_to_null_float(F.col("ca_services_2020"))).withColumn("hosp_facilities_vists", convert_dash_to_null_float(F.col("hosp_facilities_vists"))).withColumn("non_hosp_facilities_visits", convert_dash_to_null_float(F.col("non_hosp_facilities_visits"))).withColumn("other_facilities_vists", convert_dash_to_null_float(F.col("other_facilities_vists"))).withColumn("op_facilities_ca_visits", convert_dash_to_null_float(F.col("op_facilities_ca_visits"))).withColumn("mh_annual", convert_dash_to_null_float(F.col("mh_annual"))).withColumn("psych_admission", convert_dash_to_null_float(F.col("psych_admission"))).withColumn("community_admissions", convert_dash_to_null_float(F.col("community_admissions"))).withColumn("ca_adminssions", convert_dash_to_null_float(F.col("ca_adminssions"))).withColumn("mh_hosps_total_adm", convert_dash_to_null_float(F.col("mh_hosps_total_adm"))).withColumn("involuntary_adms", convert_dash_to_null_float(F.col("involuntary_adms"))).withColumn("ip_lt_1yr", convert_dash_to_null_float(F.col("ip_lt_1yr"))).withColumn("ip_1_5", convert_dash_to_null_float(F.col("ip_1_5"))).withColumn("ip_gt_5", convert_dash_to_null_float(F.col("ip_gt_5"))).withColumn("total_pshycosis", convert_dash_to_null_float(F.col("total_pshycosis"))).withColumn("total_pshycosis_male", convert_dash_to_null_float(F.col("total_pshycosis_male"))).withColumn("total_pshycosis_female", convert_dash_to_null_float(F.col("total_pshycosis_female")))

In [81]:
@F.udf(returnType=types.FloatType())
def convert_percentage_to_category_float(value):
    transformation_dictionary = {
        "less-25": 1,
        "25-less": 1,
        "25": 1,
        "26-50": 2,
        "51-75": 3,
        "more-75": 4,
        "More-75": 4
    }
    if value == "-" or value == "--":
        return None
    elif isinstance(value, type(None)):
        return
    elif isinstance(value, float):
        return value
    else:
        return float(transformation_dictionary[value])

In [82]:
patient_admissions_info_df = patient_admissions_info_df.withColumn("follow_up", convert_percentage_to_category_float(F.col("follow_up"))).withColumn("ip_timely_servies", convert_percentage_to_category_float(F.col("ip_timely_servies")))

In [83]:
patient_admissions_info_df = patient_admissions_info_df.withColumnsRenamed({
    "ip_adm_2014": "inpatient_admissions_2014",
    "ip_adm_2017": "inpatient_admissions_2017",
    "ip_adm_2020": "inpatient_admissions_2020",
    "op_adm_2014": "outpatient_admissions_2014",
    "op_adm_2017": "outpatient_admissions_2017",
    "op_adm_2020": "outpatient_admissions_2020",
    "ca_services_2014": "community_based_mental_health_services_2014",
    "ca_services_2017": "community_based_mental_health_services_2017",
    "ca_services_2020": "community_based_mental_health_services_2020",
    "hosp_facilities_vists": "hospital_attached_facilities_visits",
    "non_hosp_facilities_visits": "non_hospital_attached_facilities_visits",
    "op_facilities_ca_visits": "children_specific_facilities_visits",
    "other_facilities_vists": "other_facilities_visits",
    "mh_annual": "annual_mental_hospital_admissions",
    "involuntary_adms": "total_involuntary_admissions",
    "follow_up": "follow_up_post_discharge",
    "ip_lt_1yr": "inpatients_staying_less_1_year",
    "ip_1_5": "inpatients_staying_1_5_year",
    "ip_gt_5": "inpatients_staying_more_5_year",
    "ip_timely_servies": "received_timely_services",
    "total_pshycosis": "total_psychosis",
    "total_pshycosis_male": "total_psychosis_male",
    "total_pshycosis_female": "total_psychosis_female",
})

In [84]:
patient_admissions_info_df.printSchema()

root
 |-- country_code: string (nullable = true)
 |-- inpatient_admissions_2014: float (nullable = true)
 |-- inpatient_admissions_2017: float (nullable = true)
 |-- inpatient_admissions_2020: float (nullable = true)
 |-- outpatient_admissions_2014: float (nullable = true)
 |-- outpatient_admissions_2017: float (nullable = true)
 |-- outpatient_admissions_2020: float (nullable = true)
 |-- community_based_mental_health_services_2014: float (nullable = true)
 |-- community_based_mental_health_services_2017: float (nullable = true)
 |-- community_based_mental_health_services_2020: float (nullable = true)
 |-- hospital_attached_facilities_visits: float (nullable = true)
 |-- non_hospital_attached_facilities_visits: float (nullable = true)
 |-- other_facilities_visits: float (nullable = true)
 |-- children_specific_facilities_visits: float (nullable = true)
 |-- annual_mental_hospital_admissions: float (nullable = true)
 |-- psych_admission: float (nullable = true)
 |-- community_admission

In [85]:
patient_admissions_info_df.write.csv(output_atlas_patient_admission_info_dims, mode="overwrite")

23/12/04 17:33:13 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


### Create dimension table for mental_health_facilities

In [86]:
output_atlas_mental_health_facilities_info_dims = "Datasets/temp_atlas_outputs/atlas_mental_health_facilities_info_dims"

In [87]:
atlas_mental_health_facilities_info_df = countries.select([
    "country_code",
    "mh_op_facilities_hosp",
    "non_hosp_op_facilities",
    "other_op_facilities",
    "op_facilities_ca",
    "ip_hosp",
    "ip_psych",
    "ip_community",
    "ip_ca",
    "mh_beds",
    "psych_bed",
    "community_beds",
    "caa_beds",
    "total_comm_facilities"
])

In [88]:
for col in atlas_mental_health_facilities_info_df.columns:
    if col == "country_code":
        continue
    atlas_mental_health_facilities_info_df = atlas_mental_health_facilities_info_df.withColumn(col, convert_dash_to_null_float(F.col(col)))

In [89]:
# atlas_mental_health_facilities_info_df = atlas_mental_health_facilities_info_df.withColumn("mh_op_facilities_hosp", convert_dash_to_null_float(F.col("mh_op_facilities_hosp"))).withColumn("non_hosp_op_facilities", convert_dash_to_null_float(F.col("non_hosp_op_facilities"))).withColumn("other_op_facilities", convert_dash_to_null_float(F.col("other_op_facilities"))).withColumn("op_facilities_ca", convert_dash_to_null_float(F.col("op_facilities_ca"))).withColumn("ip_hosp", convert_dash_to_null_float(F.col("ip_hosp"))).withColumn("ip_psych", convert_dash_to_null_float(F.col("ip_psych"))).withColumn("ip_community", convert_dash_to_null_float(F.col("ip_community"))).withColumn("ip_ca", convert_dash_to_null_float(F.col("ip_ca"))).withColumn("mh_beds", convert_dash_to_null_float(F.col("mh_beds"))).withColumn("psych_bed", convert_dash_to_null_float(F.col("psych_bed"))).withColumn("community_beds", convert_dash_to_null_float(F.col("community_beds"))).withColumn("caa_beds", convert_dash_to_null_float(F.col("caa_beds"))).withColumn("total_comm_facilities", convert_dash_to_null_float(F.col("total_comm_facilities")))

In [90]:
atlas_mental_health_facilities_info_df = atlas_mental_health_facilities_info_df.withColumnsRenamed({
    "mh_op_facilities_hosp": "outpatient_facilities_attached_to_hospitals",
    "non_hosp_op_facilities": "outpatient_facilities_not_attached_to_hospitals",
    "other_op_facilities": "other_outpatient_facilities",
    "op_facilities_ca": "children_specific_outpatient_facilities",
    "ip_hosp": "inpatient_hospitals",
    "ip_psych": "inpatient_psychiatric_units",
    "ip_community": "community_residential_facilities",
    "ip_ca": "children_specific_inpatient_facilities",
    "mh_beds": "mental_hospital_beds",
    "psych_beds": "psychiatric_beds",
    "community_beds": "community_beds",
    "caa_beds": "children_specific_beds",
    "total_comm_facilities": "total_community_facilities"
})

In [91]:
atlas_mental_health_facilities_info_df.printSchema()

root
 |-- country_code: string (nullable = true)
 |-- outpatient_facilities_attached_to_hospitals: float (nullable = true)
 |-- outpatient_facilities_not_attached_to_hospitals: float (nullable = true)
 |-- other_outpatient_facilities: float (nullable = true)
 |-- children_specific_outpatient_facilities: float (nullable = true)
 |-- inpatient_hospitals: float (nullable = true)
 |-- inpatient_psychiatric_units: float (nullable = true)
 |-- community_residential_facilities: float (nullable = true)
 |-- children_specific_inpatient_facilities: float (nullable = true)
 |-- mental_hospital_beds: float (nullable = true)
 |-- psych_bed: float (nullable = true)
 |-- community_beds: float (nullable = true)
 |-- children_specific_beds: float (nullable = true)
 |-- total_community_facilities: float (nullable = true)



In [92]:
atlas_mental_health_facilities_info_df.write.csv(output_atlas_mental_health_facilities_info_dims, mode="overwrite")

### Create dimensions table for programs

In [93]:
input_mhdb_tables = "Datasets/MH Database - Table Combined.csv"
output_atlas_countries_programs_info_dims = "Datasets/temp_atlas_outputs/atlas_countries_programs_info_dims"

In [94]:
atlas_countries_programs_info_df = spark.read.option("header", True).option("multiline", "true").csv(input_mhdb_tables)

In [95]:
atlas_countries_programs_info_df.select(["Country"]).distinct().count()

164

In [96]:
atlas_countries_programs_info_df = atlas_countries_programs_info_df.join(countries_dict_df.hint("broadcast"), (atlas_countries_programs_info_df["Country"]==countries_dict_df["country_dict_country"]), how="left")

In [97]:
atlas_countries_programs_info_df.select(["country_dict_country_name"]).count()

164

In [98]:
atlas_countries_programs_info_df.printSchema()

root
 |-- Country: string (nullable = true)
 |-- Suicide prevention programme: string (nullable = true)
 |-- Mental Health Awareness /Anti- stigma: string (nullable = true)
 |-- Early Child Development: string (nullable = true)
 |-- School based mental health prevention and promotion: string (nullable = true)
 |-- Parental / Maternal mental health promotion and prevention: string (nullable = true)
 |-- Work-related mental health prevention and promotion: string (nullable = true)
 |-- Mental health and psychosocial component of disaster preparedness, disaster risk reduction: string (nullable = true)
 |-- country_dict_country: string (nullable = true)
 |-- country_dict_country_name: string (nullable = true)
 |-- country_dict_country_code: string (nullable = true)
 |-- country_dict_un_region: string (nullable = true)



In [99]:
atlas_countries_programs_info_df = atlas_countries_programs_info_df.drop("Country").drop("country_dict_country").drop("country_dict_country_name").drop("country_dict_un_region").withColumnsRenamed({
    "Suicide prevention programme": "suicide_prevention_program",
    "Mental Health Awareness /Anti- stigma": "awareness_anti_stigma_program",
    "Early Child Development": "early_child_development_program",
    "School based mental health prevention and promotion": "school_based_program",
    "Parental / Maternal mental health promotion and prevention": "parental_health_program",
    "Work-related mental health prevention and promotion": "work_related_program",
    "Mental health and psychosocial component of disaster preparedness, disaster risk reduction": "disaster_preparation_program",
    "country_dict_country_code": "country_code",
})

In [100]:
@F.udf(returnType=types.StringType())
def convert_dash_to_null_table_string(value):
    if value == "-" or value == "-,-,-,-" or value == "??":
        return None
    elif isinstance(value, type(None)):
        return
    else:
        return value

In [101]:
for col in atlas_countries_programs_info_df.columns:
    if col == "country_code":
        continue
    atlas_countries_programs_info_df = atlas_countries_programs_info_df.withColumn(col, convert_dash_to_null_table_string(F.col(col)))

In [102]:
atlas_countries_programs_info_df = atlas_countries_programs_info_df.withColumn("suicide_prevention_program", convert_dash_to_null_table_string(F.col("suicide_prevention_program"))).withColumn("awareness_anti_stigma_program", convert_dash_to_null_table_string(F.col("awareness_anti_stigma_program"))).withColumn("early_child_development_program", convert_dash_to_null_table_string(F.col("early_child_development_program"))).withColumn("school_based_program", convert_dash_to_null_table_string(F.col("school_based_program"))).withColumn("parental_health_program", convert_dash_to_null_table_string(F.col("parental_health_program"))).withColumn("work_related_program", convert_dash_to_null_table_string(F.col("work_related_program"))).withColumn("disaster_preparation_program", convert_dash_to_null_table_string(F.col("disaster_preparation_program")))

In [105]:
atlas_countries_programs_info_df.printSchema()

root
 |-- suicide_prevention_program: string (nullable = true)
 |-- awareness_anti_stigma_program: string (nullable = true)
 |-- early_child_development_program: string (nullable = true)
 |-- school_based_program: string (nullable = true)
 |-- parental_health_program: string (nullable = true)
 |-- work_related_program: string (nullable = true)
 |-- disaster_preparation_program: string (nullable = true)
 |-- country_code: string (nullable = true)



In [104]:
atlas_countries_programs_info_df.write.csv(output_atlas_countries_programs_info_dims, mode="overwrite")