# Modelo para acesso e uso do Pyspark (Python com Spark)

## Bibliotecas (importação)

In [None]:
import os
import gc
import sys
import glob
import time
import psutil
import shutil
import findspark
import pathlib as path

### Data
from datetime import datetime

## Data
import pandas as pd
import pyspark.pandas as ps

### GCP
from google.cloud import bigquery

### Spark
from pyspark               import SparkContext, SparkConf
from pyspark.sql           import SparkSession

## Suporte ao IPYTHON
from IPython.core.display import display

# Variáveis globais

In [None]:
__SPARK = None

## Constantes

In [None]:
TIME_START       = time.time()
DATE_TIME        = datetime.now()
SPARK_PATH       = f'/opt/data/spark'
PROJECT_NAME     = '[NOME DO PROJETO]' ## Utilize nessa variável o nome do projeto do GCP
BUCKET_NAME      = '[NOME DO STORAGE]' ## Utilize nessa variável o nome do seu storage name do GCP
PATH_APPLICATION = os.path.abspath(os.getcwd())

## Funções de apoio

In [None]:
def folder_create(folder:str):
    """
    Verifica se a pasta existe localmente, e caso não exista a mesma será criada
    ----------

    Parâmetros
    ----------
    folder : str
        Caminho completo com o nome da pasta de criação caso a pasta não exista será criada
    """

    # Verifica se o diretório destino existe, caso contrário cria o caminho
    path.Path(folder).mkdir(parents=True, exist_ok=True)

# --------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- #

def folder_delete(folder:str):
    """
    Verifica se o arquivos existe, e caso exista o mesmo será excluído da pasta local
    ----------

    Parâmetros
    ----------
    folder : str
        Caminho completo com o nome da pasta de criação caso a pasta não exista será excluído
    """

    ## Verifica se o arquivo existe
    shutil.rmtree(folder, ignore_errors=True)

# --------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- #

def file_split(file:str):
    """
    Fragamenta o arquivo em partes: caminho + extensão
    ----------

    Parâmetros
    ----------
    file : str
        Caminho completo com o nome do arquivo ['/home/ricardo/Downloads/email_statistics.sql']

    Retornos
    ----------
    folder : str
        Caminho completo composto pelo nome do arquivo sem a extensão ['/home/ricardo/Downloads/email_statistics']
    extension : str
        Extensão do arquivo sem separador ['sql']
    """
    
    ## Fragmenta o caminho e arquivo em partes
    folder, extension = os.path.splitext(file)
    extension = extension.replace('.', '')

    ## Retorna a pasta base + nome da pasta com o nome do arquivo + nome do arquivo + extensão do arquivo
    return folder, extension

# --------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- #

def file_extension(file:str):
    """
    Extrai a extensão do arquivo
    ----------

    Parâmetros
    ----------
    file : str
        Caminho completo com o nome do arquivo

    Retornos
    ----------
    extension : str
        Extensão do arquivo com o separador ['.sql']
    """

    ## Variável
    extension = None

    try:
        ## Extrai a extensão do arquivo
        extension = path.Path(file).suffix.lower().strip()
    except:
        pass 

    ## Retorna o valor obtido
    return extension

# --------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- #

def get_memory():
    """
    Retorna a quantidade de memória total disponível do sistema (reduzindo 1g para controle)
    ----------

    Retornos
    ----------
    mem_gib : string
        Memória total disponível (formato 8g)
    """

    ## Variável local
    memory = '4g'

    ## Recupera a memória total em bytes
    mem_tot = psutil.virtual_memory().total
    ## Converte a memória total em bytes para gigabytes
    mem_gib = round(int(mem_tot/(1024.**3)) * 0.7)

    ## Verifica o tamanho mínimo e ajusta se for o caso
    if mem_gib <= 4: mem_gib = 4

    ## Reduz 1g da memória para o sistema
    memory = f'{mem_gib}g'

    ## retorna a memória total do sistema
    return memory

# --------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- #

