#OBSERVAÇÕES
O código abaixo necessita do upload de um arquivo config.ini para iniciar variáveis sensíveis.

# Teste1

## Importando as bibliotecas

In [1]:
import requests
import configparser
import json
import pandas as pd

## Inicializando as variáveis (foi utilizado um arquivo config.ini para não expor as credenciais no escopo do Databricks é utilizado AKV para tal)

In [2]:
config = configparser.ConfigParser()
config.read('config.ini')

url = config['geo_api']['url']
token = config['geo_api']['token']

## Função para lidar com requisição GET

In [3]:
def get_command(url:str, params: object):
        """Fazer uma requisição get"""
        url = url
        response = requests.get(url=url, params=params)
        if response.status_code == 200:
            return response.text
        else:
            print(f"Error: {response.status_code}")

## Função para gerar os parâmetros da API dinamicamente

In [4]:
def generate_params(lat:str, long: str,format = 'json', token = token):
    """Gerar parâmetros necessário para API GEO"""
    params = {
        'key': token,
        'format': format,
        'lat': lat,
        'lng': long
    }

    return params

## Leitura de algumas latitude/longitudes + chamada API

In [6]:
data = requests.get('https://raw.githubusercontent.com/rodjribeiro/congenial-waffle/main/data/states.json').json()

db = []

for item in data:
    lat  = item['latitude']
    long = item['longitude']

    params = generate_params(lat=lat, long=long)
    db.append(get_command(url=url, params=params))

## Inicialização de dataframe para visualizar os dados

In [7]:
df = pd.DataFrame(db)

In [9]:
df.head()

Unnamed: 0,0
0,"{""country"":""BR"",""region"":""Rondonia"",""city"":""Pe..."
1,"{""country"":""BR"",""region"":""Acre"",""city"":""Porang..."
2,"{""country"":""BR"",""region"":""Amazonas"",""city"":""Al..."
3,"{""country"":""BR"",""region"":""Roraima"",""city"":""Car..."
4,"{""country"":""BR"",""region"":""Para"",""city"":""Novo A..."


## Transformação do objeto no formato de texto para colunar

In [10]:
df_explode = df[0].apply(json.loads).apply(pd.Series)

In [11]:
df_explode.head()

Unnamed: 0,country,region,city,latitude,longitude,currency_code,currency_name,currency_symbol,sunrise,sunset,time_zone,distance_km
0,BR,Rondonia,Peruanos,-10.6667,-63.1,BRL,Brazilian Real,R$,06:16,18:25,-04:00,31.8925
1,BR,Acre,Porangaba,-8.8,-70.6,BRL,Brazilian Real,R$,05:46,17:55,-05:00,6.4277
2,BR,Amazonas,Alvaraes,-3.22083,-64.8042,BRL,Brazilian Real,R$,06:24,18:31,-04:00,42.9629
3,BR,Roraima,Caracarai,1.81611,-61.1281,BRL,Brazilian Real,R$,06:10,18:16,-04:00,29.6195
4,BR,Para,Novo Acordo,-3.56667,-52.2667,BRL,Brazilian Real,R$,06:33,18:41,-03:00,34.306


## Agrupamento dos dados para ver qual 'time_zone' mais frenquente no Brasil

In [12]:
df_group_by = df_explode.groupby('time_zone').size().reset_index(name='count')

In [13]:
df_group_by

Unnamed: 0,time_zone,count
0,-03:00,21
1,-04:00,5
2,-05:00,1


# Teste 2

**1)	Com base na API do Teste 1, como poderíamos informar a latitude e longitude via DataFactory para um notebook no Databricks? Como fazemos para trocar variáveis entre os dois serviços?**



Na ação de Databricks do ADF é possível declarar **'base parameters'** na hora de chamar o notebook, então seria necessário incluir os parâmetros de latitude e longitude.

Já dentro do notebook databricks é necessário capturar esses parâmetros utilizando os códigos abaixo.


In [None]:
dbutils.widgets.text("latitude", "")
latitude = dbutils.widgets.get("latitude")

In [None]:
dbutils.widgets.text("longitude", "")
latitude = dbutils.widgets.get("longitude")

**2)	Quais seriam os passos para criar um pipeline que chame essa API e persista os dados no datalake?**

Incluir duas ações de 'Set variable' para iniciarmos as variáveis de latitude e longitude, depois adicionar uma ação do Databricks chamando o notebook. Dentro do notebook é possível fazer a chamada de API e tratar os dados. Para finalizar é possível a partir de um dataframe iniciado escrever dentro do Storage vinculado ao ambiente do ADB.

# Teste 3

***Considerando um banco SQL Server***

**1) Comando para consultar os acessos de determinado usuário dentro do banco de dados.**

In [None]:
SELECT *
FROM sys.fn_my_permissions('nome', 'DATABASE')
ORDER BY subentity_name, permission_name;

**2)	Comando para liberar acesso a determinado schema.**


In [None]:
GRANT USAGE ON SCHEMA::nome_do_schema TO nome_do_usuario;

**3)	Comando para remover o acesso a determinado schema.**





In [None]:
REVOKE SELECT ON SCHEMA::nome_do_schema FROM nome_do_usuario;

# Teste 4

**1)	Descreva resumidamente um projeto que você tenha realizado do início ao fim, tipo de dado, ferramenta de ETL e Data Viz. Foque nos principais desafios que teve durante a construção da solução, como foi determinado o consumo dos dados pelo Data Viz (acesso direto a base, base replicada, gateway...).**

**Projeto X - Lineage**

**Contexto:**

Arquitetura: Data Mesh

Time de DataOPS tem a necessidade de criar uma visão no PowerBI que seja capaz de mostrar visualmente todo o lineage dos 'dataflows' encontrados dentro do time de Engenharia de dados.

