# INGESTÃO DE DADOS NO AZURE STORAGE ACCOUNT USANDO PYTHON

## PARA REALIZAR ESTE PROCEDIMENTO, É NECESSÁRIO COLETAR A CHAVE DE CONEXÃO DA CONTA DE ARMAZENAMENTO

### VAMOS UTILIZAR DADOS PÚBLICOS DE CASOS DE COVID NO BRASIL
#### LINK PARA ACESSO: https://brasil.io/dataset/covid19/files/
IREMOS UTILIZAR OS CASOS CONFIRMADOS DE COVID
URL: https://data.brasil.io/dataset/covid19/caso_full.csv.gz

## CONFIGURAÇÃO DO AMBIENTE

### INSTALAÇÃO DE BIBLIOTECAS

In [None]:
pip install azure-storage-blob

### IMPORTAÇÃO DE BIBLIOTECAS

In [2]:
import requests
import gzip
import shutil
from azure.storage.blob import BlobClient

### FUNÇÃO PARA REALIZAR DOWNLOAD DE DADOS

In [3]:
def download_dados(url, nome_arquivo):
  requisicao = requests.get(url)
  conteudo = requisicao.content
  arquivo = open(nome_arquivo, 'wb')
  arquivo.write(conteudo)
  arquivo.close()

### REALIZAR O DOWNLOAD DOS DADOS

In [4]:
download_dados('https://data.brasil.io/dataset/covid19/caso_full.csv.gz', 'caso_full.csv.gz')

#### DESCOMPACTAR ARQUIVO

In [5]:
with gzip.open('caso_full.csv.gz', 'rb') as arquivo_compactado:
  with open('caso_full.csv', 'wb') as arquivo_descompactado:
    shutil.copyfileobj(arquivo_compactado,arquivo_descompactado)

### CARREGAR O ARQUIVO NO DATALAKE

#### VARIÁVEL DO ENDPOINT DE CONEXÃO

In [6]:
endpoint = ''

#### CRIANDO CONEXÃO COM O STORAGE DA PLATAFORMA MICROSOFT AZURE

In [9]:
blob = BlobClient.from_connection_string(conn_str=endpoint, container_name="datalake-aulas", blob_name="raw/brazil.io/caso_full.csv")

#### CARREGAR O ARQUIVO PARA O DATALAKE

In [10]:
with open("caso_full.csv", "rb") as data:
    blob.upload_blob(data)

Pyspark

In [None]:
!pip install pyspark 
 
from pyspark.sql import SQLContext, SparkSession, functions as F 
from pyspark import SparkFiles
from pyspark.sql.types import * 
from pyspark.sql.functions import * 

spark = SparkSession.builder.getOrCreate() 
sql = SQLContext(spark)

Leitura do Dataset

In [12]:
df_caso_full = spark.read.format('csv').options(header=True, sep = ',').load('caso_full.csv')
df_caso_full.createOrReplaceTempView('caso_full')

In [13]:
df_dim_cidade = sql.sql(''' SELECT DISTINCT city_ibge_code, city, state, place_type FROM caso_full ''')

In [14]:
df_dim_cidade.coalesce(1).write.format('csv').options(header=True, sep = ',').save('dim_cidade')

Carregando

In [16]:
blob = BlobClient.from_connection_string(conn_str=endpoint, container_name="datalake-aulas", blob_name="consume/brazil.io/dim_cidade/dim_cidade.csv")
with open("/content/dim_cidade/part-00000-83195d81-56f5-4ce6-8b84-cb10b984950f-c000.csv", "rb") as data:
    blob.upload_blob(data)

In [17]:
df_dim_periodo = sql.sql('''
select  distinct date, 
        weekofyear(cast(date as timestamp)) AS week, 
        left(date, 4) AS year, 
        substring(date, 6, 2) AS month, 
        right(date, 2) AS day,
        case 
          when cast(substring(date, 6, 2) as integer) in (1,2,3) then 1
          when cast(substring(date, 6, 2) as integer) in (4,5,6) then 2
          when cast(substring(date, 6, 2) as integer) in (7,8,9) then 3
          else 4
        end as quarter,
        case
          when dayofweek(cast(date as timestamp)) = 1 then 'domingo'
          when dayofweek(cast(date as timestamp)) = 2 then 'segunda-feira'
          when dayofweek(cast(date as timestamp)) = 3 then 'terça-feira'
          when dayofweek(cast(date as timestamp)) = 4 then 'quarta-feira'
          when dayofweek(cast(date as timestamp)) = 5 then 'quinta-feira'
          when dayofweek(cast(date as timestamp)) = 6 then 'sexta-feira'
          else 'Sábado'
        end as name_day
from caso_full 
''')

In [18]:
df_dim_periodo.coalesce(1).write.format('csv').options(header=True, sep = ',').save('dim_periodo')

In [21]:
blob = BlobClient.from_connection_string(conn_str=endpoint, container_name="datalake-aulas", blob_name="consume/brazil.io/dim_periodo/dim_periodo.csv")
with open("/content/dim_periodo/part-00000-e606768a-7ecb-4341-8379-1b85b4ffd99e-c000.csv", "rb") as data:
    blob.upload_blob(data)

In [23]:
df_dim_fatos = sql.sql('''
select  distinct 
        city_ibge_code,
        date, 
        estimated_population, 
        estimated_population_2019, 
        is_last,
        is_repeated, 
        last_available_confirmed, 
        last_available_confirmed_per_100k_inhabitants, 
        last_available_date, 
        last_available_death_rate, 
        last_available_deaths,
        order_for_place,
        new_confirmed, 
        new_deaths
from caso_full 
''')

In [24]:
df_dim_fatos.coalesce(1).write.format('csv').options(header=True, sep = ',').save('dim_fatos')

In [25]:
blob = BlobClient.from_connection_string(conn_str=endpoint, container_name="datalake-aulas", blob_name="consume/brazil.io/dim_fatos/dim_fatos.csv")
with open("/content/dim_fatos/part-00000-784b7503-9f28-471d-a252-17b2ce1c18a2-c000.csv", "rb") as data:
    blob.upload_blob(data)