# Imports + Read Files

In [16]:
import pandas
from pyspark.sql import SparkSession
from pyspark.sql.functions import (
    regexp_replace, coalesce, col, substring, length, row_number, split, to_date, trim, monotonically_increasing_id, broadcast
)
from pyspark.sql.window import Window
from pyspark.sql.types import FloatType


In [3]:
spark = SparkSession.builder \
      .appName("dimsdataproc") \
      .getOrCreate()

25/05/17 20:14:10 WARN Utils: Your hostname, Muhameds-MacBook-Air-2.local resolves to a loopback address: 127.0.0.1; using 10.0.0.74 instead (on interface en0)
25/05/17 20:14:10 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/05/17 20:14:10 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [4]:
spark.read.text("/Users/muhamedhashi/data/data_part1.txt").show(5, truncate=False)

+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|value                                                                                                                                                                                                                                                                                          

In [5]:
dimdatapart1 = spark.read.csv("/Users/muhamedhashi/data/data_part1.txt", sep="\t", header = True)
dimdatapart1.show(3)

+--------------------+--------------------+--------------------+----------+----------+----------------------+---------------------------+--------------------+--------------------+----------+--------------------+---------------------------------------+
|     IATI Identifier|               Title|         Description|Start Date|  End Date|Reporting Organisation|Reporting Organisation Type|            Aid Type|        Finance Type| Flow Type|Value (USD) (Budget)|Value (USD) (Disbursement, Expenditure)|
+--------------------+--------------------+--------------------+----------+----------+----------------------+---------------------------+--------------------+--------------------+----------+--------------------+---------------------------------------+
|XI-IATI-WBTF-P178946|Public Expenditur...|To contribute to ...|2022-03-01|2025-04-30|  XI-IATI-WBTF - Wo...|          40 - Multilateral|C01 - Project-typ...|110 - Standard grant| - No data|         15959648660|                            22669

In [6]:
dimdatapart2 = spark.read.csv("/Users/muhamedhashi/data/data_part2.txt", sep="\t", header = True)
dimdatapart2.show(3)

+--------------------+---------------------+--------------------------+---------------------+--------------------------+--------------------+---------------------------------------+
|     IATI Identifier|Provider Organisation|Provider Organisation Type|Receiver Organisation|Receiver Organisation Type|Value (USD) (Budget)|Value (USD) (Disbursement, Expenditure)|
+--------------------+---------------------+--------------------------+---------------------+--------------------------+--------------------+---------------------------------------+
|XI-IATI-WBTF-P178946| World Bank Trust ...|                 - No data|              UKRAINE|                 - No data|                   0|                            22669600494|
|US-GOV-1-7200EE22...| United States Age...|           10 - Government|     World Bank Group|         40 - Multilateral|                   0|                             9900000040|
|    US-GOV-11-247780| United States Dep...|           10 - Government| United States Dep.

In [7]:
dimdatapart3 = spark.read.csv("/Users/muhamedhashi/data/data_part3.txt", sep="\t", header = True)
dimdatapart3.show(3)

+--------------------+---------------------------+--------------------+--------------------+------------+-----------------+--------------------+---------------------------------------+
|     IATI Identifier|Recipient Country or Region|     Sector Category|              Sector|Humanitarian|Transactiion Type|Value (USD) (Budget)|Value (USD) (Disbursement, Expenditure)|
+--------------------+---------------------------+--------------------+--------------------+------------+-----------------+--------------------+---------------------------------------+
|XI-IATI-WBTF-P178946|               UA - Ukraine|150 - Government ...|15110 - Public se...|       FALSE|                3|                   0|                            22668000000|
|XI-IATI-WBTF-P178946|               UA - Ukraine|150 - Government ...|15110 - Public se...|       FALSE|           budget|         15959648660|                                      0|
|       44000-P178946|               UA - Ukraine|           - No data|    

In [8]:
dimdatapart3 = dimdatapart3.withColumn("country_code", substring(col("Recipient Country or Region"), 1, 2))
dimdatapart3.show(5)

+--------------------+---------------------------+--------------------+--------------------+------------+-----------------+--------------------+---------------------------------------+------------+
|     IATI Identifier|Recipient Country or Region|     Sector Category|              Sector|Humanitarian|Transactiion Type|Value (USD) (Budget)|Value (USD) (Disbursement, Expenditure)|country_code|
+--------------------+---------------------------+--------------------+--------------------+------------+-----------------+--------------------+---------------------------------------+------------+
|XI-IATI-WBTF-P178946|               UA - Ukraine|150 - Government ...|15110 - Public se...|       FALSE|                3|                   0|                            22668000000|          UA|
|XI-IATI-WBTF-P178946|               UA - Ukraine|150 - Government ...|15110 - Public se...|       FALSE|           budget|         15959648660|                                      0|          UA|
|       44

