In [41]:
import pandas as pd
import sqlite3

## Function definitions

In [42]:
def get_data_from_api():
    print('Reading...')
    dfj = pd.read_json('api/data.json')
    return dfj

In [43]:
def write_data(data):
    print('Writing...')
    print(data)
    data.to_csv('csv/data.csv')

In [44]:
def agg_data():
    df = pd.read_csv('csv/data.csv')
    print('Aggregating...')
    df['date']=pd.to_datetime(df['date']).dt.date    
    select_col=['store','date','price']
    new_df = df.groupby([df["store"],df["date"]]).sum().reset_index()
    new_df.rename(columns={"price":"revenue"},inplace=True)
    new_df.drop(columns=['Unnamed: 0','transaction_id'], inplace=True)
    print(new_df)
    return new_df

In [45]:
def load_data(pdf):
    try:
        con = sqlite3.connect("db/database.db", timeout=10)
        print('Loading...')
        print(pdf)
        pdf.to_sql(name='agg_data', con=con, if_exists="replace")
    except:
        print('Exception..')

In [46]:
# !pip install sqlalchemy

In [55]:
from sqlalchemy import create_engine

def load_data_postgresql(pdf):
    try:
        conn_string = 'postgresql://formacion_owner:xcg_qpOxh7GvM9Ii@ep-white-field-a8rw6835-pooler.eastus2.azure.neon.tech/formacion?sslmode=require&channel_binding=require'
        engine = create_engine(conn_string)
        print('Loading...')
        print(pdf)
        pdf.to_sql('agg_data', engine, if_exists='replace', index=False)
        print('Loaded!!')
    except NameError:
        print('Exception..')

## ETL Steps

In [48]:
# Pull Data from API
data = get_data_from_api()
data

Reading...


Unnamed: 0,transaction_id,store,date,price
0,1,10,2020-05-01 10:05:01,100.5
1,2,10,2020-05-01 10:06:02,120.5


In [49]:
## Output Data to CSV
write_data(data)

Writing...
   transaction_id  store                date  price
0               1     10 2020-05-01 10:05:01  100.5
1               2     10 2020-05-01 10:06:02  120.5


In [50]:
## Transformation to Aggregate Data
proc_data = agg_data()

Aggregating...
   store        date  revenue
0     10  2020-05-01    221.0


In [51]:
proc_data.columns

Index(['store', 'date', 'revenue'], dtype='object')

In [56]:
## Load Data to DB
load_data_postgresql(proc_data)

Loading...
   store        date  revenue
0     10  2020-05-01    221.0
Loaded!!
