In [3]:
import pandas as pd
import numpy as np

import json
import psycopg2
import progressbar
from datetime import datetime

from sqlalchemy import create_engine

## Parametros

### Conf DB

In [4]:
## Las bases de datos deben estar creadas

dbmesh = 'epi_puma_meshes'
dbname = 'epi_puma_covid19_1'
dbuser = 'postgres'
dbpass = 'postgres'
dbport = 5432
dbhost = '127.0.0.1'

### Conf archivo

In [5]:
filename = 'covid-19_processed.csv' 
total_ocurrences = 7380367
times = 20

### Conf columnas

In [6]:
## Poner None en la columna donde el dataset no tenga malla
map_grid = {'state': 'state', 'mun':'mun', 'ageb': None}

columns_types = {}
columns_types['state'] = str
columns_types['mun'] = str

covar_attributes = ['CLASIFICACION_FINAL']
listed_attributes = ''
date_column = 'FECHA_SINTOMAS'.lower()

for attr in covar_attributes:
    listed_attributes += attr + ','

listed_attributes = listed_attributes[:-1]

### Conf Meshes

In [7]:
meshes = ['state', 'mun', 'ageb']

## Inicio

In [8]:
!ls

 201028COVID19MEXICO.csv       datos_abiertos_covid19.zip
 210504COVID19MEXICO.csv      'diccionario_datos_covid19(1)'
 210623COVID19MEXICO.csv      'diccionario_datos_covid19(1).zip'
 covid-19_processed.csv        epi_puma_id_cell.ipynb
 covid-19_processed.tar.gz     procesamiento_dataset.ipynb
 datos_abiertos_20210505.zip   procesamiento_dataset.py
 datos_abiertos_20210624.zip


In [9]:
conn_string = 'postgresql://{dbuser}:{dbpass}@{dbhost}:{dbport}/{dbname}'.format(dbuser=dbuser, 
                                                                                 dbpass=dbpass,
                                                                                 dbhost=dbhost,
                                                                                 dbport=dbport,
                                                                                 dbname=dbname)
for i in range(times):
    offset = int(i*total_ocurrences/times)
    until = int((i+1)*total_ocurrences/times) - int((i+1)/times)
    
    df = pd.read_csv(filename, dtype=columns_types, skiprows=lambda x: not (x in range(offset, until) or x == 0))
    
    l = df.columns.tolist()
    d = {x:x.lower() for x in l}
    df = df.rename(columns=d)
    
    for mesh in map_grid.keys():
        if map_grid[mesh] != None:
            df = df.rename(columns={map_grid[mesh]: 'gridid_' + mesh})
        else:
            df['gridid_' + mesh] = pd.Series(df.shape[0]*[''])
            
    df['date_occurrence'] = pd.to_datetime(df[date_column]) 
    
    print('batch: ', i, ' size: ', df.shape[0], ' from: ', offset, ' to: ', until-1)
    
    engine = create_engine(conn_string)
    df.to_sql('occurrence', engine, if_exists='append', index=False, chunksize=100000)


batch:  0  size:  369017  from:  0  to:  369017
batch:  1  size:  369018  from:  369018  to:  738035
batch:  2  size:  369019  from:  738036  to:  1107054
batch:  3  size:  369018  from:  1107055  to:  1476072
batch:  4  size:  369018  from:  1476073  to:  1845090
batch:  5  size:  369019  from:  1845091  to:  2214109
batch:  6  size:  369018  from:  2214110  to:  2583127
batch:  7  size:  369018  from:  2583128  to:  2952145
batch:  8  size:  369019  from:  2952146  to:  3321164
batch:  9  size:  369018  from:  3321165  to:  3690182
batch:  10  size:  369018  from:  3690183  to:  4059200
batch:  11  size:  369019  from:  4059201  to:  4428219
batch:  12  size:  369018  from:  4428220  to:  4797237
batch:  13  size:  369018  from:  4797238  to:  5166255
batch:  14  size:  369019  from:  5166256  to:  5535274
batch:  15  size:  369018  from:  5535275  to:  5904292
batch:  16  size:  369018  from:  5904293  to:  6273310
batch:  17  size:  369019  from:  6273311  to:  6642329
batch:  18  

In [10]:
with engine.connect() as conn, conn.begin():
    for mesh in meshes:
        sql = "CREATE INDEX idx_occurrence_gridid_{grid} ON occurrence(gridid_{grid})".format(grid=mesh)
        conn.execute(sql)
    
    sql = "CREATE INDEX idx_occurrence_date_occurrence ON occurrence(date_occurrence)".format(grid=mesh)
    conn.execute(sql)

