In [1]:
import pandas_gbq
import sys
import os

import numpy as np
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
import os

import time

from projectutils import read_data 
import sys

#import sys, importlib
#importlib.reload(sys.modules['group_tam_id'])
from group_tam_id import (
    load_to_gbq,
    main_data_treat_muni,
)


In [2]:

def vm_load_grouping_part(input_date, cod_munis):
    import os
    start_time = time.time()
    anomes = input_date[:7].replace('-', '')
    output_dir = fr'data/temp_dir_{anomes}' 
    df = pd.concat([pd.read_parquet(f"{output_dir}/processed_group_{cod_muni}.parquet") for cod_muni in cod_munis])
    print(f'Loading data in {time.time()  - start_time:.2f} seconds.')
    df['agrupamento_nome_1'] = df['agrupamento_nome_1'] + df['cod_muni'].astype(str)
    return df


In [3]:
def add_grouped_part(df, input_date, cod_munis):
    df = df.copy()
    print('Tamanho do DataFrame:', df.shape)
    df = df.merge(vm_load_grouping_part(input_date, cod_munis), on=['inicio', 'cod_muni', 'nome_master'], how='inner')
    return df

In [4]:

#OUTROS = ['Outros', 'Outros_Pags', 'Outros_SumUp', 'Outros_Stone']


In [5]:
import pandas as pd
from google.cloud import bigquery
import datetime

def upload_partitioned_table_to_bigquery(df, destination_table):
    """
    Uploads a DataFrame to BigQuery with partitioning on 'ingestion_date' column.
    If the table exists, it appends the data. If not, it creates the table.

    Args:
        df (pd.DataFrame): DataFrame to upload.
        destination_table (str): Destination table in the format 'project.dataset.table'.

    Returns:
        None
    """
    # Add 'ingestion_date' column
    df['ingestion_date'] = datetime.datetime.now()

    # Create BigQuery client
    client = bigquery.Client()

    # Define job configuration
    job_config = bigquery.LoadJobConfig(
        time_partitioning=bigquery.TimePartitioning(
            type_=bigquery.TimePartitioningType.DAY,
            field="ingestion_date"
        ),
        write_disposition=bigquery.WriteDisposition.WRITE_APPEND  # Append data if table exists, otherwise create table
    )

    # Load DataFrame to BigQuery
    job = client.load_table_from_dataframe(df, destination_table, job_config=job_config)

    # Wait for the job to complete
    job.result()
    print("Data successfully loaded into BigQuery")
    
    
import os
import pandas as pd
import json

def load_available_data(directory, destination_table, final_data_treat_func, track_file='uploaded_files.json', restart_track_file=True):
    """
    Loads data from parquet files in the given directory and uploads them to a BigQuery table.
    Tracks uploaded files to avoid re-uploading.

    Args:
        directory (str): The directory containing parquet files.
        destination_table (str): The BigQuery table to upload data to.
        track_file (str): The path to the JSON file used to track uploaded files.

    Returns:
        None
    """
    # Load the list of uploaded files
    if os.path.exists(track_file) and not restart_track_file:
        with open(track_file, 'r') as f:
            uploaded_files = json.load(f)
    else:
        uploaded_files = []

    # Get list of files in the directory
    files = [f for f in os.listdir(directory) if f.endswith('parquet')]

    for file in files:
        if file not in uploaded_files:
            print(f'Opening {file}...')
            df = pd.read_parquet(os.path.join(directory, file))
            
            df = final_data_treat_func(df)
            
            print(f'Sending {file} to {destination_table}')
            upload_partitioned_table_to_bigquery(df, destination_table)
            
            print(f'{file} sent to {destination_table} successfully!')

            # Update the list of uploaded files and save it
            uploaded_files.append(file)
            with open(track_file, 'w') as f:
                json.dump(uploaded_files, f)
        else:
            print(f'{file} has already been uploaded. Skipping.')


def final_data_treat(df):
    if '_merge' in df.columns:
        df = df.drop(columns=['_merge'])             
    else:
        print('_merge not in df')
    
    if 'reference_month_x' in df.columns:
        df['reference_month'] = df['reference_month_x']
        del df['reference_month_x'], df['reference_month_y']
             
    #df['group_id_index'] = (df['group_id_index'].astype(str) + df['cod_muni'].astype(str)).astype(int)
    
    #df = df.drop(columns = ['inicio'])
             
    return df

                 


In [6]:

def inter_data_treat_muni(df, file_idx):

    len_df = len(df)
    
    #if len_df> 100000:
    #    verbose = True
    #else:
    #    verbose = False
    verbose = True
    LEVEL_GROUP = ['cod_muni']
    
    cod_munis = list(df['cod_muni'].unique())
    
    start = time.time()
    if len(cod_munis) > 3:
        print(f'Starting inter data treat for qtd munis {len(cod_munis)}, with size {len(df)}')
    else:
        print(f'Starting inter data treat for munis {cod_munis}, with size {len(df)}')
        

    df = add_grouped_part(df, cod_munis) 
   
    print(f'Inter data treat Execution time file {file_idx}: {time.time() - start:.2f} seconds')
    return df



