O processo será todo pensado em usar orquestração de dados para poder deixar o pipeline rodando automáticamente e sozinho conforme a nescessidade de atualização (D-1, near real tima, e afins.


#EXTRACT
- Estração dos dados de arquivos Jsons salvos em pastas diferentes dentro de um bucket da GCP
- Os dados são carregados nesse bucket através de uma aplicação nativa da GCP chamada PUB/SUB
- Será usado Python, Spark, PySpark e pandas.

In [None]:
# ################################
# #### Bibliotecas            ####
# ################################

from google.cloud import bigquery
from pyspark.sql import functions as F
from pyspark.sql import SparkSession
from pyspark import SparkConf
from pyspark.context import SparkContext
from pyspark import SQLContext
import argparse
from pyspark.sql.types import *
import os
import pandas as pd

# ################################
# #### Parametros de Execucao ####
# ################################

script_name = os.path.basename(__file__)
args = argparse.ArgumentParser(prog=script_name)
args.add_argument('--p_dataref', required=True,
                  help="Data de referencia para leitura dos dados de origem no formato YYYYMMDD (e.g., 20220530)")
args.add_argument('--p_ambiente', required=True, choices=['dev', 'hml', 'prd'],
                  help="Sigla do ambiente de execucao (opcoes: dev, hml, prd)")
params = args.parse_args()
print(">>> Parametros de execucao informados: {}".format(params))

# prepara os parametros
dataref = params.p_dataref
dataref_ano = params.p_dataref[0:4]
dataref_mes = params.p_dataref[4:6]
dataref_dia = params.p_dataref[6:8]
ambiente = params.p_ambiente
fila = ['produto', 'produto_oferta', 'produto_status', 'emitir_nota']

# ###########################################################################
# #### Inicia Spark Session e Spark Context                              ####
# ###########################################################################
spark = SparkSession.builder \
    .config("spark.logConf", "True") \
    .getOrCreate()

sc = SparkContext.getOrCreate(SparkConf())
sc.setLogLevel("ERROR")
sqlContext = SQLContext(sc)

sqlContext.setConf("spark.sql.caseSensitive", "false")
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")
spark.conf.set('spark.sql.session.timeZone', 'America/Sao_Paulo')

# ###########################################################################
# #### Inicializacao do Logger                                           ####
# ###########################################################################
Logger = sc._jvm.org.apache.log4j.Logger
LOGGER = Logger.getLogger(__name__)

LOGGER.info("Iniciando Processamento")


# ##################################################################
# #### Mapa de campos de destino apos aplicar o flat das colunas####
# #### (campo_destino)                                          ####
# ##################################################################

colunas = [ 'nomeProduto',
            'dataInclusao',
            'dataEstoque',
            'dataHoraExecucao',
            'quantidadeEstoque',
            'quantidadeItens',
            'codigoTipo',
            'tipoProduto_codigo',
            'tipoProduto_descricao',
            'tipoProduto_descricaoReduzida',
            'fornecedorProduto_nomeFornecedor',
            'fornecedorProduto_empresa_codigo',
            'fornecedorProduto_empresa_descricao',
            'fornecedorProduto_empresa_descricaoReduzida',
            'fornecedorProduto_responsavel_nome',
            'fornecedorProduto_responsavel_telefone',
            'fornecedorProduto_responsavel_email',
            'fornecedorProduto_responsavel_cargo',
            'fornecedorProduto_responsavel_endereco',
            'fornecedorProduto_cnpj',
            'fornecedorProduto_nomeFantasia',
            'fornecedorProduto_dataInicio',
            'flagPedidoMaximo',
            'flagPedidoMinimo',
            'quantidadeMinimaEstoque',
            'quantidadeMaximaEstoque',
            'valor_valorProduto',
            'valor_valorProdutoDesconto',
            'valor_comissaoVendedor',
            'valor_maximoComissaoVendedor',
            'valor_minimoComissaoVendedor'
           ]


# ##################################################################
# #### Schema Json para leitura padronizada                     ####
# ##################################################################

structureSchema = StructType([
    StructField('nomeProduto', StringType(), True),
    StructField('dataInclusao', StringType(), True),
    StructField('dataEstoque', StringType(), True),
    StructField('dataHoraExecucao', StringType(), True),
    StructField('quantidadeEstoque', StringType(), True),
    StructField('quantidadeItens', StringType(), True),
    StructField('codigoTipo', StringType(), True),
    StructField('tipoProduto', StructType([
        StructField('codigo', StringType(), True),
        StructField('descricao', StringType(), True),
        StructField('descricaoReduzida', StringType(), True)
    ]), True),
    StructField('fornecedorProduto', ArrayType(StructType([
        StructField('nomeFornecedor', StringType(), True),
        StructField('empresa', StructType([
            StructField('codigo', StringType(), True),
            StructField('descricao', StringType(), True),
            StructField('descricaoReduzida', StringType(), True)
        ]), True),
        StructField('responsavel', StructType([
            StructField('nome', StringType(), True),
            StructField('telefone', StringType(), True),
            StructField('email', StringType(), True),
            StructField('cargo', StringType(), True),
            StructField('endereco', StringType(), True)
        ]), True),
        StructField('cnpj', StringType(), True),
        StructField('nomeFantasia', StringType(), True),
        StructField('dataInicio', StringType(), True)
    ]), True), True),
    StructField('flagPedidoMaximo', StringType(), True),
    StructField('flagPedidoMinimo', StringType(), True),
    StructField('quantidadeMinimaEstoque', StringType(), True),
    StructField('quantidadeMaximaEstoque', StringType(), True),
    StructField('valor', StructType([
        StructField('valorProduto', StringType(), True),
        StructField('valorProdutoDesconto', StringType(), True),
        StructField('comissaoVendedor', StringType(), True),
        StructField('maximoComissaoVendedor', StringType(), True),
        StructField('minimoComissaoVendedor', StringType(), True),
    ]), True),
])




#TRANSFORM
- Aplicar flatten e explode para poder deixar de forma cartesiana todos os dados dos Jsons
- Tranformar os dados: 
    - datatypes
    - limpeza de sujeias
    - verificação de validade

In [None]:
# ###########################################################################
# #### Metodo: type_cols                                                 ####
# #### Args: df_dtypes - dataframe a ser mapeado                         ####
# ####       filter_type - tipo de campo se struct ou array              ####
# ####                                                                   ####
# #### Gera um array com todos os campos do dataframe com seus           ####
# #### respectivos dataypes                                              ####
# ###########################################################################

def type_cols(df_dtypes, filter_type):
    cols = []
    for col_name, col_type in df_dtypes:
        if col_type.startswith(filter_type):
            cols.append(col_name)
    return cols


# ###########################################################################
# #### Metodo: flatten_df                                                ####
# #### Args: nested_df - dataframe que devera sofrer o flatten           ####
# ####       sep - separador de campos                                   ####
# ####                                                                   ####
# #### Retorna um dataframe colunado quando tipo de estrutura for struct ####
# ###########################################################################

def flatten_df(nested_df, sep='_'):
    nested_cols = [c[0] for c in nested_df.dtypes if c[1][:6] == 'struct']
    flatten_cols = [fc for fc, _ in nested_df.dtypes if fc not in nested_cols]
    for nc in nested_cols:
        for cc in nested_df.select(f'{nc}.*').columns:
            if sep is None:
                flatten_cols.append(F.col(f'{nc}.{cc}').alias(f'{cc}'))
            else:
                flatten_cols.append(F.col(f'{nc}.{cc}').alias(f'{nc}{sep}{cc}'))
    return nested_df.select(flatten_cols)


# ###########################################################################
# #### Metodo: explode_df                                                ####
# #### Args: nested_df - dataframe que devera sofrer o explode           ####
# ####                                                                   ####
# #### Retorna um dataframe colunado quando tipo de estrutura for array  ####
# ###########################################################################

def explode_df(nested_df):
    array_cols = [c[0] for c in nested_df.dtypes if c[1][:5] == 'array']
    exploded_df = nested_df
    for nc in array_cols:
        exploded_df = exploded_df.withColumn(nc, F.explode_outer(F.col(nc)))
    return exploded_df


# ###########################################################################
# #### Metodo: flatten_explode_df                                        ####
# #### Args: nested_df - dataframe que origem                            ####
# ####                                                                   ####
# #### Separa o dataframe origem por tipo de estrutura array ou struct   ####
# #### para aplicacao do metodo mais adequado de padronizacao            ####
# ###########################################################################

def flatten_explode_df(nested_df):
    df = nested_df
    struct_cols = type_cols(nested_df.dtypes, 'struct')
    array_cols = type_cols(nested_df.dtypes, 'array')
    if array_cols:
        df = explode_df(df)
        return flatten_explode_df(df)
    if struct_cols:
        df = flatten_df(df)
        return flatten_explode_df(df)
    return df


# ##################################################################
# #### Cria um dataframe vazio com o schema Json definido       ####
# ##################################################################
LOGGER.info("Criando um dataframe vazio com o schema Json definido")

schemaflat = StructType([
    StructField('nomeProduto', StringType(), True),
    StructField('dataInclusao', StringType(), True),
    StructField('dataEstoque', StringType(), True),
    StructField('dataHoraExecucao', StringType(), True),
    StructField('quantidadeEstoque', StringType(), True),
    StructField('quantidadeItens', StringType(), True),
    StructField('codigoTipo', StringType(), True),
    StructField('tipoProduto_codigo', StringType(), True),
    StructField('tipoProduto_descricao', StringType(), True),
    StructField('tipoProduto_descricaoReduzida', StringType(), True),
    StructField('fornecedorProduto_nomeFornecedor', StringType(), True),
    StructField('fornecedorProduto_empresa_codigo', StringType(), True),
    StructField('fornecedorProduto_empresa_descricao', StringType(), True),
    StructField('fornecedorProduto_empresa_descricaoReduzida', StringType(), True),
    StructField('fornecedorProduto_responsavel_nome', StringType(), True),
    StructField('fornecedorProduto_responsavel_telefone', StringType(), True),
    StructField('fornecedorProduto_responsavel_email', StringType(), True),
    StructField('fornecedorProduto_responsavel_cargo', StringType(), True),
    StructField('fornecedorProduto_responsavel_endereco', StringType(), True),
    StructField('fornecedorProduto_cnpj', StringType(), True),
    StructField('fornecedorProduto_nomeFantasia', StringType(), True),
    StructField('fornecedorProduto_dataInicio', StringType(), True),
    StructField('flagPedidoMaximo', StringType(), True),
    StructField('flagPedidoMinimo', StringType(), True),
    StructField('quantidadeMinimaEstoque', StringType(), True),
    StructField('quantidadeMaximaEstoque', StringType(), True),
    StructField('valor_valorProduto', StringType(), True),
    StructField('valor_valorProdutoDesconto', StringType(), True),
    StructField('valor_comissaoVendedor', StringType(), True),
    StructField('valor_maximoComissaoVendedor', StringType(), True),
    StructField('valor_minimoComissaoVendedor', StringType(), True),
    StructField('NOM_EVENTO', StringType(), True)
])

df_cota = spark.createDataFrame([], schemaflat)

# ##################################################################
# #### Criando um dataframe formato texto com jsons do dia      ####
# #### anterior para cada fila                                  ####
# ##################################################################

LOGGER.info("Aplicando tratamentos de flatten e explode para os topicos de cotacao")

for line in fila:
    topico = line
    path_orig = "gs://business-analytics-{}_staging/auto20/cotacao/cotacao/api/{}/json/{}/{}/{}/*.json".format(
        ambiente, topico, dataref_ano, dataref_mes, dataref_dia)
    df = sqlContext.read.json(path_orig, schema=structureSchema)
    df_flat = flatten_explode_df(df)
    for column in [column for column in colunas if column not in df_flat.columns]:
        df_flat = df_flat.withColumn(column, F.lit(''))
    dfnew_flat = df_flat.select(colunas)
    dfnew_flat = dfnew_flat.withColumn('NOM_EVENTO', F.lit(topico))
    df_unic = dfnew_flat.dropDuplicates()
    df_cota = df_cota.union(df_unic)

# ##################################################################
# #### Filtra json com dataHoraExecucao null - layout errado    ####
# #### Filtra json com nomeProduto null - layout errado         ####
# ##################################################################

df_cota = df_cota.filter(df_cota.dataHoraExecucao.isNotNull())
df_cota = df_cota.filter(df_cota.nomeProduto.isNotNull())

# ##################################################################
# #### Tratamento de mapeamento do produto                      ####
# ##################################################################

LOGGER.info("Renomeando DataFrame colunado e organizando os DataTypes")

df_fim = df_cota.select(F.coalesce(F.upper(df_cota.nomeProduto.cast('string').alias("NM_PRODUTO"))),
                        (df_cota.dataInclusao).cast("timestamp").alias("DAT_INCLUSAO_SISTEMA"),
                        (df_cota.dataEstoque).cast("timestamp").alias("DAT_ENTRADA_ESTOQUE"),
                        (df_cota.dataHoraExecucao / 1000).cast("timestamp").alias("DAT_EXECUCAO"),
                        df_cota.quantidadeEstoque.cast('long').alias("QNT_ESTOQUE"),
                        df_cota.quantidadeItens.cast('long').alias("QNT_ITENS_ESTOQUE"),
                        df_cota.codigoTipo.cast('long').alias("COD_TIPO"),
                        df_cota.tipoProduto_codigo.cast('long').alias("COD_TIPO_PRODUTO"),
                        F.coalesce(F.upper(df_cota.tipoProduto_codigo.cast('string').alias("DES_TIPO_PRODUTO"))),
                        F.coalesce(F.upper(df_cota.tipoProduto_codigo.cast('string').alias("DES_ABREVIADO_TIPO_PRODUTO"))),
                        F.coalesce(F.upper(df_cota.fornecedorProduto_nomeFornecedor.cast('string').alias("NM_FORNECEDOR"))),
                        df_cota.fornecedorProduto_empresa_codigo.cast('long').alias("COD_EMPRESA"),
                        F.coalesce(F.upper(df_cota.fornecedorProduto_empresa_descricao.cast('string').alias("DES_EMPRESA"))),
                        F.coalesce(F.upper(df_cota.fornecedorProduto_empresa_descricaoReduzida.cast('string').alias("DES_ABREVIADOR_EMPRESA"))),
                        F.coalesce(F.upper(df_cota.fornecedorProduto_responsavel_nome.cast('string').alias("NM_RESPONSAVEL_FORNECEDOR"))),
                        (df_cota.fornecedorProduto_responsavel_telefone.cast('long').alias("NUM_TELEFONE_RESPONSAVEL_FORNECEDOR"),
                        F.coalesce(F.upper(df_cota.fornecedorProduto_responsavel_email.cast('string').alias("NM_EMAIL_RESPONSAVEL_FORNECEDOR"))),
                        F.coalesce(F.upper(df_cota.fornecedorProduto_responsavel_cargo.cast('string').alias("NM_CARGO_RESPONSAVEL_FORNECEDOR"))),
                        F.coalesce(F.upper(df_cota.fornecedorProduto_responsavel_endereco.cast('string').alias("NM_ENDERECO_RESPONSAVEL_FORNECEDOR"))),
                        (df_cota.fornecedorProduto_cnpj.cast('long').alias("NUM_CNPJ_FORNECEDOR"),
                        F.coalesce(F.upper(df_cota.fornecedorProduto_nomeFantasia.cast('string').alias("NM_FANTASIA_FORNECEDOR"))),
                        (df_cota.fornecedorProduto_dataInicio).cast("timestamp").alias("DAT_INICIO_FORNECEDOR"),
                        F.coalesce(F.upper(df_cota.flagPedidoMaximo.cast('string').alias("FLG_PEDIDO_MAXIMO"))),
                        F.coalesce(F.upper(df_cota.flagPedidoMinimo.cast('string').alias("FLG_PEDIDO_MINIMO"))),
                        (df_cota.quantidadeMinimaEstoque.cast('long').alias("QNT_MINIMA_ESTOQUE"),
                        (df_cota.quantidadeMaximaEstoque.cast('long').alias("QNT_MAXIMA_ESTOQUE"),
                        (df_cota.valor_valorProduto.cast('decimal(19,5)').alias("VLR_PRODUTO"),
                        (df_cota.valor_valorProdutoDesconto.cast('decimal(19,5)').alias("VLR_PRODUTO_DESCONTO"),
                        (df_cota.valor_comissaoVendedor.cast('decimal(19,5)').alias("VLR_COMISSAO_VENDEDOR"),
                        (df_cota.valor_maximoComissaoVendedor.cast('decimal(19,5)').alias("VLR_MAXIMO_COMISSAO_VENDEDOR"),
                        (df_cota.valor_minimoComissaoVendedor.cast('decimal(19,5)').alias("VLR_MINIMO_COMISSAO_VENDEDOR"),
                        F.upper(df_cota.NOM_EVENTO).alias("NOM_EVENTO"),
                        F.date_format(F.from_unixtime(df_cota.dataHoraExecucao / 1000), "yyyyMMdd").cast("string").alias("COD_PARTICIONAMENTO")
                        )


#LOAD
- Carregar os dados após transformados em uma tabela do BigQuery, assim permitindo consulta das areas interessadas

In [None]:
# ##################################################################
# #### Converte dataframe unificado em Pandas                   ####
# ##################################################################

dfP_fim = df_fim.select("*").toPandas()

# ##################################################################
# #### Insere registros de emissao na tabela do BigQuery        ####
# #### Dataset: ANALITICO_CONTROLE_LOJA                         ####
# #### tabela: CONSOLIDADO_PRODUTO                              ####
# ##################################################################
LOGGER.info("Iniciando Insert")
bigqueryClient = bigquery.Client()
tableRef = bigqueryClient.dataset("ANALITICO_CONTROLE_LOJA").table("CONSOLIDADO_PRODUTO")
bigqueryJob = bigqueryClient.load_table_from_dataframe(dfP_fim, tableRef)
bigqueryJob.result()

LOGGER.info("Registros inseridos com sucesso")

LOGGER.info("Fim de processamento")