In [None]:
#!pip install azure-storage-blob
#!pip install xml-python
#!pip install bs4
#!pip install BeautifulSoup
#!pip install lxml
#!pip install pandas
#pip install fastparquet
#!pip install BytesIO
!pip install pyarrow

In [None]:
!c:/Users/marco/AppData/Local/Programs/Python/Python39/python.exe -m pip install ipykernel -U --user --force-reinstall'

In [None]:
from azure.storage.blob import BlobServiceClient

# Connect to the storage account
connection_string = "DefaultEndpointsProtocol=https;AccountName=<your_account_name>;AccountKey=<your_account_key>;EndpointSuffix=core.windows.net"
blob_service_client = BlobServiceClient.from_connection_string(connection_string)

# Get a reference to the raw container and file
raw_container_name = "<your_raw_container_name>"
raw_blob_name = "<your_raw_blob_name>"
raw_blob_client = blob_service_client.get_blob_client(container=raw_container_name, blob=raw_blob_name)

# Download the file contents as a string
raw_file_contents = raw_blob_client.download_blob().content_as_text()


### Reading XML Data Danfe

In [None]:
# Imports

import xml.etree.ElementTree as ET
from xml.dom import minidom
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
import json
import pytz
from azure.storage.blob import BlobServiceClient
import io
import pyarrow.parquet as pq

In [None]:
# FUNÇÕES GLOBAIS
def read_context(json_path: str):
    """
    Read json content in order to get context variables from data process.
    Args:
        json_path: path from where json file containing project
            information is.
    """

    context_file = open(json_path)
    context = json.load(context_file)

    return context

def set_dtypes(df, dtypes):
    for column in dtypes:
        #print(column)
        df[column] = df[column].astype(dtypes[column])
    return df

def convert_float_to_int(df, columns):
    for column in columns:
        df[column] = df[column].astype(np.int64)
    return df

def data_load():
    brasil_fuso_horario = pytz.timezone("America/Sao_Paulo")
    data_atual = datetime.now(brasil_fuso_horario).strftime("%Y-%m-%d %H:%M:%S")
    return data_atual

def cria_data_fonte_e_carga(df,coluna):
    df['data_fonte'] = pd.to_datetime(df[coluna], format='%Y-%m-%d %H:%M:%S',errors='ignore').dt.strftime('%Y-%m-%d %H:%M')
    df['data_carga']  = data_load()
    return df

def cria_coluna_year_month_day(df,coluna: str):
    df["year"] = df[coluna].str.slice(0,4)
    df["month"] = df[coluna].str.slice(5,7)
    df["day"] = df[coluna].str.slice(8,10)
    return df

def connect_storage_account():
    context = read_context('../context/transform-info.json')
    connect_str = context['transform-info'][0]['connect_string']
    blob_service_client = BlobServiceClient.from_connection_string(connect_str)
    return blob_service_client

def get_container_xml_raw(container : str, blob : str):
    blob_service_client = connect_storage_account()
    raw_container_client = blob_service_client.get_container_client(container)

    # Especificando o caminho do arquivo CSV no seu bucket
    blob_client = raw_container_client.get_blob_client(blob)

    # Lendo o arquivo CSV em um objeto pandas DataFrame
    #csv_data = blob_client.download_blob().content_as_text()
    xml = blob_client.download_blob()
    return xml

