In [0]:
from pyspark.sql import SparkSession
spark = SparkSession. \
    builder. \
    appName('World Bank ETL'). \
    master('yarn'). \
    getOrCreate() 

In [0]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, TimestampType

In [0]:
cust_schema = StructType([\
                    StructField("pid",StringType(),True),\
                    StructField("regionname",StringType(),True),\
                    StructField("countryname",StringType(),False),\
                    StructField("boardapprovaldate",TimestampType(),True),\
                    StructField("closingdate",TimestampType(),True),\
                    StructField("curr_project_cost",IntegerType(),True),\
                    StructField("curr_ibrd_commitment",IntegerType(),True),\
                    StructField("curr_ida_commitment",IntegerType(),True),\
                    StructField("curr_total_commit",IntegerType(),True),\
                    StructField("grantamt",IntegerType(),True)])

In [0]:
# Data imported with custom schema applied.
df = spark.read.schema(cust_schema).csv('/FileStore/tables/wb_projects_demo.csv')
                   

In [0]:
df.show()

+--------------------+--------------------+--------------------+-------------------+-------------------+-----------------+--------------------+-------------------+-----------------+--------+
|                 pid|          regionname|         countryname|  boardapprovaldate|        closingdate|curr_project_cost|curr_ibrd_commitment|curr_ida_commitment|curr_total_commit|grantamt|
+--------------------+--------------------+--------------------+-------------------+-------------------+-----------------+--------------------+-------------------+-----------------+--------+
|World Bank Projec...|                null|                null|               null|               null|             null|                null|               null|             null|    null|
|          Project ID|              Region|             Country|               null|               null|             null|                null|               null|             null|    null|
|                  id|          regionname|  

In [0]:
from pyspark.sql.functions import monotonically_increasing_id, col, lit, split
from pyspark.sql.types import *

In [0]:
#df.withColumn("Index",monotonically_increasing_id)

 #.filter('Index > 2)

 #.drop("Index")
df1 = df.withColumn("Index",monotonically_increasing_id())
    

In [0]:
df1.select("Index").show()

+-----+
|Index|
+-----+
|    0|
|    1|
|    2|
|    3|
|    4|
|    5|
|    6|
|    7|
|    8|
|    9|
|   10|
|   11|
|   12|
|   13|
|   14|
|   15|
|   16|
|   17|
|   18|
|   19|
+-----+
only showing top 20 rows



In [0]:
#First three columns will be discarded.
df2 = df1.filter('index > 2').drop("Index")

In [0]:
df2.show()

+-------+--------------------+--------------------+-------------------+-------------------+-----------------+--------------------+-------------------+-----------------+--------+
|    pid|          regionname|         countryname|  boardapprovaldate|        closingdate|curr_project_cost|curr_ibrd_commitment|curr_ida_commitment|curr_total_commit|grantamt|
+-------+--------------------+--------------------+-------------------+-------------------+-----------------+--------------------+-------------------+-----------------+--------+
|P172078|         Africa East|  Republic of Uganda|2022-02-20 00:00:00|2021-08-31 00:00:00|           440000|                   0|                  0|                0|  440000|
|P175694|Middle East and N...|    Republic of Iraq|2022-02-16 00:00:00|               null|          5000000|                   0|                  0|                0| 5000000|
|P171997|         Africa West| Republic of Liberia|2022-02-09 00:00:00|2027-04-30 00:00:00|         40000000| 

In [0]:
df2.printSchema()

root
 |-- pid: string (nullable = true)
 |-- regionname: string (nullable = true)
 |-- countryname: string (nullable = true)
 |-- boardapprovaldate: timestamp (nullable = true)
 |-- closingdate: timestamp (nullable = true)
 |-- curr_project_cost: integer (nullable = true)
 |-- curr_ibrd_commitment: integer (nullable = true)
 |-- curr_ida_commitment: integer (nullable = true)
 |-- curr_total_commit: integer (nullable = true)
 |-- grantamt: integer (nullable = true)



In [0]:
# Remove empty rows

In [0]:
# Remove duplicate data

In [0]:
#checking for duplicates
df2.count()

Out[15]: 21427

In [0]:
df2.distinct().count()

Out[16]: 21427

In [0]:
# Remove null country name fields

In [0]:
#from pyspark.sql.functions import when
#from pyspark.sql.functions import expr,split

In [0]:
#check for null country name rows
df2.filter("countryname IS NULL").show()

+-------+----------+-----------+-----------------+-----------+-----------------+--------------------+-------------------+-----------------+--------+
|    pid|regionname|countryname|boardapprovaldate|closingdate|curr_project_cost|curr_ibrd_commitment|curr_ida_commitment|curr_total_commit|grantamt|
+-------+----------+-----------+-----------------+-----------+-----------------+--------------------+-------------------+-----------------+--------+
|P175659|      null|       null|             null|       null|         26000000|                   0|           20000000|         20000000|       0|
|P070918|      null|       null|             null|       null|                0|                   0|                  0|                0|       0|
+-------+----------+-----------+-----------------+-----------+-----------------+--------------------+-------------------+-----------------+--------+