def save_tratados(file_name, input_date):
    df = pd.read_parquet(f'data_new/intermediary/{input_date}/{file_name}')
    df = main_data_treat_muni(df, file_name)
    df.to_parquet(f'data_new/tratados/{input_date}/{file_name}')
    print(f'File {file_name} Saved in Tratados')
              
def save_all_tratados(input_date):
    output_dir = f'data_new/intermediary/{input_date}'
    
    #make sure 'data/intermediary/{input_date}' is created
    os.makedirs(f'data_new/tratados/{input_date}', exist_ok=True)
    
    files = [x for x in os.listdir(output_dir) if x.endswith('parquet')]
    for f in files:
        save_tratados(f, input_date)

        
        
    


In [7]:
import os
import pandas as pd
import pyarrow.parquet as pq

def generate_parquet_summary(directory):
    # Lista todos os arquivos no diretório
    files = [f for f in os.listdir(directory) if f.endswith('.parquet')]
    
    # Cria listas para armazenar os nomes dos arquivos e a contagem de linhas
    file_names = []
    line_counts = []
    
    # Itera sobre cada arquivo, conta as linhas e armazena as informações
    for file in files:
        file_path = os.path.join(directory, file)
        parquet_file = pq.ParquetFile(file_path)
        file_names.append(file)
        line_counts.append(parquet_file.metadata.num_rows)
    
    # Cria o DataFrame
    summary_df = pd.DataFrame({
        'file_name': file_names,
        'line_count': line_counts
    })
    
    return summary_df



def calculate_group_ids(column):
    # Inicializa listas para soma cumulativa e IDs
    stopped_cumsum = []
    group_ids = []
    
    # Variáveis auxiliares
    cumsum = 0
    group_id = 1
    
    # Itera pelos valores da coluna
    for value in column:
        if cumsum + value > 1000000:
            group_id += 1
            cumsum = 0
        
        cumsum += value
        stopped_cumsum.append(cumsum)
        group_ids.append(group_id)
        
        if cumsum == 5:
            cumsum = 0
            group_id += 1
    
    return pd.Series(group_ids, index=column.index)


def create_agg_nomes_agrupados(input_date):
    # Exemplo de uso
    anomes = input_date[:7].replace('-', '')
    directory = f'data/temp_dir_{anomes}'
    df = generate_parquet_summary(directory)
    df = df.sort_values('line_count')
    df['id'] = calculate_group_ids(df['line_count'])
    df['cod_muni'] = df['file_name'].apply(lambda filename: filename.split('_')[2].split('.')[0])

    import json
    dict_files = df.groupby('id')['cod_muni'].apply(lambda x: list(x)).to_dict()
    return dict_files

        

In [8]:
import pandas as pd
import pandas_gbq as pd_gbq
import jinja2
import os

import time

# function to get current date and time
def get_current_time():
    from datetime import datetime
    return "[" + datetime.now().strftime("%Y-%m-%d %H:%M:%S") + "]"

def read_gbq_(query):
  project_id = 'sfwthr2a4shdyuogrt3jjtygj160rs' # ri-nonprod
  print(f'{get_current_time()} Getting dataset from BQ...')
  return pd_gbq.read_gbq(query, progress_bar_type='tqdm',
                         use_bqstorage_api=True,
      project_id=project_id)

def read_gbq_from_template(template_query, dict_query):
  query = template_query
  if dict_query:
      from jinja2 import Template
      # Reads a query from a template and returns the query with the variables replaced
      # template_query: query as string, may use jinja2 templating
      # dict_query: dictionary of query parameters, to render from the template with jinja2
      query = Template(template_query).render(dict_query)
  return read_gbq_(query)


In [9]:
def remove_files():
    input_date = '2024-04-30'
    directory = f'data/intermediary/{input_date}'
    files = [x for x in os.listdir(directory) if x.endswith('.parquet')]
    for file in files:
        os.remove(f'{directory}/{file}')
        print(f'File {file} removed from {directory}')

In [10]:
query = """
select * from  `dataplatform-prd.master_contact.aux_tam_final_nomes` 
where 1=1 {{add_filter}}
and inicio<>'SYMPLA'
"""
 