def create_data_frame_danfe(xml):
    # define o namespace
    namespace = {'nfe': 'http://www.portalfiscal.inf.br/nfe'}

    # carrega o arquivo XML
    tree = ET.parse(xml)
    root = tree.getroot()

    # extrai os dados das tags
    cUF = root.find('.//nfe:cUF', namespace).text
    cNF = root.find('.//nfe:cNF', namespace).text
    numero_nf = root.find('.//nfe:nNF', namespace).text
    data_emissao = root.find('.//nfe:dhEmi', namespace).text
    natOp = root.find('.//nfe:natOp', namespace).text
    CNPJ_emit = root.find('.//nfe:CNPJ', namespace).text
    nome_emitente = root.find('.//nfe:emit/nfe:xNome', namespace).text
    logadouro_emitente = root.find('.//nfe:emit/nfe:enderEmit/nfe:xLgr', namespace).text
    numero_emitente = root.find('.//nfe:emit/nfe:enderEmit/nfe:nro', namespace).text
    bairro_emitente  = root.find('.//nfe:emit/nfe:enderEmit/nfe:xBairro', namespace).text
    fone_emitente  = root.find('.//nfe:emit/nfe:enderEmit/nfe:fone', namespace).text
    nome_cliente = root.find('.//nfe:dest/nfe:xNome', namespace).text

    # cria o DataFrame com as colunas desejadas
    df = pd.DataFrame(columns=['codigo_produto', 'descricao_produto', 'ncm', 'valor', 'quantidade','base_calculo_icms','aliquota_icms','valor_icms'])

    # itera sobre todas as tags <det> para extrair os dados de cada produto
    for det in root.findall('.//nfe:det', namespace):
        prod_dict = {
            'codigo_produto': det.find('nfe:prod/nfe:cProd', namespace).text,
            'descricao_produto': det.find('nfe:prod/nfe:xProd', namespace).text,
            'ncm': det.find('nfe:prod/nfe:NCM', namespace).text,
            'valor': det.find('nfe:prod/nfe:vProd', namespace).text,
            'quantidade': det.find('nfe:prod/nfe:qCom', namespace).text,
            'base_calculo_icms' : det.find('nfe:imposto/nfe:ICMS/nfe:ICMS00/nfe:vBC', namespace).text,
            'aliquota_icms' : det.find('nfe:imposto/nfe:ICMS/nfe:ICMS00/nfe:pICMS', namespace).text,
            'valor_icms' : det.find('nfe:imposto/nfe:ICMS/nfe:ICMS00/nfe:vICMS', namespace).text

        }
        
        # adiciona o dicionário ao DataFrame
        df = df.append(prod_dict, ignore_index=True)
    df['codigo_uf_emitente'] = cUF
    df['codigo_nf'] = cNF
    df['numero_nf'] = numero_nf
    df['data_emissao'] = data_emissao
    df['natureza_operacao'] = natOp
    df['cnpj_emitente'] = CNPJ_emit
    df['nome_emitente'] = nome_emitente
    df['logadouro_emitente'] = logadouro_emitente
    df['numero_emitente'] = numero_emitente       
    df['bairro_emitente'] = bairro_emitente       
    df['fone_emitente'] = fone_emitente         
    df['nome_cliente'] = nome_cliente
    return df
    
def save_container_processing_parquet(df,blob_service_client):
    # SAve parquet processing .get_blob_client('danfe.parquet')
    processing_container_name = blob_service_client.get_container_client("processing")
    parquet_blob_name = 'danfe.parquet'
    parquet_blob_name_bytes = bytes(parquet_blob_name, 'utf-8')
    parquet_blob_client = blob_service_client.get_container_client(processing_container_name).get_blob_client(parquet_blob_name_bytes)

    with io.BytesIO() as output:
        df.to_parquet(output, partition_cols=['year', 'month', 'day'], index=False, engine='fastparquet')
        data = output.getvalue()
    
    # Envia os dados para o blob storage
    with io.BytesIO(data) as input:
        parquet_dataset = pq.ParquetDataset(input)
        parquet_blob_client.upload_blob(parquet_dataset.read().to_pybytes(), overwrite=True)

    # Envia os dados para o blob storage
    #parquet_blob_client.upload_blob(data, overwrite=True)


    #parquet_blob_data = df.to_parquet(parquet_blob_client, partition_cols=['year', 'month', 'day'], index=False, engine='fastparquet')
    #parquet_blob_client.upload_blob(parquet_blob_data, overwrite=True)

In [None]:
columns_convert_int = ['quantidade','aliquota_icms']
data_types_nfe = {
      'codigo_produto' : np.int64,
      'descricao_produto' : object,                 
      'ncm' : object,
      'valor' : np.float64,
      'quantidade' : np.float64,
      'base_calculo_icms': np.float64,
      'aliquota_icms' : np.float64,
      'valor_icms' : np.float64,
      'codigo_uf_emitente' : np.int64,
      'codigo_nf' : np.int64,
      'numero_nf' : np.int64,
      'data_emissao' : np.datetime64,
      'natureza_operacao' : object ,
      'cnpj_emitente' : object,
      'nome_emitente' : object,
      'logadouro_emitente' : object,
      'numero_emitente' : object,
      'bairro_emitente' : object,
      'fone_emitente' : object,
      'nome_cliente' : object      
   }

