<a href="https://colab.research.google.com/github/macintoxic/.Touch/blob/master/trabalho_pratico_2/trabalho_pratico_2.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## Instala as dependencias

In [None]:
!pip install apache-airflow
!pip install pandas_datareader --upgrade
!pip install sidrapy

## Imports

In [35]:

import yfinance as yf
import requests
import pandas as pd
import sidrapy as sidra
import sqlite3
from pandas_datareader import data
from sqlalchemy import create_engine 



## Colete a série histórica dos dados de inflação de bebidas do IBGE usando a API sidra

In [36]:
def get_ipca():
    url_ipca = 'https://apisidra.ibge.gov.br/values/t/1705/n1/all/v/355/p/all/c315/7169,7170,7389,7396,7397,7440,7443,12394/d/v355%202'

    ipca_raw = sidra.get_table(table_code=1705,
                            territorial_level = "1",
                            ibge_territorial_code = "all",
                            period = "all",
                            categories=[7169,7170,7389,7396,7397,7440,7443,12394]                           
                            )


    return ipca_raw


get_ipca().head()

Unnamed: 0,NC,NN,MC,MN,V,D1C,D1N,D2C,D2N,D3C,D3N,D4C,D4N
0,Nível Territorial (Código),Nível Territorial,Unidade de Medida (Código),Unidade de Medida,Valor,Brasil (Código),Brasil,Mês (Código),Mês,Variável (Código),Variável,"Geral, grupo, subgrupo, item e subitem (Código)","Geral, grupo, subgrupo, item e subitem"
1,1,Brasil,2,%,0.53,1,Brasil,201202,fevereiro 2012,355,IPCA15 - Variação mensal,7169,Índice geral
2,1,Brasil,2,%,1.18,1,Brasil,201202,fevereiro 2012,356,IPCA15 - Variação acumulada no ano,7169,Índice geral
3,1,Brasil,,,...,1,Brasil,201202,fevereiro 2012,1120,IPCA15 - Variação acumulada em 12 meses,7169,Índice geral
4,1,Brasil,2,%,100.0000,1,Brasil,201202,fevereiro 2012,357,IPCA15 - Peso mensal,7169,Índice geral


## Colete a série histórica dos dados diários de preços e volume
(OHLCV) para a ação ABEV3 do provedor de sua preferência (yahoo
finance, marketstack, eodhistoricaldata, finnhub, alphavantage
etc.);


In [37]:
def get_abev3():
    yf.pdr_override()
    x  = data.get_data_yahoo("ABEV3.SA") 
    x['ticker'] = 'ABEV3.SA'

    return x

## Crie e armazene esses dados em um banco de dados SQL local ou em nuvem

In [46]:
def save_to_sqlite():
    engine = create_engine('sqlite:////content/market_data.db', echo=False)
    get_abev3().to_sql('data', con=engine)


## Esboce como seria a implementação em Python de uma DAG Airflow, que  executaria as coletas implementadas anteriormente. O código não precisa rodar com sucesso, é apenas um esboço.

### Inicializa o airflow

In [42]:
!airflow db init

DB: sqlite:////root/airflow/airflow.db
[[34m2023-04-06 01:45:40,966[0m] {[34mmigration.py:[0m207} INFO[0m - Context impl [01mSQLiteImpl[22m.[0m
[[34m2023-04-06 01:45:40,966[0m] {[34mmigration.py:[0m210} INFO[0m - Will assume [01mnon-transactional[22m DDL.[0m
INFO  [alembic.runtime.migration] Context impl SQLiteImpl.
INFO  [alembic.runtime.migration] Will assume non-transactional DDL.
INFO  [alembic.runtime.migration] Running stamp_revision  -> 290244fb8b83
WARNI [airflow.models.crypto] empty cryptography key - values will not be stored encrypted.
Initialization done


### Imports do airflow

In [43]:
from airflow import DAG
from datetime import datetime, timedelta
from airflow.operators.python import PythonOperator

In [44]:
def get_ipca():
    url_ipca = 'https://apisidra.ibge.gov.br/values/t/1705/n1/all/v/355/p/all/c315/7169,7170,7389,7396,7397,7440,7443,12394/d/v355%202'

    ipca_raw = sidra.get_table(table_code=1705,
                            territorial_level = "1",
                            ibge_territorial_code = "all",
                            period = "all",
                            categories=[7169,7170,7389,7396,7397,7440,7443,12394]                           
                            )


    #ipca_raw.head()
    return ipca_raw



## Parâmetros default do DAG

In [45]:
args = {
            'owner': 'airflow',    
            #'start_date': airflow.utils.dates.days_ago(2),
            # 'end_date': datetime(),
            # 'depends_on_past': False,
            #'email': ['airflow@example.com'],
            #'email_on_failure': False,
            #'email_on_retry': False,
            # If a task fails, retry it once after waiting
            # at least 5 minutes
            #'retries': 1,
            'retry_delay': timedelta(minutes=5),
        }

## Instanciar o DAG

In [48]:
dag_python = DAG(
	dag_id = "pythonoperator_demo",
	default_args=args,
	# schedule_interval='0 0 * * *',
	schedule_interval='@once',	
	dagrun_timeout=timedelta(seconds=1),
	description='use case of python operator in airflow',
	start_date = airflow.utils.dates.days_ago(1))

In [49]:
python_task_ipca = PythonOperator(task_id='get_ipca', python_callable=get_ipca, dag=dag_python)
python_task_get_abev3 = PythonOperator(task_id='get_abev3', python_callable=get_abev3, dag=dag_python)
python_task_save_to_sqlite = PythonOperator(task_id='save_to_sqlite', python_callable=save_to_sqlite, dag=dag_python)
