# SPRINT - 2

## Item - Levantamento dos cálculos dos indicadores.

## Item - Levantamento do procedimento de consolidação dos dados de produção e apontamentos.

## Item - Desenvolvimento do processamento de Limpeza, Formatação e Classificação dos dados, e gravação na "Refined Data".

<br>
<br>

Autor.: Sérgio C. Medina

#### Declaração dos Pacotes, Libs ou Classes utilizadas no processo.

In [1]:
# Declaração dos Pacotes, Libs ou Classes utilizadas no processo.
import os
import io
import pandas as pd
from google.cloud import storage
from datetime import datetime, timedelta


# configurando variavel de ambiente com o arquivo de credenciais para conexão GCP
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = "..\..\secrets\edc-igti-smedina-4920e12ac565.json"

# Conexão com o Cloud Storage e criação do objeto bucket referente a área de dados
storage_client = storage.Client()
bucket = storage_client.get_bucket("edc-pa-i4-data")

In [2]:
# Função para leitura do arquivo CSV no GCS convertando para Pandas Dataframe
def gcp_csv_to_df(folder, dtexec, lineprod=None, sep=";"):

    # formata o padrão do nome do arquivo
    file_name = None
    if (dtexec is not None and lineprod is None):
        file_name = f"{dtexec}.csv"
    elif (dtexec is not None and lineprod is not None):
        file_name = f"{lineprod}-{dtexec}.csv"
    else:
        raise Exception("Invalid parameters.")

    df = None

    try:
        # define o objeto blob com o path para o arquivo no GCS
        blob = bucket.blob(f"raw-data-zone/{folder}/{file_name}")

        # Faz o download no formato em bytes e posiciona no index 0
        byte_stream = io.BytesIO()
        blob.download_to_file(byte_stream)
        byte_stream.seek(0)

        # Converte para o Dataframe em Pandas
        df = pd.read_csv(byte_stream, sep=sep)
    except Exception as e:
        print("Exception:", e)

    return df


# Função para realizar o delete do blob no GCS
def delete_blob(path_name):
    if path_name is not None:
        print('SEARCHING TO DELETE:', path_name)
        blobs = bucket.list_blobs(prefix=path_name)
        for blob in blobs:
            if blob.exists():
                print('DELETE:',blob.path)
                blob.delete()
    else:
        print('ERROR: path_name is None!')



In [3]:
# Função para calcular data turno com base em uma data e hora
def calc_dtprod(dt):

    # Se dt for string
    if isinstance(dt,str):
        dt = datetime.strptime(dt, "%Y-%m-%d %H:%M:%S")

    # separa a data e hora
    my_date = dt.date()
    my_hora = dt.time()

    # Formato hora 
    FMT = '%H:%M:%S'

    # Verifica periodo entre 06:00:00 e 23:59:59
    start_time = datetime.strptime('06:00:00', '%H:%M:%S').time()
    end_time = datetime.strptime('23:59:59', '%H:%M:%S').time()

    if (my_hora >= start_time and my_hora <= end_time):
        return my_date
    elif (my_hora < start_time):
        prevDay = my_date + timedelta(days=-1)
        return prevDay
    else:
        return None


# Compara se a data inicio e final informada estão na mesma data de produção
# se sim retorna a data de produção, senão retorna None
def compate_dtprod(di, df):
    di = calc_dtprod(di)
    df = calc_dtprod(df)

    if (di == df):
        return di
    else:
        return None


#### Dados na "raw-data-zone" pasta "DATAOP"

In [12]:
dtexec = "2021-11-10"

df_dataop = gcp_csv_to_df(folder="dataop", dtexec=dtexec, sep=";")

if df_dataop is not None:
    # Limpa os dados nulos
    df_dataop.dropna(inplace=True)

    # Define os tipos de cada coluna
    df_dataop['OP'] = df_dataop['OP'].astype(str)
    df_dataop['CODMAT'] = df_dataop['CODMAT'].astype(str)
    df_dataop['LOTEFAB'] = df_dataop['LOTEFAB'].astype(str)
    df_dataop['DTINI'] = pd.to_datetime(df_dataop['DTINI'])
    df_dataop['DTFIM'] = pd.to_datetime(df_dataop['DTFIM'])
    df_dataop['QTDPLAN'] = df_dataop['QTDPLAN'].astype(int)

    # Definindo a Data de Produção DTPROD
    df_dataop.insert(0, 'DTPROD', None)
    df_dataop['DTPROD'] = df_dataop.apply(lambda row:compate_dtprod(row['DTINI'], row['DTFIM']),axis=1)
    
      

In [None]:
df_dataop.dtypes

In [10]:
df_dataop

# Somarizando os dados Nuloes
#df_dataop.isnull().sum()

# Limpando os dados Nulos
#df_dataop.dropna(inplace=True)

Unnamed: 0,DTPROD,OP,DTINI,DTFIM,CODMAT,LOTEFAB,QTDPLAN
0,2021-11-09,211109L101,2021-11-09 06:00:00,2021-11-10 05:59:59,TB70PVC,TB70395,1320


In [13]:
# save parquet - gcp
# https://gist.github.com/lpillmann/fa1874c7deb8434ca8cba8e5a045dde2
# https://blog.datasyndrome.com/python-and-parquet-performance-e71da65269ce

import gcsfs
import pyarrow
import pyarrow.parquet as pq

folder="dataop"
gs_path=f"edc-pa-i4-data/processing-zone/{folder}"
gs_blobs=f"processing-zone/{folder}/DTPROD={dtexec}"

delete_blob(path_name=gs_blobs)

gs = gcsfs.GCSFileSystem()

table = pyarrow.Table.from_pandas(df_dataop)
pq.write_to_dataset(
    table,
    gs_path,
    partition_cols=['DTPROD'],
    filesystem=gs,
    compression='snappy',
    flavor='spark'
)


SEARCHING TO DELETE: processing-zone/dataop/DTPROD=2021-11-10


In [18]:
dataset = pq.ParquetDataset(
    f"edc-pa-i4-data/processing-zone/{folder}", 
    filesystem=gs
#    filters=[('DTPROD', '=', '2021-11-08')]
#    use_threads=True
)
df = dataset.read_pandas().to_pandas()

In [19]:
df

Unnamed: 0,OP,DTINI,DTFIM,CODMAT,LOTEFAB,QTDPLAN,DTPROD
0,211108L101,2021-11-08 06:00:00,2021-11-09 05:59:59,TB70PVC,TB70394,1320,2021-11-08
0,211109L101,2021-11-09 06:00:00,2021-11-10 05:59:59,TB70PVC,TB70395,1320,2021-11-09
0,211110L101,2021-11-10 06:00:00,2021-11-11 05:59:59,TB70PVC,TB70401,1320,2021-11-10


#### Dados na "raw-data-zone" pasta "DATAPROD"

In [None]:
dtexec = "2021-11-08"
lineid = "101"

nextDay = datetime.strptime(dtexec, "%Y-%m-%d") + timedelta(days=1)
print("dtexec:", dtexec, " next:", nextDay)

df_dataprod = gcp_csv_to_df(folder="dataprod", dtexec=dtexec, lineprod=lineid, sep=";")

if df_dataprod is not None:
    df = gcp_csv_to_df(folder="dataprod", dtexec=nextDay.strftime("%Y-%m-%d"), lineprod=lineid, sep=";")
    df_dataprod = df_dataprod.append(df, sort=False, ignore_index=True)

In [None]:
df_dataprod