In [0]:
#filter out null countryname rows
df3 = df2.filter("countryname IS NOT NULL")

In [0]:
# Formatt the dates

In [0]:
df3.printSchema()


root
 |-- pid: string (nullable = true)
 |-- regionname: string (nullable = true)
 |-- countryname: string (nullable = true)
 |-- boardapprovaldate: timestamp (nullable = true)
 |-- closingdate: timestamp (nullable = true)
 |-- curr_project_cost: integer (nullable = true)
 |-- curr_ibrd_commitment: integer (nullable = true)
 |-- curr_ida_commitment: integer (nullable = true)
 |-- curr_total_commit: integer (nullable = true)
 |-- grantamt: integer (nullable = true)



In [0]:
#countryname has ',' separated values eg:Taiwan.Therefore we need to split() the values and clean the data  WARNING: cannot split as 2 taiwans present Taiwan and Taiwan, China
#also the boardapproval date and closing date has date with time, from which the time needs to be removed
df4 = df3.\
        withColumn("boardapprovaldate", split("boardapprovaldate", " ")[0]).\
        withColumn("closingdate", split("closingdate", " ")[0])
       
        
#select("pid","regionname","countryname","boardapprovaldate","closingdate","curr_project_cost","curr_ibrd_commitment","curr_ida_commitment","curr_total_commit","grantamt").


In [0]:
df4.show()

+-------+--------------------+--------------------+-----------------+-----------+-----------------+--------------------+-------------------+-----------------+--------+
|    pid|          regionname|         countryname|boardapprovaldate|closingdate|curr_project_cost|curr_ibrd_commitment|curr_ida_commitment|curr_total_commit|grantamt|
+-------+--------------------+--------------------+-----------------+-----------+-----------------+--------------------+-------------------+-----------------+--------+
|P172078|         Africa East|  Republic of Uganda|       2022-02-20| 2021-08-31|           440000|                   0|                  0|                0|  440000|
|P175694|Middle East and N...|    Republic of Iraq|       2022-02-16|       null|          5000000|                   0|                  0|                0| 5000000|
|P171997|         Africa West| Republic of Liberia|       2022-02-09| 2027-04-30|         40000000|                   0|           40000000|         40000000|  

In [0]:
#Installing pycountry to match the country names to their aplha_3 ISO-3 codes and keeping them in a column
!pip install pycountry

Collecting pycountry
  Using cached pycountry-22.3.5-py2.py3-none-any.whl
