SAP 
Addresses
BusinessPartners
ProductCategories
ProductCategoryText
Products
ProductTexts
SalesOrderItems
SalesOrders_dsp1
SalesOrders

In [0]:
import requests
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, isnan
import great_expectations as ge



In [0]:
spark = SparkSession.builder.appName("EXT_SAP_API").getOrCreate()

In [0]:
class Ext_SAP_API:

    def __init__(self, area, arquivo, chave,spark):
        self.area = area 
        self.arquivo = arquivo
        self.chave = chave
        self.spark = spark

    
    def get_dados_archive(self):
        url_base = "https://datasap-293251100165.herokuapp.com/dados"
        url = f"{url_base}/{self.area}/{self.arquivo}"
        auth = ("", f"{self.chave}")

        response = requests.get(url, auth=auth)
        if response.status_code == 200:
            data = response.json()
            return data 
        else:
            print("Erro: Não foi possível obter dados da API.")
            return None
        
    def create_dataframe(self):
        dados = self.get_dados_archive()
        if dados is not None:
            df = self.spark.createDataFrame(dados)
            return df
        else:
            print("Não foi possível obter dados da API.")
            return None

In [0]:
area = 'SAP'
arquivo = 'Products'
chave = 'meizterdevs2024'

In [0]:
extract = Ext_SAP_API(area, arquivo, chave, spark)

#Criar o dataframe do spark e imprimir
df = extract.create_dataframe()

if df is not None:
    print('Dataframe criado com sucesso!')
    display(df)
else:
    print("Não foi possível criar o Dataframe.")

Dataframe criado com sucesso!


CHANGEDAT,CHANGEDBY,CREATEDAT,CREATEDBY,CURRENCY,DEPTH,DIMENSIONUNIT,HEIGHT,PRICE,PRODCATEGORYID,PRODUCTID,PRODUCTPICURL,QUANTITYUNIT,SUPPLIER_PARTNERID,TAXTARIFFCODE,TYPECODE,WEIGHTMEASURE,WEIGHTUNIT,WIDTH
20181003,9,20181003,9,USD,,,,525,RO,RO-1001,,EA,100000000,1,PR,7.7,KG,
20181003,9,20181003,9,USD,,,,689,RO,RO-1002,,EA,100000001,1,PR,8.0,KG,
20181003,12,20181003,12,USD,,,,721,RO,RO-1003,,EA,100000002,1,PR,9.1,KG,
20181003,9,20181003,9,USD,,,,249,BX,BX-1011,,EA,100000003,1,PR,11.1,KG,
20181003,6,20181003,6,USD,,,,399,BX,BX-1012,,EA,100000004,1,PR,12.0,KG,
20181003,7,20181003,7,USD,,,,449,BX,BX-1013,,EA,100000005,1,PR,13.1,KG,
20181003,11,20181003,11,USD,,,,799,BX,BX-1014,,EA,100000006,1,PR,11.8,KG,
20181003,8,20181003,8,USD,,,,299,BX,BX-1015,,EA,100000007,1,PR,12.5,KG,
20181003,11,20181003,11,USD,,,,319,BX,BX-1016,,EA,100000008,1,PR,12.8,KG,
20181003,10,20181003,10,USD,,,,1144,CC,CC-1021,,EA,100000009,1,PR,8.1,KG,


Verificação de qualidade

In [0]:
# Verificando colunas

df.columns

Out[6]: ['CHANGEDAT',
 'CHANGEDBY',
 'CREATEDAT',
 'CREATEDBY',
 'CURRENCY',
 'DEPTH',
 'DIMENSIONUNIT',
 'HEIGHT',
 'PRICE',
 'PRODCATEGORYID',
 'PRODUCTID',
 'PRODUCTPICURL',
 'QUANTITYUNIT',
 'SUPPLIER_PARTNERID',
 'TAXTARIFFCODE',
 'TYPECODE',
 'WEIGHTMEASURE',
 'WEIGHTUNIT',
 'WIDTH']

In [0]:
df_ge = ge.dataset.SparkDFDataset(df)

#Expectativa: Verifica a presença das colunas esperadas
colunas_esperadas = [
                    'CHANGEDAT',
                    'CHANGEDBY',
                    'CREATEDAT',
                    'CREATEDBY',
                    'CURRENCY',
                    'DEPTH',
                    'DIMENSIONUNIT',
                    'HEIGHT',
                    'PRICE',
                    'PRODCATEGORYID',
                    'PRODUCTID',
                    'PRODUCTPICURL',
                    'QUANTITYUNIT',
                    'SUPPLIER_PARTNERID',
                    'TAXTARIFFCODE',
                    'TYPECODE',
                    'WEIGHTMEASURE',
                    'WEIGHTUNIT',
                    'WIDTH'
]

expectativa_colunas = df_ge.expect_table_columns_to_match_ordered_list(column_list=colunas_esperadas)

# Validar as expectativas
resultado_validacao = df_ge.validate()

# Verifica o resultado
if resultado_validacao['success']:
    print('Arquivo Ok!')
else:
    print('Erro na validação!')

Arquivo Ok!


In [0]:
tmp_delta_path = "/tmp/transient/tb_Products"
df.write.format("parquet").mode("overwrite").save(tmp_delta_path)