### Testes Leitura XML

In [None]:
tree = ET.parse('cobase-nfe2.xml', parser = ET.XMLParser(encoding = 'utf-8'))
root = ET.parse('cobase-nfe2.xml',).getroot()

nsNFE = {'ns': "http://www.portalfiscal.inf.br/nfe"}

numero_nfe = root.find('ns:NFe/ns:infNFe/ns:ide/ns:cNF',nsNFE).text
natOp = root.find('ns:NFe/ns:infNFe/ns:ide/ns:natOp',nsNFE).text
chave_nfe = root.find('ns:NFe/ns:infNFe',nsNFE).attrib['Id'][3:]

# Dados Emissor
emit_cnpj = root.find('ns:NFe/ns:infNFe/ns:emit/ns:CNPJ',nsNFE).text
emit_nome = root.find('ns:NFe/ns:infNFe/ns:emit/ns:xNome',nsNFE).text
emit_lgr = root.find('ns:NFe/ns:infNFe/ns:emit/ns:enderEmit/ns:xLgr',nsNFE).text
emit_nro = root.find('ns:NFe/ns:infNFe/ns:emit/ns:enderEmit/ns:nro',nsNFE).text
emit_xBairro = root.find('ns:NFe/ns:infNFe/ns:emit/ns:enderEmit/ns:xBairro',nsNFE).text

cProd = root.findall('ns:NFe/ns:infNFe/ns:det/ns:prod/ns:cProd',nsNFE)


#print(numero_nfe.text, chave_nfe)

dados = {
    "numero_nfe" : [numero_nfe],
    "natOp" : [natOp],
    "chave_nfe" : [chave_nfe],
    "emit_cnpj":[emit_cnpj],
    "emit_nome":[emit_nome],
    "emit_lgr":[emit_lgr],
    "emit_nro":[emit_nro],
    "emit_xBairro":[emit_xBairro]
}
#dados
lista = []
dic = {}
for p in root.iter('nItem'):
    #dic['cProd'] = p.find('prod/cProd').text
    #lista.append(dic)
    print(p.text)
#print(lista)


In [None]:
xml = open('cobase-nfe2.xml')
nfe = minidom.parse(xml)

lista = []
produtos = nfe.getElementsByTagName('cProd')
xProd = nfe.getElementsByTagName('xProd')
vProd = nfe.getElementsByTagName('vProd')

for p in produtos:
    dic = {}
    dic['cod_prod'] = p.firstChild.data    
    lista.append(dic)
    #print(p.firstChild.data)
for xProd in xProd:
    dic = {}
    dic['xProd'] = xProd.firstChild.data    
    lista.append(dic)
for vProd in vProd:
    dic = {}
    dic['vProd'] = vProd.firstChild.data    
    lista.append(dic)
lista

In [None]:
produtos = nfe.getElementsByTagName('cProd')
xProd = nfe.getElementsByTagName('xProd')
lista = [{'cod_prod': p.firstChild.data} for p in produtos] + [{'xProd': x.firstChild.data} for x in xProd]
lista


In [None]:
with open('cobase-nfe2.xml') as xml:
    nfe = minidom.parse(xml)

produtos = nfe.getElementsByTagName('cProd') + \
           nfe.getElementsByTagName('xProd')
lista = [{'cod_prod': p.firstChild.data} if p.tagName == 'cProd' else {'xProd': p.firstChild.data} for p in produtos]
lista

In [None]:
import xml.etree.ElementTree as ET

tree = ET.parse('cobase-nfe2.xml')
root = tree.getroot()

produtos = root.findall('.//det')

lista = [{'cod_prod': p.find('prod/cProd').text,
          'xProd': p.find('prod/xProd').text,
          'vProd': p.find('prod/vProd').text} for p in produtos]
lista



In [None]:
xml = open('cobase-nfe2.xml')
nfe = minidom.parse(xml)

lista = []
produtos = nfe.getElementsByTagName('det')

for produto in produtos:
    cod_prod = produto.getElementsByTagName('cProd')[0].firstChild.data
    xProd = produto.getElementsByTagName('xProd')[0].firstChild.data
    vProd = produto.getElementsByTagName('vProd')[0].firstChild.data
    dic = {'cod_prod': cod_prod, 'xProd': xProd, 'vProd': vProd}
    lista.append(dic)

