In [1]:
# Bliblioteca que configura as variáveis de ambiente do spark pra nós
import findspark
findspark.init()

In [2]:
import os
# Configura a variável de ambiente PYARROW_IGNORE_TIMEZONE
os.environ['PYARROW_IGNORE_TIMEZONE'] = '1'

In [3]:
# Importando blibliotecas pyspark
import pyspark
import pyspark.pandas as ps
from pyspark.sql import SparkSession, SQLContext
from pyspark import SparkContext

In [4]:
# Demais bibliotecas
import requests
import pandas as pd
from datetime import datetime
from dotenv import load_dotenv
from sqlalchemy import create_engine, MetaData, Table, Column, Integer, String, Float, DateTime, Date
from sqlalchemy import text
from sqlalchemy.sql import delete
from sqlalchemy.orm import sessionmaker

In [5]:
# Credenciais para acesso a API
load_dotenv()
app_id = os.environ.get('APP_ID')
access_key = os.environ.get('ACCESS_KEY')
table_name = os.environ.get('TABLE_NAME_1')

In [6]:
# Inicializa sessão Spark
spark_session = SparkSession \
        .builder \
        .appName("API_APPSHEET") \
        .master("local[*]") \
        .getOrCreate()

spark_session

In [7]:
# # Criando o Spark Context
# sc = SparkContext(appName = 'API_APPSHEET')

# # Criando a sessão do Spark
# spark_session = SparkSession.Builder().getOrCreate()

# #Visualiza o objeto spark_session
# spark_session

In [8]:
# URL da API AppSheet para ler registros
api_url = f'https://api.appsheet.com/api/v2/apps/{app_id}/tables/{table_name}/Action'

# Corpo da solicitação
payload = {
    "Action": "Find",
    "Properties": {
        "Locale": "en-US",
        "Timezone": "Pacific Standard Time"
    },
    "Rows": []
}

# Cabeçalho da solicitação com a chave de acesso do aplicativo
headers = {
    "applicationAccessKey": access_key,
    "Content-Type": "application/json"
}

try:

    # Fazendo a solicitação POST
    response = requests.post(api_url, json=payload, headers=headers)

    # Verifica se a solicitação foi bem-sucedida (código 200)
    if response.status_code == 200:
        # Retorna os dados da resposta (formatados como JSON)
        json_data = response.json()
    else:
        # Imprime uma mensagem de erro se a solicitação falhou
        print(f'Erro na solicitação. Código de Resposta: {response.status_code}')

# Tratamento de erro:
except json.JSONDecodeError as e:
    print(f'Erro ao decodificar JSON: {e}')
    print(f'Resposta da API: {response.text}')  # Imprime o conteúdo da resposta para debug
except requests.RequestException as e:
    print(f'Erro na requisição HTTP: {e}')


In [9]:
import pandas as pd
df = pd.DataFrame(json_data)
df.to_csv('dados/cadastro_funcionarios.csv')

In [10]:
df_spark = spark_session.read.csv('dados/cadastro_funcionarios.csv', header = 'true', inferSchema = 'true')

In [11]:
# Renomeando colunas uma a uma
df_spark = df_spark \
    .withColumnRenamed('_RowNumber', 'index') \
    .withColumnRenamed('Row ID', 'id') \
    .withColumnRenamed('Nome', 'nome') \
    .withColumnRenamed('RG', 'rg') \
    .withColumnRenamed('CPF', 'cpf') \
    .withColumnRenamed('Coordenador', 'coordenador') \
    .withColumnRenamed('Senioridade', 'senioridade') \
    .withColumnRenamed('Classificação', 'classificacao') \
    .withColumnRenamed('Tipo Contratação', 'tipo_de_contratacao') \
    .withColumnRenamed('Área', 'area') \
    .withColumnRenamed('Indicado Por', 'indicado_por') \
    .withColumnRenamed('Gênero', 'genero') \
    .withColumnRenamed('Restrições/Observações', 'observacoes') \
    .withColumnRenamed('Telefone', 'telefone') \
    .withColumnRenamed('Telefone 2', 'telefone_2') \
    .withColumnRenamed('Data de Nascimento', 'data_de_nascimento') \
    .withColumnRenamed('Data Aceite', 'data_aceite') \
    .withColumnRenamed('Início da Jornada', 'inicio_da_jornada') \
    .withColumnRenamed('Fim da Jornada', 'fim_da_jornada') \
    .withColumnRenamed('Idade', 'idade') \
    .withColumnRenamed('Bairro', 'bairro') \
    .withColumnRenamed('Endereço', 'endereco') \
    .withColumnRenamed('Número', 'numero') \
    .withColumnRenamed('CEP', 'cep') \
    .withColumnRenamed('Cidade', 'cidade') \
    .withColumnRenamed('Estado', 'estado') \
    .withColumnRenamed('CNPJ', 'cnpj') \
    .withColumnRenamed('Nome da Empresa', 'nome_da_empresa') \
    .withColumnRenamed('Tam. Camiseta', 'tam_camiseta') \
    .withColumnRenamed('Filhos', 'filhos') \
    .withColumnRenamed('Quantos?', 'quantos') \
    .withColumnRenamed('Entrevistador RH', 'entrevistador_rh') \
    .withColumnRenamed('Entrevistador Técnico', 'entrevistador_tecnico') \
    .withColumnRenamed('Status', 'status') \
    .withColumnRenamed('Motivo do Status', 'motivo_do_status') \
    .withColumnRenamed('Motivo Saída', 'motivo_da_saida') \
    .withColumnRenamed('E-mail', 'email') \
    .withColumnRenamed('Função', 'funcao') \
    .withColumnRenamed('Foto', 'foto') \
    .withColumnRenamed('Gerente', 'gerente') \
    .withColumnRenamed('Age', 'age') \
    .withColumnRenamed('QuantidadeDias', 'tempo_de_empresa_em_dias') \
    .withColumnRenamed('Tempo de Empresa', 'tempo_de_empresa_text')