25/05/17 20:14:24 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors


In [8]:
hdi = spark.read.csv("/Users/muhamedhashi/data/HDR25_Statistical_Annex_HDI_Table.csv", header=True)

hdi = hdi.select(
    trim(col("HDI rank")).alias("hdi_rank"),
    trim(col("Country Code")).alias("hdi_country_code"),
    trim(col("Country")).alias("hdi_country"),
    trim(col("Human Development Index (HDI) ")).alias("hdi"),
    trim(col("Human Development Level")).alias("hdi_level"), 
)
hdi.show(10)

+--------+----------------+--------------------+-----+---------+
|hdi_rank|hdi_country_code|         hdi_country|  hdi|hdi_level|
+--------+----------------+--------------------+-----+---------+
|       1|              IS|             Iceland|0.972|Very high|
|       2|              NO|              Norway|0.970|Very high|
|       2|              CH|         Switzerland|0.970|Very high|
|       4|              DK|             Denmark|0.962|Very high|
|       5|              DE|             Germany|0.959|Very high|
|       5|              SE|              Sweden|0.959|Very high|
|       7|              AU|           Australia|0.958|Very high|
|       8|              HK|Hong Kong, China ...|0.955|Very high|
|       8|              NL|         Netherlands|0.955|Very high|
|      10|              BE|             Belgium|0.951|Very high|
+--------+----------------+--------------------+-----+---------+
only showing top 10 rows



# Aid DataFrame

In [9]:
aid_type_dim = dimdatapart1.select("Aid Type").distinct()
aid_type_dim.show(3)

[Stage 10:>                                                         (0 + 8) / 8]

+--------------------+
|            Aid Type|
+--------------------+
|21 - Non-export c...|
|H02 - Refugees/as...|
|B033 - Contributi...|
+--------------------+
only showing top 3 rows



                                                                                

In [10]:
aid_type_df = (
    dimdatapart1
      .select(trim(col("Aid Type")).alias("aid_type"))
      .distinct()
      .filter(col("aid_type").isNotNull())
      .filter(to_date(col("aid_type"), "yyyy-MM-dd").isNull())
      .withColumn("aid_code",        split(col("aid_type"), " - ")[0])
      .withColumn("aid_description", split(col("aid_type"), " - ")[1])
      .dropDuplicates(["aid_type", "aid_code", "aid_description"])
      .replace("- No data", "Unknown", subset=["aid_type"])
      .orderBy("aid_type")
      .withColumn("aid_type_id", monotonically_increasing_id() + 1)
      .select("aid_type_id", "aid_type", "aid_code", "aid_description")
)

aid_type_df.show(10)

[Stage 13:>                                                         (0 + 8) / 8]

+-----------+--------------------+--------+--------------------+
|aid_type_id|            aid_type|aid_code|     aid_description|
+-----------+--------------------+--------+--------------------+
|          1|            10 - ODA|      10|                 ODA|
|          2|110 - Standard grant|     110|      Standard grant|
|          3|15 - Other Public...|      15| Other Public Sector|
|          4|            20 - OOF|      20|                 OOF|
|          5|21 - Non-export c...|      21|Non-export credit...|
|          6|30 - Private Deve...|      30|Private Developme...|
|          7| 35 - Private Market|      35|      Private Market|
|          8|36 - Private Fore...|      36|Private Foreign D...|
|          9|       40 - Non flow|      40|            Non flow|
|         10|410 - Aid loan ex...|     410|Aid loan excludin...|
+-----------+--------------------+--------+--------------------+
only showing top 10 rows



                                                                                

# Finance Type Data Frame

In [9]:
finance_type_df = (
    dimdatapart1
      .select(trim(col("Finance Type")).alias("finance_type"))
      .distinct()
      .filter(col("finance_type").isNotNull())
      .filter(~col("finance_type").rlike("^-?\\d+(\\.\\d+)?$"))
      .withColumn("finance_code",        split(col("finance_type"), " - ")[0])
      .withColumn("finance_description", split(col("finance_type"), " - ")[1])
      .replace("- No data", "Unknown", subset=["finance_type"])
      .dropDuplicates(["finance_type", "finance_code", "finance_description"])
      .orderBy("finance_type")
      .withColumn("finance_type_id", monotonically_increasing_id() + 1)
      .select("finance_type_id", "finance_type", "finance_code", "finance_description")
)