def accumulate_and_apply(input_date):
    
    def save_func(df, idx, cod_munis, input_date):
        print(f'Starting inter data treat for file {idx}')
        start = time.time()
        df = add_grouped_part(df, input_date, cod_munis)
        df.to_parquet(f'data_new/intermediary/{input_date}/part_{idx}.parquet')
        print(f'Inter data treat Execution time: {time.time() - start:.2f} seconds')
        print(f'File part_{idx}.parquet Saved') 
    
    #make sure 'data/intermediary/{input_date}' is created
    os.makedirs(f'data_new/intermediary/{input_date}', exist_ok=True)
    
    """concat files in 'nomes_agrupados', based in 'dict_files' and apply function"""

    dict_files = create_agg_nomes_agrupados(input_date)
    dict_files = {k: [int(x) for x in v] for k, v in dict_files.items()}

    df = read_gbq_from_template(query, {'add_filter': f'AND reference_month = "{input_date}"'})
    print(f'File Read: {input_date}')
    [save_func(df[df['cod_muni'].isin(v)], k, v, input_date) for k, v in dict_files.items()]
    


In [11]:
date_str = '2024-10-31'

In [12]:
#from main_agrupamento_nomes import main_agrupamento_nomes
#main_agrupamento_nomes(date_str)

In [13]:
accumulate_and_apply(date_str)

[2024-11-27 22:37:57] Getting dataset from BQ...
Downloading: 100%|[32m██████████[0m|
File Read: 2024-10-31
Starting inter data treat for file 1
Tamanho do DataFrame: (1095591, 13)
Loading data in 11.84 seconds.
Inter data treat Execution time: 20.78 seconds
File part_1.parquet Saved
Starting inter data treat for file 2
Tamanho do DataFrame: (1102404, 13)
Loading data in 4.55 seconds.
Inter data treat Execution time: 12.90 seconds
File part_2.parquet Saved
Starting inter data treat for file 3
Tamanho do DataFrame: (1109117, 13)
Loading data in 2.84 seconds.
Inter data treat Execution time: 10.84 seconds
File part_3.parquet Saved
Starting inter data treat for file 4
Tamanho do DataFrame: (1113670, 13)
Loading data in 2.13 seconds.
Inter data treat Execution time: 10.19 seconds
File part_4.parquet Saved
Starting inter data treat for file 5
Tamanho do DataFrame: (1119053, 13)
Loading data in 1.92 seconds.
Inter data treat Execution time: 9.78 seconds
File part_5.parquet Saved
Starting i

In [14]:

print(date_str)
save_all_tratados(date_str)

2024-10-31
Starting main data treat for file part_7.parquet, with size 1120246
Executing function prepare_data...
Function prepare_data executed successfully in 1.27 seconds.
Executing function init_group_id...
Function init_group_id executed successfully in 18.00 seconds.
Executing function deal_unmerged_places...
Function deal_unmerged_places executed successfully in 23.14 seconds.
Executing function deal_merged_places...
Function deal_merged_places executed successfully in 11.13 seconds.
Executing function deal_merged_docs...
Function deal_merged_docs executed successfully in 9.96 seconds.
Executing function pos_tratamento...
Function pos_tratamento executed successfully in 11.37 seconds.
Main data treat Execution time file part_7.parquet: 75.27 seconds
File part_7.parquet Saved in Tratados
Starting main data treat for file part_10.parquet, with size 1132607
Executing function prepare_data...
Function prepare_data executed successfully in 1.29 seconds.
Executing function init_group_

In [15]:
load_available_data(directory=f'data_new/tratados/{date_str}', final_data_treat_func=final_data_treat, destination_table='dataplatform-prd.master_contact.aux_tam_python_agrupados')

Opening part_7.parquet...
_merge not in df
Sending part_7.parquet to dataplatform-prd.master_contact.aux_tam_python_agrupados
Data successfully loaded into BigQuery
part_7.parquet sent to dataplatform-prd.master_contact.aux_tam_python_agrupados successfully!
Opening part_10.parquet...
_merge not in df
Sending part_10.parquet to dataplatform-prd.master_contact.aux_tam_python_agrupados
Data successfully loaded into BigQuery
part_10.parquet sent to dataplatform-prd.master_contact.aux_tam_python_agrupados successfully!
Opening part_19.parquet...
_merge not in df
Sending part_19.parquet to dataplatform-prd.master_contact.aux_tam_python_agrupados
Data successfully loaded into BigQuery
part_19.parquet sent to dataplatform-prd.master_contact.aux_tam_python_agrupados successfully!
Opening part_6.parquet...
_merge not in df
Sending part_6.parquet to dataplatform-prd.master_contact.aux_tam_python_agrupados
Data successfully loaded into BigQuery
part_6.parquet sent to dataplatform-prd.master_conta

In [16]:
print('Fim Upload')

Fim Upload


In [17]:
date_str

'2024-10-31'

In [None]:
def main(date):
    accumulate_and_apply(date)
    save_all_tratados(date)
    load_available_data(directory=f'data_new/tratados/{date}', final_data_treat_func=final_data_treat, destination_table='dataplatform-prd.master_contact.aux_tam_python_agrupados')
    print('Fim Upload', date)
    return None