def get_cores():
    """
    Retorna a quantidade de cores (cpu) do sistema (reduzindo 1core para controle)
    ----------

    Retornos
    ----------
    cores : int
        CPU Cores total disponível
    """

    ## Variável local
    cpu = 1

    ## Recupera a quantidade de cores do sistema
    cpus = psutil.cpu_count()
    ## Reduz 1 core da capacidade total do sistema
    cpu = round(int(cpus) * 0.7)

    ## Verifica o tamanho mínimo e ajusta se for o caso
    if cpu <= 0: cpu = 1

    ## retorna a memória total do sistema
    return cpu


## Configuração Spark e instância

In [None]:

def start_spark(name:str=None):
    """
    Função para iniciar a instância do contexto spark
    ----------

    Parâmetros
    ----------
    name : str, optional, default = DATE_TIME.strftime('%Y-%m%d') 
        Nome da estrutura de carga do yarn - spark

    Retornos
    ----------
    spark : sparkSession
        Sessão spark para execução das instruções spark no contexto
    """

    ## Variáveis global
    global __SPARK

    ## Variáveis local
    sc = None
    
    ## Verifica se a sessão spark já está ativa e válida
    if __SPARK: return __SPARK

    ## Verifica a instalação da jvm para o spark
    findspark.init()

    ### Cria a variável de ambiente
    environment = ['PYSPARK_PYTHON', 'PYSPARK_DRIVER_PYTHON']
    for var in environment:
        os.environ[var] = sys.executable

    ## Define o nome do contexto
    if not name: name = DATE_TIME.strftime('%Y-%m%d') 
    name = f'process_{name}'

    ## Verifica se o diretório destino existe, caso contrário cria o caminho
    folder_create(folder=SPARK_PATH)

    ## Configura o processo spark
    conf = SparkConf() \
        .setAppName(name) \
        .setMaster('local[*]') \
        .setAll([
            ("spark.submit.deployMode", "client"),
            ("spark.local.dir", SPARK_PATH),
            ("spark.network.timeout", "39000"),
            ("spark.executor.memory", get_memory()), 
            ("spark.executor.cores", get_cores()), 
            ("spark.driver.cores", get_cores()), 
            ("spark.driver.memoryOverhead", "2048"),
            ("spark.memory.offHeap.enabled", "true"),
            ("spark.memory.offHeap.size", get_memory()),
            ("spark.dynamicAllocation.enabled", "false"),
            ("spark.default.parallelism", "50"),    
            ("spark.driver.maxResultSize", "0"), 
            ("spark.driver.supervise", "true"),
            ("spark.sql.debug.maxToStringFields", 2000),
            ("spark.sql.caseSensitive", "false"),
            ("spark.driver.userClassPathFirst", "false"),
            ("spark.standalone.submit.waitAppCompletion", 'true'),
            ("spark.sql.parquet.outputTimestampType", "TIMESTAMP_MICROS"),
            ("spark.sql.legacy.parquet.datetimeRebaseModeInWrite", "LEGACY"),
            ("spark.sql.parquet.mergeSchema", "false"),
            ("spark.sql.parquet.filterPushdown", 'false'),
            ("spark.sql.parquet.enableVectorizedReader", "false"),
            ("spark.jars", f" \
                {PATH_APPLICATION}/drivers/gcs-connector-hadoop3-latest.jar, \
                {PATH_APPLICATION}/drivers/bigquery-connector-hadoop3-latest.jar, \
                {PATH_APPLICATION}/drivers/spark-bigquery-with-dependencies_2.12-0.22.0.jar, \
                {PATH_APPLICATION}/drivers/spark-bigquery-latest_2.12.jar, \
                {PATH_APPLICATION}/drivers/ngdbc-latest.jar, \
                {PATH_APPLICATION}/drivers/jetty-util-11.0.5.jar"
            ),
            ("spark.sql.execution.arrow.enabled", "true"),
            ("spark.sql.execution.arrow.pyspark.enabled", "true"),
            ("spark.sql.execution.arrow.pyspark.fallback.enabled", "true"),
            ("spark.history.fs.cleaner.enabled", "true"),
            ("spark.eventLog.enabled", "false"),
            ("spark.eventLog.overwrite", "true"),
            ("spark.logConf", "false")
        ])

    ## Cria o contexto spark
    sc = SparkContext.getOrCreate(conf=conf)

    ## Ajusta o log de erro
    sc.setLogLevel("ERROR")
    ## drivers para leitura de arquivos em google cloud
    sc._jsc.hadoopConfiguration().set("fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem")
    sc._jsc.hadoopConfiguration().set("fs.AbstractFileSystem.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS")
    sc._jsc.hadoopConfiguration().set("fs.gs.auth.service.account.enable", "true")
    sc._jsc.hadoopConfiguration().set("google.cloud.auth.service.account.json.keyfile", f"{PATH_APPLICATION}/keys/{PROJECT_NAME}_key_google.json")
    
    ## Cria o sessão spark
    __SPARK = SparkSession.builder.config(conf=sc.getConf()).getOrCreate()

    ## Retorna a sessão spark
    return __SPARK

# ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- #

def stop_spark():
    """
    Encerra a operação com spark e limpa os temporários
    ----------

    Parâmetros
    ----------
    spark : SparkSession, default = None
        Sessão spark existente
    """

    ## Variáveis global
    global __SPARK

    try:
        ## Encerra a sessao
        __SPARK.stop()
    finally:
        ## Limpa o GC
        gc.collect()

    ## Limpa as pastas temporárias
    folder_delete(SPARK_PATH)

# ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- #

def read_sql(project:str, dataset:str, sql:str):
    """
    Carrega os dados via instrução sql via operação spark
    ----------

    Parâmetros
    ----------
    spark : SparkSession
        Sessão spark existente
    project : str
        Nome do Project_id de hospedagem no bigquery [project-XXXXXX]
    dataset : str
        Nome do Dataset_id de hospedagem no bigquery [operation]
    sql : str
        Instrução sql para execução no banco de dados

    Retornos
    ----------
    data : spark rdd (dataframe)
        Retorno do dataframe contendo os dados da extração
    total : int
        Total de registros encontrados no dataframe
    """

    ## Variáveis local
    data  = None
    total = -1

    ## Verifica se a sessão está aberta
    spark = start_spark()

    ## Captação dos dados de uma tabela
    data = spark.read \
        .format("bigquery") \
        .option('viewsEnabled', 'true') \
        .option('materializationProject', project) \
        .option('materializationDataset', dataset) \
        .option('optimizedEmptyProjection', 'true') \
        .option('type', 'direct') \
        .load(sql)

    ## Organiza e padroniza os nomes das colunas
    data = data.toDF(*[col.upper() for col in data.columns])
    ## Contabiliza o total de registros do pacote
    total = data.count()

    ## Retorna o total do arquivo
    return data, total

# ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- #

def read_table(project:str, table:str, columns=None):
    """
    Carrega os dados de uma tabela do bigquery
    ----------

    Parâmetros
    ----------
    project : str
        Nome do Project_id de hospedagem no bigquery [project-XXXXXX]
    table : str
        Nome da tabela de hospedagem no bigquery [files]
    columns : list, optional, default = None
        Sequência e nome das colunas do arquivo para subscrição ['ID', 'TIPO', 'NOME', 'DESCRICAO']

    Retornos
    ----------
    data : spark rdd (dataframe)
        Retorno do dataframe contendo os dados da extração
    total : int
        Total de registros encontrados no dataframe
    """

    ## Variáveis local
    data  = None
    total = -1

    ## Verifica se a sessão está aberta
    spark = start_spark()

    ## Captação dos dados de uma tabela
    data = spark.read \
        .format('bigquery') \
        .option("parentProject", project) \
        .option('project', project) \
        .option('table', table) \
        .option('viewsEnabled', 'true') \
        .option('optimizedEmptyProjection', 'false') \
        .load()

    ## Organiza e padroniza os nomes das colunas
    data = data.toDF(*columns) if columns else data.toDF(*[col.upper() for col in data.columns])
    ## Contabiliza o total de registros do pacote
    total = data.count()

    ## Retorna o total do arquivo
    return data, total

# ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- #

