In [5]:
import requests as req
import pandas as pd
import os
import psycopg2 as ps
from sqlalchemy import create_engine


def get_data(file_name, columns):
    url = 'https://raw.githubusercontent.com/annexare/Countries/master/data/{}.json'.format(file_name)
    data = req.get(url).json()
    df = pd.DataFrame(list(data.items()),columns=columns)
    return df

def get_data_with_header(file_name):
    url = 'https://raw.githubusercontent.com/annexare/Countries/master/data/{}.json'.format(file_name)
    data = req.get(url).json()
    df= pd.DataFrame(data.values())
    df ['country_code'] = data.keys()
    return df

def save_staging_file(df):
    file_dir = os.path.abspath(os.getcwd()) +'/staging_output'
    file_staged = file_dir + '/staged_data.csv'
    if not os.path.exists(file_dir):
        os.makedirs(os.path.abspath(os.getcwd()) +'/staging_output')
    if os.path.exists(file_staged):
        print('Staging file found. \n Removing staging file: ' + file_staged)
        os.remove(file_staged)
    print('saving new staging file...')
    df.to_csv(file_staged, index=False)
    print('saved results to file: ' + file_staged)
    
    return file_staged

def stage_monitoring_data(file_staged, staging_tbl, prod_tbl):
    # connect to db
    host = 'localhost'
    username = 'postgres'
    database = 'postgres'
    password = 'postgres'

    
    conn = ps.connect(host=host,
                database = database,
                user=username,
                password =password,
                port=5438)
    
    cur = conn.cursor()
    engine = create_engine('postgresql://postgres:postgres@localhost:5438/postgres')
    
    my_df= pd.read_csv(file_staged)
    my_df.to_sql(staging_tbl, con=engine, if_exists='replace',index=None)
    conn.commit()
    
    
    cur.close()
    print("hey i am completed")
    return my_df


def update_all():
    host = 'localhost'
    username = 'postgres'
    database = 'postgres'
    password = 'postgres'

    
    conn = ps.connect(host=host,
                database = database,
                user=username,
                password =password,
                port=5438)
    
    cur = conn.cursor()
    print(">>>>>>>>>>>>>>>>> updating prod table and ensuring idempotence")
    cur.execute(open("update.sql", "r").read())
    conn.commit()
    cur.close()
    print(">>>>>>>>>>>>>>>>> ETL complete and all records successfully pulled and updated")
    


def data_without_headers():
    table = {
        'continents':['continent', 'continent_stg', ['continent_code','continent_full_name']],
        'countries.2to3':['country2to3', 'country2to3_stg', ['country_code_2', 'country_code_3']],
        'countries.3to2':['country3to2', 'country3to2_stg', ['country_code_3', 'country_code_2']]
    }
    for key, value in table.items():
        git_data = get_data(key, value[2])
        file_output = save_staging_file(git_data) 
        stage_monitoring_data(file_staged=file_output, staging_tbl=value[1], prod_tbl=value[0])

def data_with_headers():
    table = {
        'countries':['country', 'country_stg'],
        'languages':['language', 'language_stg']
    }
    for key, value in table.items():
        git_data = get_data_with_header(key)
        file_output = save_staging_file(git_data) 
        stage_monitoring_data(file_staged=file_output, staging_tbl=value[1],prod_tbl=value[0])

if __name__ == "__main__":
    data_without_headers()
    data_with_headers()
    update_all()
    
    

Staging file found. 
 Removing staging file: /Users/profbiyi/Desktop/snowflake_profiling/bayo/staging_output/staged_data.csv
saving new staging file...
saved results to file: /Users/profbiyi/Desktop/snowflake_profiling/bayo/staging_output/staged_data.csv
hey i am completed
Staging file found. 
 Removing staging file: /Users/profbiyi/Desktop/snowflake_profiling/bayo/staging_output/staged_data.csv
saving new staging file...
saved results to file: /Users/profbiyi/Desktop/snowflake_profiling/bayo/staging_output/staged_data.csv
hey i am completed
Staging file found. 
 Removing staging file: /Users/profbiyi/Desktop/snowflake_profiling/bayo/staging_output/staged_data.csv
saving new staging file...
saved results to file: /Users/profbiyi/Desktop/snowflake_profiling/bayo/staging_output/staged_data.csv
hey i am completed
Staging file found. 
 Removing staging file: /Users/profbiyi/Desktop/snowflake_profiling/bayo/staging_output/staged_data.csv
saving new staging file...
saved results to file: /Us

In [11]:
pip freeze

aiofiles==0.8.0
aiohttp @ file:///opt/concourse/worker/volumes/live/9e4bd7d9-f814-4862-5681-30883b9b8d6a/volume/aiohttp_1646806385231/work
aiohttp-cors==0.7.0
aiosignal @ file:///tmp/build/80754af9/aiosignal_1637843061372/work
aiosqlite==0.17.0
alabaster @ file:///home/ktietz/src/ci/alabaster_1611921544520/work
alembic==1.8.0
altair==4.2.0
anaconda-client @ file:///opt/concourse/worker/volumes/live/ce6e20ef-c449-4fb2-5463-03f67f4a0b31/volume/anaconda-client_1635342572171/work
anaconda-project @ file:///tmp/build/80754af9/anaconda-project_1637161053845/work
antlr4-python3-runtime==4.9.3
anyio @ file:///opt/concourse/worker/volumes/live/fdfc134d-03e4-4e6b-4eab-c131ac108813/volume/anyio_1644481717647/work/dist
appdirs==1.4.4
applaunchservices @ file:///Users/ktietz/demo/mc3/conda-bld/applaunchservices_1630511705208/work
appnope @ file:///opt/concourse/worker/volumes/live/5f13e5b3-5355-4541-5fc3-f08850c73cf9/volume/appnope_1606859448618/work
appscript @ file:///opt/concourse

Note: you may need to restart the kernel to use updated packages.
