Data Exploration

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import DecimalType
import pandas as pd

In [2]:
#Helper for pretty formatting for Spark DataFrames
def showDF(df, limitRows = 20, truncate = True):
    if(truncate):
        pd.set_option('display.max_colwidth',50)
        pd.reset_option('display.max_colwidth')      
    else:
        pd.set_option('display.max_colwidth',None)
    pd.set_option('display.max_rows', limitRows)
    display(df.limit(limitRows).toPandas())
    pd.reset_option('display.max_rows')

In [None]:
#Creating a spark session
spark = SparkSession.builder.appName("WDI Data Exploration").getOrCreate()

In [4]:
#Read the file "WDICountry.csv" into a Spark Data Frame
df_country = spark.read.format("csv") \
            .option("header","true") \
            .option("inferSchema","true") \
            .load("WDICountry.csv")

In [5]:
#Inspect the schema
df_country.printSchema()

root
 |-- Country Code: string (nullable = true)
 |-- Short Name: string (nullable = true)
 |-- Table Name: string (nullable = true)
 |-- Long Name: string (nullable = true)
 |-- 2-alpha code: string (nullable = true)
 |-- Currency Unit: string (nullable = true)
 |-- Special Notes: string (nullable = true)
 |-- Region: string (nullable = true)
 |-- Income Group: string (nullable = true)
 |-- WB-2 code: string (nullable = true)
 |-- National accounts base year: string (nullable = true)
 |-- National accounts reference year: string (nullable = true)
 |-- SNA price valuation: string (nullable = true)
 |-- Lending category: string (nullable = true)
 |-- Other groups: string (nullable = true)
 |-- System of National Accounts: string (nullable = true)
 |-- Alternative conversion factor: string (nullable = true)
 |-- PPP survey year: string (nullable = true)
 |-- Balance of Payments Manual in use: string (nullable = true)
 |-- External debt Reporting status: string (nullable = true)
 |-- Syst

In [6]:
# Use the helper function to display the DataFrame with all columns and content fully visible
showDF(df_country, truncate = False)

24/07/27 23:09:29 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


Unnamed: 0,Country Code,Short Name,Table Name,Long Name,2-alpha code,Currency Unit,Special Notes,Region,Income Group,WB-2 code,...,System of trade,Government Accounting concept,IMF data dissemination standard,Latest population census,Latest household survey,Source of most recent Income and expenditure data,Vital registration complete,Latest agricultural census,Latest industrial data,Latest trade data
0,ABW,Aruba,Aruba,Aruba,AW,Aruban florin,,Latin America & Caribbean,High income,AW,...,General trade system,,Enhanced General Data Dissemination System (e-GDDS),2020 (expected),,,Yes,,,2018.0
1,AFE,Africa Eastern and Southern,Africa Eastern and Southern,Africa Eastern and Southern,ZH,,"26 countries, stretching from the Red Sea in the North to the Cape of Good Hope in the South (https://www.worldbank.org/en/region/afr/eastern-and-southern-africa)",,,ZH,...,,,,,,,,,,
2,AFG,Afghanistan,Afghanistan,Islamic State of Afghanistan,AF,Afghan afghani,"The reporting period for national accounts data is designated as either calendar year basis (CY) or fiscal year basis (FY). For this country, it is fiscal year-based (fiscal year-end: March 20). Also, an estimate (PA.NUS.ATLS) of the exchange rate covers the same period and thus differs from the official exchange rate (CY).",,,,...,,,,,,,,,,
3,In addition,the World Bank systematically assesses the appropriateness of official exchange rates as conversion factors. In this country,multiple or dual exchange rate activity exists and must be accounted for appropriately in underlying statistics. An alternative estimate (“alternative conversion factor” - PA.NUS.ATLS) is thus calculated as a weighted average of the different exchange rates in use in the country. Doing so better reflects economic reality and leads to more accurate cross-country comparisons and country classifications by income level. For this country,"this applies to the period 1960-2006. Alternative conversion factors are used in the Atlas methodology and elsewhere in World Development Indicators as single-year conversion factors.""",South Asia,Low income,AF,2016,,Value added at basic prices (VAB),...,1979,"Demographic and Health Survey, 2015","Integrated household survey (IHS), 2016/17",,,,2018,,,
4,AFW,Africa Western and Central,Africa Western and Central,Africa Western and Central,ZI,,"22 countries, stretching from the westernmost point of Africa, across the equator, and partly along the Atlantic Ocean till the Republic of Congo in the South (https://www.worldbank.org/en/region/afr/western-and-central-africa)",,,ZI,...,,,,,,,,,,
5,AGO,Angola,Angola,People's Republic of Angola,AO,Angolan kwanza,"The World Bank systematically assesses the appropriateness of official exchange rates as conversion factors. In this country, multiple or dual exchange rate activity exists and must be accounted for appropriately in underlying statistics. An alternative estimate (“alternative conversion factor” - PA.NUS.ATLS) is thus calculated as a weighted average of the different exchange rates in use in the country. Doing so better reflects economic reality and leads to more accurate cross-country comparisons and country classifications by income level. For this country, this applies to the period 1994-2021. Alternative conversion factors are used in the Atlas methodology and elsewhere in World Development Indicators as single-year conversion factors.",Sub-Saharan Africa,Lower middle income,AO,...,General trade system,Budgetary central government,Enhanced General Data Dissemination System (e-GDDS),2014,"Demographic and Health Survey, 2015/16","Integrated household survey (IHS), 2008/09",,,,2018.0
6,ALB,Albania,Albania,Republic of Albania,AL,Albanian lek,,Europe & Central Asia,Upper middle income,AL,...,Special trade system,Consolidated central government,Enhanced General Data Dissemination System (e-GDDS),2020 (expected),"Demographic and Health Survey, 2017/18","Living Standards Measurement Study Survey (LSMS), 2012",Yes,2012,2013.0,2018.0
7,AND,Andorra,Andorra,Principality of Andorra,AD,Euro,,Europe & Central Asia,High income,AD,...,General trade system,,,2011. Population figures compiled from administrative registers.,,,Yes,,,2018.0
8,ARB,Arab World,Arab World,Arab World,1A,,Arab World aggregate. Arab World is composed of members of the League of Arab States.,,,1A,...,,,,,,,,,,
9,ARE,United Arab Emirates,United Arab Emirates,United Arab Emirates,AE,U.A.E. dirham,,Middle East & North Africa,High income,AE,...,Special trade system,Consolidated central government,Enhanced General Data Dissemination System (e-GDDS),2020 (expected),"World Health Survey, 2003",,Yes,2012,1985.0,2018.0


In [7]:
# Get some basic stats
df_country.count()

278

Examining Dimensions

How many different regions do the various countries belong to ?

In [8]:
showDF(df_country.select('Region').distinct(), truncate = False)

Unnamed: 0,Region
0,South Asia
1,2016
2,2016/2017
3,2016/17
4,Sub-Saharan Africa
5,Europe & Central Asia
6,North America
7,East Asia & Pacific
8,Middle East & North Africa
9,Latin America & Caribbean


How many different income groups do we have across countries?

In [9]:
showDF(df_country.select('Income Group').distinct(), truncate = False)

