In [1]:
import pandas as pd
import sqlalchemy 
import psycopg2
import os
import numpy as np
import requests
import re
import copy
from pandas.api.types import is_numeric_dtype
from sklearn.linear_model import LinearRegression
from sqlalchemy import create_engine
import datetime
from datetime import datetime as dt
from sqlalchemy import inspect
import sys
import sqlalchemy.ext.declarative as sqld
import roman

In [2]:
sys.path.append('..')

In [3]:
from modules.transforms import *

In [4]:
DBname=os.environ['DB_NAME']
postgres_psswd=os.environ['POSTGRES_PASSWORD']
postgres_user=os.environ['POSTGRES_USER']
postgres_port=str(os.environ['POSTGRES_PORT'])

In [5]:
# A long string that contains the necessary Postgres login information
postgres_str = ('postgresql://'+postgres_user+':'+postgres_psswd+'@'+DBname+':'+postgres_port+'/superset')
# Create the connection
cnx = create_engine(postgres_str)

In [6]:
#guardar paso en csv
path='/data/ETLcache/'
now = dt.now()
timestamp = now.strftime("_%d%m%Y_%H%M%S")

In [7]:
inspector = inspect(cnx)
schemas = inspector.get_schema_names()

In [8]:
tables={}
for schema in schemas:
    #print("schema: %s" % schema)
    tables[schema]=[]
    for table_name in inspector.get_table_names(schema=schema):
        #print("table: %s" % table_name)
        #tables[schema][table_name]=list(inspector.get_columns(table_name, schema=schema)[0].keys())
        tables[schema].append(table_name)

In [9]:
#productos que se están trabajando
productos=pd.read_sql_table('min_ciencias_products', con=cnx,schema='tracking')['0'].to_list()

In [10]:
productos

['producto5',
 'producto3',
 'producto13',
 'producto25',
 'producto14',
 'producto19',
 'producto53',
 'producto54',
 'producto52',
 'producto48',
 'producto24']

In [11]:
tbs={}
for p in productos:
    tbs[p]=tables[p]
tables=tbs

In [12]:
#información comunal
INFO_COMUNAL=pd.read_sql_table('info_comunal@ivanMSC', con=cnx,schema='@ivanMSC') 
INFO_COMUNAL=INFO_COMUNAL.rename(columns={'Nombre':'Comuna'})
INFO_COMUNAL['Codigo region']=INFO_COMUNAL['Codigo region'].astype(int)

In [13]:
#códigos iso
iso=pd.read_sql_table('iso', con=cnx,schema='geo') 
iso=iso.loc[iso['Country Name']=='Chile',('Country Code','Subdivision Name Used','Code','Number',
                                      'Latitude','Longitude')]
iso['Number']=iso['Number'].apply(lambda x: roman.fromRoman(x))
iso=iso.rename(columns={'Number':'Codigo region','Subdivision Name Used':'Region'})
#población regional
popREG=pd.DataFrame(INFO_COMUNAL.groupby('Codigo region')['Poblacion'].sum().reset_index())
#apendizar población regional a códigos ISO
iso=iso.merge(popREG,on='Codigo region')
iso=iso.drop('Region',axis=1)

In [14]:
#defunciones deis comunal (confirmado y sospechoso)
x=pd.read_sql_table('producto50_DefuncionesDEIS_confirmadosPorComuna_std', con=cnx,schema='producto50')
x['Status']='Confirmado'
y=pd.read_sql_table('producto50_DefuncionesDEIS_sospechososPorComuna_std', con=cnx,schema='producto50')
y['Status']='Sospechoso'
x=pd.concat([x,y])
x=x.fillna(0)
x=x[x['Comuna']!='Total']

In [15]:
#agregar dummies a UCI
uci=pd.read_sql_table('producto52_Camas_UCI_std', con=cnx,schema='producto52')
dummies=pd.get_dummies(uci['Serie'])
for d in dummies.columns:
    dummies[d]=dummies[d]*uci['Casos']
dummies['Camas UCI Desocupadas']=dummies['Camas UCI habilitadas']-dummies['Camas UCI ocupadas COVID-19']-dummies['Camas UCI ocupadas no COVID-19']
uci=pd.concat([uci,dummies],axis=1)
cols=[]
for c in uci.columns:
    cols.append(c.replace('(','').replace(')',''))
uci=uci.drop('Camas base (2019)',axis=1)