def read_data(file:object, columns:str=None, header:bool=True, sep:str=None, encoding:str='utf-8', quote:str=None):
    """
    Carrega os dados de um AQUIVO CSV OU PARQUET
    ----------

    Parâmetros
    ----------
    file : object
        Caminho + arquivo para leitura do arquivo ['/opt/data//programs_2021-03-18.parquet' or gs://[bucket_name]/landing/programs_2021-03-18.parquet]
    columns : list, optional, default = None
        Sequência e nome das colunas do arquivo para subscrição ['ID', 'TIPO', 'NOME', 'DESCRICAO']
    header : bool, optional, default = True
        Se o arquivo é lido com cabeçalho ou não [True or False] 
    sep : str, optional, default = None
        Tipo do separador do arquivo [',', ';', '\t', '|']
    encoding : str, optional, default = 'utf-8'
        Código de codificação da leitura do arquivo ['utf-8', 'utf-16', 'iso-8859-1']
    quote : str, optional, default = "
        Caracter para escapar de linhas com erro

    Retornos
    ----------
    data : spark rdd (dataframe)
        Retorno do dataframe contendo os dados da extração
    total : int
        Total de registros encontrados no dataframe
    """

    ## Variáveis local
    total     = -1
    data      = None
    extension = file_extension(file=file)

    ## Verifica se a sessão está aberta
    spark = start_spark()

    ## Verifica se a sessão está aberta e ativa no Spark
    if not spark:
        raise Exception('Sessão com o Spark não iniciada.')

    ## O tipo do arquivo
    if extension in ('.parquet'):
        ## Realiza a leitura do arquivo parquet para validar a quantidade
        data = spark.read \
            .option("mergeSchema", "true") \
            .parquet(file)
    elif extension in ('.json'):
        data = spark.read \
            .option('multiline','true') \
            .json(file)
    elif extension in ('.xlsx', '.xls', '.xlsm'):
        ## Leitira do arquivo excel via pandas
        data = pd.read_excel(
            file, 
            engine='openpyxl'
        )
    elif extension in ('.csv', '.txt'):
        ## Ajusta o caracter de quote
        if quote == '': quote = None

        ## Realiza a leitura dos dados e coloca em disco
        data = spark.read.csv(
            path=file,
            header=header,
            sep=sep,
            encoding=encoding,
            inferSchema=False,
            ignoreLeadingWhiteSpace=True,
            ignoreTrailingWhiteSpace=True,
            multiLine=True,
            charToEscapeQuoteEscaping=quote,
            quote=quote,
            escape=quote,
            mode='FAILFAST',
            unescapedQuoteHandling='RAISE_ERROR',
        )
    else:
        return data

    ## Converte o pyspark.pandas dataframe para RDD
    data = to_rdd(
        data=data
    )
    ## Carrega o nome das colunas ou converte para maiúsculo caso exista
    data = data.toDF(*columns) if columns else data.toDF(*[col.upper() for col in data.columns])
    ## Total de registros do dataframe
    total = data.count()

    ## Retorna o total do arquivo
    return data, total

# ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- #

def save_data(data, bucket:str, table:str, mode:bool):
    """
    Salva os dados de um rdd (dataframe) em uma tabela no bigquery
    ----------

    Parâmetros
    ----------
    data : pandas.core.frame.DataFrame ou pyspark.sql.dataframe.DataFrame
        Dataframe em memória / disco
    bucket : str
        Nome do bucket de destino dos dados [project-XXXXXX]
    table : str
        Nome da tabela de destino dos dados [project-XXXXXX.dataset.table]
    mode : bool
        Módulo para append = false ou overwrite = true
    """

    ## Verifica se a sessão está aberta
    spark = start_spark()

    try:
        ## Verifica se há a necessidade de conversão do dataframe para rdd
        data = to_rdd(
            data=data
        )
            
        ## Monta a variável para adição ou subescrição
        mode_type = 'overwrite' if mode else 'append'

        ## Salva o dataframe em parquet local
        data.write \
            .format('bigquery') \
            .option("temporaryGcsBucket", BUCKET_NAME) \
            .option('table', table) \
            .mode(mode_type) \
            .save()
    except Exception as ex:
        raise Exception(f'DATABASE: Falha na gravação {bucket}.{table} no (MODULO: save_data) - {ex}')       

# ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- #

