In [1]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [2]:
!pip install pyspark



Creating Spark session

In [4]:
from pyspark.sql import SparkSession


spark = (
    SparkSession.builder
    .appName("WorldBankEconomicIndicators")
    .getOrCreate()
)


Load the data into spark session

In [5]:
#paths

gcs_path = "gs://suraj-world-bank-etl/world_bank_data_2025.csv"
local_path = "/content/drive/MyDrive/world_bank_data_2025.csv"


In [6]:
!gsutil cp {gcs_path} {local_path}

ServiceException: 401 Anonymous caller does not have storage.objects.get access to the Google Cloud Storage object. Permission 'storage.objects.get' denied on resource (or it may not exist).


In [7]:
df = spark.read.option("header", "true").option("inferSchema", "true").csv(local_path)

df.printSchema()

root
 |-- country_name: string (nullable = true)
 |-- country_id: string (nullable = true)
 |-- year: integer (nullable = true)
 |-- Inflation (CPI %): double (nullable = true)
 |-- GDP (Current USD): double (nullable = true)
 |-- GDP per Capita (Current USD): double (nullable = true)
 |-- Unemployment Rate (%): double (nullable = true)
 |-- Interest Rate (Real, %): double (nullable = true)
 |-- Inflation (GDP Deflator, %): double (nullable = true)
 |-- GDP Growth (% Annual): double (nullable = true)
 |-- Current Account Balance (% GDP): double (nullable = true)
 |-- Government Expense (% of GDP): double (nullable = true)
 |-- Government Revenue (% of GDP): double (nullable = true)
 |-- Tax Revenue (% of GDP): double (nullable = true)
 |-- Gross National Income (USD): double (nullable = true)
 |-- Public Debt (% of GDP): double (nullable = true)



In [8]:
df.show(5)

+------------+----------+----+-----------------+------------------+----------------------------+---------------------+-----------------------+---------------------------+---------------------+-------------------------------+-----------------------------+-----------------------------+----------------------+---------------------------+----------------------+
|country_name|country_id|year|Inflation (CPI %)| GDP (Current USD)|GDP per Capita (Current USD)|Unemployment Rate (%)|Interest Rate (Real, %)|Inflation (GDP Deflator, %)|GDP Growth (% Annual)|Current Account Balance (% GDP)|Government Expense (% of GDP)|Government Revenue (% of GDP)|Tax Revenue (% of GDP)|Gross National Income (USD)|Public Debt (% of GDP)|
+------------+----------+----+-----------------+------------------+----------------------------+---------------------+-----------------------+---------------------------+---------------------+-------------------------------+-----------------------------+----------------------------

Data Preprocessing/Cleaning steps

In [9]:
# Updating the column names

from pyspark.sql.functions import col, regexp_replace

df = (df
    .withColumnRenamed("country_name", "country_name")
    .withColumnRenamed("country_id", "country_id")
    .withColumnRenamed("year", "year")
    .withColumnRenamed("Inflation (CPI %)", "inflation_cpi_percent")
    .withColumnRenamed("GDP (Current USD)", "gdp_current_usd")
    .withColumnRenamed("GDP per Capita (Current USD)", "gdp_per_capita_usd")
    .withColumnRenamed("Unemployment Rate (%)", "unemployment_rate_percent")
    .withColumnRenamed("Interest Rate (Real, %)", "interest_rate_percent")
    .withColumnRenamed("Inflation (GDP Deflator, %)", "inflation_gdp_deflator_percent")
    .withColumnRenamed("GDP Growth (% Annual)", "gdp_growth_annual_percent")
    .withColumnRenamed("Current Account Balance (% GDP)", "current_account_balance_percent_gdp")
    .withColumnRenamed("Government Expense (% of GDP)", "government_expense_percent_gdp")
    .withColumnRenamed("Government Revenue (% of GDP)", "government_revenue_percent_gdp")
    .withColumnRenamed("Tax Revenue (% of GDP)", "tax_revenue_percent_gdp")
    .withColumnRenamed("Gross National Income (USD)", "gross_national_income_usd")
    .withColumnRenamed("Public Debt (% of GDP)", "public_debt_percent_gdp")
)



In [10]:
# Handling missing values

from pyspark.sql.functions import col, sum as spark_sum, when

missing_row = sum([when(col(c).isNull(),1).otherwise(0) for c in df.columns])