In [16]:
derived_tables={}
derived_tables['producto50']={}
derived_tables['producto50']['producto50_DefuncionesDEIS_consolidadoPorComuna_std']=x
derived_tables['producto52']={}
derived_tables['producto52']['producto52_Camas_UCI_std']=uci
for s in tables.keys():
    for n in tables[s]:
        #print(n)
        if 'cumulativo' in n.lower():
            print('reading table '+n+'from schema '+s)
            table=pd.read_sql_table(n, con=cnx,schema=s) 
            print('generating diff table for cumulative table: '+n)
            nn=n.lower().replace('cumulativo','_diff_')
            if s not in derived_tables.keys():
                derived_tables[s]={}
            print('new name: '+nn)
            if 'std' not in nn:
                derived_tables[s][nn]=to_diff(table)
            else:
                print('standard form with linear regression')
                derived_tables[s][nn]=to_diff_std(table)
                derived_tables[s][nn]['Fecha']=derived_tables[s][nn]['Fecha'].dt.to_pydatetime()
        
        elif 'producto19_CasosActivosPorComuna_std' in n:
            print('reading table '+n+' from schema '+s)
            table=pd.read_sql_table(n, con=cnx,schema=s) 
            if s not in derived_tables.keys():
                derived_tables[s]={}
            nn=n
            derived_tables[s][nn]=table
            print('new name: '+nn)
            derived_tables[s][nn]=derived_tables[s][nn].merge(INFO_COMUNAL[['Region','Comuna','Superficie','Lat','Lon','CUT','Provincia']],
           on=['Region','Comuna'])
            #derived_tables[s][nn]=derived_tables[s][nn].drop('level_0',axis=1)
        
        elif 'producto53' in n.lower():
            print('reading table '+n+' from schema '+s)
            table=pd.read_sql_table(n, con=cnx,schema=s) 
            if s not in derived_tables.keys():
                derived_tables[s]={}
                
            print('adding descriptive stats to table: '+n)
            derived_tables[s][n]=casos_nuevos_desc(table,numeric_col_string='confirmados',group_col='Region')
            
            nn=n.lower()+'_casos_activos'
            #nn=n
            
            if ('producto53_confirmados_regionale' in n.lower()) & ('_casos_activos' not in n.lower()):
                print('calculando casos activos con rolling avg de 14 días')
                print('new name: '+nn)
                derived_tables[s][n]=derived_tables[s][n].merge(iso,on='Codigo region',how='left')
                derived_tables[s][nn]=casos_activos(table,numeric_col_string='confirmados',group_col='Region',window=14)
                derived_tables[s][nn]=derived_tables[s][nn].merge(iso,on='Codigo region',how='left')
                
            #elif ('producto53_confirmados_nacional' in n.lower()) & ('_casos_activos' not in n.lower()):
            #    print('calculando casos activos con rolling avg de 14 días')
            #    print('new name: '+nn)
            #    derived_tables[s][nn]=casos_activos(table,numeric_col_string='confirmados',group_col=None,window=14)
            #elif ('confirmados_provinciale' in n.lower()) & ('_casos_activos' not in n.lower()):
            #    print('calculando casos activos con rolling avg de 14 días')
            #    print('new name: '+nn)
            #    derived_tables[s][nn]=casos_activos(table,numeric_col_string='confirmados',group_col='Provincia',window=14)  
            #elif ('producto53_confirmados_' in n.lower()) & ('_casos_activos' not in n.lower()):
            #    print('calculando casos activos con rolling avg de 14 días')
            #    print('new name: '+nn)
            #    derived_tables[s][nn]=casos_activos(table,numeric_col_string='confirmados',group_col='servicio.salud',window=14)            

reading table producto3_CasosTotalesCumulativofrom schema producto3
generating diff table for cumulative table: producto3_CasosTotalesCumulativo
new name: producto3_casostotales_diff_
reading table producto3_CasosTotalesCumulativo_Tfrom schema producto3
generating diff table for cumulative table: producto3_CasosTotalesCumulativo_T
new name: producto3_casostotales_diff__t
reading table producto3_CasosTotalesCumulativo_stdfrom schema producto3
generating diff table for cumulative table: producto3_CasosTotalesCumulativo_std
new name: producto3_casostotales_diff__std
standard form with linear regression


  result = getattr(ufunc, method)(*inputs, **kwargs)


reading table producto13_CasosNuevosCumulativofrom schema producto13
generating diff table for cumulative table: producto13_CasosNuevosCumulativo
new name: producto13_casosnuevos_diff_
reading table producto13_CasosNuevosCumulativo_Tfrom schema producto13
generating diff table for cumulative table: producto13_CasosNuevosCumulativo_T
new name: producto13_casosnuevos_diff__t
reading table producto13_CasosNuevosCumulativo_stdfrom schema producto13
generating diff table for cumulative table: producto13_CasosNuevosCumulativo_std
new name: producto13_casosnuevos_diff__std
standard form with linear regression


  result = getattr(ufunc, method)(*inputs, **kwargs)


reading table producto14_FallecidosCumulativofrom schema producto14
generating diff table for cumulative table: producto14_FallecidosCumulativo
new name: producto14_fallecidos_diff_
reading table producto14_FallecidosCumulativo_Tfrom schema producto14
generating diff table for cumulative table: producto14_FallecidosCumulativo_T
new name: producto14_fallecidos_diff__t
reading table producto14_FallecidosCumulativo_stdfrom schema producto14
generating diff table for cumulative table: producto14_FallecidosCumulativo_std
new name: producto14_fallecidos_diff__std
standard form with linear regression


  result = getattr(ufunc, method)(*inputs, **kwargs)