Unnamed: 0,Income Group
0,Lower middle income
1,Value added at producer prices (VAP)
2,Value added at basic prices (VAB)
3,High income
4,Upper middle income
5,Low income
6,


In [10]:
#Read the file "WDISeries.csv" into a Spark Data Frame
df_series = spark.read.format("csv") \
        .option("header", "true") \
        .option("inferSchema", "true") \
        .load("WDISeries.csv")

In [11]:
# Inspect the schema
df_series.printSchema()

root
 |-- Series Code: string (nullable = true)
 |-- Topic: string (nullable = true)
 |-- Indicator Name: string (nullable = true)
 |-- Short definition: string (nullable = true)
 |-- Long definition: string (nullable = true)
 |-- Unit of measure: string (nullable = true)
 |-- Periodicity: string (nullable = true)
 |-- Base Period: string (nullable = true)
 |-- Other notes: string (nullable = true)
 |-- Aggregation method: string (nullable = true)
 |-- Limitations and exceptions: string (nullable = true)
 |-- Notes from original source: string (nullable = true)
 |-- General comments: string (nullable = true)
 |-- Source: string (nullable = true)
 |-- Statistical concept and methodology: string (nullable = true)
 |-- Development relevance: string (nullable = true)
 |-- Related source links: string (nullable = true)
 |-- Other web links: string (nullable = true)
 |-- Related indicators: string (nullable = true)
 |-- License Type: string (nullable = true)



In [12]:
# Use the helper function to display the DataFrame with all columns
showDF(df_series)