finance_type_df.show(10)

                                                                                

+---------------+--------------------+------------+--------------------+
|finance_type_id|        finance_type|finance_code| finance_description|
+---------------+--------------------+------------+--------------------+
|              1|            10 - ODA|          10|                 ODA|
|              2|110 - Standard grant|         110|      Standard grant|
|              3|1100 - Guarantees...|        1100|Guarantees/insurance|
|              4|       2 - ODA % GNI|           2|           ODA % GNI|
|              5|            20 - OOF|          20|                 OOF|
|              6|          2022-12-01|  2022-12-01|                NULL|
|              7|21 - Non-export c...|          21|Non-export credit...|
|              8|210 - Interest su...|         210|    Interest subsidy|
|              9|30 - Private Deve...|          30|Private Developme...|
|             10|310 - Capital sub...|         310|Capital subscript...|
+---------------+--------------------+------------+

# Flow Type Data Frame

In [10]:
flow_type_df = (
    dimdatapart1
      .select(trim(col("Flow Type")).alias("flow_type"))
      .distinct()
      .filter(col("flow_type").isNotNull())
      .filter(~col("flow_type").rlike("^-?\\d+(\\.\\d+)?([eE]-?\\d+)?$"))
      .replace("- No data", "Unknown", subset=["flow_type"])
      .dropDuplicates(["flow_type"])
      .orderBy("flow_type")                             
      .withColumn("flow_type_id", monotonically_increasing_id() + 1)
      .withColumn("flow_code",        split(col("flow_type"), " - ")[0])
      .withColumn("flow_description", split(col("flow_type"), " - ")[1])
      .select("flow_type_id", "flow_type", "flow_code", "flow_description")
)

flow_type_df.show(5)

+------------+--------------------+----------+--------------------+
|flow_type_id|           flow_type| flow_code|    flow_description|
+------------+--------------------+----------+--------------------+
|           1|            10 - ODA|        10|                 ODA|
|           2|110 - Standard grant|       110|      Standard grant|
|           3|            20 - OOF|        20|                 OOF|
|           4|          2026-11-30|2026-11-30|                NULL|
|           5|21 - Non-export c...|        21|Non-export credit...|
+------------+--------------------+----------+--------------------+
only showing top 5 rows



# Date Data Frame

In [11]:
date_order = Window.orderBy("Start Date", "End Date")
date_df = (
    dimdatapart1
    .select("Start Date", "End Date")
    .distinct()
    .dropDuplicates(["Start Date", "End Date"])
    .filter(
        to_date("End Date", "yyyy-MM-dd").isNotNull() | col("End Date").isNull()
    )
    .withColumn("date_id", row_number().over(date_order))
    .selectExpr("date_id", "`Start Date` AS start_date", "`End Date` AS end_date")
)
date_df.show(10)

25/05/17 20:16:31 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/05/17 20:16:31 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/05/17 20:16:31 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/05/17 20:16:34 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/05/17 20:16:34 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/05/17 20:16:34 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.


+-------+--------------------+----------+
|date_id|          start_date|  end_date|
+-------+--------------------+----------+
|      1|                NULL|      NULL|
|      2|                NULL|2023-10-31|
|      3|                NULL|2023-12-31|
|      4|                NULL|2024-01-12|
|      5|"<p>Expenditure v...|2023-11-28|
|      6|           12 Months|      NULL|
|      7|          1899-12-31|2029-01-01|
|      8|          1899-12-31|2029-12-31|
|      9|          1900-01-01|      NULL|
|     10|          1951-01-01|2025-12-31|
+-------+--------------------+----------+
only showing top 10 rows



                                                                                

# Sector Data Frame

In [12]:
sector_df = (
    dimdatapart3
      .select(
          trim(col("Sector Category")).alias("sector_category"),
          trim(col("Sector")).alias("sector_type")
      )
      .distinct()
      .dropDuplicates(["sector_category", "sector_type"])
      .orderBy("sector_category", "sector_type")    
      .withColumn("sector_id", monotonically_increasing_id() + 1)
      .select("sector_id", "sector_type")
)

sector_df.show(5)

[Stage 27:>                                                         (0 + 8) / 8]

