# Codes cuando se trabajaba por chunks, o lista de dataframes. Ahora se utiliza la librería Dask en su lugar.

In [None]:
# Convertimos el dataset de vacunados en una lista de chunks o dfs
# (Para procesar información en bloques de 500 000) 
lst_vac = fn.df_into_chunks(df_vac)
del df_vac

[chunk.dropna(inplace=True) for chunk in lst_vac] # Drop na en caso de que existan

# Que cada 'chunk' de lst_vac sea solamente cuando dosis = 2, es decir tengan 2 dosis
lst_vac = [chunk.loc[chunk['dosis'] == '2'] for chunk in lst_vac]

# Convertir a formato fecha la columna 'fecha vacunacion' y obtener su año y semana epi
[fn.variable_fecha(chunk, 'fecha_vacunacion') for chunk in lst_vac]
[fn.date_to_epiweek(chunk, 'fecha_vacunacion') for chunk in lst_vac]

# Creamos columnas de 1 en cada chunk para contabilizar cada caso de vacunado
for chunk in lst_vac:                                        
    chunk['vacunado'] = 1
    chunk['vacunado'].apply(np.int8)
    del chunk['dosis']  # Borramos columna dosis llena de número 2
del chunk # Borramos el último chunk que queda al final

print(lst_vac[0].head())

In [None]:
def epiweeks_chunks(dfs_vac):
    """
    Devuelve un dataframe con el total de VACUNADOS por semana y año 
    epidemiológico (recibe una lista de dataframes o chunks)
    """
    var_holder = {}     # Diccionario para guardar nombres                                             
    lst_epi_vac = []    # Lista de dfs para cada sumatoria de chunks
                                         
    for i, chunk in enumerate(dfs_vac):
        var_holder['epi_vac_' + str(i)]= pd.crosstab(index=[chunk['epi_year'],
                                                            chunk['epi_week']],
                                                     columns=chunk['vacunado'])
        lst_epi_vac.append(var_holder['epi_vac_' + str(i)])
        
    # Unimos todos los dfs sumados en uno solo
    merged_epivac = pd.concat(lst_epi_vac, axis=1) 
    epi_vac = pd.DataFrame(merged_epivac.sum(numeric_only=True, axis=1))
    epi_vac.columns = ['vacunados']
    
    return epi_vac

In [None]:
def vac_dose(vac_url):
    """
    Función que toma la dirección del dataset de vacunados y devuelve el TOTAL DE DOSIS APLICADAS 
    (1ra,2nda y 3era dosis) por semana epidemiológica
    """
    vac_cols = ['fecha_vacunacion', 'dosis']                     
    df_vac = fn.read_largeCSV_file(vac_url, ',', vac_cols)    
    lst_vac = fn.df_into_chunks(df_vac)               
 
    for idx, chunk in enumerate(lst_vac):
        chunk = fn.variable_fecha(chunk, 'fecha_vacunacion')

    for idx, chunk in enumerate(lst_vac):
        chunk = fn.date_to_epiweek(chunk, "fecha_vacunacion")
        
    return lst_vac 

In [None]:
def vacDose_chunks(dfs_vac):
    """
    Devuelve un dataframe con el TOTAL DE VACUNADOS por DEPARTAMENTO 
    del PERÚ (recibe una lista de dataframes o chunks)
    """
    var_holder = {}     # Diccionario para guardar nombres                                             
    lst_epi_vac = []    # Lista de dfs para cada sumatoria de chunks
                                         
    for i, chunk in enumerate(dfs_vac):
        var_holder['epi_vac_' + str(i)]= pd.crosstab(index = [chunk['epi_year'], chunk['epi_week']],
                                                     columns = [chunk['dosis']])
        lst_epi_vac.append(var_holder['epi_vac_' + str(i)])

    merged_epivac = pd.concat(lst_epi_vac, axis=1)  # Merge all dfs
    merged_epivac = merged_epivac.fillna(0).astype(np.int64)
    merged_epivac = merged_epivac.groupby(level=0, axis=1).sum()

    return merged_epivac

# ============================================

# Codes ya no usados para dataset de vacunados

In [None]:
## 3. Procesamos el dataset de vacunados

### 3.1. Leemos el dataset

#Debido al tamaño del dataset y para preservar recursos, no se procesará la información usando **Pandas**. En su lugar se utilizará el paquete **Dask**.

vac_col = ['fecha_vacunacion','dosis']
df_vac = dd.read_csv(vac_url, sep = ",", usecols = vac_col, 
                     dtype = {'fecha_vacunacion':'int32', 'dosis': 'int8'})
del vac_col
df_vac.head()

In [None]:
# Seleccionamos como vacunados solamente a personas con las 2 dosis
df_vac = df_vac.loc[df_vac['dosis'] == 2]