In [12]:
# Dropando colunas
df_spark = df_spark.drop('_c0', 
                         'Ações', 
                         'Complemento', 
                         'Crachá',
                         'Observação/Anotações',
                         'CPF_Mask',
                         'Ausências',
                         'Histórico 1:1',
                         'Soft Skills',
                         'Aniversário?',
                         'Histórico Coordenação',
                         'Histórico Áreas',
                         'Histórico de Ocorrências',
                         'Exceções de Apontamento',
                         'Weeklys',
                         'Histórico Funções',
                         'Aniversariantes do mês',
                         'Mês de aniversário')

In [13]:
df_spark.describe

<bound method DataFrame.describe of DataFrame[index: int, id: string, nome: string, coordenador: string, senioridade: string, classificacao: string, tipo_de_contratacao: string, area: string, indicado_por: string, genero: string, observacoes: string, telefone: string, telefone_2: bigint, data_de_nascimento: string, data_aceite: string, inicio_da_jornada: string, fim_da_jornada: string, idade: int, estado: string, cidade: string, bairro: string, endereco: string, numero: string, cep: string, rg: string, cpf: string, cnpj: string, nome_da_empresa: string, tam_camiseta: string, filhos: string, quantos: int, entrevistador_rh: string, entrevistador_tecnico: string, status: string, motivo_do_status: string, motivo_da_saida: string, email: string, funcao: string, cat_pag: string, foto: string, gerente: string, age: int, tempo_de_empresa_text: string, tempo_de_empresa_em_dias: int]>

In [14]:
# Criando uma engine com SQLAlchemy para BigQuery
engine = create_engine('bigquery://',credentials_path='keyfile.json')

Session = sessionmaker(engine)
session = Session()
metadata = MetaData()

# Definindo tabela e Schema
table_name = 'bronze_cadastro_funcionarios'
dataset_name = 'bronze'

#
cadastro_funcionarios = Table(
    table_name,
    metadata,
    Column('index', Integer),
    Column('id', String, primary_key=True),
    Column('nome', String),
    Column('coordenador', String),
    Column('senioridade', String),
    Column('classificacao', String),
    Column('tipo_de_contratacao', String),
    Column('area', String),
    Column('indicado_por', String),
    Column('genero', String),
    Column('observacoes', String),
    Column('telefone', String),
    Column('telefone_2', String),
    Column('data_de_nascimento', Date),
    Column('data_aceite', Date),
    Column('inicio_da_jornada', Date),
    Column('fim_da_jornada', Date),
    Column('idade', Integer),
    Column('estado', String),
    Column('cidade', String),
    Column('bairro', String),
    Column('endereco', String),
    Column('numero', String),
    Column('cep', String),
    Column('rg', String),
    Column('cpf', String),
    Column('cnpj', String),
    Column('nome_da_empresa', String),
    Column('tam_camiseta', String),
    Column('filhos', String),
    Column('quantos', Integer),
    Column('entrevistador_rh', String),
    Column('entrevistador_tecnico', String),
    Column('status', String),
    Column('motivo_do_status', String),
    Column('motivo_da_saida', String),
    Column('email', String),
    Column('funcao', String),
    Column('cat_pag', String),
    Column('foto', String),
    Column('gerente', String),
    Column('age', Integer),
    Column('tempo_de_empresa', String),
    Column('tempo_de_empresa_em_dias', Integer),
    schema=dataset_name
)

# Criando tabela
metadata.create_all(engine, checkfirst=True)



In [15]:
# Deletando dados (caso exista)
delete_dados = delete(cadastro_funcionarios).where(text("1=1"))
session.execute(delete_dados)
session.commit()
session.close()

In [16]:
df_pandas = df_spark.toPandas()

In [20]:
df_pandas.columns

Index(['index', 'id', 'nome', 'coordenador', 'senioridade', 'classificacao',
       'tipo_de_contratacao', 'area', 'indicado_por', 'genero', 'observacoes',
       'telefone', 'telefone_2', 'data_de_nascimento', 'data_aceite',
       'inicio_da_jornada', 'fim_da_jornada', 'idade', 'estado', 'cidade',
       'bairro', 'endereco', 'numero', 'cep', 'rg', 'cpf', 'cnpj',
       'nome_da_empresa', 'tam_camiseta', 'filhos', 'quantos',
       'entrevistador_rh', 'entrevistador_tecnico', 'status',
       'motivo_do_status', 'motivo_da_saida', 'email', 'funcao', 'cat_pag',
       'foto', 'gerente', 'age', 'tempo_de_empresa_text',
       'tempo_de_empresa_em_dias'],
      dtype='object')

In [23]:
df_pandas['quantos'].dtypes

dtype('float64')

In [17]:
spark_session.stop()

In [18]:
# Escrevendo o DataFrame Pandas para o PostgreSQL
df_pandas.to_sql(table_name, engine, schema="bronze", if_exists='append', index=False)

  df_pandas.to_sql(table_name, engine, schema="bronze", if_exists='append', index=False)


AttributeError: 'Engine' object has no attribute 'cursor'