+---------+--------------------+
|sector_id|         sector_type|
+---------+--------------------+
|        1|           - No data|
|        2|23010 - Energy po...|
|        3|23020 - Power gen...|
|        4|23030 - Power gen...|
|        5|23040 - Electrica...|
+---------+--------------------+
only showing top 5 rows



                                                                                

# Aid Purpose Data Frame

In [13]:
aid_purpose_dim = dimdatapart3.select("Humanitarian").distinct()

aid_purpose_df = (
    aid_purpose_dim
      .dropDuplicates(["Humanitarian"])
      .orderBy("Humanitarian")                        
      .withColumn("purpose_id", monotonically_increasing_id() + 1) 
      .select("purpose_id", "Humanitarian")
)

aid_purpose_df.show(5)

+----------+------------+
|purpose_id|Humanitarian|
+----------+------------+
|         1|       FALSE|
|         2|        TRUE|
+----------+------------+



# Activity Data Frame

In [17]:
activity_df = (
    dimdatapart1
      .select(
          trim(regexp_replace(
                regexp_replace(col("Title"), r'^[^a-zA-Z0-9]+', ''),   # Remove bad symbols from start
                r'[^a-zA-Z0-9]+$', ''                                  # Remove bad symbols from end
            )).alias("title"))
      .distinct()
      .filter(col("title").isNotNull() & (col("title") != "") & (length(col("title")) <= 100))
      .dropDuplicates(["title"])
      .orderBy("title")                               
      .withColumn("activity_id", monotonically_increasing_id() + 1)
      .select("activity_id", "title")
)

activity_df.show(10)

                                                                                

+-----------+--------------------+
|activity_id|               title|
+-----------+--------------------+
|          1|                0.00|
|          2|0.2. STEWARDSHIP ...|
|          3|     0.75 KG of Salt|
|          4|001 CP SYSTEM STR...|
|          5|001 EARLY CHILDHO...|
|          6|001 EVIDENCE GENE...|
|          7|001 GOVERNANCE AN...|
|          8|001 HEALTH SYSTEM...|
|          9|001 HEALTH SYSTEM...|
|         10|001 POLICY & MULT...|
+-----------+--------------------+
only showing top 10 rows



# Receiver Data Frame

In [20]:
receiver_df = (
  dimdatapart2
    .select(
      trim(regexp_replace(col("Receiver Organisation"), r'"', '')).alias("receiver_org_name"),
      trim(col("Receiver Organisation Type")).alias("receiver_org_type")
    )
    .filter(col("receiver_org_name").isNotNull() & col("receiver_org_type").isNotNull())
    .dropDuplicates(["receiver_org_name", "receiver_org_type"])
    .orderBy("receiver_org_type", "receiver_org_name")          
    .withColumn("receiver_org_id", monotonically_increasing_id() + 1) 
    .select("receiver_org_id", "receiver_org_name", "receiver_org_type")
)

receiver_df.show(10)

                                                                                

