# Junior Data Engineer test task

This test task consists of several parts. You will create a simple data download pipeline, as well as data presentation dashboard. The task consists of 3 parts.

## Part 1. Data

You will need to download the data about several countries using Python.

1. Study this API - https://restcountries.com/#rest-countries. Essential is data about countries, flags and population, but try to download as much information about countries as possible.
2. The code should be production-ready, so use your best practices. If there is something you want to point out, or something that you paid special attention to when working with this API, feel free to write comments in the code, they will be appreciated.  

As a result of this step you should have a code to receive data into pandas dataframe.

## Part 2. Storage

You will need to implement persistent storage for your data.

1. Write a docker-compose file to start any DB of your choice (PostgreSQL/MySQL/other). If needed, provide a CLI command to start it.
2. Update your code to include write operation to your DB.

As a result of this step you should have updated code to write data into DB, as well as docker-compose file for your DB and command to run this docker environment.

# Part 3. Visualization
You will need to create a simple visualization of your data. We recommend using Dash (https://dash.plotly.com/), it is an easy and powerful tool and will work well for this task. It is not important if you have not used it before, the goal here is to study the new technology and use it in the scope of your task.

1. Create a visualization - it should include a table with data and block with flags.
2. Table should support sorting and include all data you have saved to database.
3. Block with flags should display flag of the country selected in the table right now (flag urls are available from the API).

Layout and UX is not important here.
Example of what is expected:

![Alt Text](https://drive.usercontent.google.com/download?id=1wIZFI-O2E-VYbDCrSz9T-xayboT7mWRR)

## Finalization

As a result you should have:

- Python file for data download
- Docker-compose file for DB
- Python file for visualization
- Readme.md file that describes how to use everything

The reviewer should be able to reproduce everything using your instructions. Upload everything to GitHub and send us the link.

Good luck!

# STEP 1

In [None]:
import requests
import pandas as pd
from typing import List, Dict

In [None]:
# CONSTANTS
#
# according to the API documentation, thouth, we should limit our ALL request with up to 10 fields
# countries, flags and population are required, others I chose based on potential use cases
FIELDS = ["ccn3", "name", "flags", "population", "capital", "region", "languages", "area", "currencies", "gini"]

# use ALL endpoint to request all information with selected fields
URL = f"https://restcountries.com/v3.1/all?fields={','.join(FIELDS)}"

In [None]:
# fetching data from API
def fetch_countries_data() -> List[Dict]:
    """
    Extract
    Fetching 10 selected fields from countries API

    Returns:
        List of dicts by countries
    """
    try:
        response = requests.get(URL, timeout=10)
        response.raise_for_status()
        return response.json()
    except requests.RequestException as error:
        raise SystemExit(f"Fetching API's data ERROR: {error}")

In [None]:
# API's countries data response normalization
def normalize_countries_data(raw_countries_data: List[Dict]) -> Dict[str, pd.DataFrame]:
    """
    Transform
    Transforms JSON data into pandas DataFrames

    Returns:
        Dict of pandas DataFrames:
    """

    # using this 10 fields request we can get columns:
    #
    # common name, official name
    # currency name, currency symbol, 
    # capital, region, subregion, area, flag, population, gini in given year

    # Let's use Star schema for this data and create next tables:
    # countries, currencies, langueages, country_language, country_currency
    #
    # Countries schema:
    # country_code:    int
    # common_name:     String,
    # official_name:   String,
    # capital:         String,
    # region:          String,
    # flag_url:        String,
    # area:            float,
    # population       int
    # gini:            float
    #
    # Currencies schema:
    # currency_code:   String
    # currency_name:   String
    # currency_symbol: String
    #
    # Languages schema:
    # language_code:   String
    # language_name:   String
    #
    # Countries_currencies schema:
    # country_code:    int
    # currency_code:   String
    #
    # Countries_languages schema:
    # country_code:    int
    # language_code:   String

    # lets try to normalize this data first
    normalized_countries = []

    normalized_currencies = {}
    countries_currencies = []

    normalized_languages = {}
    countries_languages = []


    # cheack for doubles
    country_codes = set()
    
    for country in raw_countries_data:
        code = country.get("ccn3")
        if code in country_codes:
            print("found a double")
            continue
        country_codes.add(code)    
        
        try:
            # Skip if country code is missing or invalid
            ccn3 = country.get("ccn3")
            if not ccn3 or not str(ccn3).isdigit():
                continue
            country_code = int(ccn3)
            

            # handle gini
            gini_data = country.get("gini", {})
            gini_value = list(gini_data.values())[0] if gini_data else None

            # handle capitals. in case if some countries have none
            # for simplification lets ignore rare cases with multiple capitals
            capital_data = country.get("capital")
            capital = capital_data[0] if capital_data and len(capital_data) > 0 else None

            # creates country row
            normalized_country = {
                "country_code":          country_code,
                "country_name":          country.get("name", {}).get("common"),
                "official_country_name": country.get("name", {}).get("official"),
                "capital":               capital,                      
                "region":                country.get("region"),
                "flag_url":              country.get("flags", {}).get("png"),
                "area":                  country.get("area"),
                "population":            country.get("population"),
                "gini":                  gini_value
            }
            normalized_countries.append(normalized_country)

            # creates currency row with unique values
            currencies = country.get("currencies", {})
            for cur_code, cur_data in currencies.items():
                if isinstance(cur_data, dict):
                    if cur_code not in normalized_currencies:
                        normalized_currencies[cur_code] = {
                            "currency_code":   cur_code,
                            "currency_name":   cur_data.get("name"),
                            "currency_symbol": cur_data.get("symbol")
                        }
                    # creates junction country_currency row
                    countries_currencies.append({
                        "country_code":  country_code,
                        "currency_code": cur_code
                    })
            
            # creates languages row with unique values
            languages = country.get("languages", {})
            for lan_code, lan_name in languages.items():
                if isinstance(lan_name, str):
                    if lan_code not in normalized_languages:
                        normalized_languages[lan_code] = {
                            "language_code": lan_code,
                            "language_name": lan_name
                        }
                    # creates junction country_language row
                    countries_languages.append({
                        "country_code": country_code,
                        "language_code": lan_code
                    })
        except Exception as error:
            print(f"Error while normalizing country: {error}. Proccessing to the next one.")
            print(f"Error from country: {country.get('name', {}).get('common')}")
            continue
            
    normalized_currencies = list(normalized_currencies.values())
    normalized_languages = list(normalized_languages.values())

    return {
        "countries":            pd.DataFrame(normalized_countries),
        "currencies":           pd.DataFrame(normalized_currencies),
        "languages":            pd.DataFrame(normalized_languages),
        "countries_currencies": pd.DataFrame(countries_currencies),
        "countries_languages":  pd.DataFrame(countries_languages)
    }

In [None]:
pip install sqlalchemy psycopg2-binary pandas

# STEP 2

In [None]:
# import pandas as pd
from sqlalchemy import create_engine, text, Integer, String, Text, Numeric, BigInteger

In [None]:
# load normalized data from the first step
fetched_data = fetch_countries_data()
normalized_dataframes = normalize_countries_data(fetched_data)

In [None]:
# connect to PostgreSQL
DB_URL = "postgresql://admin:admin@localhost:5432/countries_db"
engine = create_engine(DB_URL)

In [None]:
# save normalized DataFrames into the databases tables
for table_name, df in normalized_dataframes.items():
    df.to_sql(
        table_name,
        engine,
        if_exists="replace",
        index=False
    )
    print(f"{table_name} saved!")

print("All DataFrames saved to PostgreSQL!")

In [None]:
# Save normalized DataFrames into the databases tables
#
# Clear existing tables with cascade to avoid dependency errors
with engine.begin() as con:
    con.execute(text("DROP TABLE IF EXISTS countries_languages CASCADE;"))
    con.execute(text("DROP TABLE IF EXISTS countries_currencies CASCADE;"))
    con.execute(text("DROP TABLE IF EXISTS countries CASCADE;"))
    con.execute(text("DROP TABLE IF EXISTS currencies CASCADE;"))
    con.execute(text("DROP TABLE IF EXISTS languages CASCADE;"))
    con.execute(text("DROP VIEW IF EXISTS countries_summary;"))

# MAIN tables first
# countries table
normalized_dataframes["countries"].to_sql(
    "countries",
    engine,
    if_exists="replace",
    index=False,
    dtype={
        "country_code":          Integer(),
        "country_name":          String(100),
        "official_country_name": Text(),
        "capital":               String(100),
        "region":                String(50),
        "flag_url":              Text(),
        "area":                  Numeric(12, 2),
        "population":            BigInteger(),
        "gini":                  Numeric(4, 2)
    }
)

# currencies table
normalized_dataframes["currencies"].to_sql(
    "currencies",
    engine,
    if_exists="replace",
    index=False,
    dtype={
        "currency_code":   String(3),
        "currency_name":   String(50),
        "currency_symbol": String(15)
    }
)

# languages table
normalized_dataframes["languages"].to_sql(
    "languages",
    engine,
    if_exists="replace",
    index=False,
    dtype={
        "language_code": String(3),
        "language_name": String(50)
    }
)

# Now JUNCTION tables
# Countries_currencies table
normalized_dataframes["countries_currencies"].to_sql(
    "countries_currencies",
    engine,
    if_exists="replace",
    index=False
)

# Countries_languages table
normalized_dataframes["countries_languages"].to_sql(
    "countries_languages",
    engine,
    if_exists="replace",
    index=False
)

with engine.begin() as con:
    # PRIMARY KEYS
    con.execute(text("ALTER TABLE countries ADD PRIMARY KEY (country_code);"))
    con.execute(text("ALTER TABLE currencies ADD PRIMARY KEY (currency_code);"))
    con.execute(text("ALTER TABLE languages ADD PRIMARY KEY (language_code);"))

    # primary keys for junction tables
    con.execute(text("""
        ALTER TABLE countries_currencies
        ADD PRIMARY KEY (country_code, currency_code);
    """))
    con.execute(text("""
        ALTER TABLE countries_languages
        ADD PRIMARY KEY (country_code, language_code);
    """))

    # add foreign keys countries_currencies
    con.execute(text("""
        ALTER TABLE countries_currencies
        ADD CONSTRAINT fk_countries FOREIGN KEY (country_code) REFERENCES countries(country_code) ON DELETE CASCADE,
        ADD CONSTRAINT fk_currencies FOREIGN KEY (currency_code) REFERENCES currencies(currency_code) ON DELETE CASCADE;
    """))

    # add foreign keys countries_languages
    con.execute(text("""
        ALTER TABLE countries_languages
        ADD CONSTRAINT fk_countries FOREIGN KEY (country_code) REFERENCES countries(country_code) ON DELETE CASCADE,
        ADD CONSTRAINT fk_languages  FOREIGN KEY (language_code) REFERENCES languages(language_code) ON DELETE CASCADE;
    """))

        # add constrains
    con.execute(text("""
        ALTER TABLE countries
        ADD CONSTRAINT chk_area_positive CHECK (area > 0),
        ADD CONSTRAINT chk_population_positive CHECK (population >= 0),
        ADD CONSTRAINT chk_gini_range CHECK (gini IS NULL OR (gini BETWEEN 0 AND 100));
    """))

    # create indexes
    con.execute(text("""
        CREATE INDEX idx_countries_region ON countries(region);
        CREATE INDEX idx_countries_population ON countries(population);
    """))

    # create data marts for visualization
    con.execute(text("""
        CREATE OR REPLACE VIEW countries_summary AS
        SELECT
            c.country_name          AS "Country Name",
            c.official_country_name AS "Official Country Name",
            CASE
                WHEN c.capital IS NULL
                    THEN 'N/A'
                ELSE c.capital
            END                     AS "Capital",
            c.region                AS "Region",
            c.area                  AS "Area",
            c.population            AS "Population",
            c.gini                  AS "GINI",
            c.flag_url,
            CASE
                WHEN COUNT(cr.currency_name)=0
                    THEN 'N/A'
                ELSE STRING_AGG(DISTINCT cr.currency_name, ', ')
            END AS "Currencies",
            CASE
                WHEN COUNT(l.language_name)=0
                    THEN 'N/A'
                ELSE STRING_AGG(DISTINCT l.language_name, ', ') 
            END AS "Languages"
        FROM countries c
        
        LEFT JOIN countries_currencies cc USING(country_code)
        LEFT JOIN currencies cr USING(currency_code)
        LEFT JOIN countries_languages cl USING(country_code)
        LEFT JOIN languages l USING(language_code)

        GROUP BY
            c.country_name, c.official_country_name, c.capital, c.region, c.area, c.population, c.gini, c.flag_url
    """))

# STEP 3

In [None]:
from typing import List, Optional, Dict, Any
import dash
from dash import Dash, dash_table, dcc, html, Input, Output
from dash.development.base_component import Component


# init dash app
app = Dash(__name__)

# Connect to PostgreSQL
DB_URL = "postgresql://admin:admin@localhost:5432/countries_db"
engine = create_engine(DB_URL)

# read data
def load_data() -> List[Dict]:
    """
    Load country summary data from the database view.

    Returns:
        List of dictionaries.
    """
    with engine.connect() as con:
        query_result = con.execute(text("SELECT * FROM countries_summary;"))
        return [dict(row) for row in query_result.mappings()]

# get regions for additional filtering
def get_regions() -> List[str]:
    """Get unique regions from the DB"""
    with engine.connect() as con:
        query_result = con.execute(text("SELECT DISTINCT \"Region\" FROM countries_summary ORDER BY \"Region\";"))
        return [row[0] for row in query_result] 

display_data = load_data()
columns = [{"name": key, "id": key} for key in display_data[0].keys() if key != "flag_url"]
regions = get_regions()

# layout
app.layout = html.Div([
    html.H1("Countries Dashboard.", style={'textAlign': 'center'}),
    # Main container
    html.Div([
        # Table container (left side)
        html.Div([
            dash_table.DataTable(
                id="country_table",
                columns=columns,
                data=display_data,
                sort_action="native",
                row_selectable="single",
                style_table={
                    "overflowX": "auto",
                    "height": "75vh",
                    "overflowY": "auto"
                },
                page_size=15,
                style_cell={
                    'textAlign': 'center',
                    'padding': '8px',
                    'whiteSpace': 'normal',
                    'height': 'auto'
                },
                style_header={
                    'backgroundColor': 'lightgrey',
                    'fontWeight': 'bold',
                    'position': 'sticky',
                    'top': 0
                },
                style_data={
                    'border': '1px solid lightgrey'
                }
            ),
        ], style={
            'width': '65%',
            'display': 'inline-block',
            'verticalAlign': 'top',
            'padding': '10px'
        }),
        # Side container (right side)
        html.Div([
            # Region filter container
            html.Div([
                html.H3("Filters", style={
                    "marginBottom": "20px",
                    'borderBottom': '1px solid #ddd',
                    'paddingBottom': '10px'
                }),
                html.Label("Filter by Region", style={'fontWeight': 'bold'}),
                dcc.RadioItems(
                    id="region-filter",
                    options=[{"label": "All Regions", "value": "all"}] + [{"label": region, "value": region} for region in regions],
                    value="all",
                    labelStyle={'display': 'block', 'margin': '5px 0'},
                    style={"marginBottom":"20px"}
                )
        ], style={
                'padding': '20px',
                'backgroundColor': '#f8f9fa',
                'borderRadius': '8px'
            }),
            # Flag section
            html.Div(id="flag_container", style={
                "marginTop": "20px",
                "padding": "20px",
                "textAlign": "center",
                "backgroundColor": "#f8f9fa",
                "borderRadius": "8px"
            })
        ], style={
            'width': '30%',
            'display': 'inline-block',
            'verticalAlign': 'top',
        })
    ], style={
        "width": "95%",
        "margin": "0 auto"
    })
])


@app.callback(
    Output("country_table", "data"),
    Input("region-filter", "value")
)
def update_table(selected_region: str) -> List[Dict]:
    """
    Update table based on selected region
    """
    filtered_data = display_data.copy()
    if selected_region != "all":
        filtered_data = [row for row in filtered_data if row["Region"] == selected_region]

    return filtered_data

@app.callback(
    Output("flag_container", "children"),
    Input("country_table", "selected_rows")
)

def update_flag(selected_rows: Optional[List[int]]) -> Component:
    """
    Displays the flag of the selected country.

    Returns:
        A dash component - a flag or a placeholder. 
    """

    if not selected_rows or len(selected_rows) == 0:
        return html.Div("Select a country to see its flag.")

    try:
        selected_country: Dict[str, Any] = display_data[selected_rows[0]]

        return html.Div([
            html.H3(
                f"Flag of {selected_country['Country Name']}.",
                style={"textAlign": "center"}
            ),
            html.Img(
                src=selected_country["flag_url"],
                style={
                    "height": "150px",
                    "border": "1px solid black",
                    "display": "block",
                    "margin": "0 auto"
                },
                alt=f"Flag of {selected_country['Country Name']}."
            ),
            html.P(
                f"Capital: {selected_country['Capital']} | Region: {selected_country['Region']}",
                style={"textAlign": "center"}
            )
        ])
    except (IndexError, KeyError) as error:
        return html.Div("Error loading country data.", style={"color": "red"})

In [None]:
# launch app
if __name__ == "__main__":
    app.run(debug=True)