In [1]:
import os
from dotenv import load_dotenv
os.chdir(os.path.expanduser(".."))

In [2]:
from src.collect.collect import SDMXCollector
import pandas as pd
from io import StringIO

In [3]:
from sqlalchemy import create_engine, URL

### Get the balance of payments

In [4]:
load_dotenv()

pg_user = os.environ["POSTGRES_USER"]
pg_db = os.environ["POSTGRES_DB"]

url = URL.create(
  "postgresql+psycopg2",
  username=pg_user,
  database=pg_db
)

con = create_engine(url)

In [5]:
def sample_to_pandas(sample, 
                     parse_dates: list[str] = None):

  df = pd.read_csv(StringIO(sample),
            parse_dates=parse_dates,
            engine="pyarrow")

  return df

In [6]:
def factorize(df: pd.DataFrame):
  obj_cols = df.keys()[df.dtypes == "object"]
  factor_array = []

  for col in obj_cols:
    indices, factors = pd.factorize(df[col])
    df.loc[:, col] = indices
    factor_array.append( (col, factors) )

  return df, factor_array

In [7]:
collector = SDMXCollector("sdmx.oecd.org/public", "rest")

n_args = 8
flow_ref = ["OECD.SDD.TPS", "DSD_BOP@DF_BOP", ""]

sample = collector.get(flow_ref, n_args=n_args, params={"format": "csv"})