+---------------+--------------------+-----------------+
|receiver_org_id|   receiver_org_name|receiver_org_type|
+---------------+--------------------+-----------------+
|              1|(MULTINAT.) MINIS...|        - No data|
|              2|): Ministry of  E...|        - No data|
|              3|          **********|        - No data|
|              4|-Ministry of Econ...|        - No data|
|              5|.Red de Comercial...|        - No data|
|              6|100 BEDDED DISTRI...|        - No data|
|              7|100 BEDED DISTRIC...|        - No data|
|              8|1001 FONTAINES PO...|        - No data|
|              9|11 HOSPITALITY (L...|        - No data|
|             10|            11.11.11|        - No data|
+---------------+--------------------+-----------------+
only showing top 10 rows



# Reporting Data Frame

In [18]:
reporting_df = (
    dimdatapart1
      .select(
          trim(col("Reporting Organisation")).alias("reporting_org_name"),
          trim(col("Reporting Organisation Type")).alias("reporting_org_type")
      )
      .filter(
          ~(
            (col("reporting_org_name") == "- No data") &
            (col("reporting_org_type") == "- No data")
          )
      )
      .filter(col("reporting_org_name").isNotNull() &
              col("reporting_org_type").isNotNull())
      .dropDuplicates(["reporting_org_name", "reporting_org_type"])
      .orderBy("reporting_org_type", "reporting_org_name")
      .withColumn("reporting_org_id", monotonically_increasing_id() + 1)
      .select("reporting_org_id", "reporting_org_name", "reporting_org_type")
)

reporting_df.show(10)

+----------------+--------------------+------------------+
|reporting_org_id|  reporting_org_name|reporting_org_type|
+----------------+--------------------+------------------+
|               1|     10 - Government|         - No data|
|               2|21 - Internationa...|         - No data|
|               3|   22 - National NGO|         - No data|
|               4|   23 - Regional NGO|         - No data|
|               5|     60 - Foundation|         - No data|
|               6| 70 - Private Sector|         - No data|
|               7|80 - Academic, Tr...|         - No data|
|               8|B01 - Core suppor...|         - No data|
|               9|B03 - Contributio...|         - No data|
|              10|BE-BCE_KBO-041005...|         - No data|
+----------------+--------------------+------------------+
only showing top 10 rows



                                                                                

# Provider Data Frame

In [22]:
provider_df = (
    dimdatapart2
      .select(
          regexp_replace(trim(col("provider Organisation")), r'["\\]', '').alias("provider_org_name"),
          trim(col("provider Organisation Type")).alias("provider_org_type")
      )
      .filter(
        col("provider_org_name").isNotNull() &
        col("provider_org_type").isNotNull() &
        (col("provider_org_name") != "0 [0]")
      )
      .dropDuplicates(["provider_org_name", "provider_org_type"])
      .orderBy("provider_org_type", "provider_org_name")
      .withColumn("provider_org_id", monotonically_increasing_id() + 1)
      .select("provider_org_id", "provider_org_name", "provider_org_type")
)

provider_df.show(10)

                                                                                

+---------------+--------------------+-----------------+
|provider_org_id|   provider_org_name|provider_org_type|
+---------------+--------------------+-----------------+
|              1|1st Commitment  L...|        - No data|
|              2|AAWAG-Asian Ambas...|        - No data|
|              3|        ABB [DAB012]|        - No data|
|              4|         ABPF [ABPF]|        - No data|
|              5|               ACTED|        - No data|
|              6|                 ADB|        - No data|
|              7|ADD International...|        - No data|
|              8|AECOM [GB-COH-018...|        - No data|
|              9|AFREXIMBANK [XM-D...|        - No data|
|             10|AFRICAN DEV. BANK...|        - No data|
+---------------+--------------------+-----------------+
only showing top 10 rows



# Transaction Data Frame

In [20]:
transaction_df = (
    dimdatapart3
      .select(trim(col("Transactiion Type")).alias("transaction_label"))
      .distinct()
      .filter(col("transaction_label").isNotNull() & (col("transaction_label") != ""))
      .filter(~col("transaction_label").rlike("^[0-9]+$"))
      .replace("- No data", "Unknown", subset=["transaction_label"])
      .dropDuplicates(["transaction_label"])
      .orderBy("transaction_label") 
      .withColumn("transaction_id", monotonically_increasing_id() + 1)
      .select("transaction_id", "transaction_label")
)
transaction_df.show(10)

+--------------+-----------------+
|transaction_id|transaction_label|
+--------------+-----------------+
|             1|           budget|
+--------------+-----------------+



25/05/15 18:53:56 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors


# Location Data Frame

In [26]:
location_df = (
    dimdatapart3
      .select(trim(col("Recipient Country or Region"))
                  .alias("recipient_country_region"))
      .distinct()
      .filter(col("recipient_country_region").isNotNull() &
              (col("recipient_country_region") != ""))
      .withColumn("location_code", split(col("recipient_country_region"), " - ")[0])
      .withColumn("location_name", split(col("recipient_country_region"), " - ")[1])
      .dropDuplicates(["recipient_country_region", "location_code", "location_name"])
      .orderBy("recipient_country_region")
      .withColumn("location_id", monotonically_increasing_id() + 1)
      .select(
          "location_id",
          "recipient_country_region",
          "location_code",
          "location_name"
      )
)

location_df.show(5)

                                                                                

+-----------+------------------------+-------------+--------------------+
|location_id|recipient_country_region|location_code|       location_name|
+-----------+------------------------+-------------+--------------------+
|          1|    1027 - Eastern Af...|         1027|Eastern Africa, r...|
|          2|    1028 - Middle Afr...|         1028|Middle Africa, re...|
|          3|    1029 - Southern A...|         1029|Southern Africa, ...|
|          4|    1030 - Western Af...|         1030|Western Africa, r...|
|          5|    1031 - Caribbean,...|         1031| Caribbean, regional|
+-----------+------------------------+-------------+--------------------+
only showing top 5 rows



# Fact Table

In [22]:
dimdatapart1 = dimdatapart1 \
    .withColumnRenamed("Value (USD) (Budget)", "usd_budget_part1") \
    .withColumnRenamed("Value (USD) (Disbursement, Expenditure)", "usd_disbursed_part1")

dimdatapart2 = dimdatapart2 \
    .withColumnRenamed("Value (USD) (Budget)", "usd_budget_part2") \
    .withColumnRenamed("Value (USD) (Disbursement, Expenditure)", "usd_disbursed_part2")

dimdatapart3 = dimdatapart3 \
    .withColumnRenamed("Value (USD) (Budget)", "usd_budget_part3") \
    .withColumnRenamed("Value (USD) (Disbursement, Expenditure)", "usd_disbursed_part3")

In [23]:
base_fact_df = (
    dimdatapart1
    .join(dimdatapart2, on="IATI Identifier", how="inner")
    .join(dimdatapart3, on="IATI Identifier", how="inner")
)


base_fact_df = base_fact_df.withColumn(
    "usd_budget",
    coalesce("usd_budget_part1", "usd_budget_part2", "usd_budget_part3")
).withColumn(
    "usd_disbursed",
    coalesce("usd_disbursed_part1", "usd_disbursed_part2", "usd_disbursed_part3")
)

base_fact_df = base_fact_df.drop(
    "usd_budget_part1", "usd_budget_part2", "usd_budget_part3",
    "usd_disbursed_part1", "usd_disbursed_part2", "usd_disbursed_part3"
)
base_fact_df = base_fact_df.dropDuplicates(["IATI Identifier"])

In [24]:
base_fact_df.show(5)

25/05/15 18:53:57 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
[Stage 106:>                                                        (0 + 8) / 8]

+--------------------+--------------------+--------------------+----------+----------+----------------------+---------------------------+--------------------+--------------------+----------+---------------------+--------------------------+---------------------+--------------------------+---------------------------+--------------------+--------------------+------------+-----------------+------------+-----------+-------------+
|     IATI Identifier|               Title|         Description|Start Date|  End Date|Reporting Organisation|Reporting Organisation Type|            Aid Type|        Finance Type| Flow Type|Provider Organisation|Provider Organisation Type|Receiver Organisation|Receiver Organisation Type|Recipient Country or Region|     Sector Category|              Sector|Humanitarian|Transactiion Type|country_code| usd_budget|usd_disbursed|
+--------------------+--------------------+--------------------+----------+----------+----------------------+---------------------------+-----

                                                                                

In [25]:
fact_renamed = base_fact_df \
    .withColumnRenamed("Reporting Organisation",        "reporting_org_name") \
    .withColumnRenamed("Reporting Organisation Type",   "reporting_org_type") \
    .withColumnRenamed("Provider Organisation",         "provider_org_name") \
    .withColumnRenamed("Provider Organisation Type",    "provider_org_type") \
    .withColumnRenamed("Receiver Organisation",         "receiver_org_name") \
    .withColumnRenamed("Receiver Organisation Type",    "receiver_org_type") \
    .withColumnRenamed("Finance Type",                  "finance_type") \
    .withColumnRenamed("Flow Type",                     "flow_type") \
    .withColumnRenamed("Transactiion Type",             "transaction_label") \
    .withColumnRenamed("Recipient Country or Region",   "recipient_country_region") \
    .withColumnRenamed("Start Date",                    "start_date") \
    .withColumnRenamed("End Date",                      "end_date") \
    .withColumnRenamed("Title",                         "title") \
    .withColumnRenamed("Aid Type",                      "aid_type") \
    .withColumnRenamed("Humanitarian",                  "humanitarian") \
    .withColumnRenamed("Sector",                        "sector_type") \

aid_purpose_df = aid_purpose_df.withColumnRenamed("Humanitarian", "humanitarian")

In [26]:
fact_stage = (
    fact_renamed
      .join(broadcast(activity_df),      ["title"],                "inner")
      .join(broadcast(reporting_df),     ["reporting_org_name", "reporting_org_type"], "inner")
      .join(broadcast(provider_df),      ["provider_org_name",  "provider_org_type"],  "inner")
      .join(broadcast(receiver_df),      ["receiver_org_name",  "receiver_org_type"],  "inner")
      .join(broadcast(finance_type_df),  ["finance_type"],                         "inner")
      .join(broadcast(flow_type_df),     ["flow_type"],                            "inner")
      .join(broadcast(transaction_df),   ["transaction_label"],                    "inner")
      .join(broadcast(location_df),      ["recipient_country_region"],             "inner")
      .join(broadcast(date_df),          ["start_date", "end_date"],               "inner")
      .join(broadcast(sector_df),        ["sector_type"],                          "inner")
      .join(broadcast(aid_purpose_df),   ["humanitarian"],                         "inner")
      .join(broadcast(aid_type_df),      ["aid_type"],                             "inner")
)

In [27]:
fact_table = (
    fact_stage
      .withColumn("fact_id", monotonically_increasing_id() + 1)
      .select(
          "fact_id",
          "activity_id",
          "reporting_org_id",
          "purpose_id",
          "finance_type_id",
          "flow_type_id",
          "provider_org_id",
          "receiver_org_id",
          "sector_id",
          "transaction_id",
          "date_id",
          "location_id",
          "aid_type_id",
          "country_code",
          col("usd_budget").cast("double"),
          col("usd_disbursed").cast("double")
      )
)

In [28]:
fact_table.show(5)

25/05/15 18:54:03 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/05/15 18:54:03 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/05/15 18:54:04 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/05/15 18:54:04 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/05/15 18:54:04 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/05/15 18:54:04 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/05/15 1

+-------+-----------+----------------+----------+---------------+------------+---------------+---------------+---------+--------------+-------+-----------+-----------+------------+------------+-------------+
|fact_id|activity_id|reporting_org_id|purpose_id|finance_type_id|flow_type_id|provider_org_id|receiver_org_id|sector_id|transaction_id|date_id|location_id|aid_type_id|country_code|  usd_budget|usd_disbursed|
+-------+-----------+----------------+----------+---------------+------------+---------------+---------------+---------+--------------+-------+-----------+-----------+------------+------------+-------------+
|      1|17179876375|             667|         1|              2|           1|           3103|          37180|       70|             1|  32901|        101|         20|          GH|  36039.9311|   8017.76955|
|      2|      14794|             592|         1|              2|           1|           2811|          26992|      163|             1|  10817|         46|         26| 

In [29]:
fact_table_enriched = fact_table.join(
    hdi,
    trim(fact_table["country_code"]) == trim(hdi["hdi_country_code"]),
    how="inner"
)

In [30]:
fact_table_enriched.show(3)
# if create dimension for hdi, leave country code

25/05/15 18:54:11 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/05/15 18:54:11 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/05/15 18:54:12 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/05/15 18:54:12 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/05/15 18:54:13 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/05/15 18:54:13 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/05/15 1

+-------+-----------+----------------+----------+---------------+------------+---------------+---------------+---------+--------------+-------+-----------+-----------+------------+-----------+-------------+--------+----------------+------------+-----+---------+
|fact_id|activity_id|reporting_org_id|purpose_id|finance_type_id|flow_type_id|provider_org_id|receiver_org_id|sector_id|transaction_id|date_id|location_id|aid_type_id|country_code| usd_budget|usd_disbursed|hdi_rank|hdi_country_code| hdi_country|  hdi|hdi_level|
+-------+-----------+----------------+----------+---------------+------------+---------------+---------------+---------+--------------+-------+-----------+-----------+------------+-----------+-------------+--------+----------------+------------+-----+---------+
|      1|17179876382|             667|         1|              2|           1|           3103|          37180|       70|             1|  32901|        101|         20|          GH| 36039.9311|   8017.76955|     143

In [32]:
fact_table_enriched.coalesce(1).write.option("header", True).mode("overwrite").csv("fact_table_output4")

25/05/15 18:58:04 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/05/15 18:58:04 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/05/15 18:58:06 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/05/15 18:58:06 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/05/15 18:58:06 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/05/15 18:58:06 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/05/15 1

In [33]:
fact_table_enriched.printSchema()

root
 |-- fact_id: long (nullable = false)
 |-- activity_id: long (nullable = false)
 |-- reporting_org_id: long (nullable = false)
 |-- purpose_id: long (nullable = false)
 |-- finance_type_id: long (nullable = false)
 |-- flow_type_id: long (nullable = false)
 |-- provider_org_id: long (nullable = false)
 |-- receiver_org_id: long (nullable = false)
 |-- sector_id: long (nullable = false)
 |-- transaction_id: long (nullable = false)
 |-- date_id: integer (nullable = false)
 |-- location_id: long (nullable = false)
 |-- aid_type_id: long (nullable = false)
 |-- country_code: string (nullable = true)
 |-- usd_budget: double (nullable = true)
 |-- usd_disbursed: double (nullable = true)
 |-- hdi_rank: string (nullable = true)
 |-- hdi_country_code: string (nullable = true)
 |-- hdi_country: string (nullable = true)
 |-- hdi: string (nullable = true)
 |-- hdi_level: string (nullable = true)



In [46]:
base_fact_df.printSchema()

root
 |-- IATI Identifier: string (nullable = true)
 |-- Title: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Start Date: string (nullable = true)
 |-- End Date: string (nullable = true)
 |-- Reporting Organisation: string (nullable = true)
 |-- Reporting Organisation Type: string (nullable = true)
 |-- Aid Type: string (nullable = true)
 |-- Finance Type: string (nullable = true)
 |-- Flow Type: string (nullable = true)
 |-- Provider Organisation: string (nullable = true)
 |-- Provider Organisation Type: string (nullable = true)
 |-- Receiver Organisation: string (nullable = true)
 |-- Receiver Organisation Type: string (nullable = true)
 |-- Recipient Country or Region: string (nullable = true)
 |-- Sector Category: string (nullable = true)
 |-- Sector: string (nullable = true)
 |-- Humanitarian: string (nullable = true)
 |-- Transactiion Type: string (nullable = true)
 |-- country_code: string (nullable = true)
 |-- usd_budget: string (nullable = true)
 |-

In [1]:
finance_type_df.printSchema()

NameError: name 'finance_type_df' is not defined

In [47]:
aid_type_df.printSchema()

root
 |-- aid_type_id: long (nullable = false)
 |-- aid_type: string (nullable = true)
 |-- aid_code: string (nullable = true)
 |-- aid_description: string (nullable = true)



25/05/16 14:03:45 WARN TransportChannelHandler: Exception in connection from /10.19.104.4:58141
java.io.IOException: Operation timed out
	at java.base/sun.nio.ch.SocketDispatcher.read0(Native Method)
	at java.base/sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:47)
	at java.base/sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:340)
	at java.base/sun.nio.ch.IOUtil.read(IOUtil.java:294)
	at java.base/sun.nio.ch.IOUtil.read(IOUtil.java:269)
	at java.base/sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:425)
	at io.netty.buffer.PooledByteBuf.setBytes(PooledByteBuf.java:254)
	at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:1132)
	at io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:357)
	at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:151)
	at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:788)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimiz

