In [1]:
# Install Java 8
!apt-get install openjdk-8-jdk-headless > /dev/null  
# Download Spark 3.0.1
!wget -q https://archive.apache.org/dist/spark/spark-3.0.1/spark-3.0.1-bin-hadoop2.7.tgz 
# Extract the Spark archive
!tar xf spark-3.0.1-bin-hadoop2.7.tgz

In [2]:
# Set environment variables
import os
os.environ["JAVA_HOME"] = '/usr/lib/jvm/java-8-openjdk-amd64'
os.environ["SPARK_HOME"] = '/content/spark-3.0.1-bin-hadoop2.7'

In [3]:
# Install findspark to use pyspark
!pip install -q findspark 

In [8]:

# Mount Google Drive 
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [9]:
!cp /content/drive/MyDrive/WDIData.csv /content/

In [10]:
# Make pyspark importable
import findspark
findspark.init()

In [11]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql import Window
from pyspark.sql.types import DecimalType

In [12]:
# Create a SparkSession
spark = (SparkSession.builder
                     .master('local') 
                     .appName("WDI Analysis") 
                     .getOrCreate())

In [13]:
def show_size_shema(df):
    """Show the size and schema of a DataFrame."""
    print("Rows: {0}, columns: {1}".format(df.count(), len(df.columns)))
    df.printSchema()

In [14]:
# Create a WDI DataFrame
wdi_path = '/content/WDIData.csv'
df_wdi = spark.read.csv(wdi_path, inferSchema=True, header=True)

In [15]:
show_size_shema(df_wdi)

