In [85]:
from google.colab import drive
import requests
from tqdm import tqdm
import os
import json
import pandas as pd
import psycopg2
from psycopg2.extras import execute_values

In [86]:
ROOT_DIR = "/content/drive/"
drive.mount(ROOT_DIR, force_remount=True)

Mounted at /content/drive/


In [92]:
PROJECT_DIR = os.path.join(ROOT_DIR,"MyDrive", "DE","CHPF")
os.chdir(PROJECT_DIR)

In [88]:
with open("api_token.txt",'r') as f:
  api_token = f.read()

api_urls_to_query = dict(
  milestones = "https://api.estadisticasbcra.com/milestones",
  blue_usd = "https://api.estadisticasbcra.com/usd",
  official_usd = "https://api.estadisticasbcra.com/usd_of"
  )
retrieved_tables = dict()
headers = {"Authorization": f"Bearer {api_token}"}

In [91]:
for tablename, api_url in tqdm(api_urls_to_query.items()):
  result = requests.get(api_url, headers=headers)
  result_json = result.json()
  df = pd.DataFrame( result_json ).rename(columns={"d":"date","e":"event","v":"value","t":"event_type"})
  df["date"] = pd.to_datetime(df["date"], format='%Y-%m-%d')
  retrieved_tables[tablename] = df

100%|██████████| 3/3 [00:00<00:00,  4.21it/s]


In [93]:
retrieved_tables.keys()

dict_keys(['milestones', 'blue_usd', 'official_usd'])

In [117]:
retrieved_tables["official_usd"]["date"].astype(str)

0       2002-03-04
1       2002-03-05
2       2002-03-06
3       2002-03-07
4       2002-03-08
           ...    
5302    2023-10-05
5303    2023-10-06
5304    2023-10-09
5305    2023-10-10
5306    2023-10-11
Name: date, Length: 5307, dtype: object

In [95]:
with open("redshift_credentials.json",'r') as f:
   redshift_credentials = json.load(f)

In [96]:
schema_name = "javier_coderhouse"

In [118]:
def create_table(table_name: str, schema_name: str, dataframe: pd.DataFrame, primary_key: str = None, foreign_keys: dict = None, diststyle: str = "EVEN", distkey: str = "", sortkeys: list = None) -> str:
    """
    Generates SQL schema for creating a table in the specified schema based on the given dataframe's structure,
    including primary key and foreign key constraints if specified, and with considerations for DISTSTYLE and SORTKEY
    for performance optimization in Redshift.

    Parameters:
    - table_name (str): Name of the SQL table to be created.
    - schema_name (str): Name of the SQL schema where the table will be created.
    - dataframe (pd.DataFrame): DataFrame whose structure will be used to define the table schema.
    - primary_key (str): Column name to be set as primary key. Optional.
    - foreign_keys (dict): A dictionary where keys are column names that should be foreign keys, 
                           and values are tuples containing the reference table and reference column, respectively. Optional.
    - diststyle (str): Distribution style of the table (EVEN, KEY, ALL).
    - distkey (str): Column name to be used as the DISTKEY for KEY distribution style.
    - sortkeys (list): A list of column names to be used as the SORTKEY. Use a single element list for a single sort key,
                       or a multiple element list for compound sort keys.
    
    Returns:
    - str: SQL query string to create the table within the specified schema with the appropriate columns, 
           data types, and constraints, including DISTSTYLE and SORTKEY configurations.
    Note:
    This function supports the following data types mapping:
    int64 -> INT
    int32 -> INT
    float64 -> FLOAT
    object -> VARCHAR(300)
    bool -> BOOLEAN
    datetime64[ns] -> DATE
    """
    type_map = {'int64': 'INT', 'int32': 'INT', 'float64': 'FLOAT',
                'object': 'VARCHAR(300)', 'bool': 'BOOLEAN', 'datetime64[ns]': 'TIMESTAMP'} 
    column_defs = [f"{col} {type_map[str(dtype)]}" for col, dtype in dataframe.dtypes.items()]
    
    if primary_key:
        column_defs.append(f"PRIMARY KEY ({primary_key})")
    
    if foreign_keys:
        for column, (reference_table, reference_column) in foreign_keys.items():
            column_defs.append(f"FOREIGN KEY ({column}) REFERENCES {reference_table}({reference_column})")
    
    dist_clause = f"DISTSTYLE {diststyle}"
    if diststyle.upper() == "KEY" and distkey:
        dist_clause += f" DISTKEY({distkey})"
    
    sort_clause = ""
    if sortkeys:
        if len(sortkeys) > 1:
            sort_clause = f"SORTKEY({', '.join(sortkeys)})"
        else:
            sort_clause = f"SORTKEY({sortkeys[0]})"
    
    table_schema = f"""
        CREATE TABLE IF NOT EXISTS {schema_name}.{table_name} (
            {', '.join(column_defs)}
        )
        {dist_clause}
        {sort_clause};
    """
    return table_schema

