In [1]:
import pandas as pd
import numpy as np
import pyreadr
import time
import math
from sdv import Metadata
from sdv.tabular import GaussianCopula
from sdv.constraints import Unique, UniqueCombinations, GreaterThan, Positive, Rounding, ColumnFormula
import multiprocessing as mp

In [2]:
import warnings
warnings.filterwarnings("ignore")

In [3]:
# Set seed
np.random.seed(1234)

In [4]:
# Flags
model_type = 'GC' # Options are: CTGAN
syn_level = 'strata' # Options are: country, strata or pcd
pop_level = 'sample' # Options are: population or sample
encoding = 'categorical' # Options are: 'categorical' or 'label_encoding'
field_dist = 'gaussian_kde' # Options are: 'gaussian' or 'parametric'

if syn_level != 'country':
    sampling_pcd = True
else:
    sampling_pcd = False

if pop_level == 'population':
    sampling_population = True
else:
    sampling_population = False

In [5]:
# Read constraints
%run ./synthetic_constraints.ipynb

In [6]:
# Number of surveys to synthesize
n_sim = 100
n_mse = 1

Define SDG function

In [7]:
def synthesize_survey(x):

    # Read 'original' survey
    dat = pyreadr.read_r('./data/midsave/syn_surveys/survey_' + str(x) + '.rds')[None]

    
    # Align data types
    dat = dat.astype({'pid': int,
                      "strata": str,
                      "H05A_TOTAL_HOMBRES": int,
                      "H05B_TOTAL_MUJERES": int,
                      "V09_CANT_DORMITORIOS": int,
                      "V10_CANT_APOSENTOS": int,
                      "V19_CANT_LINEAS_CELULAR": int,
                      "V01B3_RESIDENTES_HABITUALES": int,
                      "V02B3_CANT_HOGARES": int,
                      "P00_NUMERO_LINEA": float,
                      "P03_EDAD": int
                       })
    dat['strata'] = dat['strata'].astype(str)
    dat['PSUD'] = dat['PSUD'].astype(str)
    
    # Add a tiny bit of noise to the line number to get a better ordering later
    dat['P00_NUMERO_LINEA'] = dat['P00_NUMERO_LINEA'] + np.random.default_rng().normal(0, 0.1, dat.shape[0])

    # Exclude mostly ID variables not needed for SDG
    dat = dat.loc[:,dat.columns.str.startswith('pid') | 
                   dat.columns.str.startswith('strata') |  
                   dat.columns.str.startswith('ID_PCD') |  
                   dat.columns.str.startswith('PSUD') |  
                   dat.columns.str.startswith('ID_AE_CONSECU') |  
                   dat.columns.str.startswith('ID_VIVIENDA') |  
                   dat.columns.str.startswith('ID_HOGAR') |  
                   dat.columns.str.startswith('weight') |  
                   dat.columns.str.startswith('V') | 
                   dat.columns.str.startswith('H') | 
                   dat.columns.str.startswith('P0') | 
                   dat.columns.str.startswith('P1') | 
                   dat.columns.str.startswith('P2') | 
                   dat.columns.str.startswith('P3') | 
                   dat.columns.str.startswith('P4') | 
                   dat.columns.str.startswith('NBI')].copy()


    # Set up copula model
    if(model_type == 'GC'):
        model = GaussianCopula(primary_key='pid',
                               default_distribution=field_dist,
                               constraints=constraints,
                               categorical_transformer=encoding
                              )


    # Run conditional sampling
    if (sampling_pcd == True):
    
        c_syn = pd.DataFrame()

        dat_full = dat.copy()

        # For every stratum separately
        for y in dat_full.strata.unique().tolist():

            dat = dat_full[dat_full['strata'] == y].copy()
            
            model.fit(dat)
    
            for z in dat.PSUD.unique().tolist():
                if (sampling_population == True):
                    smpl = model.sample(round(dat[dat['PSUD'] == z].shape[0]*(dat.loc[dat['PSUD'] == z, 'weight'].reset_index(drop = True)[0])*n_mse),
                                        conditions = {'PSUD': z},
                                        max_retries = 10000).reset_index(drop = True)
                    sub_count = list(range(1,n_mse+1))*math.ceil(smpl.shape[0]/n_mse)
                    smpl['sub_sample'] = sub_count[0:smpl.shape[0]]
                else:
                    smpl = model.sample(dat[dat['PSUD'] == z].shape[0]*n_mse,
                                        conditions = {'PSUD': z},
                                        max_retries = 10000).reset_index(drop = True)
                    smpl['sub_sample'] = list(range(1,n_mse+1))*(dat[dat['PSUD'] == z].shape[0])

                # Combine to one survey
                c_syn = c_syn.append(smpl)   
                
    else:
        model.fit(dat)
        
        c_syn = pd.DataFrame()

        for z in dat.PSUD.unique().tolist():
            if (sampling_population == True):
                smpl = model.sample(round(dat[dat['PSUD'] == z].shape[0]*(dat.loc[dat['PSUD'] == z, 'weight'].reset_index(drop = True)[0])*n_mse),
                                    conditions = {'PSUD': z},
                                    max_retries = 10000).reset_index(drop = True)
                sub_count = list(range(1,n_mse+1))*math.ceil(smpl.shape[0]/n_mse)
                smpl['sub_sample'] = sub_count[0:smpl.shape[0]]
            else:
                smpl = model.sample(dat[dat['PSUD'] == z].shape[0]*n_mse,
                                    conditions = {'PSUD': z},
                                    max_retries = 10000).reset_index(drop = True)
                smpl['sub_sample'] = list(range(1,n_mse+1))*(dat[dat['PSUD'] == z].shape[0])

            # Combine to one survey
            c_syn = c_syn.append(smpl)   

    c_syn_post = c_syn.copy()

    # Re-create previously excluded ID variables
    c_syn_post['ID_PCD'] = c_syn_post['PSUD'].str.slice(0, 5)
    c_syn_post['strata'] = c_syn_post['PSUD'].str.slice(start = 5)
    c_syn_post['hhid'] = c_syn_post['ID_PCD'].astype(str) + c_syn_post['ID_AE_CONSECU'].astype(str) + c_syn_post['ID_VIVIENDA'].astype(str) + c_syn_post['ID_HOGAR'].astype(str)
    c_syn_post['hid'] = c_syn_post['ID_PCD'].astype(str) + c_syn_post['ID_AE_CONSECU'].astype(str) + c_syn_post['ID_VIVIENDA'].astype(str)
    c_syn_post['ID_PROVINCIA'] = c_syn_post['hhid'].str.slice(0, 1)
    c_syn_post['ID_PC'] = c_syn_post['hhid'].str.slice(0, 3)

    c_syn_post.shape


    # Some post-processing to ensure census logic
    c_syn_post['P05B_CODIGO_PC'] = np.where(c_syn_post.P05A_LUGAR_NACIMIENTO == 'En este mismo cantón', c_syn_post.ID_PC, c_syn_post.P05B_CODIGO_PC)

    c_syn_post['P08_PUEBLO_INDIGENA'] = np.where(c_syn_post.P07_INDIGENA == 'No', np.nan, c_syn_post.P08_PUEBLO_INDIGENA)
    c_syn_post['P09_HABLA_INDIGENA'] = np.where(c_syn_post.P07_INDIGENA == 'No', np.nan, c_syn_post.P09_HABLA_INDIGENA)
    c_syn_post['P10_AFRODESCENDIENTE'] = np.where(c_syn_post.P07_INDIGENA == 'Sí', np.nan, c_syn_post.P10_AFRODESCENDIENTE)

    c_syn_post['P12H_NINGUNA'] = np.where((c_syn_post.P12A_LIMITACION_VISUAL == 'No') & 
                                          (c_syn_post.P12B_LIMITACION_OIR == 'No') & 
                                          (c_syn_post.P12C_LIMITACION_HABLAR == 'No') & 
                                          (c_syn_post.P12D_LIMITACION_CAMINAR == 'No') & 
                                          (c_syn_post.P12E_LIMITACION_BRAZOS_MANOS == 'No') & 
                                          (c_syn_post.P12F_LIMITACION_INTELECTUAL == 'No') & 
                                          (c_syn_post.P12G_LIMITACION_MENTAL == 'No'), 'Sí (ninguna de las anteriores)', 'No (al menos una discapacidad)')

    c_syn_post['P14_TIPO_EDUCACION_CUIDO'] = np.where(c_syn_post.P13_ASISTE_EDUCACION_CUIDO == 'No asiste', np.nan, c_syn_post.P14_TIPO_EDUCACION_CUIDO)

    c_syn_post['P17_OBTUVO_TITULO'] = np.where(c_syn_post.P16_ULTIMO_GRADO_APROBADO.isna() == True, np.nan, c_syn_post.P17_OBTUVO_TITULO)

    c_syn_post['P19B_CODIGO_RESIDE'] = np.where(c_syn_post.P19A_LUGAR_RESIDE_HACE_5ANOS == 'En este mismo cantón', c_syn_post.ID_PC, c_syn_post.P19B_CODIGO_RESIDE)

    c_syn_post['P22_ACTIVIDAD_REALIZADA'] = np.where(c_syn_post.P21_TRABAJO_SEMANA_PASADA == 'Ninguna de las anteriores', c_syn_post.P22_ACTIVIDAD_REALIZADA, np.nan)
    c_syn_post['P23_CONDICION_ACTIVIDAD'] = np.where(c_syn_post.P21_TRABAJO_SEMANA_PASADA == 'Ninguna de las anteriores', c_syn_post.P23_CONDICION_ACTIVIDAD, np.nan)

    c_syn_post['P29B_CODIGO_UBICACION_TRABAJO'] = np.where((c_syn_post.P29A_UBICACION_TRABAJO == '...dentro o junto a esta vivienda') | (c_syn_post.P29A_UBICACION_TRABAJO == '...en este mismo cantón'), c_syn_post.ID_PC, c_syn_post.P29B_CODIGO_UBICACION_TRABAJO)

    c_syn_post['V08A_ESTADO_PAREDES'] = np.where((c_syn_post.V04_MATERIAL_PAREDES == '... material de desecho') & (c_syn_post.V08A_ESTADO_PAREDES == 'Bueno'), 'Regular', c_syn_post.V08A_ESTADO_PAREDES)

    c_syn_post['V08B_ESTADO_TECHO'] = np.where((c_syn_post.V05_MATERIAL_TECHO == '... material de desecho') & (c_syn_post.V08B_ESTADO_TECHO == 'Bueno'), 'Regular', c_syn_post.V08B_ESTADO_TECHO)

    c_syn_post['V08C_ESTADO_PISO'] = np.where((c_syn_post.V07_MATERIAL_PISO == 'Piso de tierra') & (c_syn_post.V08C_ESTADO_PISO == 'Bueno'), 'Regular', c_syn_post.V08C_ESTADO_PISO)

    c_syn_post['V15_COMBUSTIBLE_COCINAR'] = np.where((c_syn_post.V14_PROVENIENCIA_ELECTRICIDAD == 'No hay luz eléctrica') & (c_syn_post.V15_COMBUSTIBLE_COCINAR == '... electricidad'), '... leña o carbón', c_syn_post.V15_COMBUSTIBLE_COCINAR)




    # Save

    c_syn_post.to_pickle('./data/midsave/syn_surveys/syn_survey_' + str(model_type) +'_' + str(pop_level) +'_' + str(syn_level) +'_' + str(field_dist) +'_' + str(encoding) +'_' + str(n_sim) +'_' + str(n_mse) +'_' + str(x) +'.pkl')

    end = time.time()

    delta = (end - start)/60

    print('Processing', x, 'of', n_sim, 'surveys. Took %.2f minutes to fit and synthesize.' % delta)

In [None]:
if __name__ == '__main__':
    
    with mp.Pool(processes=7) as pool:
        
        pool.map(synthesize_survey, range(1, n_sim + 1))