total_missing_per_year = (df.withColumn("null_count", missing_row).groupBy("year").agg(spark_sum("null_count").alias("total_nulls")).orderBy("year"))

total_missing_per_year.show()

+----+-----------+
|year|total_nulls|
+----+-----------+
|2010|        630|
|2011|        611|
|2012|        602|
|2013|        608|
|2014|        573|
|2015|        580|
|2016|        590|
|2017|        599|
|2018|        630|
|2019|        631|
|2020|        660|
|2021|        709|
|2022|        792|
|2023|        982|
|2024|       2514|
|2025|       2821|
+----+-----------+



In [11]:
# dropping 2024 and 2025 due to lot of incomplete data

df_cleaned = df.filter(df.year <= 2023)

In [12]:
# gdp, gross_income cannot have negative values

invalid_negative_cols = [
    "gdp_current_usd",
    "gdp_per_capita_usd",
    "gross_national_income_usd"
]

for c in invalid_negative_cols:
    df_cleaned = df_cleaned.filter((col(c) >= 0) | col(c).isNull())

In [13]:
from pyspark.sql.functions import count

df_cleaned.select([
    count(when(col(c).isNull(), c)).alias(c + "_nulls")
    for c in df_cleaned.columns
]).show(truncate=False)

+------------------+----------------+----------+---------------------------+---------------------+------------------------+-------------------------------+---------------------------+------------------------------------+-------------------------------+-----------------------------------------+------------------------------------+------------------------------------+-----------------------------+-------------------------------+-----------------------------+
|country_name_nulls|country_id_nulls|year_nulls|inflation_cpi_percent_nulls|gdp_current_usd_nulls|gdp_per_capita_usd_nulls|unemployment_rate_percent_nulls|interest_rate_percent_nulls|inflation_gdp_deflator_percent_nulls|gdp_growth_annual_percent_nulls|current_account_balance_percent_gdp_nulls|government_expense_percent_gdp_nulls|government_revenue_percent_gdp_nulls|tax_revenue_percent_gdp_nulls|gross_national_income_usd_nulls|public_debt_percent_gdp_nulls|
+------------------+----------------+----------+---------------------------+--

In [14]:
df_cleaned = df_cleaned.drop("public_debt_percent_gdp")

df_cleaned.show(10)

+------------+----------+----+---------------------+------------------+------------------+-------------------------+---------------------+------------------------------+-------------------------+-----------------------------------+------------------------------+------------------------------+-----------------------+-------------------------+
|country_name|country_id|year|inflation_cpi_percent|   gdp_current_usd|gdp_per_capita_usd|unemployment_rate_percent|interest_rate_percent|inflation_gdp_deflator_percent|gdp_growth_annual_percent|current_account_balance_percent_gdp|government_expense_percent_gdp|government_revenue_percent_gdp|tax_revenue_percent_gdp|gross_national_income_usd|
+------------+----------+----+---------------------+------------------+------------------+-------------------------+---------------------+------------------------------+-------------------------+-----------------------------------+------------------------------+------------------------------+-------------------

In [15]:
# Saving the cleaned data
output_path = "/content/drive/MyDrive/economic_indicators_cleaned"

df_cleaned.write.mode("overwrite").parquet(output_path)

In [16]:
!gsutil -m cp -r /content/drive/MyDrive/economic_indicators_cleaned gs://suraj-world-bank-etl

Copying file:///content/drive/MyDrive/economic_indicators_cleaned/part-00000-9e635a6b-f594-4a0c-a1ef-6bb978cdb16f-c000.snappy.parquet [Content-Type=application/octet-stream]...
/ [0/4 files][    0.0 B/235.1 KiB]   0% Done                                    Copying file:///content/drive/MyDrive/economic_indicators_cleaned/.part-00000-9e635a6b-f594-4a0c-a1ef-6bb978cdb16f-c000.snappy.parquet.crc [Content-Type=application/octet-stream]...
/ [0/4 files][    0.0 B/235.1 KiB]   0% Done                                    Copying file:///content/drive/MyDrive/economic_indicators_cleaned/_SUCCESS [Content-Type=application/octet-stream]...
Copying file:///content/drive/MyDrive/economic_indicators_cleaned/._SUCCESS.crc [Content-Type=application/octet-stream]...
/ [0/4 files][    0.0 B/235.1 KiB]   0% Done                                    / [0/4 files][    0.0 B/235.1 KiB]   0% Done                                    ServiceException: 401 Anonymous caller does not have storage.objects.create 