def prepare_insert_values(table_name: str, schema_name: str, dataframe: pd.DataFrame) -> str:
    """
    Generates SQL insert statements for the given dataframe's rows into the specified table within a given schema in Redshift.
    
    Parameters:
    - table_name (str): Name of the SQL table to insert the values.
    - schema_name (str): Name of the SQL schema where the table resides.
    - dataframe (pd.DataFrame): DataFrame whose values will be prepared for insertion.
    
    Returns:
    - str: SQL query string to insert the dataframe's rows into the specified table within the given schema.
    
    Note:
    This function converts the 'date' column of the dataframe to string in the format compatible with Redshift before insertion.
    """
    # Ensure that 'date' columns in the dataframe are converted to a string format compatible with Redshift
    for col, dtype in dataframe.dtypes.items():
        if "datetime64[ns]" in str(dtype):
            dataframe[col] = dataframe[col].dt.strftime('%Y-%m-%d %H:%M:%S')  # Assuming you want to keep time information

    cols = dataframe.columns.tolist()
    values_str_list = []
    for _, row in dataframe.iterrows():
        row_values = ', '.join([f"'{str(item).replace("'", "''")}'" if isinstance(item, str) else (str(item) if not pd.isnull(item) else 'NULL') for item in row])
        values_str_list.append(f"({row_values})")

    values_str = ",\n".join(values_str_list)
    insert_sql = f"INSERT INTO {schema_name}.{table_name} ({', '.join(cols)}) VALUES {values_str};"
    
    return insert_sql

def query_to_df(query):
    """
    Executes a SQL query using a predefined connection to a Redshift database and returns the result as a pandas DataFrame.
    
    Parameters:
    - query (str): SQL query string to be executed.
    
    Returns:
    - pd.DataFrame: DataFrame representation of the SQL query results.
    
    Note:
    This function utilizes global Redshift credentials for establishing a connection.
    Ensure that 'redshift_credentials' is properly defined and accessible before invoking this function.
    """
    conn = psycopg2.connect(**redshift_credentials)
    cur = conn.cursor()
    cur.execute(query)
    retrieved_query = cur.fetchall()
    colnames = [x.name for x in cur.description]
    df_query = pd.DataFrame(retrieved_query , columns = colnames)
    cur.close()
    conn.close()
    return df_query

In [120]:
for table_name, dataframe in retrieved_tables.items():
  conn = psycopg2.connect(**redshift_credentials)
  table_schema = create_table(table_name=table_name,schema_name=schema_name, dataframe=dataframe, primary_key='date', foreign_keys= None, diststyle='EVEN', sortkey='sort_column')
  cur = conn.cursor()
  cur.execute("BEGIN")
  cur.execute(table_schema)
  insert_sql = prepare_insert_values(table_name=table_name, schema_name=schema_name,dataframe=dataframe)
  cur.execute(insert_sql)
  cur.execute("COMMIT")
  cur.close()
  conn.close()

In [121]:
for table_name in retrieved_tables.keys():
  print(f"Retrieving data (first 5 rows) from table: {table_name}")
  example = query_to_df( f"SELECT * FROM {schema_name}.{table_name} LIMIT 5")
  print("="*30)
  print(example)

Retrieving data (first 5 rows) from table: milestones
         date              event event_type
0  1991-02-05    Roque Fernández       bcra
1  1991-04-01    Domingo Cavallo       econ
2  1995-08-08  Carlos Saúl Menem       pres
3  1996-08-05          Pedro Pou       bcra
4  1996-08-06    Roque Fernández       econ
Retrieving data (first 5 rows) from table: blue_usd
         date   value
0  2000-05-24  1.0005
1  2000-05-25  1.0005
2  2000-05-26  1.0004
3  2000-05-29  1.0007
4  2000-05-30  1.0009
Retrieving data (first 5 rows) from table: official_usd
         date  value
0  2002-03-04   2.01
1  2002-03-05   1.99
2  2002-03-06   2.05
3  2002-03-07   2.14
4  2002-03-08   2.20