xml.close()
lista


#pd.DataFrame(lista)

In [None]:
import xml.etree.ElementTree as ET
from lxml import etree

#numero_nfe = root.find()
parser = etree.XMLParser(recover=True,encoding='utf-8') #iso-8859-5
root = ET.parse('cobase-nfe.xml',parser=parser).getroot()

nsNFE = {'ns': "http://www.portalfiscal.inf.br/nfe"}
numero_nfe = root.find('ns:NFe/ns:infNFe/ns:ide/ns:nNF',nsNFE)
chave_nfe = root.find('ns:NFe/ns:infNFe',nsNFE).attrib['Id'][3:]

#tree = etree.parse('danfe.xml')

In [None]:
#print(numero_nfe.text)
#print(chave_nfe.attrib['Id'][3:])
print(chave_nfe)
#numero_nfe


In [None]:
xml = open('danfe.xml')
nfe = minidom.parse(xml)

In [None]:
import pandas as pd
import xml.etree.ElementTree as ET

# Carrega o XML
tree = ET.parse('cobase-nfe2.xml')
root = tree.getroot()

# Obtém as informações da nota fiscal
cUF = root.find('.//{http://www.portalfiscal.inf.br/nfe}cUF').text
cNF = root.find('.//{http://www.portalfiscal.inf.br/nfe}cNF').text
natOp = root.find('.//{http://www.portalfiscal.inf.br/nfe}natOp').text
CNPJ_emit = root.find('.//{http://www.portalfiscal.inf.br/nfe}emit/{http://www.portalfiscal.inf.br/nfe}CNPJ').text
xNome_emit = root.find('.//{http://www.portalfiscal.inf.br/nfe}emit/{http://www.portalfiscal.inf.br/nfe}xNome').text

# Cria uma lista com as informações dos produtos
produtos = []
for det in root.findall('.//{http://www.portalfiscal.inf.br/nfe}det'):
    cProd = det.find('{http://www.portalfiscal.inf.br/nfe}prod/{http://www.portalfiscal.inf.br/nfe}cProd').text
    xProd = det.find('{http://www.portalfiscal.inf.br/nfe}prod/{http://www.portalfiscal.inf.br/nfe}xProd').text
    NCM = det.find('{http://www.portalfiscal.inf.br/nfe}prod/{http://www.portalfiscal.inf.br/nfe}NCM').text
    vUnCom = det.find('{http://www.portalfiscal.inf.br/nfe}prod/{http://www.portalfiscal.inf.br/nfe}vUnCom').text
    produtos.append({'cProd': cProd, 'xProd': xProd, 'NCM': NCM, 'vUnCom': vUnCom})

# Cria o DataFrame pandas
df = pd.DataFrame(produtos)
df['cUF'] = cUF
df['cNF'] = cNF
df['natOp'] = natOp
df['CNPJ_emit'] = CNPJ_emit
df['xNome_emit'] = xNome_emit
df

In [None]:
import xml.etree.ElementTree as ET
import pandas as pd

def extract_data_from_xml(xml_string):
    root = ET.fromstring(xml_string)

    # Extrair informações do cabeçalho
    header = root.find('.//{http://www.portalfiscal.inf.br/nfe}ide')
    data = {
        'cUF': header.find('cUF').text,
        'cNF': header.find('cNF').text,
        'natOp': header.find('natOp').text,
        'CNPJ': header.find('.//{http://www.portalfiscal.inf.br/nfe}CNPJ').text,
        'xNome': header.find('.//{http://www.portalfiscal.inf.br/nfe}xNome').text
    }

    # Extrair informações dos produtos
    products = root.iter('{http://www.portalfiscal.inf.br/nfe}det')
    for product in products:
        data.update({
            f"cProd_{product.get('nItem')}": product.find('prod/cProd').text,
            f"xProd_{product.get('nItem')}": product.find('prod/xProd').text,
            f"NCM_{product.get('nItem')}": product.find('prod/NCM').text,
            f"vUnCom_{product.get('nItem')}": product.find('prod/vUnCom').text
        })

    # Criar o DataFrame a partir do dicionário de dados
    df = pd.DataFrame.from_dict([data])

    return df
extract_data_from_xml('cobase-nfe2.xml')

### Código final Transformação Nota fiscal XML