# Transformamos a formato fecha
df_vac = df_vac.assign(fecha_vacunacion = dd.to_datetime(df_vac["fecha_vacunacion"], format = "%Y%m%d", 
                                                         errors="coerce"))
df_vac.head()

In [None]:
# Obtenemos el año y semana epidemiológica en una sola columna
df_vac['epi_date'] = df_vac['fecha_vacunacion'].map(lambda date : Week.fromdate(date).isoformat())
del df_vac['fecha_vacunacion'], df_vac['dosis']

# Se separa la columna obtenida en 2, una para el año y otra para la semana epidemiológica
df_vac[['year','epi_week']] = df_vac['epi_date'].str.split("W", 1, expand=True)
del df_vac['epi_date']
df_vac['vacunados'] = 1
df_vac.head()

In [None]:
df_vac['year'] = df_vac['year'].astype('int16')
df_vac['epi_week'] = df_vac['epi_week'].astype('int8')
df_vac['vacunados'] = df_vac['vacunados'].astype('int8')
df_vac

In [None]:
def crosstab4dask(ddf):

    lst = [] # Lista para almacenar la sumatoria de cada particion

    for partition in range(0, ddf.npartitions):
        lst.append(ddf.partitions[partition].compute().groupby(['year', 'epi_week']).count())

    total_epivac = pd.concat(lst, axis=1).sum(numeric_only=True, axis=1).apply(np.int64)
    del lst
    total_epivac = pd.DataFrame(total_epivac)
    total_epivac.columns = ['vacunados']
    
    return total_epivac

In [None]:
epi_vac = crosstab4dask(df_vac)
del df_vac
epi_vac.head()

In [None]:
merged_epiweeks = pd.concat([epi_fal, epi_vac], axis=1)         
merged_epiweeks = merged_epiweeks.fillna(value = 0) # Rellenamos vacíos o Nan values con 0
del epi_fal, epi_vac

# Cambiamos a int ya que existen Nan values y cambia a float64 automáticamente
merged_epiweeks = merged_epiweeks.astype('int64')
                                    
merged_epiweeks

# ============================================

In [None]:
def filtering_data_dep(falxdep_df):
    """Function to fix the indexes of the data of deceased by department of Peru.
    IMPORTANT: There are more efficient ways to modify indexes using 'loc' and 
    'iloc' but this method at least 'works'"""

    time = falxdep_df[["fallecido", "Unnamed: 1"]]   # Get the col of epidemiological weeks
    time = time.rename(columns=time.iloc[1])     # Put the first row (epi_week) as header
    time = time.drop([0,1, len(time)-1],axis=0)  # Drop the first and last row (header, nan and total)
    time = time.reset_index(drop=True)           # Reset index

    departments = falxdep_df.drop(["Unnamed: 1", 'fallecido'], axis=1)    # Drop cols that are not departments
    departments = departments.rename(columns=departments.iloc[0])     # Put the first row (epi_week) as header
    departments = departments.drop([0,1, len(departments)-1],axis=0)  # Drop the first and last row (header, nan and total)
    departments = departments.reset_index(drop=True)                  # Reset index

    falxdep_df = pd.concat([time, departments], axis=1)
    return falxdep_df

In [None]:
ct_falxdep_fix = filtering_data_dep(ct_falxdep)
print(ct_falxdep_fix.head())

# ===============================

In [None]:
def vac_department(vac_url):
    """
    Función que toma la dirección del dataset de vacunados y devuelve el número de VACUNADOS 
    por los 24 departamentos del Perú
    """
    vac_col = ['id_centro_vacunacion', 'dosis']                     
    df_vac = fn.read_largeCSV_file(vac_url, ',', vac_col)    
    lst_vac = fn.df_into_chunks(df_vac)               
    
    # Que cada 'chunk' de lst_vac sea solamente cuando dosis = 2, es decir tengan 2 dosis
    lst_vac = [chunk.loc[chunk.loc[:, 'dosis'] == 2] for chunk in lst_vac]

    # Creamos columnas de 1 en cada chunk para contabilizar cada caso de vacunado
    for chunk in lst_vac:                                        
        chunk['vacunado'] = 1
        chunk['vacunado'].apply(np.int8)
        del chunk['dosis']  # Borramos columna dosis llena de número 2
    del chunk # Borramos el último chunk que queda al final
    
    return lst_vac