Hoje em produção funciona quatro arquiteturas diferentes cada uma com seu 'metadata store', todos eles armazenados em databases (SQL Server) diferentes.


**Tipos de dependência presentes:**
As dependências seria basicamente um pipeline que depende de outro para completar um fluxo de dados por completo.

Dependência 1: Raw -> Bronze -> Silver

Dependência 2: Nó 1 do Mesh (tabela=invoice) > Nó 2 do Mesh (tabela=invoice(silver))

Dependência 3: Grupo de tabelas (Silver: customer, product, invoice) -> Gold


**ETL:**

Com o cenário complexo acima fica muito crítico para monitorar as execuções com falha. Portanto foi criada um ETL para compilar todas as informações de pipelines cadastrados e montar um schema padrão para encaixar as informações de metadados de 4 arquiteturas.

O ETL foi criado usando o Databricks Workflow.

O acesso as tabelas de metadado foi disponibilizado utilizando um AKV + SPN (para cada database).

Foi criada uma função para gerar a tabela raw onde basicamente tinhamos a informação do nome do fluxo de dados e as outras colunas foram chamadas de dags.

**schema:**
dataflow_name | dag_1 | dag_2 | dag_3 ...

Esse schema foi desenhado para usarmos um componente de 'Árvore Hierarquica' dentro do BI.

Todo esse processo era trigado no momento que um novo código era comitado no Github, a cada '.sql' comitado tinhamos um novo job cadastrado portanto poderíamos ter uma mudança no lineage, foi utilizado o Azure DevOps.

# Teste 5

**1)	O que você julga como boas práticas para criação, manutenção e consumo de dados em uma estrutura de datalake? Não existe uma única resposta, avaliaremos sua linha de visão sobre os temas.**

*   Arquitetura medalhão (raw, bronze, silver, gold)
*   Camada gold ser camada de consumo
*   Formato parquet de arquivos
*   Utilizar 'schema_evolution' pois arquivos parquet não lidam bem com schema diferentes
*   Catalogar os metadados
*   Particionamento das pastas do datalake
*   Quando possível usar 'enable data feed'(CDC no contexto de delta tables) e z-order
*   Utilizar time-travel para não permitir disrupção do negócio


# Teste 6

**1)	Um grupo de pessoas está tentando encontrar passagens de ônibus mais baratas de uma cidade para outra. Em certos cenários, os ônibus diretos podem ser mais caros do que fazer uma escala em outra cidade.
Escreva uma consulta que liste as rotas de ônibus mais baratas entre as cidades. Se houver duas rotas com o mesmo custo, mas com um número diferente de paradas, liste as rotas com menos paradas.
Considere o seguinte conjunto de dados para este problema. Como você pode ver neste conjunto de dados, o custo direto da passagem de San Marino para Pisa é de 50, mas uma opção mais barata é viajar de San Marino para Milão e depois viajar de Milão para Pisa, onde o custo somado desta viagem chega a 40.
Para efeitos de simplificação, assume-se que as pessoas podem comprar bilhetes de autocarro com um máximo de duas escalas (ou seja: A -> B, B -> C, C -> D)**


## Importando biblioteca do polars(similar ao pandas) e duckdb para conseguir rodar query no dataframe

### Importando as bibliotecas

Polar - substituto do pandas

Duckdb - engine para executar query

In [8]:
import polars as pl
import duckdb

### Criando dataframe

In [2]:
df = pl.DataFrame({
    'Origin': ['San Marino', 'San Marino', 'San Marino', 'Milan', 'Milan', 'Pisa', 'Pisa', 'Pisa'],
    'Destination': ['Pisa', 'Milan', 'Rome', 'Rome', 'Pisa', 'Venice', 'San Marino', 'Milan'],
    'TicketCost': [50, 20, 40, 10, 20, 100, 30, 50]
})

### Criando outro dataframe para selecionar as rotas diretas

In [3]:
routes = df.with_columns(pl.lit(0).alias('Stops'))
routes = routes.select(pl.col('Origin'), pl.col('Destination'), pl.col('Stops'), pl.col('TicketCost'))

### Lógica responsável por iterar todas as possibilidades de rotas

In [None]:
for _ in range(len(df) - 1):
    new_routes = routes.join(df, left_on='Destination', right_on='Origin', how='inner', suffix='_new')
    new_routes = new_routes.select([
        pl.col('Origin'),
        pl.col('Destination_new').alias('Destination'),
        (pl.col('Stops') + 1).alias('Stops'),
        (pl.col('TicketCost') + pl.col('TicketCost_new')).alias('TicketCost')
    ])

    routes = pl.concat([routes, new_routes]).filter(pl.col('Origin') != pl.col('Destination'))

### Iniciando duckdb

In [28]:
conn = duckdb.connect(database=':memory:', read_only=False)

### Query final + Resultado

In [29]:
conn.execute("""
 WITH step1 AS (
    SELECT *,
     RANK() OVER (PARTITION BY Origin, Destination ORDER BY TicketCost ASC, Stops ASC) as rank
    FROM routes
)
  SELECT DISTINCT Origin
  ,Destination
  ,Stops
  , TicketCost
  FROM step1
  WHERE rank=1
  AND Origin <> Destination
  ORDER BY Origin
  """).df()

Unnamed: 0,Origin,Destination,Stops,TicketCost
0,Milan,San Marino,1,50
1,Milan,Venice,1,120
2,Milan,Rome,0,10
3,Milan,Pisa,0,20
4,Pisa,Venice,0,100
5,Pisa,Milan,0,50
6,Pisa,San Marino,0,30
7,Pisa,Rome,1,60
8,San Marino,Pisa,1,40
9,San Marino,Milan,0,20