Unnamed: 0,Series Code,Topic,Indicator Name,Short definition,Long definition,Unit of measure,Periodicity,Base Period,Other notes,Aggregation method,Limitations and exceptions,Notes from original source,General comments,Source,Statistical concept and methodology,Development relevance,Related source links,Other web links,Related indicators,License Type
0,AG.AGR.TRAC.NO,Environment: Agricultural production,"Agricultural machinery, tractors",,Agricultural machinery refers to the number of...,,Annual,,,Sum,The data are collected by the Food and Agricul...,,,"Food and Agriculture Organization, electronic ...","""A tractor provides the power and traction to ...",for plowing,tilling,disking,harrowing,planting
1,A substantial contribution to agriculture in t...,steel plows,mowers,mechanical reapers,seed drills,and threshers contributed to the development ...,tractors enabled the farmer to sow and harves...,powered machinery such as tractors,has replaced many jobs formerly carried out b...,horses and mules. FAO estimates that most far...,seeds or agrochemicals.,,,,,,,,,
2,Agriculture is still a major sector in many ec...,and agricultural activities provide developin...,,,,,,,,,,,,,,,,,,
3,There is no single correct mix of inputs to th...,as it is dependent on local climate,land quality,and economic development; appropriate levels ...,the climate and soils,"and the production process used.""",,,,CC BY-4.0,,,,,,,,,,
4,AG.CON.FERT.PT.ZS,Environment: Agricultural production,Fertilizer consumption (% of fertilizer produc...,,Fertilizer consumption measures the quantity o...,,Annual,,,Weighted average,The FAO has revised the time series for fertil...,,,,,,,,,
5,The data are collected by the Food and Agricul...,but complete consistency across countries and...,national publications and related country dat...,,,"Food and Agriculture Organization, electronic ...",Fertilizer consumption measures the quantity o...,,,,,,,,,,,,,
6,Most fertilizers that are commonly used in agr...,phosphorus,and potassium. Some fertilizers also contain ...,""""" such as zinc and other metals that are nece...",composts and other organic matter,and wastes,such as sewage sludge and certain industrial ...,,,,,,,,,,,,,
7,FAO defines arable land as land under temporar...,temporary meadows for mowing or for pasture,land under market or kitchen gardens,and land temporarily fallow; land abandoned a...,"Factors such as the green revolution, has led ...",,,,,,,,,,,,,,,
8,Agriculture is still a major sector in many ec...,and agricultural activities provide developin...,pesticides,and intensive irrigation have environmental c...,inappropriate use of inputs for agricultural ...,,,,,,,,,,,,,,,
9,In many developed countries,excessive nitrogen fertilizer applications ha...,longevity and overall fitness of certain agri...,such as aphids. Further,excessive use of fertilizers emits significan...,"as """"fertilizer burn"""" can occur when too muc...",resulting in drying out of the leaves and dam...,overuse of fertilizers has resulted in contam...,,,,,,,,,,,,


In [13]:
# Get some basic stats
df_series.count()

4284

In [14]:
#Read the file "WDIData.csv" into a Spark Data Frame
df_indicators = spark.read.format("csv") \
            .option("header", "true") \
            .option("inferSchema", "true") \
            .load("WDIData.csv")

                                                                                

In [15]:
# Inspect the schema
df_indicators.printSchema()

root
 |-- Country Name: string (nullable = true)
 |-- Country Code: string (nullable = true)
 |-- Indicator Name: string (nullable = true)
 |-- Indicator Code: string (nullable = true)
 |-- 1960: double (nullable = true)
 |-- 1961: double (nullable = true)
 |-- 1962: double (nullable = true)
 |-- 1963: double (nullable = true)
 |-- 1964: double (nullable = true)
 |-- 1965: double (nullable = true)
 |-- 1966: double (nullable = true)
 |-- 1967: double (nullable = true)
 |-- 1968: double (nullable = true)
 |-- 1969: double (nullable = true)
 |-- 1970: double (nullable = true)
 |-- 1971: double (nullable = true)
 |-- 1972: double (nullable = true)
 |-- 1973: double (nullable = true)
 |-- 1974: double (nullable = true)
 |-- 1975: double (nullable = true)
 |-- 1976: double (nullable = true)
 |-- 1977: double (nullable = true)
 |-- 1978: double (nullable = true)
 |-- 1979: double (nullable = true)
 |-- 1980: double (nullable = true)
 |-- 1981: double (nullable = true)
 |-- 1982: double (null

In [16]:
# Use the helper function to display the DataFrame with all columns
showDF(df_indicators)

Unnamed: 0,Country Name,Country Code,Indicator Name,Indicator Code,1960,1961,1962,1963,1964,1965,...,2012,2013,2014,2015,2016,2017,2018,2019,2020,2021
0,Africa Eastern and Southern,AFE,Access to clean fuels and technologies for coo...,EG.CFT.ACCS.ZS,,,,,,,...,16.559819,16.936004,17.337896,17.687093,18.140971,18.491344,18.82552,19.272212,19.628009,
1,Africa Eastern and Southern,AFE,Access to clean fuels and technologies for coo...,EG.CFT.ACCS.RU.ZS,,,,,,,...,6.281667,6.499471,6.680066,6.85911,7.016238,7.180364,7.322294,7.517191,7.651598,
2,Africa Eastern and Southern,AFE,Access to clean fuels and technologies for coo...,EG.CFT.ACCS.UR.ZS,,,,,,,...,37.601816,37.855399,38.046781,38.326255,38.468426,38.670044,38.722783,38.927016,39.042839,
3,Africa Eastern and Southern,AFE,Access to electricity (% of population),EG.ELC.ACCS.ZS,,,,,,,...,31.844384,31.79416,32.001027,33.87191,38.880173,40.261358,43.061877,44.27086,45.803485,
4,Africa Eastern and Southern,AFE,"Access to electricity, rural (% of rural popul...",EG.ELC.ACCS.RU.ZS,,,,,,,...,19.402592,18.663502,17.633986,16.464681,24.531436,25.345111,27.449908,29.64176,30.404935,
5,Africa Eastern and Southern,AFE,"Access to electricity, urban (% of urban popul...",EG.ELC.ACCS.UR.ZS,,,,,,,...,66.945277,67.112206,66.283426,67.080235,69.132292,70.928567,71.866136,73.332842,73.942949,
6,Africa Eastern and Southern,AFE,Account ownership at a financial institution o...,FX.OWN.TOTL.ZS,,,,,,,...,,,,,,,,,,
7,Africa Eastern and Southern,AFE,Account ownership at a financial institution o...,FX.OWN.TOTL.FE.ZS,,,,,,,...,,,,,,,,,,
8,Africa Eastern and Southern,AFE,Account ownership at a financial institution o...,FX.OWN.TOTL.MA.ZS,,,,,,,...,,,,,,,,,,
9,Africa Eastern and Southern,AFE,Account ownership at a financial institution o...,FX.OWN.TOTL.OL.ZS,,,,,,,...,,,,,,,,,,


In [17]:
# Get some basic stats
df_indicators.count()

383572


Data Transformation, Modeling and Quality

In [18]:
# Select and rename columns needed for our data model for data ingestion.
countryDim = df_country \
             .select("2-alpha code", "Country Code", "Short Name", "Long Name", "Region", "Income Group") \
             .withColumnRenamed("2-alpha code", "country_iso_code") \
             .withColumnRenamed("Country Code", "wb_country_code") \
             .withColumnRenamed("Short Name", "name") \
             .withColumnRenamed("Long Name", "long_name") \
             .withColumnRenamed("Region", "region") \
             .withColumnRenamed("Income Group", "income_group")
showDF(countryDim)

Unnamed: 0,country_iso_code,wb_country_code,name,long_name,region,income_group
0,AW,ABW,Aruba,Aruba,Latin America & Caribbean,High income
1,ZH,AFE,Africa Eastern and Southern,Africa Eastern and Southern,,
2,AF,AFG,Afghanistan,Islamic State of Afghanistan,,
3,South Asia,In addition,the World Bank systematically assesses the ap...,this applies to the period 1960-2006. Alterna...,2016,
4,ZI,AFW,Africa Western and Central,Africa Western and Central,,
5,AO,AGO,Angola,People's Republic of Angola,Sub-Saharan Africa,Lower middle income
6,AL,ALB,Albania,Republic of Albania,Europe & Central Asia,Upper middle income
7,AD,AND,Andorra,Principality of Andorra,Europe & Central Asia,High income
8,1A,ARB,Arab World,Arab World,,
9,AE,ARE,United Arab Emirates,United Arab Emirates,Middle East & North Africa,High income


In [19]:
# Filter the DataFrame to include only rows where the country_iso_code is not null
countryDimNotNull = countryDim.filter("country_iso_code is not null")

Data Quality Check

In [20]:
# Do all countries have 2 character country_iso_codes ?
showDF( \
    countryDimNotNull \
       .select(F.col("country_iso_code"), \
               F.length(F.col("country_iso_code")).alias("column_length")) \
       .groupBy("column_length") \
       .agg(F.count("*").alias("cnt")) \
       .filter("cnt > 1") \
)

Unnamed: 0,column_length,cnt
0,19,4
1,2,265


In [21]:
# Filter the DataFrame to keep only rows where country_iso_code length is 2
fixIsoCountryCode = countryDimNotNull.filter("length(country_iso_code) = 2")
countAfterFiltering = fixIsoCountryCode.count()
print(countAfterFiltering)

265


In [22]:
# Check the length distribution of wb_country_code after filtering
WbCountryCodeLength = (fixIsoCountryCode \
    .select("wb_country_code",
            F.length("wb_country_code").alias("column_length"))\
    .groupBy("column_length") \
    .agg(F.count("*").alias("cnt")) 
                      )
showDF(WbCountryCodeLength)    

Unnamed: 0,column_length,cnt
0,3,264
1,20,1


In [23]:
# Display rows where `wb_country_code` has a length of 20 to assess if these entries should be retained or removed
showDF(fixIsoCountryCode.filter("length(wb_country_code) = 20"))

Unnamed: 0,country_iso_code,wb_country_code,name,long_name,region,income_group
0,F1,For more information,please visit https://www.worldbank.org/en/top...,,,


In [24]:
# Filter the DataFrame to keep only rows where wb_country_code length is 3
fixWbCountryCode = fixIsoCountryCode.filter("length(wb_country_code) = 3")
print(fixWbCountryCode .count())
showDF(fixWbCountryCode)

264


Unnamed: 0,country_iso_code,wb_country_code,name,long_name,region,income_group
0,AW,ABW,Aruba,Aruba,Latin America & Caribbean,High income
1,ZH,AFE,Africa Eastern and Southern,Africa Eastern and Southern,,
2,AF,AFG,Afghanistan,Islamic State of Afghanistan,,
3,ZI,AFW,Africa Western and Central,Africa Western and Central,,
4,AO,AGO,Angola,People's Republic of Angola,Sub-Saharan Africa,Lower middle income
5,AL,ALB,Albania,Republic of Albania,Europe & Central Asia,Upper middle income
6,AD,AND,Andorra,Principality of Andorra,Europe & Central Asia,High income
7,1A,ARB,Arab World,Arab World,,
8,AE,ARE,United Arab Emirates,United Arab Emirates,Middle East & North Africa,High income
9,AR,ARG,Argentina,Argentine Republic,Latin America & Caribbean,Upper middle income


In [25]:
# Check duplicate records for the columns
duplicateISOCountryCode = fixWbCountryCode.groupby("country_iso_code").agg(F.count("*").alias("cnt")).filter("cnt>1")
showDF(duplicateISOCountryCode)               
duplicateWBCountryCode = fixWbCountryCode.groupby("wb_country_code").agg(F.count("*").alias("cnt")).filter("cnt>1")
showDF(duplicateWBCountryCode)                
duplicateCountryName = fixWbCountryCode.groupby("name").agg(F.count("*").alias("cnt")).filter("cnt>1")
showDF(duplicateCountryName)

Unnamed: 0,country_iso_code,cnt


Unnamed: 0,wb_country_code,cnt


Unnamed: 0,name,cnt


In [26]:
countryDimFinal = fixWbCountryCode
print(countryDimFinal.count())
showDF(countryDimFinal)

264


Unnamed: 0,country_iso_code,wb_country_code,name,long_name,region,income_group
0,AW,ABW,Aruba,Aruba,Latin America & Caribbean,High income
1,ZH,AFE,Africa Eastern and Southern,Africa Eastern and Southern,,
2,AF,AFG,Afghanistan,Islamic State of Afghanistan,,
3,ZI,AFW,Africa Western and Central,Africa Western and Central,,
4,AO,AGO,Angola,People's Republic of Angola,Sub-Saharan Africa,Lower middle income
5,AL,ALB,Albania,Republic of Albania,Europe & Central Asia,Upper middle income
6,AD,AND,Andorra,Principality of Andorra,Europe & Central Asia,High income
7,1A,ARB,Arab World,Arab World,,
8,AE,ARE,United Arab Emirates,United Arab Emirates,Middle East & North Africa,High income
9,AR,ARG,Argentina,Argentine Republic,Latin America & Caribbean,Upper middle income


Write the data to the destination

In [27]:
# write the country dimension to an output csv file
countryDimFinal \
    .coalesce(1) \
    .write.csv('output/CountryDim', mode='overwrite', header='true')

In [28]:
%%bash
cat output/CountryDim/*csv | head

country_iso_code,wb_country_code,name,long_name,region,income_group
AW,ABW,Aruba,Aruba,Latin America & Caribbean,High income
ZH,AFE,Africa Eastern and Southern,Africa Eastern and Southern,,
AF,AFG,Afghanistan,Islamic State of Afghanistan,,
ZI,AFW,Africa Western and Central,Africa Western and Central,,
AO,AGO,Angola,People's Republic of Angola,Sub-Saharan Africa,Lower middle income
AL,ALB,Albania,Republic of Albania,Europe & Central Asia,Upper middle income
AD,AND,Andorra,Principality of Andorra,Europe & Central Asia,High income
1A,ARB,Arab World,Arab World,,
AE,ARE,United Arab Emirates,United Arab Emirates,Middle East & North Africa,High income


Transform the Series Dataset

In [29]:
# Filter only for series that have Annual periodicity
# Select and rename columns needed for our data model for data ingestion.
seriesDim = df_series\
            .select("Series Code", "Indicator Name", "Periodicity", "Aggregation method") \
            .withColumnRenamed("Series Code", "indicator_code") \
            .withColumnRenamed("Indicator Name", "indicator_name") \
            .withColumnRenamed("Periodicity", "periodicity") \
            .withColumnRenamed("Aggregation method", "Aggregation_method") \
            .filter(F.col("Periodicity") == "Annual")
showDF(seriesDim)
seriesDim.count()

Unnamed: 0,indicator_code,indicator_name,periodicity,Aggregation_method
0,AG.AGR.TRAC.NO,"Agricultural machinery, tractors",Annual,Sum
1,AG.CON.FERT.PT.ZS,Fertilizer consumption (% of fertilizer produc...,Annual,Weighted average
2,AG.CON.FERT.ZS,Fertilizer consumption (kilograms per hectare ...,Annual,Weighted average
3,AG.LND.AGRI.K2,Agricultural land (sq. km),Annual,Sum
4,AG.LND.AGRI.ZS,Agricultural land (% of land area),Annual,Weighted average
5,AG.LND.ARBL.HA,Arable land (hectares),Annual,
6,AG.LND.ARBL.HA.PC,Arable land (hectares per person),Annual,Weighted average
7,AG.LND.ARBL.ZS,Arable land (% of land area),Annual,Weighted average
8,AG.LND.CREL.HA,Land under cereal production (hectares),Annual,Sum
9,AG.LND.CROP.ZS,Permanent cropland (% of land area),Annual,Weighted average


1377

In [33]:
# Filter rows where `indicator_code` is not null
seriesNotNull = seriesDim.filter("indicator_code is Not Null")
seriesNotNull.count()

1377

In [41]:
# Check for duplicate entries
duplicate_check_indicator_code = seriesNotNull.groupBy("indicator_code").agg(F.count("*").alias("cnt")).filter("cnt>1")
showDF(duplicate_check_indicator_code)

Unnamed: 0,indicator_code,cnt
0,be caused by household chemicals,2


In [43]:
# Drop duplicate entries
seriesNoDuplicates = seriesNotNull.dropDuplicates(["indicator_code"])
seriesNoDuplicates.count()

1376

Cellular and Broadband Penetration Analysis:

We aim to measure cellular and broadband penetration in comparison to population demographics for each country. Additionally, we seek insights on annual global aggregates.

In [44]:
# Our dataset has multiple types of metrics. The only ones that we care about are simple aggregates.
simpleAggIndicators = seriesNoDuplicates \
                    .filter("lower(Aggregation_method) = 'sum'") \
                    .select("indicator_code", "indicator_name") \
                    .orderBy("indicator_code")
                    
showDF(simpleAggIndicators, limitRows=500, truncate = False)

Unnamed: 0,indicator_code,indicator_name
0,AG.AGR.TRAC.NO,"Agricultural machinery, tractors"
1,AG.LND.AGRI.K2,Agricultural land (sq. km)
2,AG.LND.CREL.HA,Land under cereal production (hectares)
3,AG.LND.EL5M.RU.K2,Rural land area where elevation is below 5 meters (sq. km)
4,AG.LND.EL5M.UR.K2,Urban land area where elevation is below 5 meters (sq. km)
5,AG.LND.FRST.K2,Forest area (sq. km)
6,AG.LND.TOTL.K2,Land area (sq. km)
7,AG.LND.TOTL.RU.K2,Rural land area (sq. km)
8,AG.LND.TOTL.UR.K2,Urban land area (sq. km)
9,AG.PRD.CREL.MT,Cereal production (metric tons)


In [45]:
# Only keep the indicators that are relevant to requirements i.e. Population indicators and Cellular and Broadband penetration
targetIndicators = simpleAggIndicators.filter("lower(indicator_name) like '%population%total%'" +
                            "or lower(indicator_name) like '%cellular%' " +
                            "or lower(indicator_name) like '%broadband%' ")
showDF(targetIndicators)

Unnamed: 0,indicator_code,indicator_name
0,IT.CEL.SETS,Mobile cellular subscriptions
1,IT.NET.BBND,Fixed broadband subscriptions
2,SP.POP.0014.TO,"Population ages 0-14, total"
3,SP.POP.1564.TO,"Population ages 15-64, total"
4,SP.POP.65UP.TO,"Population ages 65 and above, total"
5,SP.POP.TOTL,"Population, total"


In [46]:
indicatorsData = df_indicators \
                .withColumnRenamed("Indicator Code", "indicator_code") \
                .withColumnRenamed("Country Code", "wb_country_code") \
                .drop("Indicator Name") \
                .drop("Country Name") 
showDF(indicatorsData)
indicatorsData.count()

Unnamed: 0,wb_country_code,indicator_code,1960,1961,1962,1963,1964,1965,1966,1967,...,2012,2013,2014,2015,2016,2017,2018,2019,2020,2021
0,AFE,EG.CFT.ACCS.ZS,,,,,,,,,...,16.559819,16.936004,17.337896,17.687093,18.140971,18.491344,18.82552,19.272212,19.628009,
1,AFE,EG.CFT.ACCS.RU.ZS,,,,,,,,,...,6.281667,6.499471,6.680066,6.85911,7.016238,7.180364,7.322294,7.517191,7.651598,
2,AFE,EG.CFT.ACCS.UR.ZS,,,,,,,,,...,37.601816,37.855399,38.046781,38.326255,38.468426,38.670044,38.722783,38.927016,39.042839,
3,AFE,EG.ELC.ACCS.ZS,,,,,,,,,...,31.844384,31.79416,32.001027,33.87191,38.880173,40.261358,43.061877,44.27086,45.803485,
4,AFE,EG.ELC.ACCS.RU.ZS,,,,,,,,,...,19.402592,18.663502,17.633986,16.464681,24.531436,25.345111,27.449908,29.64176,30.404935,
5,AFE,EG.ELC.ACCS.UR.ZS,,,,,,,,,...,66.945277,67.112206,66.283426,67.080235,69.132292,70.928567,71.866136,73.332842,73.942949,
6,AFE,FX.OWN.TOTL.ZS,,,,,,,,,...,,,,,,,,,,
7,AFE,FX.OWN.TOTL.FE.ZS,,,,,,,,,...,,,,,,,,,,
8,AFE,FX.OWN.TOTL.MA.ZS,,,,,,,,,...,,,,,,,,,,
9,AFE,FX.OWN.TOTL.OL.ZS,,,,,,,,,...,,,,,,,,,,


383572

In [47]:
#Keep only the indicators that we care about
targetIndicatorsData = indicatorsData.join(targetIndicators, indicatorsData.indicator_code == targetIndicators.indicator_code).drop(targetIndicators.indicator_code)
showDF(targetIndicatorsData)

                                                                                

Unnamed: 0,wb_country_code,indicator_code,1960,1961,1962,1963,1964,1965,1966,1967,...,2013,2014,2015,2016,2017,2018,2019,2020,2021,indicator_name
0,AFE,IT.NET.BBND,,,,,,,,,...,2572220.0,3123787.0,3086933.0,3129499.0,3187389.0,3286161.0,4119216.0,4929182.0,,Fixed broadband subscriptions
1,AFE,IT.CEL.SETS,0.0,,,,,0.0,,,...,327628287.0,361512328.0,403793199.0,396261205.0,406413955.0,386012231.0,458769966.0,486868958.0,,Mobile cellular subscriptions
2,AFE,SP.POP.0014.TO,57144288.0,58943932.0,60748348.0,62553938.0,64347711.0,66128065.0,68203382.0,70220874.0,...,241896989.0,247356893.0,252787720.0,258306434.0,263744012.0,269079146.0,274282381.0,279340111.0,284463145.0,"Population ages 0-14, total"
3,AFE,SP.POP.1564.TO,69648709.0,71064806.0,72614824.0,74300783.0,76131179.0,78107040.0,79886565.0,81867716.0,...,303709474.0,313182403.0,322989451.0,332917687.0,343225705.0,353905517.0,364955873.0,376366386.0,387857061.0,"Population ages 15-64, total"
4,AFE,SP.POP.65UP.TO,4043770.0,4151048.0,4251472.0,4347316.0,4441300.0,4534866.0,4662727.0,4787863.0,...,16995114.0,17536074.0,18094678.0,18754826.0,19423165.0,20105467.0,20808019.0,21536800.0,22344911.0,"Population ages 65 and above, total"
5,AFE,SP.POP.TOTL,130836765.0,134159786.0,137614644.0,141202036.0,144920186.0,148769974.0,152752671.0,156876454.0,...,562601578.0,578075373.0,593871847.0,609978946.0,626392880.0,643090131.0,660046272.0,677243299.0,694665117.0,"Population, total"
6,AFW,IT.NET.BBND,,,,,,,,,...,399466.0,489777.0,565867.0,971317.0,980402.0,1084105.0,1209936.0,1743931.0,,Fixed broadband subscriptions
7,AFW,IT.CEL.SETS,0.0,,,,,0.0,,,...,290449890.0,319468155.0,348206501.0,356136490.0,358816819.0,384382192.0,422941609.0,457048002.0,,Mobile cellular subscriptions
8,AFW,SP.POP.0014.TO,40179920.0,41258443.0,42322255.0,43381248.0,44435635.0,45485151.0,46815458.0,48101375.0,...,168071948.0,172375593.0,176628546.0,181074105.0,185398355.0,189619289.0,193768241.0,197861564.0,202086389.0,"Population ages 0-14, total"
9,AFW,SP.POP.1564.TO,53387204.0,54235983.0,55195109.0,56251904.0,57401041.0,58639680.0,59627220.0,60751064.0,...,201761298.0,207655193.0,213860470.0,220036067.0,226593701.0,233498281.0,240697092.0,248156675.0,255592231.0,"Population ages 15-64, total"



Model the DataFrame to have a row for each value instead of a column, allowing easier future data augmentation.

In [48]:
yearList = [x for x in targetIndicatorsData.schema.names \
             if x != 'wb_country_code' and x != 'indicator_code' and x != 'indicator_name'] 

print(yearList)

['1960', '1961', '1962', '1963', '1964', '1965', '1966', '1967', '1968', '1969', '1970', '1971', '1972', '1973', '1974', '1975', '1976', '1977', '1978', '1979', '1980', '1981', '1982', '1983', '1984', '1985', '1986', '1987', '1988', '1989', '1990', '1991', '1992', '1993', '1994', '1995', '1996', '1997', '1998', '1999', '2000', '2001', '2002', '2003', '2004', '2005', '2006', '2007', '2008', '2009', '2010', '2011', '2012', '2013', '2014', '2015', '2016', '2017', '2018', '2019', '2020', '2021']


In [50]:
exprs = [F.struct(F.lit(year).alias("year"), F.col(year).alias("indicator_value")) for year in yearList]
tempDF = targetIndicatorsData.withColumn("years", F.explode(F.array(*exprs)))
indicatorsDF = tempDF.select(
    "wb_country_code", 
    "indicator_code", 
    F.col("years.year").alias("year"), 
    F.col("years.indicator_value").alias("indicator_value")
)
showDF(indicatorsDF)

Unnamed: 0,wb_country_code,indicator_code,year,indicator_value
0,AFE,IT.NET.BBND,1960,
1,AFE,IT.NET.BBND,1961,
2,AFE,IT.NET.BBND,1962,
3,AFE,IT.NET.BBND,1963,
4,AFE,IT.NET.BBND,1964,
5,AFE,IT.NET.BBND,1965,
6,AFE,IT.NET.BBND,1966,
7,AFE,IT.NET.BBND,1967,
8,AFE,IT.NET.BBND,1968,
9,AFE,IT.NET.BBND,1969,


In [52]:
yearPivot = indicatorsDF.groupBy("year").pivot("indicator_code").sum('indicator_value')
showDF(yearPivot.orderBy("year"))

                                                                                

Unnamed: 0,year,IT.CEL.SETS,IT.NET.BBND,SP.POP.0014.TO,SP.POP.1564.TO,SP.POP.65UP.TO,SP.POP.TOTL
0,1960,0.0,,11770900000.0,17740230000.0,1419879000.0,30945380000.0
1,1961,,,11989950000.0,17897340000.0,1451559000.0,31353510000.0
2,1962,,,12284090000.0,18127130000.0,1483848000.0,31910120000.0
3,1963,,,12629820000.0,18438790000.0,1517585000.0,32601630000.0
4,1964,,,12949340000.0,18783770000.0,1549776000.0,33298710000.0
5,1965,0.0,,13237530000.0,19177670000.0,1581610000.0,34013030000.0
6,1966,,,13568440000.0,19544890000.0,1634962000.0,34764920000.0
7,1967,,,13848900000.0,19960800000.0,1688033000.0,35514760000.0
8,1968,,,14103200000.0,20420980000.0,1741251000.0,36282840000.0
9,1969,,,14359470000.0,20906780000.0,1794466000.0,37078540000.0


In [53]:
yearPivot.printSchema()

root
 |-- year: string (nullable = false)
 |-- IT.CEL.SETS: double (nullable = true)
 |-- IT.NET.BBND: double (nullable = true)
 |-- SP.POP.0014.TO: double (nullable = true)
 |-- SP.POP.1564.TO: double (nullable = true)
 |-- SP.POP.65UP.TO: double (nullable = true)
 |-- SP.POP.TOTL: double (nullable = true)



In [55]:
yearPivotDF = yearPivot.orderBy('year') \
    .withColumnRenamed('IT.CEL.SETS', 'cellular_subscriptions') \
    .withColumnRenamed('IT.NET.BBND', 'broadband_subscriptions') \
    .withColumnRenamed('SP.POP.0014.TO', 'population_age_0_to_14') \
    .withColumnRenamed('SP.POP.1564.TO', 'population_age_15_64') \
    .withColumnRenamed('SP.POP.65UP.TO', 'population_age_65_and_above') \
    .withColumnRenamed('SP.POP.TOTL', 'population')

In [56]:
# You can iterate over a dataframe that is already computed by caching it onces and using it repeatedly
yearPivotDF.cache()

#Forces the data to be cached
yearPivotDF.count()

                                                                                

62

In [57]:
yearPivotDF.filter('population_age_0_to_14 < 0').count()

0

In [58]:
yearPivotDF.filter('population_age_15_64 < 0').count()

0

In [59]:
yearPivotDF.filter('population_age_65_and_above < 0').count()

0

In [60]:
yearPivotDF.filter('population < 0').count()

0

In [61]:
yearPivotDF.filter('cellular_subscriptions < 0').count()

0

In [62]:
yearPivotDF.filter('broadband_subscriptions < 0').count()

0

In [63]:
yearPivotDF.filter('population_age_0_to_14 > population').count()

0

In [64]:
yearPivotDF.filter('population_age_15_64 > population').count()

0

In [65]:
yearPivotDF.filter('population_age_65_and_above > population').count()

0

In [66]:
yearPivotDF.filter('(population_age_0_to_14 + population_age_15_64 + population_age_65_and_above) > population').count()

0

In [67]:
#Write the yearly totals to a CSV File
yearPivotDF \
    .select('year'
            , F.col('population').cast(DecimalType(38, 2))
            , F.col('population_age_0_to_14').cast(DecimalType(38, 2))
            , F.col('population_age_15_64').cast(DecimalType(38, 2))
            , F.col('population_age_65_and_above').cast(DecimalType(38, 2))
            , F.col('broadband_subscriptions').cast(DecimalType(38, 2))
            , F.col('cellular_subscriptions').cast(DecimalType(38, 2))) \
    .coalesce(1) \
    .write.csv('output/YearlyStats', mode='overwrite', header='true')

Getting yearly regional totals

In [73]:
regionalIndicators = indicatorsDF.join(countryDimFinal
                                       , indicatorsDF.wb_country_code == countryDim.wb_country_code
                                       , "inner") \
    .select(countryDim.region
            , indicatorsDF.wb_country_code
            , indicatorsDF.year
            , indicatorsDF.indicator_code
            , indicatorsDF.indicator_value)

In [74]:
regionalPivot = regionalIndicators.groupBy('region', 'year').pivot('indicator_code').sum('indicator_value')

In [75]:
showDF(regionalPivot.orderBy('region', 'year'), limitRows=100)

                                                                                

Unnamed: 0,region,year,IT.CEL.SETS,IT.NET.BBND,SP.POP.0014.TO,SP.POP.1564.TO,SP.POP.65UP.TO,SP.POP.TOTL
0,,1960,0.0,,10952820000.0,16417730000.0,1297286000.0,28680820000.0
1,,1961,,,11150850000.0,16555780000.0,1325576000.0,29045430000.0
2,,1962,,,11424640000.0,16764580000.0,1354457000.0,29557270000.0
3,,1963,,,11750930000.0,17053430000.0,1384791000.0,30203090000.0
4,,1964,,,12052010000.0,17373730000.0,1413591000.0,30853620000.0
5,,1965,0.0,,12322900000.0,17741280000.0,1442054000.0,31520880000.0
6,,1966,,,12635470000.0,18084300000.0,1491255000.0,32226040000.0
7,,1967,,,12899140000.0,18473660000.0,1540245000.0,32928420000.0
8,,1968,,,13137910000.0,18905420000.0,1589418000.0,33648470000.0
9,,1969,,,13379320000.0,19361330000.0,1638569000.0,34395300000.0


In [76]:
#Write the regional-yearly totals to a CSV File
regionalPivot.filter('region is not null') \
    .orderBy('region','year') \
    .withColumnRenamed('IT.CEL.SETS', 'cellular_subscriptions') \
    .withColumnRenamed('IT.NET.BBND', 'broadband_subscriptions') \
    .withColumnRenamed('SP.POP.0014.TO', 'population_age_0_to_14') \
    .withColumnRenamed('SP.POP.1564.TO', 'population_age_15_64') \
    .withColumnRenamed('SP.POP.65UP.TO', 'population_age_65_and_above') \
    .withColumnRenamed('SP.POP.TOTL', 'population') \
    .select('region'
            , F.col('year')
            , F.col('population').cast(DecimalType(38, 2))
            , F.col('population_age_0_to_14').cast(DecimalType(38, 2))
            , F.col('population_age_15_64').cast(DecimalType(38, 2))
            , F.col('population_age_65_and_above').cast(DecimalType(38, 2))
            , F.col('broadband_subscriptions').cast(DecimalType(38, 2))
            , F.col('cellular_subscriptions').cast(DecimalType(38, 2))) \
    .coalesce(1) \
    .write.csv('output/RegionalStats', mode='overwrite', header='true')

                                                                                

Becky finds the regional metrics interesting, but she wants to look at these metrics at a country level for each year. Can you adapt the regional pivot that we computed earlier to get the metrics for each year and country ?

In [78]:
countryIndicators = indicatorsDF.join(countryDimFinal
                                       , indicatorsDF.wb_country_code == countryDim.wb_country_code
                                       , "inner") \
    .select(indicatorsDF.wb_country_code
            , countryDim.country_iso_code
            , countryDim.name
            , indicatorsDF.year
            , indicatorsDF.indicator_code
            , indicatorsDF.indicator_value)

showDF(countryIndicators)

Unnamed: 0,wb_country_code,country_iso_code,name,year,indicator_code,indicator_value
0,AFE,ZH,Africa Eastern and Southern,1960,IT.NET.BBND,
1,AFE,ZH,Africa Eastern and Southern,1961,IT.NET.BBND,
2,AFE,ZH,Africa Eastern and Southern,1962,IT.NET.BBND,
3,AFE,ZH,Africa Eastern and Southern,1963,IT.NET.BBND,
4,AFE,ZH,Africa Eastern and Southern,1964,IT.NET.BBND,
5,AFE,ZH,Africa Eastern and Southern,1965,IT.NET.BBND,
6,AFE,ZH,Africa Eastern and Southern,1966,IT.NET.BBND,
7,AFE,ZH,Africa Eastern and Southern,1967,IT.NET.BBND,
8,AFE,ZH,Africa Eastern and Southern,1968,IT.NET.BBND,
9,AFE,ZH,Africa Eastern and Southern,1969,IT.NET.BBND,


In [80]:
countryPivot = countryIndicators.groupBy('country_iso_code', 'name', 'year') \
    .pivot('indicator_code').sum('indicator_value')

In [83]:
showDF(countryPivot.orderBy('country_iso_code', 'name', 'year'), limitRows=100)

                                                                                

Unnamed: 0,country_iso_code,name,year,IT.CEL.SETS,IT.NET.BBND,SP.POP.0014.TO,SP.POP.1564.TO,SP.POP.65UP.TO,SP.POP.TOTL
0,1A,Arab World,1960,0.0,,39900270.0,49063220.0,3234225.0,92197720.0
1,1A,Arab World,1961,,,41339720.0,50032210.0,3352609.0,94724540.0
2,1A,Arab World,1962,,,42792880.0,51072090.0,3469470.0,97334440.0
3,1A,Arab World,1963,,,44248950.0,52200460.0,3584776.0,100034200.0
4,1A,Arab World,1964,,,45685300.0,53449220.0,3698270.0,102832800.0
5,1A,Arab World,1965,0.0,,47089850.0,54836640.0,3809935.0,105736400.0
6,1A,Arab World,1966,,,48668390.0,56150920.0,3939319.0,108758600.0
7,1A,Arab World,1967,,,50184670.0,57648450.0,4066217.0,111899300.0
8,1A,Arab World,1968,,,51657730.0,59286790.0,4191645.0,115136200.0
9,1A,Arab World,1969,,,53108290.0,61012240.0,4316656.0,118437200.0


In [86]:
#Write the country-yearly totals to a CSV File
countryPivot.filter('country_iso_code is not null') \
    .orderBy('country_iso_code','name', 'year') \
    .withColumnRenamed('IT.CEL.SETS', 'cellular_subscriptions') \
    .withColumnRenamed('IT.NET.BBND', 'broadband_subscriptions') \
    .withColumnRenamed('SP.POP.0014.TO', 'population_age_0_to_14') \
    .withColumnRenamed('SP.POP.1564.TO', 'population_age_15_64') \
    .withColumnRenamed('SP.POP.65UP.TO', 'population_age_65_and_above') \
    .withColumnRenamed('SP.POP.TOTL', 'population') \
    .select('country_iso_code'
            , 'name'
            , 'year'
            , F.col('population').cast(DecimalType(38, 2))
            , F.col('population_age_0_to_14').cast(DecimalType(38, 2))
            , F.col('population_age_15_64').cast(DecimalType(38, 2))
            , F.col('population_age_65_and_above').cast(DecimalType(38, 2))
            , F.col('broadband_subscriptions').cast(DecimalType(38, 2))
            , F.col('cellular_subscriptions').cast(DecimalType(38, 2))) \
    .coalesce(1) \
    .write.csv('output/CountryStats', mode='overwrite', header='true')

                                                                                

Business Environment Analysis:

Kat wants to identify the countries that are conducive to starting a business. She is interested in the most recent metrics for the following indicators:

Gross National Income (GNI)

Cost of business start-up procedures

Number of days required to start a business

Number of start-up procedures to register a business

GDP

GDP per capita

Business Regulatory Environment

Ease of doing business index (available only for 2017)

The data should be written to a CSV file.

In [87]:
businessIndicators = seriesDim.filter(
    (F.lower(F.col("indicator_name")).like("%start%up%procedures%") |
     F.lower(F.col("indicator_name")).like("%start%business%") |
     F.lower(F.col("indicator_name")).like("%business%regulatory%environment%") |
     F.lower(F.col("indicator_name")).like("%gdp%constant%us%") |
     F.lower(F.col("indicator_name")).like("%gdp%per%capita%constant%us%") |
     F.lower(F.col("indicator_name")).like("%gni, atlas%us%")) &
    ~F.lower(F.col("indicator_name")).contains(", female") &
    ~F.lower(F.col("indicator_name")).contains(", male")
)
showDF(businessIndicators)
businessIndicators.count()

Unnamed: 0,indicator_code,indicator_name,periodicity,Aggregation_method
0,IC.REG.COST.PC.ZS,Cost of business start-up procedures (% of GNI...,Annual,Unweighted average
1,IC.REG.DURS,Time required to start a business (days),Annual,Unweighted average
2,IC.REG.PROC,Start-up procedures to register a business (nu...,Annual,Unweighted average
3,IQ.CPA.BREG.XQ,CPIA business regulatory environment rating (1...,Annual,Unweighted average
4,NY.GDP.MKTP.KD,GDP (constant 2015 US$),Annual,Gap-filled total
5,NY.GDP.PCAP.KD,GDP per capita (constant 2015 US$),Annual,Weighted average
6,NY.GNP.ATLS.CD,"GNI, Atlas method (current US$)",Annual,Gap-filled total


7

In [88]:
businessIndicatorsData = indicatorsData.join(businessIndicators, indicatorsData.indicator_code == businessIndicators.indicator_code).drop(businessIndicators.indicator_code)
showDF(businessIndicatorsData)

Unnamed: 0,wb_country_code,indicator_code,1960,1961,1962,1963,1964,1965,1966,1967,...,2015,2016,2017,2018,2019,2020,2021,indicator_name,periodicity,Aggregation_method
0,AFE,IC.REG.COST.PC.ZS,,,,,,,,,...,55.09615,55.63077,51.07692,41.73846,34.23077,,,Cost of business start-up procedures (% of GNI...,Annual,Unweighted average
1,AFE,IQ.CPA.BREG.XQ,,,,,,,,,...,3.027778,3.027778,2.973684,3.027778,2.921053,2.921053,,CPIA business regulatory environment rating (1...,Annual,Unweighted average
2,AFE,NY.GDP.MKTP.KD,153829400000.0,154198000000.0,166504100000.0,175103000000.0,183122600000.0,192878900000.0,200424500000.0,210960400000.0,...,924252500000.0,944743400000.0,968901700000.0,992981300000.0,1013099000000.0,983851800000.0,1026194000000.0,GDP (constant 2015 US$),Annual,Gap-filled total
3,AFE,NY.GDP.PCAP.KD,1175.735,1149.361,1209.93,1240.088,1263.61,1296.491,1312.085,1344.755,...,1556.316,1548.813,1546.796,1544.078,1534.89,1452.73,1477.249,GDP per capita (constant 2015 US$),Annual,Weighted average
4,AFE,NY.GNP.ATLS.CD,,,,,,,,,...,947673100000.0,900316600000.0,914231400000.0,942124100000.0,994512800000.0,946476900000.0,1025115000000.0,"GNI, Atlas method (current US$)",Annual,Gap-filled total
5,AFE,IC.REG.PROC,,,,,,,,,...,9.307692,8.884615,8.461538,8.346154,8.307692,,,Start-up procedures to register a business (nu...,Annual,Unweighted average
6,AFE,IC.REG.DURS,,,,,,,,,...,32.03846,31.57692,29.28846,27.82692,26.82692,,,Time required to start a business (days),Annual,Unweighted average
7,AFW,IC.REG.COST.PC.ZS,,,,,,,,,...,63.6,57.43636,53.52273,48.01364,38.74545,,,Cost of business start-up procedures (% of GNI...,Annual,Unweighted average
8,AFW,IQ.CPA.BREG.XQ,,,,,,,,,...,3.15,3.1,3.075,3.025,3.025,3.05,,CPIA business regulatory environment rating (1...,Annual,Unweighted average
9,AFW,NY.GDP.MKTP.KD,104844600000.0,106782900000.0,110808900000.0,118867400000.0,125281900000.0,130355500000.0,128026000000.0,115804000000.0,...,760734500000.0,761707000000.0,779361000000.0,802369200000.0,828065400000.0,820637500000.0,852763800000.0,GDP (constant 2015 US$),Annual,Gap-filled total


In [89]:
recentIndicatorsData = businessIndicatorsData \
                        .select("wb_country_code", "indicator_code",'2019') \
                        .withColumnRenamed("2019", "indicator_value") \
                        .withColumn("indicator_value", F.col("indicator_value").cast(DecimalType(38, 2)))
showDF(recentIndicatorsData)

Unnamed: 0,wb_country_code,indicator_code,indicator_value
0,AFE,IC.REG.COST.PC.ZS,34.23
1,AFE,IQ.CPA.BREG.XQ,2.92
2,AFE,NY.GDP.MKTP.KD,1013098519257.66
3,AFE,NY.GDP.PCAP.KD,1534.89
4,AFE,NY.GNP.ATLS.CD,994512787038.08
5,AFE,IC.REG.PROC,8.31
6,AFE,IC.REG.DURS,26.83
7,AFW,IC.REG.COST.PC.ZS,38.75
8,AFW,IQ.CPA.BREG.XQ,3.03
9,AFW,NY.GDP.MKTP.KD,828065395939.52


In [97]:
businessIndicatorsCount = recentIndicatorsData.groupBy("wb_country_code").agg(F.count("*").alias("cnt")).filter("cnt>9")
showDF(businessIndicatorsCount)

                                                                                

Unnamed: 0,wb_country_code,cnt


In [98]:
countryBusinessIndicator = recentIndicatorsData.join(countryDimFinal, recentIndicatorsData.wb_country_code == countryDimFinal.wb_country_code)
showDF(countryBusinessIndicator)

Unnamed: 0,wb_country_code,indicator_code,indicator_value,country_iso_code,wb_country_code.1,name,long_name,region,income_group
0,AFE,IC.REG.COST.PC.ZS,34.23,ZH,AFE,Africa Eastern and Southern,Africa Eastern and Southern,,
1,AFE,IQ.CPA.BREG.XQ,2.92,ZH,AFE,Africa Eastern and Southern,Africa Eastern and Southern,,
2,AFE,NY.GDP.MKTP.KD,1013098519257.66,ZH,AFE,Africa Eastern and Southern,Africa Eastern and Southern,,
3,AFE,NY.GDP.PCAP.KD,1534.89,ZH,AFE,Africa Eastern and Southern,Africa Eastern and Southern,,
4,AFE,NY.GNP.ATLS.CD,994512787038.08,ZH,AFE,Africa Eastern and Southern,Africa Eastern and Southern,,
5,AFE,IC.REG.PROC,8.31,ZH,AFE,Africa Eastern and Southern,Africa Eastern and Southern,,
6,AFE,IC.REG.DURS,26.83,ZH,AFE,Africa Eastern and Southern,Africa Eastern and Southern,,
7,AFW,IC.REG.COST.PC.ZS,38.75,ZI,AFW,Africa Western and Central,Africa Western and Central,,
8,AFW,IQ.CPA.BREG.XQ,3.03,ZI,AFW,Africa Western and Central,Africa Western and Central,,
9,AFW,NY.GDP.MKTP.KD,828065395939.52,ZI,AFW,Africa Western and Central,Africa Western and Central,,


In [99]:
countryBusinessStartupPivot = countryBusinessIndicator \
                                .select("country_iso_code" \
                                        ,"name" \
                                        ,"indicator_code" \
                                        ,"indicator_value") \
                                .groupBy("country_iso_code","name").pivot("indicator_code").sum("indicator_value") \
                                .withColumnRenamed("country_iso_code", "Country ISO Code") \
                                .withColumnRenamed("name", "Country Name") \
                                .withColumnRenamed("IC.REG.COST.PC.ZS", "Startup Cost Pct of GNI") \
                                .withColumnRenamed("IC.REG.DURS", "Startup Time") \
                                .withColumnRenamed("IC.REG.PROC","Startup Procedures") \
                                .withColumnRenamed("IQ.CPA.BREG.XQ","Business Regulation") \
                                .withColumnRenamed("NY.GDP.MKTP.KD", "GDP") \
                                .withColumnRenamed("NY.GDP.PCAP.KD", "GDP per capita") \
                                .withColumnRenamed("NY.GNP.ATLS.CD","GNI") \
                                .withColumn('Startup Cost', (F.col('GNI') * F.col('Startup Cost Pct of GNI') / F.lit(100.0)).cast(DecimalType(38, 2))) \
                                .filter(F.col('GNI') > 0) \
                                .filter(F.col("Startup Cost").isNotNull())
                                
showDF(countryBusinessStartupPivot)

                                                                                

Unnamed: 0,Country ISO Code,Country Name,Startup Cost Pct of GNI,Startup Time,Startup Procedures,Business Regulation,GDP,GDP per capita,GNI,Startup Cost
0,BJ,Benin,3.5,8.5,6.0,3.5,14179807326.72,1201.56,14803390762.06,518118676.67
1,XC,Euro area,3.21,9.82,5.05,,12623114142874.7,36879.14,13798162524579.1,442921017038.99
2,LY,Libya,24.6,35.0,10.0,,60950218438.49,8993.09,69161249722.12,17013667431.64
3,KZ,Kazakhstan,0.2,5.0,4.0,,211106972039.01,11402.76,163361654737.11,326723309.47
4,JM,Jamaica,4.2,3.0,2.0,,14934128282.74,5065.37,15466765494.66,649604150.78
5,NO,Norway,0.8,4.0,4.0,,406468037614.96,76005.22,436746877155.38,3493975017.24
6,AG,Antigua and Barbuda,8.0,19.0,9.0,,1630215842.3,16786.45,1594604632.72,127568370.62
7,IR,Iran,1.1,72.5,10.0,,440183212449.35,5308.92,353025324793.28,3883278572.73
8,CG,Congo,62.2,49.5,11.0,2.0,9647394773.66,1793.03,9746848755.23,6062539925.75
9,EU,European Union,3.2,12.17,5.37,,14772269194026.8,33032.96,16101472914830.0,515247133274.56


In [100]:
#write the business startup data to an output csv file
countryBusinessStartupPivot \
    .select("Country ISO Code", "Country Name", "GDP", "GDP Per Capita", "GNI", \
            "Startup Cost", "Startup Cost Pct of GNI", "Startup Time", "Startup Procedures", \
            "Business Regulation") \
    .coalesce(1) \
    .write.csv('output/BusinessStartupData', mode='overwrite', header='true')

                                                                                