In [190]:
!pip install beautifulsoup4



# ETL Process
다음은 wikipedia에서 표를 읽고 json파일로 저장하는 ETL 프로세스 입니다. 로그 파일도 같이 작성됩니다.

In [53]:
import requests
from bs4 import BeautifulSoup
from io import StringIO
import pandas as pd
import re
import datetime

LOG_DIR = "./etl_project_log.txt"

# urls
urlGDP = 'https://en.wikipedia.org/wiki/List_of_countries_by_GDP_%28nominal%29'
urlRegion = 'https://en.wikipedia.org/wiki/List_of_countries_and_territories_by_the_United_Nations_geoscheme'

# backup file name
bakupFile = {urlGDP:"wikipediaGDP", urlRegion:"wikipediaRegion"}

# decorator for logging
# this decorator helps to log the ETL processes.
# function name, its start and end time, and running time will be logged.
def withLog(func):
    def wrapper(*args, **kwargs):
        with open(LOG_DIR, "a") as f:
            f.write(datetime.datetime.now().strftime("%Y-%B-%d-%H-%M-%S,"))
            f.write(f"{func.__name__},start\n")
            startTime = datetime.datetime.now()

            result = func(*args, **kwargs)

            endTime = datetime.datetime.now()
            f.write(endTime.strftime("%Y-%B-%d-%H-%M-%S,"))
            f.write(f"{func.__name__},end,{endTime-startTime}{","+args[0] if func.__name__ == "extract" else ""}\n")
        return result
    return wrapper   


# get the gdp data from wikipedia
@withLog
def extract(url):
    # find past extract log
    logs = pd.read_csv(LOG_DIR, header=None, names=["time", "function", "status" ,"taken", "url"])
    lastAccess = logs[logs["url"] == url]
    
    # get http responce
    if lastAccess.empty:
        response = requests.get(url)
    else:
        lastAccess = datetime.datetime.strptime(lastAccess.iloc[-1]['time'],"%Y-%B-%d-%H-%M-%S") - datetime.timedelta(hours=9)
        response = requests.get(url, headers={"if-Modified-Since":lastAccess.strftime('%a, %d %b %Y %H:%M:%S GMT')})
    
    table = ""

    # check reponse
    if response.status_code == 200:
        html = response.text
        soup = BeautifulSoup(html, 'html.parser')
        
        table = str(soup.select("table.wikitable")[0])
        
        # save the responce
        with open(bakupFile[url]+".bak", "w") as f:
            f.write(str(table))

    elif response.status_code == 304:
        # open from backup file
        try:
            with open(bakupFile[url]+".bak", "r") as f:
                table = f.read()
        except:
            print(response.status_code)        
    else : 
        print(response.status_code)

    return table
    

# transform the html table data to pandas data frame and process the data
@withLog
def transform(data):
    # html table to pandas data frame
    df = pd.read_html(StringIO(data))[0]

    # delete annotation
    for col in df.columns:
        df[col] = df[col].apply(lambda x: re.sub(r'\[.*?\]', '', str(x)))
    
    return df

@withLog
def transformGDPTable(df):
    df = df.droplevel(level=0, axis=1)  # drop level. because its data has multy-level columns
    df.columns = ['Country/Territory', 'IMF Forecast', 'IMF Year', 'World Bank Estimate', 'World BankYear', 'UN Estimate',
       'UN Year']
    cols = df.columns

    for col in cols[2::2]:  # for year columns, transform the data to integer
        df[col] = df[col].apply(lambda x: int(x) if x.isdigit() else -1)
    for col in cols[1::2]:  # for GDP data, make billion unit and float
        df[col] = df[col].apply(lambda x: round(float(x)/1000.0, 2) if x.isdigit() else 0.0)

    df = df.drop(index = [0])
            
    return df

@withLog
def transformConjungrate(gdp, region):
    #_gdp = gdp.reset_index(drop=True)
    #_region = region.reset_index(drop=True)

    res = gdp.merge(
        region[['Country or Area', 'Geographical subregion']],
        left_on='Country/Territory',
        right_on='Country or Area',
        how='left'
    )
    res = res.drop('Country or Area', axis=1)

    return res

# write the dataFrame to a json file
@withLog
def load(dataFrame):
    # extract dataFrame by json
    dataFrame.transpose().to_json('Countries_by_GDP.json')

def printOver100B(gdpData):
    print(gdpData[gdpData['IMF Forecast'] >= 100.0]['Country/Territory'])

def printTop5(gdpData):
    top5 = gdpData.groupby('Geographical subregion').apply(lambda x: x.sort_values('IMF Forecast').head(5))
    top5 = top5.reset_index(level = 0, drop=True)
    top5 = top5.groupby('Geographical subregion')['IMF Forecast'].mean()
    print(top5)

# main