In [8]:
df = sample_to_pandas(sample, parse_dates=["TIME_PERIOD"])
df.drop("DATAFLOW", axis=1, inplace=True)
df.head()

  df = pd.read_csv(StringIO(sample),


Unnamed: 0,REF_AREA,COUNTERPART_AREA,MEASURE,ACCOUNTING_ENTRY,FS_ENTRY,FREQ,UNIT_MEASURE,ADJUSTMENT,TIME_PERIOD,OBS_VALUE,OBS_STATUS,UNIT_MULT,CURRENCY,DECIMALS
0,NLD,WXD,S,C,T,Q,XDC,Y,2003-04-01,16209.56,A,6,EUR,2
1,NLD,WXD,S,C,T,Q,XDC,Y,2003-07-01,16491.17,A,6,EUR,2
2,NLD,WXD,S,C,T,Q,XDC,Y,2003-10-01,16172.25,A,6,EUR,2
3,NLD,WXD,S,C,T,Q,XDC,Y,2004-01-01,16250.9,A,6,EUR,2
4,NLD,WXD,S,C,T,Q,XDC,Y,2004-04-01,16740.05,A,6,EUR,2


In [9]:
df, factor_array = factorize(df)
df.head()

Unnamed: 0,REF_AREA,COUNTERPART_AREA,MEASURE,ACCOUNTING_ENTRY,FS_ENTRY,FREQ,UNIT_MEASURE,ADJUSTMENT,TIME_PERIOD,OBS_VALUE,OBS_STATUS,UNIT_MULT,CURRENCY,DECIMALS
0,0,0,0,0,0,0,0,0,2003-04-01,16209.56,0,6,0,2
1,0,0,0,0,0,0,0,0,2003-07-01,16491.17,0,6,0,2
2,0,0,0,0,0,0,0,0,2003-10-01,16172.25,0,6,0,2
3,0,0,0,0,0,0,0,0,2004-01-01,16250.9,0,6,0,2
4,0,0,0,0,0,0,0,0,2004-04-01,16740.05,0,6,0,2


In [10]:
df.to_sql(name="balance_of_pay", con=con, if_exists='replace')

695

### Get interest rates 

In [11]:
n_args = 7
flow_ref = ["OECD.SDD.STES", "DSD_KEI@DF_KEI", "4.0"]

sample = collector.get(flow_ref, n_args=n_args, params={"format": "csv"})

In [12]:
df = sample_to_pandas(sample, parse_dates=["TIME_PERIOD"])
df.drop("DATAFLOW", axis=1, inplace=True)

df, factor_array_ir = factorize(df)
df.head()

Unnamed: 0,REF_AREA,FREQ,MEASURE,UNIT_MEASURE,ACTIVITY,ADJUSTMENT,TRANSFORMATION,TIME_PERIOD,OBS_VALUE,OBS_STATUS,UNIT_MULT,DECIMALS,BASE_PER
0,0,0,0,0,0,0,0,0,0.962752,0,0,1,
1,0,0,0,0,0,0,0,1,2.519469,0,0,1,
2,0,0,0,0,0,0,0,2,4.938903,0,0,1,
3,0,0,0,0,0,0,0,3,5.057707,0,0,1,
4,0,0,0,0,0,0,0,4,5.473367,0,0,1,


In [13]:
df.to_sql(name="interest_rate", con=con, if_exists='replace')

488

### Exchange rate

In [14]:
collector = SDMXCollector("data-api.ecb.europa.eu", "service")
flow_ref = "EXR"

collector.make_url(flow_ref, params={"format": "csvdata"})

'https://data-api.ecb.europa.eu/service/data/EXR?format=csvdata'

In [15]:
sample = collector.get(flow_ref, params={"format": "csvdata"})

In [16]:
df = sample_to_pandas(sample)
df.drop("KEY", axis=1, inplace=True)
df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 3334957 entries, 0 to 3334956
Data columns (total 31 columns):
 #   Column           Dtype  
---  ------           -----  
 0   FREQ             object 
 1   CURRENCY         object 
 2   CURRENCY_DENOM   object 
 3   EXR_TYPE         object 
 4   EXR_SUFFIX       object 
 5   TIME_PERIOD      object 
 6   OBS_VALUE        float64
 7   OBS_STATUS       object 
 8   OBS_CONF         object 
 9   OBS_PRE_BREAK    float64
 10  OBS_COM          object 
 11  TIME_FORMAT      object 
 12  BREAKS           float64
 13  COLLECTION       object 
 14  COMPILING_ORG    float64
 15  DISS_ORG         float64
 16  DOM_SER_IDS      float64
 17  PUBL_ECB         float64
 18  PUBL_MU          float64
 19  PUBL_PUBLIC      float64
 20  UNIT_INDEX_BASE  object 
 21  COMPILATION      object 
 22  COVERAGE         float64
 23  DECIMALS         int64  
 24  NAT_TITLE        float64
 25  SOURCE_AGENCY    object 
 26  SOURCE_PUB       float64
 27  TITLE       

In [17]:
df = df[df['FREQ'] == 'D'].astype({"TIME_PERIOD": "datetime64[ns]"})
df.info()

<class 'pandas.core.frame.DataFrame'>
Index: 2800549 entries, 37314 to 2837862
Data columns (total 31 columns):
 #   Column           Dtype         
---  ------           -----         
 0   FREQ             object        
 1   CURRENCY         object        
 2   CURRENCY_DENOM   object        
 3   EXR_TYPE         object        
 4   EXR_SUFFIX       object        
 5   TIME_PERIOD      datetime64[ns]
 6   OBS_VALUE        float64       
 7   OBS_STATUS       object        
 8   OBS_CONF         object        
 9   OBS_PRE_BREAK    float64       
 10  OBS_COM          object        
 11  TIME_FORMAT      object        
 12  BREAKS           float64       
 13  COLLECTION       object        
 14  COMPILING_ORG    float64       
 15  DISS_ORG         float64       
 16  DOM_SER_IDS      float64       
 17  PUBL_ECB         float64       
 18  PUBL_MU          float64       
 19  PUBL_PUBLIC      float64       
 20  UNIT_INDEX_BASE  object        
 21  COMPILATION      object        


In [19]:
df.head(3)

Unnamed: 0,FREQ,CURRENCY,CURRENCY_DENOM,EXR_TYPE,EXR_SUFFIX,TIME_PERIOD,OBS_VALUE,OBS_STATUS,OBS_CONF,OBS_PRE_BREAK,...,COMPILATION,COVERAGE,DECIMALS,NAT_TITLE,SOURCE_AGENCY,SOURCE_PUB,TITLE,TITLE_COMPL,UNIT,UNIT_MULT
37314,D,ARS,EUR,SP00,A,2000-01-13,1.02745,A,,,...,,,5,,4F0,,Argentine peso/Euro,"Indicative exchange rate, Argentine peso/Euro,...",ARS,0
37315,D,ARS,EUR,SP00,A,2000-01-14,1.02232,A,,,...,,,5,,4F0,,Argentine peso/Euro,"Indicative exchange rate, Argentine peso/Euro,...",ARS,0
37316,D,ARS,EUR,SP00,A,2000-01-17,1.0087,A,,,...,,,5,,4F0,,Argentine peso/Euro,"Indicative exchange rate, Argentine peso/Euro,...",ARS,0


In [20]:
mask = df['TIME_PERIOD'] > pd.Timestamp("2015-01-01")
df = df[mask]
df, factor_array_exr = factorize(df)

df.head()

Unnamed: 0,FREQ,CURRENCY,CURRENCY_DENOM,EXR_TYPE,EXR_SUFFIX,TIME_PERIOD,OBS_VALUE,OBS_STATUS,OBS_CONF,OBS_PRE_BREAK,...,COMPILATION,COVERAGE,DECIMALS,NAT_TITLE,SOURCE_AGENCY,SOURCE_PUB,TITLE,TITLE_COMPL,UNIT,UNIT_MULT
41205,0,0,0,0,0,2015-01-02,10.3,0,-1,,...,-1,,5,,0,,0,0,0,0
41206,0,0,0,0,0,2015-01-05,10.1963,0,-1,,...,-1,,5,,0,,0,0,0,0
41207,0,0,0,0,0,2015-01-06,10.1883,0,-1,,...,-1,,5,,0,,0,0,0,0
41208,0,0,0,0,0,2015-01-07,10.1131,0,-1,,...,-1,,5,,0,,0,0,0,0
41209,0,0,0,0,0,2015-01-08,10.0887,0,-1,,...,-1,,5,,0,,0,0,0,0


In [22]:
df.to_sql(name="exchange_rates", con=con, if_exists='replace')

621

In [23]:
def index_to_df(index_names: list[str]) -> pd.DataFrame:
  output_dict = {
    "name": index_names
  }

  return pd.DataFrame(output_dict)

In [24]:
def factor_arr_to_df_list(name: str, arr: list[tuple[str, pd.Index]]) -> list[tuple[str, pd.DataFrame]]:
  return [(f"{name}_{col}".lower(), index_to_df(index)) for col, index in arr]

In [25]:
dimension_tables = factor_arr_to_df_list("bop", factor_array)

dimension_tables.extend(factor_arr_to_df_list("int_rates", factor_array_ir))
dimension_tables.extend(factor_arr_to_df_list("ex_rates", factor_array_exr))

In [26]:
for tbl_name, tbl in dimension_tables:
  tbl.to_sql(name=tbl_name, con=con, if_exists="replace")