In [32]:
import pandas as pd
import dask.dataframe as dd
from dask.diagnostics import ProgressBar
import time
import numpy as np

def log_message(message, start_time=None):
    current_time = time.time()
    readable_time = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(current_time))
    elapsed_time = f'Elapsed time: {current_time - start_time:.2f} seconds' if start_time else ''
    print(f"[{readable_time}] {message} {elapsed_time}")

from dask import config
from dask.distributed import Client

# Initialize a Dask client
client = Client()

# Set configuration for optimization
config.set({
    'optimization.fuse.ave-width': 5
})


Perhaps you already have a cluster running?
Hosting the HTTP server on port 46423 instead


<dask.config.set at 0x7f4009e4e310>

In [33]:
pip install "dask[distributed]" --upgrade

Note: you may need to restart the kernel to use updated packages.


In [34]:
censo_DB_path = '/media/matias/Elements/suite/ext_CPV2010_basico_radio_pub'


In [35]:

log_message("Script started.")

# Load data
proy_pop = pd.read_csv('./../data/info/proy_pop200125.csv', encoding='utf-8')[['DPTO', '2010', '2023']]

radio_circ_ign = pd.read_csv('./../../CNE-INDEC-georef/info/radio_circ_IGN.csv')
radio_ref = pd.read_csv('./../data/info/radio_ref.csv').astype({'DPTO':int, 'PROV':int})
radio_ref['COD_2010'] = radio_ref['radio'].astype(str).str.zfill(9)

radio_circ_ign = radio_circ_ign.merge(radio_ref[['COD_2010', 'PROV_REF_ID', 'DPTO', 'RADIO_REF_ID']], how = 'left')[['RADIO_REF_ID', 'PROV_REF_ID', 'DPTO', 'circuito']].dropna()
radio_circ_ign['RADIO_REF_ID'] = radio_circ_ign['RADIO_REF_ID'].astype(int)


# Load datasets
VIVIENDA = dd.read_csv(censo_DB_path + '/VIVIENDA.csv', sep=';', usecols=['VIVIENDA_REF_ID', 'RADIO_REF_ID'])
HOGAR = dd.read_csv(censo_DB_path + '/HOGAR.csv', sep=';', usecols=['HOGAR_REF_ID', 'VIVIENDA_REF_ID', 'PROP'])
PERSONA = dd.read_csv(censo_DB_path + '/PERSONA.csv', sep=';', usecols=['PERSONA_REF_ID', 'HOGAR_REF_ID', 'CONDACT', 'P02', 'P03', 'P09'])

# Filter for age >= 16
PERSONA = PERSONA[PERSONA['P03'] >= 16]

start_time = time.time()

def compute_age_bins(partition, bins):
    labels = ['young', 'middle', 'old']
    age_bins = pd.cut(partition['P03'], bins=bins, labels=labels, right=False)
    return partition.assign(age_bin=age_bins)

# Calculate global age bins
# global_bins = PERSONA['P03'].quantile([0, .3333, .6666, 1]).compute().tolist()
global_bins = [16.0, 31.0, 51.0, 110.0]

# Apply the age bins without computing
PERSONA = PERSONA.map_partitions(compute_age_bins, bins=global_bins)

# Display the type of PERSONA and the value counts for the age bins
type_after_binning = type(PERSONA)
age_bin_counts = PERSONA['age_bin'].value_counts().compute()

# Display type of PERSONA after binning
print("Type of PERSONA after binning:", type(PERSONA))
log_message("Age binning completed.", start_time)

# Display the value counts for the age bins
print(PERSONA['age_bin'].value_counts())


# Merging
with ProgressBar():
    HOGAR = HOGAR.merge(VIVIENDA[['VIVIENDA_REF_ID', 'RADIO_REF_ID']], on='VIVIENDA_REF_ID')
    PERSONA = PERSONA.merge(HOGAR[['RADIO_REF_ID', 'VIVIENDA_REF_ID', 'HOGAR_REF_ID', 'PROP']], on='HOGAR_REF_ID')
    # PERSONA = PERSONA.merge(radio_ref[['RADIO_REF_ID', 'DPTO', 'PROV_REF_ID', 'DPTO', 'AGLOMERADO']], on='RADIO_REF_ID')
    PERSONA = PERSONA.merge(radio_circ_ign[['RADIO_REF_ID', 'PROV_REF_ID', 'DPTO', 'circuito']], on='RADIO_REF_ID')
log_message("Merging completed.", start_time)

start_time = time.time()


def count_for_single_column(df, col):
    """Count unique values for a single column."""
    return df.groupby(['PROV_REF_ID', 'DPTO', 'circuito', col]).size().reset_index(name=f'{col}_count')

columns_to_count = ['PROP', 'CONDACT', 'P02', 'age_bin', 'P09']

# Create a list to store the results for each column
result_frames = []


for column in columns_to_count:
    # Group by the desired columns and compute the counts for the current column
    partial_result = PERSONA.map_partitions(count_for_single_column, col=column)
    
    # Group by other geographical columns and compute the sum
    grouped_counts = partial_result.groupby(['PROV_REF_ID', 'DPTO', 'circuito', column]).sum().reset_index()
    
    # Join the population projections (if needed)
    # final_grouped = grouped_counts.merge(proy_pop, on='DPTO', how='left')
    
    # Compute the results and save to a unique CSV file for the current column
    out = grouped_counts.compute()
    out.to_csv(f'./../data/agg_circuitos_{column}.csv')

    log_message(f"Data processed and saved for column: {column}", start_time)

print("All columns processed and saved.")
log_message("Script completed.")

[2023-08-25 17:20:05] Script started. 
Type of PERSONA after binning: <class 'dask.dataframe.core.DataFrame'>
[2023-08-25 17:20:27] Age binning completed. Elapsed time: 21.51 seconds
Dask Series Structure:
npartitions=1
    int64
      ...
Name: count, dtype: int64
Dask Name: value-counts-agg, 8 graph layers
[2023-08-25 17:20:27] Merging completed. Elapsed time: 21.57 seconds
[2023-08-25 17:21:49] Data processed and saved for column: PROP Elapsed time: 81.65 seconds
[2023-08-25 17:23:16] Data processed and saved for column: CONDACT Elapsed time: 168.86 seconds
[2023-08-25 17:24:54] Data processed and saved for column: P02 Elapsed time: 267.23 seconds
[2023-08-25 17:26:38] Data processed and saved for column: age_bin Elapsed time: 371.46 seconds
[2023-08-25 17:28:14] Data processed and saved for column: P09 Elapsed time: 467.03 seconds
All columns processed and saved.
[2023-08-25 17:28:14] Script completed. 
