In [1]:
from google.colab import drive
import requests
from tqdm import tqdm
import os
import json
import pandas as pd
import psycopg2
from psycopg2.extras import execute_values

In [2]:
ROOT_DIR = "/content/drive/"
drive.mount(ROOT_DIR, force_remount=True)

Mounted at /content/drive/


In [3]:
PROJECT_DIR = os.path.join(ROOT_DIR,"MyDrive", "DataEngineering - DE - Coderhouse","Projecto Final - DE - Coderhouse")
os.chdir(PROJECT_DIR)
os.listdir(".")

['01_apiretriever.py',
 '01_apiretriever.ipynb',
 'api_token.txt',
 'redshift_credentials.json']

In [4]:
with open("api_token.txt",'r') as f:
  api_token = f.read()
# apis to query

api_urls_to_query = dict(
  milestones = "https://api.estadisticasbcra.com/milestones",
  blue_usd = "https://api.estadisticasbcra.com/usd",
  official_usd = "https://api.estadisticasbcra.com/usd_of"
  )

retrieved_tables = dict()

headers = {"Authorization": f"Bearer {api_token}"}


In [5]:
# retrieve tables an convert them to pd dataframes
for tablename, api_url in tqdm(api_urls_to_query.items()):
  result = requests.get(api_url, headers=headers)
  result_json = result.json()
  df = pd.DataFrame( result_json ).rename(columns={"d":"date","e":"event","v":"value","t":"event_type"})
  retrieved_tables[tablename] = df


100%|██████████| 3/3 [00:02<00:00,  1.19it/s]


In [6]:
retrieved_tables.keys()

dict_keys(['milestones', 'blue_usd', 'official_usd'])

In [7]:
retrieved_tables["official_usd"].dtypes #.dtypes

date      object
value    float64
dtype: object

In [8]:
with open("redshift_credentials.json",'r') as f:
   # json.dump(credentials,f)
   redshift_credentials = json.load(f)

In [16]:
try:
    conn = psycopg2.connect(**redshift_credentials)
    print("Connected to Redshift successfully!")

except Exception as e:
    print("Unable to connect to Redshift.")
    print(e)

Connected to Redshift successfully!


In [17]:
  def create_table(table_name: str, dataframe:pd.DataFrame) -> str:
    """
    Generates SQL schema for creating a table based on the given dataframe's structure.
    
    Parameters:
        - table_name (str): Name of the SQL table to be created.
        - dataframe (pd.DataFrame): DataFrame whose structure will be used to define the table schema.
    
    Returns:
        - str: SQL query string to create the table with the appropriate columns and data types.

    Note:
    This function supports the following data types mapping:
        int64 -> INT
        int32 -> INT
        float64 -> FLOAT
        object -> VARCHAR(300)
        bool -> BOOLEAN
        datetime64[ns] -> DATE
    """
    type_map = {'int64': 'INT','int32': 'INT','float64': 'FLOAT','object': 'VARCHAR(300)','bool':'BOOLEAN', 'datetime64[ns]':'DATE'}
    dtypes= dataframe.dtypes
    cols= list(dtypes.index )
    tipos= list(dtypes.values)
    sql_dtypes = [type_map[str(dtype)] for dtype in tipos]
    column_defs = [f"{name} {data_type}" for name, data_type in zip(cols, sql_dtypes)]
    table_schema = f"""
        CREATE TABLE IF NOT EXISTS {table_name} (
            {', '.join(column_defs)}
        );
        """
    return table_schema

def prepare_insert_values(table_name:str, dataframe:pd.DataFrame) -> str:
    """
    Generates SQL insert statements for the given dataframe's rows.
    
    Parameters:
        - table_name (str): Name of the SQL table to insert the values.
        - dataframe (pd.DataFrame): DataFrame whose values will be prepared for insertion.
    
    Returns:
        - str: SQL query string to insert the dataframe's rows into the specified table.
    """
    cols = dataframe.columns.tolist()
    values_str = ",\n\t\t".join( [str(tuple(x)) for x in dataframe.to_numpy()]   )
    insert_sql = f"""INSERT INTO {table_name}
              ({', '.join( cols )})
     VALUES {values_str};
    """
    return insert_sql

def query_to_df(conn, query):
    """
    Executes a SQL query using a given connection and returns the result as a pandas DataFrame.
    
    Parameters:
        - conn: Database connection object.
        - query (str): SQL query string to be executed.
    
    Returns:
        - pd.DataFrame: DataFrame representation of the SQL query results.
    """
    cur = conn.cursor()
    cur.execute(query)
    retrieved_query = cur.fetchall()
    colnames = [x.name for x in cur.description]
    df_query = pd.DataFrame(retrieved_query , columns = colnames)
    cur.close()
    return df_query

In [18]:
for table_name, dataframe in retrieved_tables.items():
  table_schema = create_table(table_name=table_name,dataframe=dataframe)
  cur = conn.cursor()
  cur.execute(table_schema)
  insert_sql = prepare_insert_values(table_name=table_name,dataframe=dataframe)
  cur.execute(insert_sql)
  cur.close()

In [19]:
for table_name in retrieved_tables.keys():
  print(f"Retrieving data (first 5 rows) from table: {table_name}")
  example = query_to_df(conn, f"SELECT * FROM {table_name} LIMIT 5")
  print("="*30)
  print(example)

Retrieving data (first 5 rows) from table: milestones
         date              event event_type
0  1991-02-05    Roque Fernández       bcra
1  1991-04-01    Domingo Cavallo       econ
2  1995-08-08  Carlos Saúl Menem       pres
3  1996-08-05          Pedro Pou       bcra
4  1996-08-06    Roque Fernández       econ
Retrieving data (first 5 rows) from table: blue_usd
         date   value
0  2000-05-24  1.0005
1  2000-05-25  1.0005
2  2000-05-26  1.0004
3  2000-05-29  1.0007
4  2000-05-30  1.0009
Retrieving data (first 5 rows) from table: official_usd
         date  value
0  2002-03-04   2.01
1  2002-03-05   1.99
2  2002-03-06   2.05
3  2002-03-07   2.14
4  2002-03-08   2.20


In [20]:
conn.close()