reading table producto19_CasosActivosPorComuna_std from schema producto19
new name: producto19_CasosActivosPorComuna_std
reading table producto53_confirmados_regionale_casos_activos from schema producto53
adding descriptive stats to table: producto53_confirmados_regionale_casos_activos


A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  nuevos['Promedio']=nuevos.mean(axis=1)
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  nuevos['Min']=nuevos.min(axis=1)
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  nuevos['Max']=nuevos.max(axis=1)


reading table producto53_confirmados_nacional from schema producto53
adding descriptive stats to table: producto53_confirmados_nacional
reading table producto53_confirmados_provinciale from schema producto53
adding descriptive stats to table: producto53_confirmados_provinciale
reading table producto53_confirmados_regionale from schema producto53
adding descriptive stats to table: producto53_confirmados_regionale
calculando casos activos con rolling avg de 14 días
new name: producto53_confirmados_regionale_casos_activos
reading table producto53_confirmados_ from schema producto53
adding descriptive stats to table: producto53_confirmados_


In [17]:
#fix dtypes derived tables producto 14 ('región metropolitana' mezclada con valores numéricos)
dates=[x for x in derived_tables['producto14']['producto14_fallecidos_diff_'].columns if x not in ['Region','Region_Metropolitana','index']]
derived_tables['producto14']['producto14_fallecidos_diff_']=derived_tables['producto14']['producto14_fallecidos_diff_'][dates].apply(pd.to_numeric, errors='coerce')

dates=[x for x in derived_tables['producto14']['producto14_fallecidos_diff__t'].columns if x not in ['Fecha','index']]
derived_tables['producto14']['producto14_fallecidos_diff__t']=derived_tables['producto14']['producto14_fallecidos_diff__t'][dates].apply(pd.to_numeric, errors='coerce')

In [18]:
dates=[x for x in derived_tables['producto3']['producto3_casostotales_diff_'].columns if x not in ['Fecha','index','Region_Metropolitana']]
derived_tables['producto3']['producto3_casostotales_diff_']=derived_tables['producto3']['producto3_casostotales_diff_'][dates].apply(pd.to_numeric, errors='coerce')

In [19]:
dates=[x for x in derived_tables['producto3']['producto3_casostotales_diff__t'].columns if x not in ['Fecha','index']]
derived_tables['producto3']['producto3_casostotales_diff__t']=derived_tables['producto3']['producto3_casostotales_diff__t'][dates].apply(pd.to_numeric, errors='coerce')

In [20]:
dates=[x for x in derived_tables['producto13']['producto13_casosnuevos_diff_'].columns if x not in ['Fecha','index','Region_Metropolitana']]
derived_tables['producto13']['producto13_casosnuevos_diff_']=derived_tables['producto13']['producto13_casosnuevos_diff_'][dates].apply(pd.to_numeric, errors='coerce')

In [21]:
dates=[x for x in derived_tables['producto13']['producto13_casosnuevos_diff__t'].columns if x not in ['Fecha','index']]
derived_tables['producto13']['producto13_casosnuevos_diff__t']=derived_tables['producto13']['producto13_casosnuevos_diff__t'][dates].apply(pd.to_numeric, errors='coerce')

In [22]:
for schema in derived_tables.keys():
    for name in derived_tables[schema].keys():
        try:
            df=derived_tables[schema][name]
            if 'level_0' in df.columns:
                df=df.drop('level_0',axis=1)
            name=name.replace('-','_')
            print("creating table "+name+' ,schema: '+schema)
            df.to_sql(name, schema=schema,con=cnx,if_exists='replace')
            print("saving table"+path+name+timestamp+'.csv in cache')
            df.to_csv(path+name+timestamp+'.csv',encoding='utf-8')
        except Exception as e: 
            print(str(e))
            pass

creating table producto50_DefuncionesDEIS_consolidadoPorComuna_std ,schema: producto50
saving table/data/ETLcache/producto50_DefuncionesDEIS_consolidadoPorComuna_std_28012021_052023.csv in cache
creating table producto52_Camas_UCI_std ,schema: producto52
saving table/data/ETLcache/producto52_Camas_UCI_std_28012021_052023.csv in cache
creating table producto3_casostotales_diff_ ,schema: producto3
saving table/data/ETLcache/producto3_casostotales_diff__28012021_052023.csv in cache
creating table producto3_casostotales_diff__t ,schema: producto3
saving table/data/ETLcache/producto3_casostotales_diff__t_28012021_052023.csv in cache
creating table producto3_casostotales_diff__std ,schema: producto3
saving table/data/ETLcache/producto3_casostotales_diff__std_28012021_052023.csv in cache
creating table producto13_casosnuevos_diff_ ,schema: producto13
saving table/data/ETLcache/producto13_casosnuevos_diff__28012021_052023.csv in cache
creating table producto13_casosnuevos_diff__t ,schema: prod

In [23]:
os.system('jupyter nbconvert --output /home/jovyan/work/ETLdocs/' + 'ETL_covid-chile-derived.html' + ' --to html ' + 
          '/home/jovyan/work/ETL/covid-chile-derived@min-ciencias.ipynb')

65280