# gdp ETL process
gdp = extract(urlGDP)
gdp = transform(gdp)
gdp = transformGDPTable(gdp)

# region ETL process
region = extract(urlRegion)
region = transform(region)

# merge two tables
gdpTable = transformConjungrate(gdp, region)

# and load merged table
load(gdpTable)

# print what we want
printOver100B(gdpTable)
printTop5(gdpTable)

Thu, 09 Jan 2025 08:54:04 GMT
304
Thu, 09 Jan 2025 08:54:04 GMT
304
0     United States
1             China
2           Germany
3             Japan
4             India
          ...      
68            Kenya
69           Angola
70        Guatemala
71             Oman
72        Venezuela
Name: Country/Territory, Length: 72, dtype: object
Geographical subregion
Australia and New Zealand    1072.030000
Caribbean                       0.142000
Central America                36.412000
Central Asia                  108.958000
Eastern Africa                  2.212000
Eastern Asia                 7982.053333
Eastern Europe                168.324000
Melanesia                       8.104000
Micronesia                      0.270000
Middle Africa                  21.324000
Northern Africa               112.238000
Northern America              776.770000
Northern Europe               104.852000
Polynesia                       0.417500
South America                  54.898000
South-eastern Asia     

  top5 = gdpData.groupby('Geographical subregion').apply(lambda x: x.sort_values('IMF Forecast').head(5))


# ETL Process with IMF API
다음은 IMF에서 GDP정보를 직접 받아와서 작업을 처리하는 ETL입니다.

In [1]:
import requests
import pandas as pd
import json
import datetime

# decorator for logging
# this decorator helps to log the ETL processes.
# function name, its start and end time, and running time will be logged.
def withLog(func):
    def wrapper(*args, **kwargs):
        with open("./etl_project_log.txt", "a") as f:
            f.write(datetime.datetime.now().strftime("%Y-%B-%d-%H-%M-%S,"))
            f.write(f"{func.__name__},start\n")
            startTime = datetime.datetime.now()

            result = func(*args, **kwargs)

            endTime = datetime.datetime.now()
            f.write(endTime.strftime("%Y-%B-%d-%H-%M-%S, "))
            f.write(f"{func.__name__},end,{endTime-startTime}\n")
        return result
    return wrapper   

# get the gdp data from wikipedia
#@withLog
def extract_IMF_GDP(url):
    # get http response
    response = requests.get(url)

    # check reponse
    if response.status_code == 200:
        GDPJson= response.text
    else : 
        print(response.status_code)

    return GDPJson

# transform the html table data to pandas data frame and process the data
#@withLog
def transform_IMF_GDP(data):
    # html table to pandas data frame
    df = json.loads(data)
    df = df['values']['NGDPD']
    df = pd.DataFrame(df).transpose()
    #df = df.fillna(0.0)
    df = df.dropna()    
    return df

# merge  imfData with ISO_Countries format data
def mergeCountries(imfData, ISO_Countries):
    iso = ISO_Countries.drop(columns=['alpha-2', 'country-code', 'iso_3166-2', 'region-code', 'sub-region-code', 'intermediate-region-code'])
    imfData = imfData.reset_index()
    result = pd.merge(
        left=iso,
        right=imfData,
        left_on='alpha-3',
        right_on='index',
        how='inner'
    )
    return result

def printOver100B(gdpData, year):
    print(gdpData[gdpData[str(year)] >= 100.0]['name'])

# prin top 5 countries' gdp data average, year is integer and selectable
def printTop5(gdpData, year):
    top5 = gdpData.groupby('sub-region').apply(lambda x: x.sort_values(str(year)).head(5))
    top5 = top5.reset_index(level = 0, drop=True)
    top5 = top5.groupby('sub-region')[str(year)].mean()
    print(top5)

# write the dataFrame to a json file
#@withLog
def load(dataFrame):
    # extract dataFrame by json
    dataFrame.transpose().to_json('Countries_by_GDP.json')        

ISO_Countries = pd.read_csv('./ISO_3166_Countries.csv')
url = 'https://www.imf.org/external/datamapper/api/v1/NGDPD'

a = transform_IMF_GDP(extract_IMF_GDP(url))
b = mergeCountries(a, ISO_Countries)

printOver100B(b, 2024)
printTop5(b, 2024)

load(a)

1                                                Algeria
2                                                 Angola
4                                              Argentina
5                                              Australia
6                                                Austria
                             ...                        
131                                              Türkiye
133                                 United Arab Emirates
134    United Kingdom of Great Britain and Northern I...
135                             United States of America
138                                             Viet Nam
Name: name, Length: 61, dtype: object
sub-region
Australia and New Zealand           1027.1210
Eastern Asia                        4927.7574
Eastern Europe                       395.1750
Latin America and the Caribbean        1.3438
Melanesia                             10.1420
Micronesia                             0.3120
Northern Africa                      108.8924
No

  top5 = gdpData.groupby('sub-region').apply(lambda x: x.sort_values(str(year)).head(5))