In [11]:
conn = psycopg2.connect(conn_string)

cur = conn.cursor()
cur.execute("SELECT DISTINCT {0} FROM occurrence".format(listed_attributes))
results = cur.fetchall()

cur.close()
conn.close()

print(len(results), ' covars')

7  covars


In [12]:
N_attributes = len(covar_attributes)
records = {}

index = 0
for r in results:
    aux = {}
    for i in range(N_attributes):
        aux[covar_attributes[i]] = r[i]
    records[index] = aux
    index += 1
    
df = pd.read_json(json.dumps(records), orient='index')
l = df.columns.tolist()
d = {x:x.lower() for x in l}
df = df.rename(columns=d)
df = df.fillna('')
df = df.dropna()
df

Unnamed: 0,clasificacion_final
0,NEGATIVO A SARS-COV-2 POR LABORATORIO
1,CASO DE SARS-COV-2 CONFIRMADO POR LABORATORIO
2,CASO DE COVID-19 CONFIRMADO POR ASOCIACIÓN CLÍ...
3,NO REALIZADO POR LABORATORIO
4,CASO DE COVID-19 CONFIRMADO POR COMITÉ DE DIC...
5,INVÁLIDO POR LABORATORIO
6,CASO SOSPECHOSO


In [13]:
engine = create_engine(conn_string)
df.to_sql('covariable', engine, if_exists='replace', index=False, chunksize=100000)

with engine.connect() as conn, conn.begin():
    sql = """ALTER TABLE covariable ADD COLUMN id serial"""
    conn.execute(sql)
    
    for attr in covar_attributes:
        sql = "CREATE INDEX idx_covariable_{attr} ON covariable({attr})".format(attr=attr)
        conn.execute(sql)
        
        sql = "CREATE INDEX idx_occurrence_{attr} ON occurrence({attr})".format(attr=attr)
        conn.execute(sql)

In [14]:
with engine.connect() as conn, conn.begin():
    sql = """ALTER TABLE occurrence ADD COLUMN covariable_id integer"""
    conn.execute(sql)
    
    sql = "UPDATE occurrence SET covariable_id = covariable.id FROM covariable WHERE "
    for attr in covar_attributes:
        sql += "occurrence." + attr + " = " + "covariable." + attr + " AND "
    sql += "TRUE"
    conn.execute(sql)

In [15]:
with engine.connect() as conn, conn.begin():
    sql = "CREATE INDEX idx_occurrence_id_covariable ON occurrence(covariable_id)"
    conn.execute(sql)
    
    for mesh in meshes:
        sql = "ALTER TABLE covariable ADD COLUMN cells_{mesh} varchar(10)[]".format(mesh=mesh)
        conn.execute(sql)
        
        sql = "CREATE INDEX idx_covariable_cells_{mesh} ON covariable USING GIN(cells_{mesh})".format(mesh=mesh)
        conn.execute(sql)

In [16]:
conn = psycopg2.connect(conn_string)

cur = conn.cursor()
cur.execute("SELECT id FROM covariable ORDER BY id DESC")
N_covar = cur.fetchone()[0]


for mesh in meshes:
    print(mesh)
    for i in range(1, N_covar+1):
        cur = conn.cursor()
        cur.execute("UPDATE covariable SET cells_{mesh} = array(SELECT DISTINCT gridid_{mesh} FROM occurrence WHERE not gridid_{mesh} is null AND covariable_id =  {id_covariable})::varchar(10)[] WHERE covariable.id = {id_covariable}". format(mesh=mesh, id_covariable=i))
        conn.commit()
        
cur.close()
conn.close()

state
mun
ageb


In [17]:
with engine.connect() as conn, conn.begin():
    sql ="""ALTER TABLE occurrence ADD COLUMN id serial"""
    conn.execute(sql)

In [18]:
with engine.connect() as conn, conn.begin():
    sql ="""update occurrence set gridid_ageb = 9999 where gridid_ageb is null"""
    conn.execute(sql)
    
    sql ="""update occurrence set gridid_mun = 9999 where gridid_mun is null;"""
    conn.execute(sql)
    
    sql ="""update occurrence set gridid_state = 9999 where gridid_state is null"""
    conn.execute(sql)

In [19]:
with engine.connect() as conn, conn.begin():
    sql ="""CREATE FUNCTION array_intersection(anyarray, anyarray)
  RETURNS anyarray
  language sql
as $FUNCTION$
    SELECT ARRAY(
        SELECT UNNEST($1)
        INTERSECT
        SELECT UNNEST($2)
    );
$FUNCTION$;"""
    conn.execute(sql)