### 01. Reading Data from API to spark Dataframe

In [0]:
import time
from datetime import datetime, timezone, timedelta

import math
import threading
from concurrent.futures import ThreadPoolExecutor, as_completed
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry

from pyspark.sql.types import StructType, StructField, StringType, IntegerType, LongType, DoubleType, DateType
import pyspark.sql.functions as F

In [0]:
def time_this(func):
    def wrapper(*args, **kwargs):
        start_time = time.time()
        result = func(*args, **kwargs)
        elapsed_time = time.time() - start_time
        
        # Convert to timezone-aware datetime in UTC
        utc_dt = datetime.fromtimestamp(start_time, tz=timezone.utc)

        # Convert UTC → IST (UTC+5:30)
        ist_dt = utc_dt.astimezone(timezone(timedelta(hours=5, minutes=30)))

        # Format as AM/PM
        formatted_start_time = ist_dt.strftime("%Y-%m-%d %I:%M:%S %p %Z")

        print(f"⏱️ {func.__name__} started at {formatted_start_time} and executed in {elapsed_time:.2f} seconds")
        return result
    return wrapper

In [0]:
# Thread-local storage for a per-thread Session (connection pooling)
_thread_local = threading.local()

def get_session():
    """Return a requests.Session unique to the current thread (reuses TCP connections)."""
    session = getattr(_thread_local, "session", None)
    if session is None:
        session = requests.Session()
        retries = Retry(
            total=3,
            backoff_factor=0.5,
            status_forcelist=(429, 500, 502, 503, 504),
            allowed_methods=frozenset(["GET", "POST"])
        )
        adapter = HTTPAdapter(max_retries=retries, pool_connections=1, pool_maxsize=10)
        session.mount("https://", adapter)
        session.mount("http://", adapter)
        _thread_local.session = session
    return session

def fetch_page(page, top, dataset_id, resource_id, timeout=20):
    """Fetch a single page using a thread-local session."""
    base_url = 'https://datacatalogapi.worldbank.org/dexapps/fone/api/apiservice'
    skip = top * page
    params = {
        'datasetId': dataset_id,
        'resourceId': resource_id,
        'top': top,
        'type': 'json',
        'skip': skip
    }

    try:
        session = get_session()
        resp = session.get(base_url, params=params, timeout=timeout)
        resp.raise_for_status()
        return resp.json()
    except requests.RequestException as e:
        # Log and return None so caller can continue
        print(f"[fetch_page] Error fetching page {page}: {e}")
        return None

@time_this
def fetch_all_data_optimized():
    dataset_id = 'DS00975'
    resource_id = 'RS00905'
    top = 5000

    # Step 1: initial request to learn total count
    try:
        resp = requests.get(
            'https://datacatalogapi.worldbank.org/dexapps/fone/api/apiservice',
            params={'datasetId': dataset_id, 'resourceId': resource_id, 'top': top, 'type': 'json', 'skip': 0},
            timeout=10
        )
        resp.raise_for_status()
        initial_data = resp.json()
        total_count = initial_data.get('count', 0)
        all_data = initial_data.get('data', [])
        if total_count == 0:
            return {'count': 0, 'data': []}
    except requests.RequestException as e:
        print(f"[initial request] Error: {e}")
        return {'count': 0, 'data': []}

    # compute remaining pages
    already = len(all_data)
    remaining = max(0, total_count - already)
    num_pages_to_fetch = math.ceil(remaining / top)
    if num_pages_to_fetch <= 0:
        return {'count': total_count, 'data': all_data}

    page_numbers = range(1, num_pages_to_fetch + 1)

    # Tune max_workers: don't create more threads than pages; keep an upper cap.
    max_workers = min(20, max(1, num_pages_to_fetch))
    print(f'Total pages to fetch: {num_pages_to_fetch}, using max_workers: {max_workers} and top rows: {top}')

    # Step 2: concurrent fetch using as_completed
    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        future_to_page = {
            executor.submit(fetch_page, page, top, dataset_id, resource_id): page
            for page in page_numbers
        }
        print(f'Created {len(future_to_page)} futures.')

        for fut in as_completed(future_to_page):
            page = future_to_page[fut]
            try:
                page_result = fut.result(timeout=30)  # optionally set per-future timeout
            except Exception as exc:
                print(f"[as_completed] page {page} failed: {exc}")
                continue

            if page_result and 'data' in page_result:
                # Extend the aggregated list; consider streaming to disk/db if very large
                all_data.extend(page_result['data'])

    return {'count': total_count, 'data': all_data}

if __name__ == "__main__":
    results = fetch_all_data_optimized()
    #print(f"Fetched {len(results['data'])} records (API count={results['count']}).")

In [0]:
@time_this
def data_size_api(results):
    return f"Fetched {len(results['data'])} records (API count={results['count']})."

print(data_size_api(results))