def save_file(data, file:str, mode:bool=True):
    """
    Salva os dados de um rdd (dataframe) em um arquivo local
    ----------

    Parâmetros
    ----------
    data : pandas.core.frame.DataFrame ou pyspark.sql.dataframe.DataFrame
        Dataframe (rdd) em memória / disco
    file : str
        Nome do arquivo para armazenamento em disco [/opt/data/programs_2021-03-18.csv]
    mode : bool, optional, default = True
        Módulo para append = false ou overwrite = true
    """

    ## Verifica se a sessão está aberta
    spark = start_spark()

    ## Valida se o arquivo foi criado corretamente
    try:
        ## Verifica se o dataframe é do tipo pandas ou rdd
        data = to_rdd(
            data=data
        )

        ## Variáveis local
        folder_base, extension = file_split(file=file)
        mode_type = 'overwrite' if mode else 'append'

        ## Verifica se a exportação é no formato excel
        if extension.lower() in ('xlsx', 'xls', 'excel'):
            extension = 'com.crealytics.spark.excel'

        ## Criando pasta de destino caso não exista
        folder_create(folder=folder_base)
        
        ## Salva o dataframe em parquet local
        data.repartition(1) \
            .write.format(extension) \
            .mode(mode_type) \
            .save(
                folder_base, 
                header=True,
                encoding='utf-8'
            )

        ## Verifica se o arquivo foi criado e ajusta o nome final
        for name in path.Path(folder_base).glob('_SUCCESS'):

            ## Varre o diretório criado "parquet" para validar os arquivos gerados
            for item in glob.glob(f'{folder_base}/part-*.{extension}'):
                ## Renomei o arquivo criado parquet part para o nome de exportação
                os.rename(item, file)
                
                ## Verifica se o arquivo foi renomeado corretamente
                if glob.glob(file):
                    ## Apaga 
                    folder_delete(folder=folder_base)
                    ## Remove o dataframe da memória
                    data.unpersist(True)

    except Exception as ex:
        raise Exception(f'DATABASE: Falha na gravação do arquivo {file} no (MODULO: save_file) - {ex}')

# ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- #

def to_rdd(data:pd.DataFrame):
    """
    Carrega os dados de um pandas dataframe para um spark dataframe
    ----------

    Parâmetros
    ----------
    data : pd.Dataframe
        Dados de um pandas dataframe

    Retornos
    ----------
    df : spark dataframe
        Retorno do dataframe contendo os dados da extração
    """

    ## Variáveis local
    df = None 

    ## Verifica se a sessão está aberta
    spark = start_spark()

    if type(data) == pd.DataFrame:
        ## Ajusta os tipos de colunas para string
        data = data.astype('str')
        ## Converte o pandas dataframe para spark RDD
        df = spark.createDataFrame(data=data)
    elif type(data) == ps.DataFrame:
        ## Ajusta os tipos de colunas para string
        data = data.astype('str')
        ## Converte de pyspark.pandas para spark RDD
        df = data.to_spark()
    else:
        ## Atualiza a variável local com o conteúdo 
        df = data

    ## Retorna o spark datafrem
    return df
    

## Exemplos de execução

#### Lendo um arquivo parquet de dentro do storage da Google em parquet

In [None]:
### Caminho do arquivo
file = 'gs://bucket_name/staging/file_2022-02-19/*.parquet'
### Leitura do arquivo no storage e retorno do dataframe e total de registros
data, total = read_data(file=file)

### Apresentação do resultado
display(f'Total de registros lidos: {total}', 'Dados:', data.show(5, False))

#### Lendo uma tabela do BigQuery

In [None]:
### Lendo a tabela do BigQuery completa e retorno do dataframe e total de registros
data, total = read_table(project=[project_XXXXX], table=[dataset.tabela]])

### Apresentação do resultado
display(f'Total de registros lidos: {total}', 'Dados:', data.show(5, False))

### Lendo um sql no BigQuery

In [None]:
### Instrução sql
sql = 'SELECT DISTINCT ID,GENDER FROM `project-XXXXX.dataset.table` WHERE ID > 0'

## Captação dos dados de uma tabela
data, total = read_sql(project=[project_XXXXX], dataset=[dataset_name], sql=sql)

### Apresentação do resultado
display(f'Total de registros lidos: {total}', 'Dados:', data.show(5, False))