Installing collected packages: pycountry
Successfully installed pycountry-22.3.5
You should consider upgrading via the '/databricks/python3/bin/python -m pip install --upgrade pip' command.[0m


In [0]:
from pycountry import countries

In [0]:
countries.get(name='Spain')
countries.lookup('taiwan').alpha_3

Out[27]: 'TWN'

In [0]:
df4.select("countryname").distinct().show(250,truncate = False)

+----------------------------------------+
|countryname                             |
+----------------------------------------+
|South Asia                              |
|Republic of Indonesia                   |
|EU Accession Countries                  |
|Republic of Turkey                      |
|Republic of Bulgaria                    |
|World                                   |
|Islamic Republic of Iran                |
|Middle East and North Africa            |
|Republic of South Africa                |
|Western Balkans                         |
|Republic of Namibia                     |
|Taiwan, China                           |
|Republic of Korea                       |
|Republic of the Marshall Islands        |
|Organization of Eastern Caribbean States|
|State of Israel                         |
|Latin America                           |
|Republic of Liberia                     |
|Malaysia                                |
|Republic of Chile                       |
|Kingdom of

In [0]:
from collections import defaultdict
country_not_found = [] # stores countries not found in the pycountry library
project_country_abbrev_dict = defaultdict(str) # set up an empty dictionary of string values

# iterate through the country names in df_projects. 
# Create a dictionary mapping the country name to the alpha_3 ISO code
for country in df3.select("countryname").distinct().collect():
    try: 
        # look up the country name in the pycountry library
        # store the country name as the dictionary key and the ISO-3 code as the value
        project_country_abbrev_dict[country[0]] = countries.lookup(country[0]).alpha_3
        #print(country["countryname"], 'FOUND')
    except:
        # If the country name is not in the pycountry library, then print out the country name
        # And store the results in the country_not_found list
        #print(country["countryname"], ' not found')
        country_not_found.append(country)

In [0]:
print(project_country_abbrev_dict)


defaultdict(<class 'str'>, {'Republic of Indonesia': 'IDN', 'Republic of Turkey': 'TUR', 'Republic of Bulgaria': 'BGR', 'Islamic Republic of Iran': 'IRN', 'Republic of South Africa': 'ZAF', 'Republic of Namibia': 'NAM', 'Republic of the Marshall Islands': 'MHL', 'State of Israel': 'ISR', 'Republic of Liberia': 'LBR', 'Malaysia': 'MYS', 'Republic of Chile': 'CHL', 'Kingdom of the Netherlands': 'NLD', 'Commonwealth of The Bahamas': 'BHS', 'Republic of Cabo Verde': 'CPV', 'Republic of Burundi': 'BDI', 'Republic of Sierra Leone': 'SLE', 'Gabonese Republic': 'GAB', 'Republic of Benin': 'BEN', 'Commonwealth of Dominica': 'DMA', 'Grand Duchy of Luxembourg': 'LUX', 'Kingdom of Eswatini': 'SWZ', 'Republic of Peru': 'PER', 'Republic of Azerbaijan': 'AZE', "People's Democratic Republic of Algeria": 'DZA', 'Hashemite Kingdom of Jordan': 'JOR', 'Republic of Haiti': 'HTI', 'Republic of Costa Rica': 'CRI', 'Republic of the Philippines': 'PHL', 'Republic of Malta': 'MLT', 'Kingdom of Norway': 'NOR', '

In [0]:
#The missing countries in the pycountries need to be manually mapped 
country_not_found_mapping = {'Co-operative Republic of Guyana': 'GUY',
             'Commonwealth of Australia':'AUS',
             'Democratic Republic of Sao Tome and Prin':'STP',
             'Democratic Republic of the Congo':'COD',
             'Democratic Socialist Republic of Sri Lan':'LKA',
             'East Asia and Pacific':'EAS',
             'Europe and Central Asia': 'ECS',
             'Islamic  Republic of Afghanistan':'AFG',
             'Latin America':'LCN',
              'Caribbean':'CBN', #changed previously LCN
             'Macedonia':'MKD',
             'Middle East and North Africa':'MEA',
             'Oriental Republic of Uruguay':'URY',
             'Republic of Congo':'COG',
             "Republic of Cote d'Ivoire":'CIV',
             'Republic of Korea':'KOR',
             'Republic of Niger':'NER',
             'Republic of Kosovo':'XKX',
             'Republic of Rwanda':'RWA',
              'Republic of The Gambia':'GMB',
              'Republic of Togo':'TGO',
              'Republic of the Union of Myanmar':'MMR',
              'Republica Bolivariana de Venezuela':'VEN',
              'Sint Maarten':'SXM',
              "Socialist People's Libyan Arab Jamahiriy":'LBY',
              'Socialist Republic of Vietnam':'VNM',
              'Somali Democratic Republic':'SOM',
              'South Asia':'SAS',
              'St. Kitts and Nevis':'KNA',
              'St. Lucia':'LCA',
              'St. Vincent and the Grenadines':'VCT',
              'State of Eritrea':'ERI',
              'The Independent State of Papua New Guine':'PNG',
              'West Bank and Gaza':'PSE',
              'World':'WLD',
              'EU Accession Countries':'EAC', #from here to below added by me
              'Western Balkans':'WBL',                         
              'Taiwan, China':'TWC', #originally Taiwan, China                            
              'Organization of Eastern Caribbean States':'ECB',
              'Socialist Federal Republic of Yugoslavia':'YSL',
              'Africa':'AFR',                                  
              'Macedonia, Republic of North':'MCD',            
              'Multi-Regional':'MRG',                          
              'Andean Countries':'AND',                        
              'Central Asia':'CAS',                            
              'Pacific Islands':'PIS',                         
              'Central America':'CAM',                         
              'Aral Sea':'ASE',                                
              'Western Africa':'WEA',                          
              'Eastern Africa':'EAA',                          
              'Southern Africa':'SOA',                         
              'Mercosur':'MER',                                
              'Caucasus':'CAU',                                
              'Asia':'ASA',                                    
              'Red Sea and Gulf of Aden':'RAA',                
              'Central Africa':'CEA',                          
              'Mekong':'MEK'             }

In [0]:
# Update the country and code dictionary with the missing values
project_country_abbrev_dict.update(country_not_found_mapping)


In [0]:
#creating the schema for the dataframe to be created
#ddf = spark.createDataFrame(data_dict, schema)
scm = StructType([\
                StructField("country", StringType(), True),\
                StructField("country_code", StringType(), True)\
                 ])
 

In [0]:
ddf = spark.createDataFrame(project_country_abbrev_dict.items(), scm)

In [0]:
ddf2 = ddf.select("country_code").filter(ddf.country_code == 'TCW')

In [0]:
ddf2.show()

+------------+
|country_code|
+------------+
+------------+



In [0]:
#checking if there are any duplicate country_codes
ddf.select("country_code").distinct().count() 

Out[37]: 207

In [0]:
ddf.select("country_code").count()

Out[38]: 207

In [0]:
#finding the repeated value
ddf1 = ddf.groupby("country_code").count().filter("count>1").show()

+------------+-----+
|country_code|count|
+------------+-----+
+------------+-----+



In [0]:
#joining the dataframes on the countryname and country column to add the countrycode 
dfc = df4.join(ddf, df4.countryname == ddf.country,"leftouter").drop("country")#.withColumn("code", ddf.value)

In [0]:
dfc.distinct().count()

Out[41]: 21425

In [0]:
dfc.count()

Out[42]: 21425

In [0]:
dfc.show()


+-------+--------------------+--------------------+-----------------+-----------+-----------------+--------------------+-------------------+-----------------+--------+------------+
|    pid|          regionname|         countryname|boardapprovaldate|closingdate|curr_project_cost|curr_ibrd_commitment|curr_ida_commitment|curr_total_commit|grantamt|country_code|
+-------+--------------------+--------------------+-----------------+-----------+-----------------+--------------------+-------------------+-----------------+--------+------------+
|P172078|         Africa East|  Republic of Uganda|       2022-02-20| 2021-08-31|           440000|                   0|                  0|                0|  440000|         UGA|
|P175694|Middle East and N...|    Republic of Iraq|       2022-02-16|       null|          5000000|                   0|                  0|                0| 5000000|         IRQ|
|P171997|         Africa West| Republic of Liberia|       2022-02-09| 2027-04-30|         40000

In [0]:
#checking if any countrycode has null value
dfc.select("countryname").filter("country_code IS NULL").distinct().show(300, truncate = False)

+-----------+
|countryname|
+-----------+
+-----------+



In [0]:
#read the country_metadata 
#drop columns "SpecialNotes" & "_c5"
#join wb_projects and country_metadata on country_code
#add the IncomeGroup column
#if any country remains with null value in Income group then join by region to fill the nulls

In [0]:
DF_country_meta = spark.read.csv("/FileStore/tables/country_metadata.csv", inferSchema = True, header = True)

In [0]:
DF_meta = DF_country_meta.drop("SpecialNotes", "_c5")

In [0]:
DF_joined = dfc.join(DF_meta, dfc["country_code"] == DF_meta["Country Code"], "leftouter").drop("Country Code", "Region", "TableName")

In [0]:
DF_joined.show()

+-------+--------------------+--------------------+-----------------+-----------+-----------------+--------------------+-------------------+-----------------+--------+------------+-------------------+
|    pid|          regionname|         countryname|boardapprovaldate|closingdate|curr_project_cost|curr_ibrd_commitment|curr_ida_commitment|curr_total_commit|grantamt|country_code|        IncomeGroup|
+-------+--------------------+--------------------+-----------------+-----------+-----------------+--------------------+-------------------+-----------------+--------+------------+-------------------+
|P172078|         Africa East|  Republic of Uganda|       2022-02-20| 2021-08-31|           440000|                   0|                  0|                0|  440000|         UGA|         Low income|
|P175694|Middle East and N...|    Republic of Iraq|       2022-02-16|       null|          5000000|                   0|                  0|                0| 5000000|         IRQ|Upper middle inc

In [0]:
DF_joined.filter("IncomeGroup IS NULL").select(DF_joined.countryname).distinct().show(250, truncate = False)

+----------------------------------------+
|countryname                             |
+----------------------------------------+
|South Asia                              |
|EU Accession Countries                  |
|World                                   |
|Middle East and North Africa            |
|Western Balkans                         |
|Taiwan, China                           |
|Organization of Eastern Caribbean States|
|Latin America                           |
|Socialist Federal Republic of Yugoslavia|
|Republica Bolivariana de Venezuela      |
|Africa                                  |
|Macedonia, Republic of North            |
|Multi-Regional                          |
|Central Asia                            |
|Pacific Islands                         |
|Central America                         |
|East Asia and Pacific                   |
|Aral Sea                                |
|Europe and Central Asia                 |
|Caribbean                               |
|Western Af

In [0]:
DF_joined.select(DF_joined.pid).count()

Out[65]: 21425

In [0]:
DF = DF_joined.filter("Incomegroup IS NOT NULL").groupby("IncomeGroup").sum("grantamt").sort("sum(grantamt)")

In [None]:
x = DF.toPandas()["IncomeGroup"].values.tolist()
y = DF.toPandas()["sum(grantamt)"].values.tolist()

In [None]:
plt.axis("equal")
plt.pie(y,labels=x, autopct='%.1f%%')
plt.show()