In [None]:
def main ():
    blob_service_client = connect_storage_account()
    xml = get_container_xml_raw('raw','cobase-nfe2.xml')
    df = create_data_frame_danfe(xml)
    df = set_dtypes(df, data_types_nfe)
    df = convert_float_to_int(df,columns_convert_int)
    df = cria_data_fonte_e_carga(df, 'data_emissao')
    df = cria_coluna_year_month_day(df, 'data_fonte')    
    save_container_processing_parquet(df,blob_service_client)
    return df


main()
#if __name__ == "__main__":
#    main()

In [None]:
#df['data_fonte'] = pd.to_datetime(df['data_emissao'], format='%Y-%m-%d %H:%M:%S',errors='ignore').dt.strftime('%Y-%m-%d %H:%M')
#df['data_carga']  = data_load()
df.head()
#df.dtypes
#df.to_parquet(destination_bucket_name, partition_cols=['YEAR', 'MONTH', 'DAY'],engine="fastparquet")

In [None]:
context_file = open('../context/transform-info.json')
context = json.load(context_file)
a = context['transform-info'][0]['connect_string']
a

In [None]:
connect_str = ""
blob_service_client = BlobServiceClient.from_connection_string(connect_str)

raw_container_client = blob_service_client.get_container_client("raw")
# Especificando o caminho do arquivo CSV no seu bucket
blob_client = raw_container_client.get_blob_client("cobase-nfe2.xml")

# Lendo o arquivo CSV em um objeto pandas DataFrame
#csv_data = blob_client.download_blob().content_as_text()
xml = blob_client.download_blob()

tree = ET.parse(xml)
root = tree.getroot()

#df = pd.read_csv(csv_data, sep=';')
processing_container_name = blob_service_client.get_container_client("processing")
parquet_blob_client = blob_service_client.get_container_client(processing_container_name).get_blob_client('file.parquet')
df.to_parquet(parquet_blob_client, partition_cols=['year', 'month', 'day'], index=False)

## Gerando data lake azure

In [None]:
# Importando as bibliotecas necessárias
from azure.storage.blob import BlobServiceClient
import pandas as pd

# Configurando a conexão com o seu bucket na Azure
connect_str = ""
blob_service_client = BlobServiceClient.from_connection_string(connect_str)
container_client = blob_service_client.get_container_client("landing")

# Especificando o caminho do arquivo CSV no seu bucket
blob_client = container_client.get_blob_client("teste.csv")

# Lendo o arquivo CSV em um objeto pandas DataFrame
#csv_data = blob_client.download_blob().content_as_text()
csv_data = blob_client.download_blob()
df = pd.read_csv(csv_data, sep=';')

# Exibindo o DataFrame
df.head()


In [None]:
# Movendo arquivo teste.csv da landing para RAW
landing_container_client = blob_service_client.get_container_client("landing")
landing_blob_client = landing_container_client.get_blob_client("teste.csv")

raw_container_client = blob_service_client.get_container_client("raw")
raw_blob_client = raw_container_client.get_blob_client("teste.csv")
raw_blob_client.start_copy_from_url(landing_blob_client.url)


#landing_blob_client.delete_blob()

In [None]:
data = raw_blob_client.get_blob_properties()
status = raw_blob_client.get_blob_properties().copy.status

if raw_blob_client.get_blob_properties().copy.status == 'success':
    print("sucesso!")

In [None]:
data

In [None]:
data_modificacao = data['last_modified']
data_modificacao

In [None]:
from fastparquet import *
# LENDRO DA RAW E SALVANDO NA PROCESSING EM FORMATO PARQUET

raw_container_client = blob_service_client.get_container_client("raw")
# Especificando o caminho do arquivo CSV no seu bucket
raw_blob_client = raw_container_client.get_blob_client("teste.csv")

processing_container_client = blob_service_client.get_container_client("processing")
processing_blob_client = processing_container_client.get_blob_client("teste.parquet")

# Lendo o arquivo CSV em um objeto pandas DataFrame
#csv_data = blob_client.download_blob().content_as_text()
csv_data = raw_blob_client.download_blob()
df = pd.read_csv(csv_data, sep=';')

# Exibindo o DataFrame

parquet_blob_data = df.to_parquet('teste.parquet',engine="fastparquet")
processing_blob_client.upload_blob(parquet_blob_data, overwrite=True)