Rows: 383572, columns: 67
root
 |-- Country Name: string (nullable = true)
 |-- Country Code: string (nullable = true)
 |-- Indicator Name: string (nullable = true)
 |-- Indicator Code: string (nullable = true)
 |-- 1960: double (nullable = true)
 |-- 1961: double (nullable = true)
 |-- 1962: double (nullable = true)
 |-- 1963: double (nullable = true)
 |-- 1964: double (nullable = true)
 |-- 1965: double (nullable = true)
 |-- 1966: double (nullable = true)
 |-- 1967: double (nullable = true)
 |-- 1968: double (nullable = true)
 |-- 1969: double (nullable = true)
 |-- 1970: double (nullable = true)
 |-- 1971: double (nullable = true)
 |-- 1972: double (nullable = true)
 |-- 1973: double (nullable = true)
 |-- 1974: double (nullable = true)
 |-- 1975: double (nullable = true)
 |-- 1976: double (nullable = true)
 |-- 1977: double (nullable = true)
 |-- 1978: double (nullable = true)
 |-- 1979: double (nullable = true)
 |-- 1980: double (nullable = true)
 |-- 1981: double (nullable = tru

In [16]:
# Show the first 5 rows of the DataFrame in tabular form
df_wdi.select(df_wdi.columns[:7] + df_wdi.columns[-5:]).show(5)

+--------------------+------------+--------------------+-----------------+----+----+----+----------------+----------------+----------------+----+----+
|        Country Name|Country Code|      Indicator Name|   Indicator Code|1960|1961|1962|            2018|            2019|            2020|2021|_c66|
+--------------------+------------+--------------------+-----------------+----+----+----+----------------+----------------+----------------+----+----+
|Africa Eastern an...|         AFE|Access to clean f...|   EG.CFT.ACCS.ZS|null|null|null|18.6953056959887|19.1499422830713|19.5018373155115|null|null|
|Africa Eastern an...|         AFE|Access to clean f...|EG.CFT.ACCS.RU.ZS|null|null|null|7.25482807057078|7.46078295393556|7.59928856389562|null|null|
|Africa Eastern an...|         AFE|Access to clean f...|EG.CFT.ACCS.UR.ZS|null|null|null|38.4824089338151|38.6920529949576|38.7939833193673|null|null|
|Africa Eastern an...|         AFE|Access to electri...|   EG.ELC.ACCS.ZS|null|null|null|42.88

In [17]:
df_wdi.select('Country Code').distinct().count()

266

In [18]:
df_wdi.select('Indicator Code').distinct().count()

1442

In [19]:
!cp -r /content/drive/MyDrive/WDI_CSV /content/

In [20]:
# Create a DataFrame of countries
country_path = '/content/WDI_CSV/WDICountry.csv'
df_country = spark.read.csv(
    country_path, inferSchema=True, header=True, multiLine=True)
     

In [21]:
show_size_shema(df_country)

Rows: 265, columns: 31
root
 |-- Country Code: string (nullable = true)
 |-- Short Name: string (nullable = true)
 |-- Table Name: string (nullable = true)
 |-- Long Name: string (nullable = true)
 |-- 2-alpha code: string (nullable = true)
 |-- Currency Unit: string (nullable = true)
 |-- Special Notes: string (nullable = true)
 |-- Region: string (nullable = true)
 |-- Income Group: string (nullable = true)
 |-- WB-2 code: string (nullable = true)
 |-- National accounts base year: string (nullable = true)
 |-- National accounts reference year: integer (nullable = true)
 |-- SNA price valuation: string (nullable = true)
 |-- Lending category: string (nullable = true)
 |-- Other groups: string (nullable = true)
 |-- System of National Accounts: string (nullable = true)
 |-- Alternative conversion factor: string (nullable = true)
 |-- PPP survey year: string (nullable = true)
 |-- Balance of Payments Manual in use: string (nullable = true)
 |-- External debt Reporting status: string (nu

In [22]:
df_country.select('Country Code').distinct().count()

265

In [23]:
# Show the last 7 rows of the DataFrame as a list of Rows
df_country.select('Country Code', 'Short Name').tail(7)

[Row(Country Code='WLD', Short Name='World'),
 Row(Country Code='WSM', Short Name='Samoa'),
 Row(Country Code='XKX', Short Name='Kosovo'),
 Row(Country Code='YEM', Short Name='Yemen'),
 Row(Country Code='ZAF', Short Name='South Africa'),
 Row(Country Code='ZMB', Short Name='Zambia'),
 Row(Country Code='ZWE', Short Name='Zimbabwe')]

In [24]:
# Create a DataFrame of indicators (series)
series_path = '/content/WDI_CSV/WDISeries.csv'
df_series = spark.read.csv(
    series_path, inferSchema=True, header=True, multiLine=True, escape="\"")

In [25]:
show_size_shema(df_series)

Rows: 1442, columns: 21
root
 |-- Series Code: string (nullable = true)
 |-- Topic: string (nullable = true)
 |-- Indicator Name: string (nullable = true)
 |-- Short definition: string (nullable = true)
 |-- Long definition: string (nullable = true)
 |-- Unit of measure: string (nullable = true)
 |-- Periodicity: string (nullable = true)
 |-- Base Period: string (nullable = true)
 |-- Other notes: string (nullable = true)
 |-- Aggregation method: string (nullable = true)
 |-- Limitations and exceptions: string (nullable = true)
 |-- Notes from original source: string (nullable = true)
 |-- General comments: string (nullable = true)
 |-- Source: string (nullable = true)
 |-- Statistical concept and methodology: string (nullable = true)
 |-- Development relevance: string (nullable = true)
 |-- Related source links: string (nullable = true)
 |-- Other web links: string (nullable = true)
 |-- Related indicators: string (nullable = true)
 |-- License Type: string (nullable = true)
 |-- : s

In [26]:
df_series.select('Series Code').distinct().count()

1442

In [27]:
# Show the first 5 rows of the DataFrame without truncation 
df_series.select('Indicator Name', 'Topic').show(5, False)

+-------------------------------------------------------------+------------------------------------+
|Indicator Name                                               |Topic                               |
+-------------------------------------------------------------+------------------------------------+
|Agricultural machinery, tractors                             |Environment: Agricultural production|
|Fertilizer consumption (% of fertilizer production)          |Environment: Agricultural production|
|Fertilizer consumption (kilograms per hectare of arable land)|Environment: Agricultural production|
|Agricultural land (sq. km)                                   |Environment: Land use               |
|Agricultural land (% of land area)                           |Environment: Land use               |
+-------------------------------------------------------------+------------------------------------+
only showing top 5 rows



In [28]:
df_country.select('Region', 'Income Group').show(7, False)

+-------------------------+-------------------+
|Region                   |Income Group       |
+-------------------------+-------------------+
|Latin America & Caribbean|High income        |
|null                     |null               |
|South Asia               |Low income         |
|null                     |null               |
|Sub-Saharan Africa       |Lower middle income|
|Europe & Central Asia    |Upper middle income|
|Europe & Central Asia    |High income        |
+-------------------------+-------------------+
only showing top 7 rows



In [29]:
df_series.select('Topic').limit(5).collect()

[Row(Topic='Environment: Agricultural production'),
 Row(Topic='Environment: Agricultural production'),
 Row(Topic='Environment: Agricultural production'),
 Row(Topic='Environment: Land use'),
 Row(Topic='Environment: Land use')]

In [30]:
# View the number of country names contained in both of these DataFrames 
# (with the removal of duplicates)
df_wdi.select('Country Code').intersect(
    df_country.select('Country Code')).count()

265

In [31]:
# View a country code that is in the WDI DataFrame but is not in the Dataframe 
# of countries (without the removal of duplicates)
(df_wdi.select('Country Code').distinct()
    .exceptAll(df_country.select('Country Code')).show())

+------------+
|Country Code|
+------------+
|         INX|
+------------+



In [32]:
# View what values in other columns
df_wdi.where(F.col('Country Code') == 'INX').select(df_wdi.columns[:7]).show(5)

+--------------+------------+--------------------+-----------------+----+----+----+
|  Country Name|Country Code|      Indicator Name|   Indicator Code|1960|1961|1962|
+--------------+------------+--------------------+-----------------+----+----+----+
|Not classified|         INX|Access to clean f...|   EG.CFT.ACCS.ZS|null|null|null|
|Not classified|         INX|Access to clean f...|EG.CFT.ACCS.RU.ZS|null|null|null|
|Not classified|         INX|Access to clean f...|EG.CFT.ACCS.UR.ZS|null|null|null|
|Not classified|         INX|Access to electri...|   EG.ELC.ACCS.ZS|null|null|null|
|Not classified|         INX|Access to electri...|EG.ELC.ACCS.RU.ZS|null|null|null|
+--------------+------------+--------------------+-----------------+----+----+----+
only showing top 5 rows



In [33]:
# Count the number of rows with values in the annual columns 
# by deleting all empty rows in them
(df_wdi.where(F.col('Country Code') == 'INX')
       .dropna('all', subset=df_wdi.columns[4:])
       .select('*').count())

0

In [34]:
# Count the number of missing values
for ncols in ['Region', 'Income Group']:
    print(f"Missing values in '{ncols}': ", df_country.where(df_country[ncols].isNull()).count())
print("Missing values in 'Topic': ", df_series.where(df_series['Topic'].isNull()).count())

Missing values in 'Region':  48
Missing values in 'Income Group':  49
Missing values in 'Topic':  0


In [35]:
# See how many countries do not have values in the "Region" 
# and "Income Group" columns
(df_country
    .filter(df_country['Income Group'].isNull() | df_country['Region'].isNull())
    .select('Country Code')
    .distinct()
    .count())

49

In [36]:
# Show the names of these countries
(df_country
    .filter(df_country['Region'].isNull())
    .select('Short Name', 'Country Code')
    .distinct()
    .collect())

[Row(Short Name='Euro area', Country Code='EMU'),
 Row(Short Name='Low income', Country Code='LIC'),
 Row(Short Name='Middle East & North Africa', Country Code='MEA'),
 Row(Short Name='Least developed countries: UN classification', Country Code='LDC'),
 Row(Short Name='Middle East & North Africa (excluding high income)', Country Code='MNA'),
 Row(Short Name='Middle East & North Africa (IDA & IBRD)', Country Code='TMN'),
 Row(Short Name='Europe & Central Asia (excluding high income)', Country Code='ECA'),
 Row(Short Name='East Asia & Pacific (IDA & IBRD)', Country Code='TEA'),
 Row(Short Name='IDA total', Country Code='IDA'),
 Row(Short Name='Middle income', Country Code='MIC'),
 Row(Short Name='Arab World', Country Code='ARB'),
 Row(Short Name='Latin America & Caribbean (excluding high income)', Country Code='LAC'),
 Row(Short Name='Sub-Saharan Africa (excluding high income)', Country Code='SSA'),
 Row(Short Name='Pacific island small states', Country Code='PSS'),
 Row(Short Name='IBRD

In [37]:
# Add rows with filled in missing values for the "World"
df_country_w = (df_country
                .unionByName(df_country
                                .filter(df_country['Country Code'] == 'WLD')
                                .fillna({'Region': 'World', 
                                         'Income group': 'World'})))

In [38]:
(df_country_w
    .filter(df_country_w['Country Code'] == 'WLD')
    .select('Country Code', 'Region')
    .show())

+------------+------+
|Country Code|Region|
+------------+------+
|         WLD|  null|
|         WLD| World|
+------------+------+



In [39]:
(df_wdi.select('Indicator Code')
    .distinct()
    .intersect(df_series.select('Series Code'))
    .count())

1442

In [40]:
df_wdi.filter(df_wdi['_c65'].isNotNull()).count()

AnalysisException: ignored

In [41]:
# Select columns in a specific order
cols_ord = (['Country Code', 'Country Name', 'Region', 'Income Group', 'Topic'] 
            + df_wdi.columns[2:len(df_wdi.columns)-1])

# Join all the selected DataFrames together
wdi_csdf = (df_wdi
            .join(df_country_w
                  .dropna(subset=['Region', 'Income Group'])
                  .select('Country Code', 'Region', 'Income group'), 
                  'Country Code')
            .join(df_series.select('Series Code', 'Topic'),  
                  df_wdi['Indicator Code'] == df_series['Series Code'], 
                  'left')
            .fillna({'Topic': 'Unknown'})
            .select(cols_ord))

In [42]:
# Calculate the final number of rows
rows_df = (df_wdi.where(df_wdi['Country Code'] != 'INX').count() 
           - (df_country.where(df_country['Region'].isNull()).count() - 1)
           * df_series.select('Series Code').count())

print("The correct number of rows: {0}, columns: {1}".format(
    rows_df, len(df_wdi.columns) + 2))
print("The resulting number of rows: {0}, columns: {1}".format(
    wdi_csdf.count(), len(wdi_csdf.columns)))

The correct number of rows: 314356, columns: 69
The resulting number of rows: 312914, columns: 69


In [43]:
# Display a sampled subset
wdi_csdf.sample(0.2).show(10)

+------------+------------+------+------------+--------------------+--------------------+--------------------+----+----+----+----+----+----+----+----+----+----+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+

In [44]:
# Divide the columns into numeric and string columns
years_cols = [col[0] for col in wdi_csdf.dtypes if col[1] == 'double']
name_cols = [col[0] for col in wdi_csdf.dtypes if col[1] == 'string']

In [45]:
print("Number of years in the dataset: {}".format(len(years_cols)))

Number of years in the dataset: 62


In [46]:
# Counts the number of records for each year
wdi_csdf.select(years_cols).summary('count').show()
     

+-------+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+-----+
|summary| 1960| 1961| 1962| 1963| 1964| 1965| 1966| 1967| 1968| 1969| 1970| 1971| 1972| 1973| 1974| 1975| 1976| 1977| 1978| 1979| 1980| 1981| 1982| 1983| 1984| 1985| 1986| 1987| 1988| 1989|  1990|  1991|  1992|  1993|  1994|  1995|  1996|  1997|  1998|  1999|  2000|  2001|  2002|  2003|  2004|  2005|  2006|  2007|  2008|  2009|  2010|  2011|  2012|  2013|  2014|  2015|  2016|  2017|  2018|  2019|  2020| 2021|
+-------+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+---

In [47]:
# Counts the number of records in string columns
wdi_csdf.select(name_cols).summary('count').show()

+-------+------------+------------+------+------------+------+--------------+--------------+
|summary|Country Code|Country Name|Region|Income Group| Topic|Indicator Name|Indicator Code|
+-------+------------+------------+------+------------+------+--------------+--------------+
|  count|      312914|      312914|312914|      312914|312914|        312914|        312914|
+-------+------------+------------+------+------------+------+--------------+--------------+



In [48]:
wdi_csdf.select('Region').distinct().collect()

[Row(Region='South Asia'),
 Row(Region='World'),
 Row(Region='Sub-Saharan Africa'),
 Row(Region='Europe & Central Asia'),
 Row(Region='North America'),
 Row(Region=' the economy is still counted as IBRD in the IDA- and IBRD-relevant group data in this version of WDI."'),
 Row(Region='East Asia & Pacific'),
 Row(Region='Middle East & North Africa'),
 Row(Region='Latin America & Caribbean')]

In [49]:
(wdi_csdf.select('Country Code', 'Region').distinct()
         .groupBy('Region').count().orderBy('count')
         .show(truncate=False))

+-------------------------------------------------------------------------------------------------------+-----+
|Region                                                                                                 |count|
+-------------------------------------------------------------------------------------------------------+-----+
| the economy is still counted as IBRD in the IDA- and IBRD-relevant group data in this version of WDI."|1    |
|World                                                                                                  |1    |
|North America                                                                                          |3    |
|South Asia                                                                                             |7    |
|Middle East & North Africa                                                                             |21   |
|East Asia & Pacific                                                                                    

In [50]:
wdi_csdf.select('Income Group').distinct().collect()

[Row(Income Group='South Asia'),
 Row(Income Group='Lower middle income'),
 Row(Income Group='World'),
 Row(Income Group='High income'),
 Row(Income Group='Upper middle income'),
 Row(Income Group='Low income')]

In [51]:

(wdi_csdf.filter(wdi_csdf['Income Group'] != 'World')
         .select('Country Code', 'Income Group').distinct()
         .groupBy('Income Group').count()
         .orderBy(F.desc('Income Group')).show())

+-------------------+-----+
|       Income Group|count|
+-------------------+-----+
|Upper middle income|   54|
|         South Asia|    1|
|Lower middle income|   53|
|         Low income|   28|
|        High income|   80|
+-------------------+-----+



In [52]:
wdi_csdf.select('Topic').distinct().count()

89

In [53]:
(wdi_csdf.select('Indicator Code', 'Topic').distinct()
         .groupBy('Topic').count().orderBy('Topic')
         .show(10, False))

+--------------------------------------------------------------------------------------+-----+
|Topic                                                                                 |count|
+--------------------------------------------------------------------------------------+-----+
|Economic Policy & Debt: Balance of payments: Capital & financial account              |11   |
|Economic Policy & Debt: Balance of payments: Current account: Balances                |4    |
|Economic Policy & Debt: Balance of payments: Current account: Goods, services & income|22   |
|Economic Policy & Debt: Balance of payments: Current account: Transfers               |7    |
|Economic Policy & Debt: Balance of payments: Reserves & other items                   |6    |
|Economic Policy & Debt: External debt: Debt outstanding                               |10   |
|Economic Policy & Debt: External debt: Debt ratios & other items                      |11   |
|Economic Policy & Debt: External debt: Debt servi

In [54]:
# Create a list of columns to analyze
ind_nc_topic = ['Indicator Name', 'Indicator Code', 'Topic']

In [55]:
def rename_agg_cols(df, cols_rename, chars, new_form=None):
    """Return a new DataFrame with renamed aggregate columns.

    Parameters:
    df -- a PySpark DataFrame
    cols_rename -- names of columns to rename
    chars -- a string specifying the set of characters to be removed
    new_form -- a new format of column names
    """
    for old_name in cols_rename:
    
        if new_form:
            new_name = new_form.format(old_name.strip(chars)) 
        else:
            new_name = old_name.strip(chars)

        df = df.withColumnRenamed(old_name, new_name)

    return df

In [56]:
wdi_csdf.groupBy('Indicator Code').count().select('count').distinct().show()

+-----+
|count|
+-----+
|  217|
+-----+



In [65]:
# Count the number of countries with data for each indicator
# (the specific year does not matter)
country_ind = (wdi_csdf.dropna('all', subset=years_cols)
                       .groupBy(ind_nc_topic)
                       .count())

country_ind.select('Indicator Name', 'count').show(7, False)

+-----------------------------------------------------+-----+
|Indicator Name                                       |count|
+-----------------------------------------------------+-----+
|Depth of credit information index (0=low to 8=high)  |190  |
|Manufactures exports (% of merchandise exports)      |198  |
|People practicing open defecation (% of population)  |215  |
|Personal remittances, received (% of GDP)            |198  |
|Electricity production from coal sources (% of total)|142  |
|GNI growth (annual %)                                |167  |
|International tourism, number of arrivals            |203  |
+-----------------------------------------------------+-----+
only showing top 7 rows



In [66]:
# Count the number of indicators with data for each country
# (the specific indicator does not matter)
(wdi_csdf.dropna('all', subset=years_cols)
         .groupBy('Country Name').count()
         .select('Country Name', 'count')
         .limit(5).collect())

[Row(Country Name='Chad', count=1234),
 Row(Country Name='Paraguay', count=1342),
 Row(Country Name='World', count=941),
 Row(Country Name='Congo, Dem. Rep.', count=1323),
 Row(Country Name='Senegal', count=1374)]

In [67]:
# Set the lower and upper bounds (inclusive) for selecting indicators
sel_bounds = (200, 220)

In [68]:
# Find out indicators with data in a certain number of countries
country_ind_sel = country_ind.where(F.col('count').between(*sel_bounds))
country_ind_sel.count()

298

In [69]:
# View which topics these indicators relate to
country_ind_sel.select('Topic').distinct().limit(5).collect()

[Row(Topic='Education: Efficiency'),
 Row(Topic='Social Protection & Labor: Labor force structure'),
 Row(Topic='Environment: Density & urbanization'),
 Row(Topic='Environment: Agricultural production'),
 Row(Topic='Public Sector: Policy & institutions')]

In [70]:
country_ind_sel.select('Topic').distinct().count()

45

In [71]:
# Count the number of countries with data for each indicator in each year
agg_op = {year: 'count' for year in years_cols}
year_ind_agg = (wdi_csdf.groupBy(ind_nc_topic)
                        .agg(agg_op)
                        .orderBy('Indicator Name'))

# Use our defined function to rename aggregate columns
year_ind = rename_agg_cols(year_ind_agg.select(ind_nc_topic
                                               + sorted(year_ind_agg.columns[3:])), 
                           year_ind_agg.columns[3:], 'count()')

year_ind.show(10)

+--------------------+-----------------+--------------------+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+
|      Indicator Name|   Indicator Code|               Topic|1960|1961|1962|1963|1964|1965|1966|1967|1968|1969|1970|1971|1972|1973|1974|1975|1976|1977|1978|1979|1980|1981|1982|1983|1984|1985|1986|1987|1988|1989|1990|1991|1992|1993|1994|1995|1996|1997|1998|1999|2000|2001|2002|2003|2004|2005|2006|2007|2008|2009|2010|2011|2012|2013|2014|2015|2016|2017|2018|2019|2020|2021|
+--------------------+-----------------+--------------------+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+

In [64]:
def collect_cols(df, cols, new_cols, bounds=None, exp_isnull=None):
    """Collects in a new column all the column names whose values match 
    the condition, and counts the number of such columns.

    Parameters:
    df -- a PySpark DataFrame
    cols -- columns whose value will be compared with the condition
    new_cols -- names for the two new columns
    bounds -- values for the PySpark Column method "between()" 
    to select columns (default None)
    exp_isnull -- whether to use the PySpark Column methods "isNull()" 
    (if True) or "isNotNull()" (if bounds is None) (default None). 
    This parameter is ignored when bounds have a value other than None
    
    Return a new DataFrame with the two new columns.
    """
    if bounds:
        cond = [F.when(df[col].between(*bounds), F.lit(col)) for col in cols]
    elif exp_isnull:
        cond = [F.when(df[col].isNull(), F.lit(col)) for col in cols]
    else:
        cond = [F.when(df[col].isNotNull(), F.lit(col)) for col in cols]

    new_df = (df.withColumn(new_cols[0], F.array_sort(F.array(cond)))
                .withColumn(new_cols[0], 
                            F.array_intersect(new_cols[0], 
                                              F.array([F.lit(col) for col in cols])))
                .withColumn(new_cols[1], F.size(new_cols[0]))
                .select('*'))

    return new_df
     


In [72]:
# Find out the years when the indicators have entries in a certain number 
# of countries
year_country_ind = collect_cols(year_ind, years_cols, 
                                ['Name_years', 'Num_years'], 
                                bounds=sel_bounds).drop(*years_cols)
                            
year_country_ind.select('*').show(5)

+--------------------+-----------------+--------------------+--------------------+---------+
|      Indicator Name|   Indicator Code|               Topic|          Name_years|Num_years|
+--------------------+-----------------+--------------------+--------------------+---------+
|ARI treatment (% ...|   SH.STA.ARIC.ZS|Health: Disease p...|                  []|        0|
|Access to clean f...|   EG.CFT.ACCS.ZS|Environment: Ener...|                  []|        0|
|Access to clean f...|EG.CFT.ACCS.RU.ZS|Environment: Ener...|                  []|        0|
|Access to clean f...|EG.CFT.ACCS.UR.ZS|Environment: Ener...|                  []|        0|
|Access to electri...|   EG.ELC.ACCS.ZS|Environment: Ener...|[2000, 2001, 2002...|       21|
+--------------------+-----------------+--------------------+--------------------+---------+
only showing top 5 rows



In [73]:
# Select indicators that have values in a certain number of countries and years
year_country_ind_sel = year_country_ind.filter(year_country_ind['Num_years'] >= 50)

# Create a list of these indicator names
year_country_ind_sel.select(ind_nc_topic[:2]).show(7, False)

+-------------------------------------------------------------+--------------+
|Indicator Name                                               |Indicator Code|
+-------------------------------------------------------------+--------------+
|Adjusted savings: mineral depletion (current US$)            |NY.ADJ.DMIN.CD|
|Adolescent fertility rate (births per 1,000 women ages 15-19)|SP.ADO.TFRT   |
|Age dependency ratio (% of working-age population)           |SP.POP.DPND   |
|Age dependency ratio, old (% of working-age population)      |SP.POP.DPND.OL|
|Age dependency ratio, young (% of working-age population)    |SP.POP.DPND.YG|
|Agricultural land (% of land area)                           |AG.LND.AGRI.ZS|
|Agricultural land (sq. km)                                   |AG.LND.AGRI.K2|
+-------------------------------------------------------------+--------------+
only showing top 7 rows



In [74]:
year_country_ind_sel.select('Topic').distinct().limit(5).collect()

[Row(Topic='Environment: Density & urbanization'),
 Row(Topic='Environment: Agricultural production'),
 Row(Topic='Environment: Land use'),
 Row(Topic='Economic Policy & Debt: National accounts: Adjusted savings & income'),
 Row(Topic='Health: Population: Structure')]

In [75]:
year_country_ind_sel.select('Topic').distinct().count()

10

In [76]:
# Find out in which years the indicators have records in certain countries
wdi_csdf_years = collect_cols(wdi_csdf, years_cols, 
                              ['Name_years_c', 'Num_years_c']).drop(*years_cols)

wdi_csdf_years.select('Country Name', 'Indicator Name',
                      'Name_years_c', 'Num_years_c').show(5)

+------------+--------------------+--------------------+-----------+
|Country Name|      Indicator Name|        Name_years_c|Num_years_c|
+------------+--------------------+--------------------+-----------+
|       World|Access to clean f...|[2000, 2001, 2002...|         21|
|       World|Access to clean f...|[2000, 2001, 2002...|         21|
|       World|Access to clean f...|[2000, 2001, 2002...|         21|
|       World|Access to electri...|[1998, 1999, 2000...|         23|
|       World|Access to electri...|[2000, 2001, 2002...|         21|
+------------+--------------------+--------------------+-----------+
only showing top 5 rows



In [77]:
# Create a new DataFrame with decimal values
pop_df1 = (wdi_csdf
          .filter(wdi_csdf['Indicator Code'].isin(['SP.POP.TOTL', 
                                                   'EN.POP.DNST']))
          .select('Income Group', 'Indicator Name', 
                  *[wdi_csdf[cname].astype(DecimalType()) 
                    for cname in wdi_csdf.columns[-6:-1]])
          .groupBy('Income Group', 'Indicator Name').sum()
          .withColumnRenamed('Income Group', 'IncomeGroup')
          .withColumnRenamed('Indicator Name', 'IndicatorName'))

pop_df = rename_agg_cols(pop_df1, pop_df1.schema.names[2:], 'sum()', 'YR{}')

# Create a local temporary view with this DataFrame
pop_df.createOrReplaceTempView('Population')

In [78]:
query1 = "SELECT * FROM Population ORDER BY IncomeGroup, IndicatorName"
spark.sql(query1).show(5)

+-------------------+--------------------+----------+----------+----------+----------+----------+
|        IncomeGroup|       IndicatorName|    YR2016|    YR2017|    YR2018|    YR2019|    YR2020|
+-------------------+--------------------+----------+----------+----------+----------+----------+
|        High income|Population densit...|     75139|     75516|     74794|     75442|     75766|
|        High income|   Population, total|1195037039|1201180693|1207103958|1212278212|1217317132|
|         Low income|Population densit...|      2789|      2852|      2922|      3002|      3079|
|         Low income|   Population, total| 626382245| 643485701| 661256583| 680013468| 699186538|
|Lower middle income|Population densit...|      8221|      8326|      8431|      8528|      8640|
+-------------------+--------------------+----------+----------+----------+----------+----------+
only showing top 5 rows



In [79]:
query2 = "SELECT *  FROM Population WHERE IncomeGroup != 'World' ORDER BY IncomeGroup, IndicatorName"
spark.sql(query2).show(5)
     

+-------------------+--------------------+----------+----------+----------+----------+----------+
|        IncomeGroup|       IndicatorName|    YR2016|    YR2017|    YR2018|    YR2019|    YR2020|
+-------------------+--------------------+----------+----------+----------+----------+----------+
|        High income|Population densit...|     75139|     75516|     74794|     75442|     75766|
|        High income|   Population, total|1195037039|1201180693|1207103958|1212278212|1217317132|
|         Low income|Population densit...|      2789|      2852|      2922|      3002|      3079|
|         Low income|   Population, total| 626382245| 643485701| 661256583| 680013468| 699186538|
|Lower middle income|Population densit...|      8221|      8326|      8431|      8528|      8640|
+-------------------+--------------------+----------+----------+----------+----------+----------+
only showing top 5 rows



In [80]:
pop_window = Window.partitionBy('IndicatorName').orderBy(pop_df.columns[2:])
        
pop_df.withColumn('Rank', F.dense_rank().over(pop_window)).show()  

+-------------------+--------------------+----------+----------+----------+----------+----------+----+
|        IncomeGroup|       IndicatorName|    YR2016|    YR2017|    YR2018|    YR2019|    YR2020|Rank|
+-------------------+--------------------+----------+----------+----------+----------+----------+----+
|              World|Population densit...|        58|        58|        59|        60|        60|   1|
|         South Asia|Population densit...|       343|       346|       350|       352|       354|   2|
|         Low income|Population densit...|      2789|      2852|      2922|      3002|      3079|   3|
|Upper middle income|Population densit...|      7077|      7146|      7218|      7275|      7323|   4|
|Lower middle income|Population densit...|      8221|      8326|      8431|      8528|      8640|   5|
|        High income|Population densit...|     75139|     75516|     74794|     75442|     75766|   6|
|         South Asia|   Population, total|  21203000|  21444000|  2167000

In [81]:
def transpose_cols(df, show_cols, unpivot_cols=None, new_names=('Key', 'Val'), 
                   pivot_col=None, group_cols=('Key', 'Val')):
    """Returns a new transpose DataFrame or Group data (if a pivot is used).

    Parameters:
    df -- a PySpark DataFrame
    show_cols -- columns that will be displayed along with the new columns
    unpivot_cols -- columns that will be unpivot
    new_names -- names for two new columns that will be created by unpivoting
    pivot_col -- a column whose values will be new columns 
    (aggregation is not used)
    group_cols -- a tuple of columns for grouping
    """
    if unpivot_cols:
        transp_df = (df.select('*', F.explode_outer(F.array(
            [F.create_map(F.lit(name), F.col(name)) 
            for name in unpivot_cols])).alias('New_cols'))
                       .select(*show_cols, 
                               F.explode_outer('New_cols').alias(*new_names)))

        if pivot_col: 
            transp_df = (transp_df.withColumn(pivot_col, df[pivot_col])
                                  .groupBy(*group_cols).pivot(pivot_col))
            
    elif pivot_col:
        transp_df = df.groupBy(*group_cols).pivot(pivot_col)

    return transp_df

In [82]:
# Transpose the DataFrame so that the indicators and the years are reversed
tr_pop_df = transpose_cols(pop_df, pop_df.columns[:2], 
                           unpivot_cols=pop_df.columns[2:], 
                           new_names=('Years', 'Vals'), 
                           pivot_col='IndicatorName', 
                           group_cols=('IncomeGroup', 'Years'))

tr_pop_df.max().orderBy('IncomeGroup', 'Years').show()
     

+-------------------+------+---------------------------------------------------+-----------------+
|        IncomeGroup| Years|Population density (people per sq. km of land area)|Population, total|
+-------------------+------+---------------------------------------------------+-----------------+
|        High income|YR2016|                                              75139|       1195037039|
|        High income|YR2017|                                              75516|       1201180693|
|        High income|YR2018|                                              74794|       1207103958|
|        High income|YR2019|                                              75442|       1212278212|
|        High income|YR2020|                                              75766|       1217317132|
|         Low income|YR2016|                                               2789|        626382245|
|         Low income|YR2017|                                               2852|        643485701|
|         

In [84]:
path_to_save = '/content/'

wdi_csdf.write.csv(path_to_save + 'Wdi_csdf.csv', header=True, mode='ignore')
country_ind.write.csv(path_to_save + 'Country_ind.csv', header=True, mode='ignore')
year_ind.write.csv(path_to_save + 'Year_ind.csv', header=True, mode='ignore')

# Save to JSON, because CSV does not support the array data type
year_country_ind.write.json(path_to_save + 'year_country_ind.json', mode='ignore')
wdi_csdf_years.write.json(path_to_save + 'wdi_csdf_years.json', mode='ignore')