In [0]:
# Checking first row of data
#results['data'][0]

In [0]:
@time_this
def converting_to_spark_dataframe(results):
    # Extract data
    data_list = results['data']

    # Define the schema
    # For numeric columns that might have decimals or be very large, DoubleType is often a safe bet.
    # If you need to distinguish between integers and decimals, define separate columns or be precise.
    # For amounts, DoubleType or DecimalType are usually appropriate.
    schema = StructType([
        StructField("agreement_signing_date", StringType(), True),
        StructField("board_approval_date", StringType(), True),
        StructField("borrower", StringType(), True),
        StructField("borrowers_obligation", LongType(), True), # Changed to LongType assuming it should be whole numbers
        StructField("cancelled_amount", LongType(), True),     # Changed to LongType
        StructField("closed_date_most_recent", StringType(), True),
        StructField("country", StringType(), True),
        StructField("country_code", StringType(), True),
        StructField("currency_of_commitment", StringType(), True), # Assuming None should be treated as string for nulls
        StructField("disbursed_amount", DoubleType(), True),   # Changed to DoubleType to accommodate potential decimals or for consistency
        StructField("due_3rd_party", LongType(), True),
        StructField("due_to_ibrd", LongType(), True),
        StructField("effective_date_most_recent", StringType(), True),
        StructField("end_of_period", StringType(), True),
        StructField("exchange_adjustment", LongType(), True),
        StructField("first_repayment_date", StringType(), True),
        StructField("guarantor", StringType(), True),
        StructField("guarantor_country_code", StringType(), True),
        StructField("interest_rate", DoubleType(), True),
        StructField("last_disbursement_date", StringType(), True),
        StructField("last_repayment_date", StringType(), True),
        StructField("loan_number", StringType(), True),
        StructField("loan_status", StringType(), True),
        StructField("loan_type", StringType(), True),
        StructField("loans_held", LongType(), True),
        StructField("original_principal_amount", DoubleType(), True), # Changed to DoubleType
        StructField("project_id", StringType(), True),
        StructField("project_name_", StringType(), True),
        StructField("region", StringType(), True),
        StructField("repaid_3rd_party", DoubleType(), True),     # Changed to DoubleType
        StructField("repaid_to_ibrd", DoubleType(), True),      # Changed to DoubleType
        StructField("sold_3rd_party", DoubleType(), True),      # Changed to DoubleType
        StructField("undisbursed_amount", DoubleType(), True)  # Changed to DoubleType
    ])

    # Create DataFrame with defined schema
    converted_df = spark.createDataFrame(data_list, schema=schema).withColumn('ingestion_date', F.current_timestamp())

    return converted_df

api_df = converting_to_spark_dataframe(results)
#df.show(5)

In [0]:
# Checking the schema of dataframe
api_df.printSchema()

In [0]:
# Checking dataframe size
@time_this
def data_shape(df):
    return f'Dataframe has {api_df.count()} rows and {len(api_df.columns)} columns'

data_shape(api_df)

In [0]:
@time_this
def date_conversion(df):
    # Extracting Date Columns
    # date_columns = []
    # for i in df.columns:
    #     if "date" in i or "period" in i:
    #         date_columns.append(i)
    
    date_columns = [c for c in df.columns if "date" in c.lower() or "period" in c.lower()]

    # Changing Datatype
    for column in date_columns:
        df = df.withColumn(column, F.to_date(F.col(column), "dd-MMM-yyyy"))
    
    df.select(*date_columns).printSchema()

    return df

api_df = date_conversion(api_df)

In [0]:
@time_this
def checking_header(df):
    return df.limit(5).display()

checking_header(api_df)

### Reading Existing Data and Getting last date of ingestion

In [0]:
df_before_append = spark.read.parquet('/Volumes/worldbank/bronze/ibrd_data_parquet')

In [0]:
count_before_append = df_before_append.count()
count_before_append

In [0]:
# Find the latest partition value
latest_partition_date = df_before_append.agg({"end_of_period": "max"}).collect()[0][0]
print(f"Latest partition: {latest_partition_date}")

In [0]:
new_data_df = api_df.filter(F.col('end_of_period') > F.lit(latest_partition_date))

In [0]:
count_new_data = new_data_df.count()
print(f'Count of new data -> {count_new_data}')
new_data_df.limit(5).display()

### Write Incremental Data to Existing parquet location

In [0]:
new_data_df.write.format('parquet').mode('append').partitionBy('end_of_period').save('/Volumes/worldbank/bronze/ibrd_data_parquet')

In [0]:
df_after_append = spark.read.parquet('/Volumes/worldbank/bronze/ibrd_data_parquet')

In [0]:
count_after_append = df_after_append.count()
print(f'Count before append -> {count_before_append}, Count after append -> {count_after_append}')
if count_new_data + count_before_append == count_after_append:
    print('Data Load Successful')
else:
    print('Data Load Not Successful')