In [None]:
def vacxdep_chunks(dfs_vac):
    """
    Devuelve un dataframe con el TOTAL DE VACUNADOS por DEPARTAMENTO 
    del PERÚ (recibe una lista de dataframes o chunks)
    """
    var_holder = {}     # Diccionario para guardar nombres                                             
    lst_epi_vac = []    # Lista de dfs para cada sumatoria de chunks
                                         
    for i, chunk in enumerate(dfs_vac):
        var_holder['epi_vac_' + str(i)]= pd.crosstab(index=[chunk['id_centro_vacunacion']],
                                                     columns=chunk['vacunado'])
        lst_epi_vac.append(var_holder['epi_vac_' + str(i)])
    
    merged_epivac = pd.concat(lst_epi_vac, axis=1)  # Merge all dfs
    epi_vac = pd.DataFrame(merged_epivac.sum(numeric_only=True, axis=1))
    epi_vac.columns = ['vacunados']
    epi_vac['vacunados'] = epi_vac['vacunados'].astype(np.int64)
    epi_vac.reset_index(level=0, inplace=True)

    return epi_vac

# =========================================================

In [None]:
def vac_department(vac_url):
    """
    Función que toma la dirección del dataset de vacunados y devuelve el número de VACUNADOS 
    por los 24 departamentos del Perú
    """
    vac_col = ['id_centro_vacunacion', 'dosis','fecha_vacunacion']                     
    df_vac = fn.read_largeCSV_file(vac_url, ',', vac_col)    
    lst_vac = fn.df_into_chunks(df_vac)               
    
    for df in lst_vac:                                           
        df = df.drop(df[df["dosis"] == 1].index,  inplace=True)     # Drop non fully vaccinated (1 dose)

    for df in lst_vac:
        df['vacunado'] = 1  # To count each case
        df['vacunado'] = df['vacunado'].apply(np.int8)
        del df['dosis']     # Dose var is no needed anymore

    return lst_vac

In [None]:
vac_url = "RawData/TB_VACUNACION_COVID19.csv"
vacxdep = vac_department(vac_url)
del vac_url

In [None]:
for i, chunk in enumerate(vacxdep):
    chunk = fn.variable_fecha_ymd(chunk, "fecha_vacunacion")
    print(vacxdep[i].head())

In [None]:
for i, chunk in enumerate(vacxdep):
    chunk = fn.date_to_epiweek(chunk, "fecha_vacunacion")

print(vacxdep[0].head())

In [None]:
ubigeo_url = 'RawData/TB_UBIGEOS.csv'
vaccenter_url = 'RawData/TB_CENTRO_VACUNACION.csv'

ubigeo = pd.read_csv(ubigeo_url, usecols = ['id_ubigeo', 'departamento'])
vaccenter = pd.read_csv(vaccenter_url, usecols= ['id_centro_vacunacion','id_ubigeo'])

del ubigeo_url, vaccenter_url

vaccenter = vaccenter.merge(ubigeo, on = 'id_ubigeo', how = 'left')
del vaccenter['id_ubigeo']

print("Head of the merged dataframe (vaccenter) with: 'id_centro_vacunacion' and 'departamento'") 
print(vaccenter.head(10))

In [None]:

df = vacxdep[0].merge(vaccenter, on = 'id_centro_vacunacion', how = 'left')
#del vacxdep_id['id_centro_vacunacion']

print(df.head())

In [None]:
for i, chunk in enumerate(vacxdep):
    chunk = chunk.merge(vaccenter, on = 'id_centro_vacunacion', how = 'left')
    vacxdep[i] = chunk

print(vacxdep[0].head())

In [None]:
def vacxdep_chunks(dfs_vac):
    """
    Devuelve un dataframe con el TOTAL DE VACUNADOS por DEPARTAMENTO 
    del PERÚ (recibe una lista de dataframes o chunks)
    """
    var_holder = {}     # Diccionario para guardar nombres                                             
    lst_epi_vac = []    # Lista de dfs para cada sumatoria de chunks
                                         
    for i, chunk in enumerate(dfs_vac):
        var_holder['epi_vac_' + str(i)]= pd.crosstab(index = [chunk['epi_year'], chunk['epi_week']],
                                                     columns = [chunk['departamento']])
        lst_epi_vac.append(var_holder['epi_vac_' + str(i)])

    merged_epivac = pd.concat(lst_epi_vac, axis=1)  # Merge all dfs
    merged_epivac = merged_epivac.fillna(0).astype(np.int64)
    merged_epivac = merged_epivac.groupby(level=0, axis=1).sum()
    #epi_vac = pd.DataFrame(merged_epivac.sum(numeric_only=True, axis=1))
    #epi_vac.columns = ['vacunados']
    #epi_vac['vacunados'] = epi_vac['vacunados'].astype(np.int64)
    #epi_vac.reset_index(level=0, inplace=True)

    return merged_epivac

In [None]:
vacxdepxsemEpi = vacxdep_chunks(vacxdep)
print(vacxdepxsemEpi.head(50))

In [None]:
vacxdepxsemEpi.to_csv('Data/vacunados_x_departamentos_x_semanaEpi.csv')