In [146]:
#pip install pyspark

In [147]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('WorldBankAnalysis').getOrCreate()

In [148]:
df = spark.read.csv('/content/drive/MyDrive/WB_Data/IBRD_Statement_Of_Loans_-_Historical_Data_20240320.csv', header=True, inferSchema=True,dateFormat="MM/dd/yyyy hh:mm:ss a")

In [149]:
df.show()

+--------------------+-----------+--------------------+------------+-----------+--------------------+----------------------+---------+---------+---------------+-------------+----------------------+----------+--------------------+-------------------------+----------------+------------------+----------------+--------------+-----------+-------------------+---------------------+--------------+----------------+-------------+----------+--------------------+-------------------+----------------------+-------------------+----------------------------+-------------------------+----------------------+
|       End of Period|Loan Number|              Region|Country Code|    Country|            Borrower|Guarantor Country Code|Guarantor|Loan Type|    Loan Status|Interest Rate|Currency of Commitment|Project ID|       Project Name |Original Principal Amount|Cancelled Amount|Undisbursed Amount|Disbursed Amount|Repaid to IBRD|Due to IBRD|Exchange Adjustment|Borrower's Obligation|Sold 3rd Party|Repaid 3rd 

In [150]:
df.dtypes

[('End of Period', 'string'),
 ('Loan Number', 'string'),
 ('Region', 'string'),
 ('Country Code', 'string'),
 ('Country', 'string'),
 ('Borrower', 'string'),
 ('Guarantor Country Code', 'string'),
 ('Guarantor', 'string'),
 ('Loan Type', 'string'),
 ('Loan Status', 'string'),
 ('Interest Rate', 'double'),
 ('Currency of Commitment', 'int'),
 ('Project ID', 'string'),
 ('Project Name ', 'string'),
 ('Original Principal Amount', 'string'),
 ('Cancelled Amount', 'double'),
 ('Undisbursed Amount', 'double'),
 ('Disbursed Amount', 'double'),
 ('Repaid to IBRD', 'double'),
 ('Due to IBRD', 'double'),
 ('Exchange Adjustment', 'double'),
 ("Borrower's Obligation", 'string'),
 ('Sold 3rd Party', 'string'),
 ('Repaid 3rd Party', 'string'),
 ('Due 3rd Party', 'string'),
 ('Loans Held', 'string'),
 ('First Repayment Date', 'string'),
 ('Last Repayment Date', 'date'),
 ('Agreement Signing Date', 'date'),
 ('Board Approval Date', 'date'),
 ('Effective Date (Most Recent)', 'date'),
 ('Closed Date (M

In [151]:
df.select('Country').distinct().count()

158

In [152]:
df.select('*').count()

1296381

Calculating Total Original Principal Amount disbursed by World Bank to countries

In [153]:
from pyspark.sql.functions import sum, col
total_sum = df.agg(sum('Disbursed Amount').alias('Total Sum')).collect()
total_sum

[Row(Total Sum=84786557845144.34)]

In [154]:
print(f'Total amount lent by World Bank is {total_sum[0]["Total Sum"]/(10 ** 12):.2f} trillion')

Total amount lent by World Bank is 84.79 trillion


In [155]:
total_repaid = df.agg(sum('Repaid to IBRD').alias('repaid_to_IBRD'), sum('Repaid 3rd Party').alias('repaid_to_3rd')).collect()

In [156]:
total_repaid

[Row(repaid_to_IBRD=55312997979464.336, repaid_to_3rd=577125513576.4141)]

In [157]:
total_repaid_sum = total_repaid[0]['repaid_to_IBRD'] + total_repaid[0]['repaid_to_3rd']

In [158]:
print(f'Total amount repaid to World Bank is {total_repaid_sum/(10 ** 12):.2f} trillion')

Total amount repaid to World Bank is 55.89 trillion


In [159]:
from pyspark.sql.functions import col, desc_nulls_last

In [160]:
from os import truncate
df.select('Country', 'Loan Number', 'Borrower', 'Loan Status', 'Interest Rate', 'Project Name ', 'Original Principal Amount', 'Disbursed Amount', 'Repaid to IBRD', 'Repaid 3rd Party', 'Due to IBRD', 'Due 3rd Party' , 'Last Disbursement Date')\
.filter("Country = 'India' AND `Loan Status` = 'Disbursed'")\
.dropDuplicates(['Loan Number'])\
.orderBy(col('Last Disbursement Date').desc_nulls_last())\
.show(truncate = False)

+-------+-----------+----------------------------------------+-----------+-------------+-----------------------------------+-------------------------+----------------+--------------+----------------+-------------+-------------+----------------------+
|Country|Loan Number|Borrower                                |Loan Status|Interest Rate|Project Name                       |Original Principal Amount|Disbursed Amount|Repaid to IBRD|Repaid 3rd Party|Due to IBRD  |Due 3rd Party|Last Disbursement Date|
+-------+-----------+----------------------------------------+-----------+-------------+-----------------------------------+-------------------------+----------------+--------------+----------------+-------------+-------------+----------------------+
|India  |IBRD79230  |SMALL INDUSTRIES DEVELOPMENT BK OF INDIA|Disbursed  |0.0          |IN: Microf-Scaling Up Sustnble & Re|200000000                |2.0E8           |0.0           |0               |2.0E8        |0            |2016-01-25          

In [161]:
'Country', 'Borrower', 'Loan Status', 'Interest Rate', 'Project Name ', 'Original Principal Amount' '`Disbursed Amount`', '`Repaid to IBRD`', 'Due to IBRD'

('Country',
 'Borrower',
 'Loan Status',
 'Interest Rate',
 'Project Name ',
 'Original Principal Amount`Disbursed Amount`',
 '`Repaid to IBRD`',
 'Due to IBRD')

In [162]:
from pyspark.sql.functions import sum, round
df.groupBy('Country')\
.agg(round((sum('Disbursed Amount')/1e12),2)\
     .alias('Total amount in trillions'))\
.orderBy(col('Total amount in trillions').desc())\
.show()


+--------------------+-------------------------+
|             Country|Total amount in trillions|
+--------------------+-------------------------+
|              Brazil|                     7.86|
|              Mexico|                     7.58|
|           Indonesia|                      6.3|
|               India|                     6.08|
|               China|                     5.96|
|              Turkey|                     4.34|
|           Argentina|                     4.27|
|            Colombia|                     3.43|
|         Philippines|                     2.42|
|  Korea, Republic of|                     2.33|
|             Morocco|                     2.14|
|              Poland|                     2.13|
|Egypt, Arab Repub...|                     1.81|
|             Romania|                     1.73|
|  Russian Federation|                      1.6|
|             Ukraine|                      1.4|
|                Peru|                     1.38|
|            Thailan

In [163]:
df_debt = df.groupBy('Country')\
.agg(round((sum('Disbursed Amount')/1e12),2)\
     .alias('Debt_in_trillions'))

In [164]:
df_debt.printSchema()

root
 |-- Country: string (nullable = true)
 |-- Debt_in_trillions: double (nullable = true)



In [165]:
from pyspark.sql.functions import avg
df.groupBy('Country')\
.agg(avg('Interest Rate').alias('Average Interest Rate'))\
.orderBy(col('Average Interest Rate').desc())\
.show()

+--------------------+---------------------+
|             Country|Average Interest Rate|
+--------------------+---------------------+
|              Repaid|   3734380.6100000017|
|           Disbursed|           3734380.61|
|                Togo|    8.853846153846154|
|             Senegal|    8.146724282129885|
|Syrian Arab Republic|     8.04030701754391|
|               Ghana|    8.027873015873038|
|             Liberia|    7.910518648018673|
|      Western Africa|    7.859285714285693|
|Western and Centr...|    7.855000000000005|
|              Zambia|   7.7431219951923165|
|               Kenya|   7.7261886195994345|
|              Guyana|   7.7133547008546826|
|            Tanzania|     7.45522317188986|
|             Ireland|   7.4245192307692305|
|        Bahamas, The|   7.4032432432431845|
|              Greece|    7.332621028744356|
|            Zimbabwe|    7.263411541824501|
|            Portugal|    7.240875507942425|
|                Oman|    7.200452991452963|
|         

In [166]:
df.filter('Country = "Repaid"').show(5)

+--------------+-----------+-------+------------+-------+--------+----------------------+---------+-------------------+-----------+-------------+----------------------+----------+-------------+-------------------------+----------------+------------------+----------------+--------------+-----------+-------------------+---------------------+--------------------+--------------------+--------------------+--------------------+--------------------+-------------------+----------------------+-------------------+----------------------------+-------------------------+----------------------+
| End of Period|Loan Number| Region|Country Code|Country|Borrower|Guarantor Country Code|Guarantor|          Loan Type|Loan Status|Interest Rate|Currency of Commitment|Project ID|Project Name |Original Principal Amount|Cancelled Amount|Undisbursed Amount|Disbursed Amount|Repaid to IBRD|Due to IBRD|Exchange Adjustment|Borrower's Obligation|      Sold 3rd Party|    Repaid 3rd Party|       Due 3rd Party|        

In [167]:
df = df.filter((col('Country') != "Repaid") & (col('Country') != "Disbursed"))


In [168]:
df.filter('Country = "Repaid"').show(5)

+-------------+-----------+------+------------+-------+--------+----------------------+---------+---------+-----------+-------------+----------------------+----------+-------------+-------------------------+----------------+------------------+----------------+--------------+-----------+-------------------+---------------------+--------------+----------------+-------------+----------+--------------------+-------------------+----------------------+-------------------+----------------------------+-------------------------+----------------------+
|End of Period|Loan Number|Region|Country Code|Country|Borrower|Guarantor Country Code|Guarantor|Loan Type|Loan Status|Interest Rate|Currency of Commitment|Project ID|Project Name |Original Principal Amount|Cancelled Amount|Undisbursed Amount|Disbursed Amount|Repaid to IBRD|Due to IBRD|Exchange Adjustment|Borrower's Obligation|Sold 3rd Party|Repaid 3rd Party|Due 3rd Party|Loans Held|First Repayment Date|Last Repayment Date|Agreement Signing Date|B

In [169]:
df.groupBy('Country')\
.agg(round(avg('Interest Rate'),2).alias('Average Interest Rate'))\
.orderBy(col('Average Interest Rate').desc())\
.show()

+--------------------+---------------------+
|             Country|Average Interest Rate|
+--------------------+---------------------+
|                Togo|                 8.85|
|             Senegal|                 8.15|
|Syrian Arab Republic|                 8.04|
|               Ghana|                 8.03|
|             Liberia|                 7.91|
|      Western Africa|                 7.86|
|Western and Centr...|                 7.86|
|              Zambia|                 7.74|
|               Kenya|                 7.73|
|              Guyana|                 7.71|
|            Tanzania|                 7.46|
|             Ireland|                 7.42|
|        Bahamas, The|                  7.4|
|              Greece|                 7.33|
|            Zimbabwe|                 7.26|
|            Portugal|                 7.24|
|                Oman|                  7.2|
|              Malawi|                  7.2|
|              Cyprus|                 7.13|
|        S

In [170]:
total_repaid = df.filter('Country = "India"')\
.agg({
    'Repaid to IBRD':'sum',
    'Repaid 3rd Party':'sum'
    })\
.collect()

In [171]:
total_repaid

[Row(sum(Repaid to IBRD)=3798826074929.1987, sum(Repaid 3rd Party)=21002616619.550037)]

In [172]:
total_repaid_sum = total_repaid[0]['sum(Repaid to IBRD)'] + total_repaid[0]['sum(Repaid 3rd Party)']

In [173]:
total_repaid_sum

3819828691548.7485

In [174]:
print(f'Total loan repaid is {total_repaid_sum/(10 ** 12):.2f} trillion USD')

Total loan repaid is 3.82 trillion USD


In [175]:
total_due = df.filter('Country = "India"')\
.agg(
    sum('Due to IBRD').alias('due_to_IBRD'),
    sum('Due 3rd Party').alias('due_3rd_party'))\
.collect()

In [176]:
total_due_sum = total_due[0]['due_to_IBRD'] + total_due[0]['due_3rd_party']

In [177]:
print(f'Total loan amount due is {total_due_sum/(10 ** 12):.2f} trillion USD')

Total loan amount due is 2.26 trillion USD


In [178]:
from pyspark.sql.functions import desc, sum, rank
from pyspark.sql.window import Window
window_spec = Window.partitionBy("Country").orderBy(sum("Disbursed Amount").desc())


In [179]:
df_1 = df.groupBy('Country').agg(
    sum('Disbursed Amount').alias('Disbursed Amount')
    )


In [180]:
df_1.show()

+------------------+--------------------+
|           Country|    Disbursed Amount|
+------------------+--------------------+
|              Chad| 5.828176581239996E9|
|          Paraguay| 2.19359903840371E11|
|           Senegal|2.023023515663971...|
|     Taiwan, China|4.863627259274998...|
|        Cabo Verde| 6.219164082819994E9|
|            Guyana|1.196520893515998...|
|       Philippines|2.418722022453353E12|
|          Malaysia|5.102331510383356...|
|         Singapore|2.815845939434989...|
|              Fiji|3.162642482627992...|
|            Turkey|4.335420830010125...|
|            Malawi|1.640907883333001...|
|              Iraq|3.520561131425398...|
|            Jordan|6.788526293701582E11|
|             Sudan|2.182079405579997...|
|            France|            3.925E10|
|            Greece|5.845748365453957E10|
|            Kosovo|5.591854442784003...|
|Yugoslavia, former| 8.97100158561601E10|
|         Sri Lanka|6.506987738941989E10|
+------------------+--------------

In [181]:
window_spec = Window.orderBy(df_1["Disbursed Amount"].desc())


In [182]:
ranked_df = df_1.select(
    "Country",
    "Disbursed Amount",
    rank().over(window_spec).alias("Loan Rank")
)

In [183]:
ranked_df.show()

+--------------------+--------------------+---------+
|             Country|    Disbursed Amount|Loan Rank|
+--------------------+--------------------+---------+
|              Brazil|7.858129598973741E12|        1|
|              Mexico|7.578693649406748E12|        2|
|           Indonesia|6.296142982180327E12|        3|
|               India|6.076481996997463E12|        4|
|               China| 5.96343202387073E12|        5|
|              Turkey|4.335420830010125...|        6|
|           Argentina| 4.26955781699128E12|        7|
|            Colombia|3.432944677713122E12|        8|
|         Philippines|2.418722022453353E12|        9|
|  Korea, Republic of|2.334320011551867...|       10|
|             Morocco|2.141130484417415E12|       11|
|              Poland|2.125257762680814...|       12|
|Egypt, Arab Repub...|1.812073095382333...|       13|
|             Romania|1.733573920506731E12|       14|
|  Russian Federation|1.604793399069028...|       15|
|             Ukraine|1.4004

In [184]:
df.select('Region').distinct().show(truncate=False)

+----------------------------+
|Region                      |
+----------------------------+
|AFRICA WEST                 |
|AFRICA EAST                 |
|LATIN AMERICA AND CARIBBEAN |
|SOUTH ASIA                  |
|EASTERN AND SOUTHERN AFRICA |
|AFRICA                      |
|MIDDLE EAST AND NORTH AFRICA|
|EAST ASIA AND PACIFIC       |
|EUROPE AND CENTRAL ASIA     |
|WESTERN AND CENTRAL AFRICA  |
|Western and Central Africa  |
|Eastern and Southern Africa |
+----------------------------+



In [185]:
from pyspark.sql.functions import countDistinct,round
df.groupBy('Region')\
.agg(countDistinct('Country').alias('Country Count by Region'))\
.orderBy(col('Country Count by Region').desc())\
.show(truncate = False)

+----------------------------+-----------------------+
|Region                      |Country Count by Region|
+----------------------------+-----------------------+
|EUROPE AND CENTRAL ASIA     |48                     |
|AFRICA                      |39                     |
|LATIN AMERICA AND CARIBBEAN |33                     |
|AFRICA EAST                 |22                     |
|EASTERN AND SOUTHERN AFRICA |22                     |
|Eastern and Southern Africa |22                     |
|EAST ASIA AND PACIFIC       |18                     |
|AFRICA WEST                 |15                     |
|Western and Central Africa  |15                     |
|WESTERN AND CENTRAL AFRICA  |15                     |
|MIDDLE EAST AND NORTH AFRICA|12                     |
|SOUTH ASIA                  |4                      |
+----------------------------+-----------------------+



In [186]:
df.select('Region').filter('Country = "India"').distinct().show()

+----------+
|    Region|
+----------+
|SOUTH ASIA|
+----------+



In [187]:
df.groupBy('Region')\
.agg(round((sum('Disbursed Amount')/1e12),2).alias('Loan lent by Region in trillions'))\
.orderBy(col('Loan lent by Region in trillions').desc())\
.show(truncate = False)

+----------------------------+--------------------------------+
|Region                      |Loan lent by Region in trillions|
+----------------------------+--------------------------------+
|LATIN AMERICA AND CARIBBEAN |29.31                           |
|EAST ASIA AND PACIFIC       |19.49                           |
|EUROPE AND CENTRAL ASIA     |17.85                           |
|MIDDLE EAST AND NORTH AFRICA|7.7                             |
|SOUTH ASIA                  |7.32                            |
|AFRICA                      |2.04                            |
|EASTERN AND SOUTHERN AFRICA |0.31                            |
|WESTERN AND CENTRAL AFRICA  |0.27                            |
|AFRICA WEST                 |0.24                            |
|AFRICA EAST                 |0.22                            |
|Western and Central Africa  |0.01                            |
|Eastern and Southern Africa |0.01                            |
+----------------------------+----------

Calculate the total number of fully repaid projects for each region

In [188]:
df.filter('`Loan Status` = "Fully Repaid"' )\
.groupBy('Region')\
.count()\
.orderBy(col('count').desc())\
.show(truncate = False)

+----------------------------+------+
|Region                      |count |
+----------------------------+------+
|LATIN AMERICA AND CARIBBEAN |180372|
|EAST ASIA AND PACIFIC       |125724|
|EUROPE AND CENTRAL ASIA     |97421 |
|MIDDLE EAST AND NORTH AFRICA|67796 |
|AFRICA                      |30613 |
|SOUTH ASIA                  |26520 |
|WESTERN AND CENTRAL AFRICA  |7876  |
|AFRICA WEST                 |7490  |
|EASTERN AND SOUTHERN AFRICA |6549  |
|AFRICA EAST                 |6223  |
|Western and Central Africa  |358   |
|Eastern and Southern Africa |297   |
+----------------------------+------+



In [189]:
df.select('Loan Status').distinct().show()

+-------------------+
|        Loan Status|
+-------------------+
|         Disbursing|
|Disbursing&Repaying|
|           Approved|
|    Fully Cancelled|
|          Cancelled|
|             Signed|
|          Effective|
|         Terminated|
|    Fully Disbursed|
|          Disbursed|
|           Repaying|
|  Fully Transferred|
|             Repaid|
|       Fully Repaid|
|         Negotiated|
|              Draft|
|               NULL|
+-------------------+



In [190]:
df_gdp = spark.read.csv('/content/drive/MyDrive/WB_Data/API_NY.GDP.MKTP.CD_DS2_en_csv_v2_2.csv', header = True)

In [191]:
df_gdp.show(truncate = False)

+---------------------------+----------------------------+-----------------+
|Data Source                |World Development Indicators|_c2              |
+---------------------------+----------------------------+-----------------+
|Last Updated Date          |2024-02-21                  |NULL             |
|Country Name               |Country Code                |Indicator Name   |
|Aruba                      |ABW                         |GDP (current US$)|
|Africa Eastern and Southern|AFE                         |GDP (current US$)|
|Afghanistan                |AFG                         |GDP (current US$)|
|Africa Western and Central |AFW                         |GDP (current US$)|
|Angola                     |AGO                         |GDP (current US$)|
|Albania                    |ALB                         |GDP (current US$)|
|Andorra                    |AND                         |GDP (current US$)|
|Arab World                 |ARB                         |GDP (current US$)|

In [192]:
rdd = spark.sparkContext.textFile('/content/drive/MyDrive/WB_Data/API_NY.GDP.MKTP.CD_DS2_en_csv_v2_2.csv')

In [193]:
rdd.take(30)[28]

'"Bahamas, The","BHS","GDP (current US$)","NY.GDP.MKTP.CD","169803921.568627","190098039.215686","212254901.960784","237745098.039216","266666666.666667","300392156.862745","340000000","390196078.431373","444901960.784314","528137254.901961","538423153.692615","573400000","590900000","670900000","632400000","596200000","642100000","713000000","832400000","1139800100","1335300000","1426500000","1578300000","1732800000","2041100000","2320699900","2472500000","2713999900","2817900000","3062000000","3166000000","3111160000","3109000000","3092000000","3259000000","3429000000","3609000000","6332360000","6833220000","7683870000","8076470000","8317830000","8881160000","8870090000","9055290000","9836200000","10167250000","10618340000","10526000000","9981960000","10095760000","10070450000","10720400000","10395500000","10974800000","11672900000","11750800000","12253500000","12653600000","13058700000","9754600000","11527600000","12897400000",'

In [194]:
header = rdd.filter(lambda x: "Country Name" in x).first()

In [195]:
header

'"Country Name","Country Code","Indicator Name","Indicator Code","1960","1961","1962","1963","1964","1965","1966","1967","1968","1969","1970","1971","1972","1973","1974","1975","1976","1977","1978","1979","1980","1981","1982","1983","1984","1985","1986","1987","1988","1989","1990","1991","1992","1993","1994","1995","1996","1997","1998","1999","2000","2001","2002","2003","2004","2005","2006","2007","2008","2009","2010","2011","2012","2013","2014","2015","2016","2017","2018","2019","2020","2021","2022",'

In [196]:
type(header)

str

In [197]:
indexed_rdd = rdd.zipWithIndex()


In [198]:
indexed_rdd.take(5)[4]

('"Country Name","Country Code","Indicator Name","Indicator Code","1960","1961","1962","1963","1964","1965","1966","1967","1968","1969","1970","1971","1972","1973","1974","1975","1976","1977","1978","1979","1980","1981","1982","1983","1984","1985","1986","1987","1988","1989","1990","1991","1992","1993","1994","1995","1996","1997","1998","1999","2000","2001","2002","2003","2004","2005","2006","2007","2008","2009","2010","2011","2012","2013","2014","2015","2016","2017","2018","2019","2020","2021","2022",',
 4)

In [199]:
header_index = indexed_rdd.filter(lambda x: "Country Name" in x[0]).first()[1]

In [200]:
header_index

4

In [201]:
original_rdd_with_index = indexed_rdd.filter(lambda x: x[1]>header_index)

In [202]:
original_rdd_with_index.take(5)

[('"Aruba","ABW","GDP (current US$)","NY.GDP.MKTP.CD","","","","","","","","","","","","","","","","","","","","","","","","","","","405586592.178771","487709497.206704","596648044.692737","695530726.256983","764804469.273743","872067039.106145","958659217.877095","1083240223.46369","1245810055.86592","1320670391.06145","1379888268.15642","1531843575.41899","1665363128.49162","1722905027.93296","1873452513.96648","1896456983.24022","1961843575.41899","2044111731.84358","2254830726.25698","2360017318.43575","2469782681.56425","2677641340.78212","2843024581.00559","2553793296.08939","2453597206.70391","2637859217.87709","2615208379.88827","2727849720.67039","2790849720.67039","2962907262.56983","2983635195.53073","3092429050.27933","3276184357.5419","3395798882.68156","2558906303.88098","3103184101.51354","3544707788.05664",',
  5),
 ('"Africa Eastern and Southern","AFE","GDP (current US$)","NY.GDP.MKTP.CD","18478095142.1781","19366314294.0882","20506467177.8699","22242734491.0409","2429

In [203]:
original_rdd = original_rdd_with_index.map(lambda x: x[0])

In [204]:
(original_rdd.take(3)[0])

'"Aruba","ABW","GDP (current US$)","NY.GDP.MKTP.CD","","","","","","","","","","","","","","","","","","","","","","","","","","","405586592.178771","487709497.206704","596648044.692737","695530726.256983","764804469.273743","872067039.106145","958659217.877095","1083240223.46369","1245810055.86592","1320670391.06145","1379888268.15642","1531843575.41899","1665363128.49162","1722905027.93296","1873452513.96648","1896456983.24022","1961843575.41899","2044111731.84358","2254830726.25698","2360017318.43575","2469782681.56425","2677641340.78212","2843024581.00559","2553793296.08939","2453597206.70391","2637859217.87709","2615208379.88827","2727849720.67039","2790849720.67039","2962907262.56983","2983635195.53073","3092429050.27933","3276184357.5419","3395798882.68156","2558906303.88098","3103184101.51354","3544707788.05664",'

In [205]:
column_names = header.split(',')

In [206]:
column_names = [column.strip('"') for column in column_names]

In [207]:
import re

def regex_csv_line_split(line):
    pattern = re.compile(r'"((?:[^"\\]|\\.)*)"|([^,]+)')
    matches = pattern.findall(line)
    x= ["".join(match).strip() for match in matches]
    print(x)
    return x



In [208]:
original_rdd = original_rdd.map(regex_csv_line_split)

In [209]:
len(original_rdd.take(30)[0])

67

In [210]:
len(column_names)

68

In [211]:
column_names = column_names[:-1]

In [212]:
len(column_names)

67

In [213]:
len(original_rdd.take(1)[0])

67

In [214]:
rdd_row_length = 67


check_length = original_rdd.filter(lambda x: len(x)!=rdd_row_length)

In [215]:
check_length.take(5)

[]

In [216]:
df_gdp = original_rdd.toDF(column_names)

In [217]:
df_gdp.show(5)

+--------------------+------------+-----------------+--------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+--------------

In [218]:
df_gdp.select('Country Name').distinct().count()

266

In [219]:
drop_columns = column_names[3:55]

In [220]:
df_gdp = df_gdp.drop(*drop_columns)

In [221]:
df_gdp.show(truncate = False, n = 4)

+---------------------------+------------+-----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+
|Country Name               |Country Code|Indicator Name   |2011            |2012            |2013            |2014            |2015            |2016            |2017            |2018            |2019            |2020            |2021            |2022            |
+---------------------------+------------+-----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+
|Aruba                      |ABW         |GDP (current US$)|2637859217.87709|2615208379.88827|2727849720.67039|2790849720.67039|2962907262.56983|2983635195.53073|3092429050.27933|3276184357.5419 |339579888

In [222]:
df_gdp.printSchema()

root
 |-- Country Name: string (nullable = true)
 |-- Country Code: string (nullable = true)
 |-- Indicator Name: string (nullable = true)
 |-- 2011: string (nullable = true)
 |-- 2012: string (nullable = true)
 |-- 2013: string (nullable = true)
 |-- 2014: string (nullable = true)
 |-- 2015: string (nullable = true)
 |-- 2016: string (nullable = true)
 |-- 2017: string (nullable = true)
 |-- 2018: string (nullable = true)
 |-- 2019: string (nullable = true)
 |-- 2020: string (nullable = true)
 |-- 2021: string (nullable = true)
 |-- 2022: string (nullable = true)



In [223]:
df_gdp = df_gdp.drop("")

In [224]:
df_gdp.printSchema()

root
 |-- Country Name: string (nullable = true)
 |-- Country Code: string (nullable = true)
 |-- Indicator Name: string (nullable = true)
 |-- 2011: string (nullable = true)
 |-- 2012: string (nullable = true)
 |-- 2013: string (nullable = true)
 |-- 2014: string (nullable = true)
 |-- 2015: string (nullable = true)
 |-- 2016: string (nullable = true)
 |-- 2017: string (nullable = true)
 |-- 2018: string (nullable = true)
 |-- 2019: string (nullable = true)
 |-- 2020: string (nullable = true)
 |-- 2021: string (nullable = true)
 |-- 2022: string (nullable = true)



In [225]:
df_gdp.show(1)

+------------+------------+-----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+---------------+----------------+----------------+----------------+----------------+
|Country Name|Country Code|   Indicator Name|            2011|            2012|            2013|            2014|            2015|            2016|            2017|           2018|            2019|            2020|            2021|            2022|
+------------+------------+-----------------+----------------+----------------+----------------+----------------+----------------+----------------+----------------+---------------+----------------+----------------+----------------+----------------+
|       Aruba|         ABW|GDP (current US$)|2637859217.87709|2615208379.88827|2727849720.67039|2790849720.67039|2962907262.56983|2983635195.53073|3092429050.27933|3276184357.5419|3395798882.68156|2558906303.88098|3103184101.51354|3544707788.05664|
+---

In [226]:
df_gdp.select('Country Name').distinct().count()

266

In [227]:
df_gdp = df_gdp.withColumn('2022', col('2022').cast('double'))

In [228]:
df_gdp.select('Country Name', col('2022'))\
.orderBy(col('2022').desc())\
.show(truncate= False, n=5)

+-------------------------+-------------------+
|Country Name             |2022               |
+-------------------------+-------------------+
|World                    |1.00879560825388E14|
|High income              |6.17295121237778E13|
|OECD members             |5.98498783817075E13|
|Post-demographic dividend|5.62115846996494E13|
|IDA & IBRD total         |4.05739621636075E13|
+-------------------------+-------------------+
only showing top 5 rows



In [229]:

#list of non-country names
non_countries = ["World", "High income", "OECD members", "Post-demographic dividend", "IDA & IBRD total",
                 "Low & middle income", "Middle income", "IBRD only", "East Asia & Pacific",
                 "Upper middle income", "Late-demographic dividend", "North America",
                 "Europe & Central Asia", "East Asia & Pacific (excluding high income)",
                 "East Asia & Pacific (IDA & IBRD countries)", "European Union",
                 "Early-demographic dividend", "Euro area", "Lower middle income", "Latin America & Caribbean", "Latin America & the Caribbean (IDA & IBRD countries)",
                 "Latin America & Caribbean (excluding high income)", "Europe & Central Asia (IDA & IBRD countries)", "Middle East & North Africa",
                 "South Asia", "South Asia (IDA & IBRD)", "Europe & Central Asia (excluding high income)", "IDA total", "Sub-Saharan Africa", "Arab World",
                 "Sub-Saharan Africa (IDA & IBRD countries)", "Sub-Saharan Africa (excluding high income)", "Fragile and conflict affected situations",
                 "Middle East & North Africa (excluding high income)", "Middle East & North Africa (IDA & IBRD countries)", "Central Europe and the Baltics",
                 "IDA only", "Pre-demographic dividend", "Least developed countries: UN classification", "Africa Eastern and Southern",
                 "Heavily indebted poor countries (HIPC)"]


df_gdp = df_gdp.filter(~(col('Country Name').isin(non_countries)))

df_gdp.select('Country Name', round((col('2022')/1e12), 2).alias("GDP in trillions USD")).orderBy(col('2022').desc()).show(truncate= False)

+------------------+--------------------+
|Country Name      |GDP in trillions USD|
+------------------+--------------------+
|United States     |25.44               |
|China             |17.96               |
|Japan             |4.26                |
|Germany           |4.08                |
|India             |3.42                |
|United Kingdom    |3.09                |
|France            |2.78                |
|Russian Federation|2.24                |
|Canada            |2.16                |
|Italy             |2.05                |
|Brazil            |1.92                |
|Australia         |1.69                |
|Korea, Rep.       |1.67                |
|Mexico            |1.47                |
|Spain             |1.42                |
|Indonesia         |1.32                |
|IDA blend         |1.18                |
|Saudi Arabia      |1.11                |
|Netherlands       |1.01                |
|Turkiye           |0.91                |
+------------------+--------------

In [230]:
df_gdp_current = df_gdp.select(col('Country Name').alias('Country_Name'), round((col('2022')/1e12), 2).alias("current_gdp"))

In [231]:
df_gdp_current.printSchema()

root
 |-- Country_Name: string (nullable = true)
 |-- current_gdp: double (nullable = true)



In [232]:
df_debt_gdp = df_debt.join(df_gdp_current, df_debt['Country']==df_gdp_current['Country_Name'])

In [233]:
df_debt_gdp.orderBy(col('current_gdp').desc()).show()

+------------------+-----------------+------------------+-----------+
|           Country|Debt_in_trillions|      Country_Name|current_gdp|
+------------------+-----------------+------------------+-----------+
|             China|             5.96|             China|      17.96|
|             Japan|             0.13|             Japan|       4.26|
|             India|             6.08|             India|       3.42|
|            France|             0.04|            France|       2.78|
|Russian Federation|              1.6|Russian Federation|       2.24|
|             Italy|             0.06|             Italy|       2.05|
|            Brazil|             7.86|            Brazil|       1.92|
|         Australia|             0.07|         Australia|       1.69|
|            Mexico|             7.58|            Mexico|       1.47|
|             Spain|             0.07|             Spain|       1.42|
|         Indonesia|              6.3|         Indonesia|       1.32|
|       Netherlands|