In [34]:
aid_type_df.coalesce(1).write.option("header", True).mode("overwrite").csv("aid_type_dim_output")

                                                                                

In [35]:
finance_type_df.coalesce(1).write.option("header", True).mode("overwrite").csv("finance_dim_output")

In [36]:
flow_type_df.coalesce(1).write.option("header", True).mode("overwrite").csv("flow_dim_output")

In [37]:
date_df.coalesce(1).write.option("header", True).mode("overwrite").csv("date_dim_output")

25/05/15 19:18:29 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/05/15 19:18:29 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/05/15 19:18:29 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/05/15 19:18:30 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/05/15 19:18:30 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/05/15 19:18:30 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/05/15 1

In [38]:
sector_df.coalesce(1).write.option("header", True).mode("overwrite").csv("sector_dim_output")

                                                                                

In [39]:
aid_purpose_df.coalesce(1).write.option("header", True).mode("overwrite").csv("aid_purpose_dim_output")

In [19]:
activity_df.coalesce(1).write.option("header", True).option("quote", '"').option("escape", '"').option("quoteAll", True).mode("overwrite").csv("activity_dim_output")


                                                                                

In [21]:
receiver_df.coalesce(1).write.option("header", True).option("quote", '"').option("escape", '"').option("quoteAll", True).mode("overwrite").csv("receiver_dim_output")

                                                                                

In [42]:
reporting_df.coalesce(1).write.option("header", True).mode("overwrite").csv("reporting_dim_output")

In [24]:
provider_df.coalesce(1).write.option("header", True).option("quote", '"').option("escape", '"').option("quoteAll", True).mode("overwrite").csv("provider_dim_output")

                                                                                

In [44]:
transaction_df.coalesce(1).write.option("header", True).mode("overwrite").csv("transaction_dim_output")

In [45]:
location_df.coalesce(1).write.option("header", True).mode("overwrite").csv("location_dim_output")

In [27]:
location_df.printSchema()

root
 |-- location_id: long (nullable = false)
 |-- recipient_country_region: string (nullable = true)
 |-- location_code: string (nullable = true)
 |-- location_name: string (nullable = true)