# ETL Process with sql
다음은 IMF 에서 받은 데이터를 SQL에 load하는 코드입니다.

In [None]:
import requests
import pandas as pd
import json
import datetime
import sqlite3

# data base settings
database = "World_Economies.db"

# function to send a query
def sendQuery(sql):
    try:
        with sqlite3.connect(database) as conn:
            cursor = conn.cursor()
            cursor.execute(sql)
            conn.commit()
            return cursor.fetchall()
        
    except sqlite3.Error as e:
        print(e)

LOG_DIR = "./etl_project_log.txt"

# urls
url = 'https://www.imf.org/external/datamapper/api/v1/NGDPD'

# backup file name
bakupFile = {url:"imfGDP"}

# decorator for logging
# this decorator helps to log the ETL processes.
# function name, its start and end time, and running time will be logged.
def withLog(func):
    def wrapper(*args, **kwargs):
        with open(LOG_DIR, "a") as f:
            f.write(datetime.datetime.now().strftime("%Y-%B-%d-%H-%M-%S,"))
            f.write(f"{func.__name__},start\n")
            startTime = datetime.datetime.now()

            result = func(*args, **kwargs)

            endTime = datetime.datetime.now()
            f.write(endTime.strftime("%Y-%B-%d-%H-%M-%S,"))
            f.write(f"{func.__name__},end,{endTime-startTime}{","+args[0] if func.__name__ == "extract" else ""}\n")
        return result
    return wrapper   

# get the gdp data from wikipedia
@withLog
def extract(url):
    # find past extract log
    logs = pd.read_csv(LOG_DIR, header=None, names=["time", "function", "status" ,"taken", "url"])
    lastAccess = logs[logs["url"] == url]
    
    # get response
    if lastAccess.empty:
        response = requests.get(url)
    else:
        lastAccess = datetime.datetime.strptime(lastAccess.iloc[-1]['time'],"%Y-%B-%d-%H-%M-%S") - datetime.timedelta(hours=9)
        response = requests.get(url, headers={"if-Modified-Since":lastAccess.strftime('%a, %d %b %Y %H:%M:%S GMT')})
    
    GDPJson = ""

    # check reponse
    if response.status_code == 200:
        GDPJson = response.text
        
        # save the response
        with open(bakupFile[url]+".bak", "w") as f:
            f.write(GDPJson)

    elif response.status_code == 304:
        # read saved response
        try:
            with open(bakupFile[url]+".bak", "r") as f:
                GDPJson = f.read()
        except:
            print(response.status_code) 
    else : 
        print(response.status_code)

    return GDPJson

# transform the html table data to pandas data frame and process the data
@withLog
def transform_IMF_GDP(data):
    # html table to pandas data frame
    df = json.loads(data)
    df = df['values']['NGDPD']
    df = pd.DataFrame(df).transpose()
    return df

# write the dataFrame to a json file
@withLog
def load(dataFrame:pd.DataFrame, tableName:str, index:bool = False, index_label:str = ""):
    # extract dataFrame by jsontry:
    try:
        with sqlite3.connect(database) as conn:
            # Add table name 'gdp_data' and if_exists parameter
            dataFrame.to_sql(tableName, conn, if_exists='replace', index=index, index_label="Country")
            return True
    except Exception as e:
        print(f"Error: {e}")
        return False
    
# main

# import gdp data from imf
imf_gdp = transform_IMF_GDP(extract(url))
load(imf_gdp, "imf_gdp", index=True, index_label="country")

# import iso countries' name data from saved csv file
ISO_Countries = pd.read_csv('./ISO_3166_Countries.csv')
load(ISO_Countries, "iso_country_name")

year = 2024

# print countries whose GDP is over 100B
sql = f"""SELECT DISTINCT i.name
         FROM imf_gdp g
         JOIN iso_country_name i ON i."alpha-3" = g.country
         WHERE g."{year}" >= 100.0;"""
print(*sendQuery(sql), sep="\n")

# print top 5 GDP mean of group by region
sql =f"""WITH ranked_countries AS (
            SELECT 
                g.country,
                i."sub-region",
                g."{year}" as gdp,
                RANK() OVER (PARTITION BY i."sub-region" ORDER BY g."{year}" DESC) as rank
            FROM imf_gdp g
            JOIN iso_country_name i ON i."alpha-3" = g.country
            WHERE g."{year}" IS NOT NULL
        )
        SELECT 
            "sub-region",
            COUNT(country) as country_count,
            ROUND(AVG(gdp), 2) as avg_gdp
        FROM ranked_countries
        WHERE rank <= 5
        GROUP BY "sub-region"
        ORDER BY avg_gdp DESC;"""
print(*sendQuery(sql), sep="\n")

304
