In [1]:
import awswrangler as wr
import datetime
from datetime import date 

today = date.today()
day, month, year = (today - datetime.timedelta(days=60)).strftime("%d/%m/%Y").split('/')
period = year + month
period

'202102'

In [2]:
q = """
with tmp as (
select tipodocumento||'-'||documento cuc, 
        scoreingreso, b.rango , alta, situacion, e_giro, tipotrabajador  ,
       row_number() over (partition by documento order by alta desc) num
  from  Procesosda.base_dependientes a
  left join Procesosda.base_dependientes_rangos b on a.scoreingreso = b.score
)
select cuc, scoreingreso, rango, alta, situacion, e_giro, tipotrabajador
from tmp where num = 1;
"""
df_dependientes = wr.athena.read_sql_query(q, database="bdax_acselx")

In [5]:
wr.s3.to_parquet(
        df=df_dependientes.reset_index(drop=True),
        path='s3://rimac-analytics-temporal/individuals/Dante/cross_prop_veh/data/interm/load/tbl_dependientes/',
        mode='overwrite',
        dataset=True,
        database="coe_analytics_tmp",
        table="ao_crossde_dep"
)

{'paths': ['s3://rimac-analytics-temporal/individuals/Dante/cross_prop_veh/data/interm/load/tbl_dependientes/92295f6c6b8b455e9d2be38dbaac4959.snappy.parquet'],
 'partitions_values': {}}

In [4]:
q = f"""
CREATE TABLE coe_analytics_tmp.ao_cross3_tmp00
	with (
	 format = 'parquet',
	 parquet_compression = 'snappy',
	 external_location = 's3://rimac-analytics-temporal/individuals/Antonio Ordonez/data/base_cross/ao_cross3_tmp00'
	)  as 
	   with tmp_canales as (
			select canal, id_canalnt
			from bdrrvv_app_datman.per_clientes_detalle
			group by canal, id_canalnt
		)
	  
	  select periodo, cuc , id_producto, riesgo , canal, 
	       cast( ( case when  REPLACE(prima_cont,'[^0-9.]', '') = '' then null
	                    else  REPLACE(prima_cont,'[^0-9.]', '') end )
	                as double) prima_cont
	  from bdrrvv_app_datman.per_clientes_detalle
	  where periodo >= '201501' and periodo <= '201712' 
	  --
	  union all
	  --
	  select  a.periodo, a.cuc, a.id_producto, a.riesgo,
		        a.canaldes canal, 
		        cast(a.prima_emitida as double) prima_cont
	  from mis_clientes.dim_modelo_clientes a
	  where periodo >= '201801'  and periodo <= '{period}' -- limitar perido cerrado
	    and id_producto not in ('4001','4075','9030','1304','1355','4062','1357','4020','4061','4028','1996','3101','4063','1403')
	    and vigencia ='VIGENTE' 
	    and tipo_stock != 'Caduco';
"""

wr.s3.delete_objects('s3://rimac-analytics-temporal/individuals/Antonio Ordonez/data/base_cross/ao_cross3_tmp00')

query_id = wr.athena.start_query_execution('DROP TABLE coe_analytics_tmp.ao_cross3_tmp00;', database='coe_analytics_tmp')
res = wr.athena.wait_query(query_execution_id=query_id)

query_id = wr.athena.start_query_execution(q, database='coe_analytics_tmp', )
res = wr.athena.wait_query(query_execution_id=query_id)

'dbbe805f-6163-434b-bc79-aecb26d2b95b'

In [7]:
q = """
CREATE TABLE coe_analytics_tmp.ao_cross3_tmp01 
with (
 format = 'parquet',
 parquet_compression = 'snappy',
 external_location = 's3://rimac-analytics-temporal/individuals/Antonio Ordonez/data/base_cross/ao_cross3_tmp01'
)  as 
 SELECT a.periodo, a.cuc,  b.GRUPO_RES as GRUPO_ANALYTICS, 
       sum(prima_cont) PRIMA
 FROM coe_analytics_tmp.ao_cross3_tmp00 A
 LEFT JOIN coe_analytics_tmp.ao_cross_gruposnew B on a.id_producto = b.id_producto
GROUP BY periodo, cuc, a.id_producto, b.GRUPO_RES;
"""

wr.s3.delete_objects('s3://rimac-analytics-temporal/individuals/Antonio Ordonez/data/base_cross/ao_cross3_tmp01')

query_id = wr.athena.start_query_execution('DROP TABLE coe_analytics_tmp.ao_cross3_tmp01;', database='coe_analytics_tmp')
res = wr.athena.wait_query(query_execution_id=query_id)

query_id = wr.athena.start_query_execution(q, database='coe_analytics_tmp', )
res = wr.athena.wait_query(query_execution_id=query_id)

In [8]:
q = """
CREATE TABLE coe_analytics_tmp.ao_cross3_tmp001  
	with (
	 format = 'parquet',
	 parquet_compression = 'snappy',
	 external_location = 's3://rimac-analytics-temporal/individuals/Antonio Ordonez/data/base_cross/ao_cross3_tmp001'
	)  as 
	SELECT PERIODO, cuc, array_join(array_agg(GRUPO_ANALYTICS),'/') GRUPO_ANALYTICS_concat,
	     SUM(PRIMA) PRIMA
	FROM coe_analytics_tmp.ao_cross3_tmp01 
	GROUP BY PERIODO, cuc;
"""

wr.s3.delete_objects('s3://rimac-analytics-temporal/individuals/Antonio Ordonez/data/base_cross/ao_cross3_tmp001')

query_id = wr.athena.start_query_execution('DROP TABLE coe_analytics_tmp.ao_cross3_tmp001;', database='coe_analytics_tmp')
res = wr.athena.wait_query(query_execution_id=query_id)

query_id = wr.athena.start_query_execution(q, database='coe_analytics_tmp', )
res = wr.athena.wait_query(query_execution_id=query_id)

In [9]:
q = """
CREATE TABLE coe_analytics_tmp.ao_cross3_cr
with (
 format = 'parquet',
 parquet_compression = 'snappy',
 external_location = 's3://rimac-analytics-temporal/individuals/Antonio Ordonez/data/base_cross/ao_cross3_cr'
)  as 
SELECT PERIODO, cuc, array_join(array_agg(RIESGO),'/') COMBO_RIESGOS 
FROM (SELECT DISTINCT PERIODO, cuc, RIESGO 
      FROM coe_analytics_tmp.ao_cross3_tmp00) GROUP BY PERIODO, cuc;
"""

wr.s3.delete_objects('s3://rimac-analytics-temporal/individuals/Antonio Ordonez/data/base_cross/ao_cross3_cr')

query_id = wr.athena.start_query_execution('DROP TABLE coe_analytics_tmp.ao_cross3_cr;', database='coe_analytics_tmp')
res = wr.athena.wait_query(query_execution_id=query_id)

query_id = wr.athena.start_query_execution(q, database='coe_analytics_tmp', )
res = wr.athena.wait_query(query_execution_id=query_id)

In [10]:
q = """
CREATE TABLE coe_analytics_tmp.ao_cross3_cc  
with (
 format = 'parquet',
 parquet_compression = 'snappy',
 external_location = 's3://rimac-analytics-temporal/individuals/Antonio Ordonez/data/base_cross/ao_cross3_cc'
)  as 
SELECT PERIODO, cuc, array_join(array_agg(CANAL),'/') COMBO_CANALES
FROM (SELECT DISTINCT PERIODO, cuc, CANAL 
      FROM coe_analytics_tmp.ao_cross3_tmp00 ) GROUP BY PERIODO, cuc;
"""

wr.s3.delete_objects('s3://rimac-analytics-temporal/individuals/Antonio Ordonez/data/base_cross/ao_cross3_cc')

query_id = wr.athena.start_query_execution('DROP TABLE coe_analytics_tmp.ao_cross3_cc;', database='coe_analytics_tmp')
res = wr.athena.wait_query(query_execution_id=query_id)

query_id = wr.athena.start_query_execution(q, database='coe_analytics_tmp', )
res = wr.athena.wait_query(query_execution_id=query_id)

In [11]:
q = """
CREATE TABLE coe_analytics_tmp.ao_cross3_tmp04  
with (
 format = 'parquet',
 parquet_compression = 'snappy',
 external_location = 's3://rimac-analytics-temporal/individuals/Antonio Ordonez/data/base_cross/ao_cross3_tmp04'
)  as 
SELECT A.PERIODO, A.cuc, A.GRUPO_ANALYTICS_concat, A.PRIMA, C1.COMBO_RIESGOS, C2.COMBO_CANALES,
first_value(A.periodo) over (partition by A.cuc order by a.periodo ) first_periodo

FROM coe_analytics_tmp.ao_cross3_tmp001 A
LEFT JOIN coe_analytics_tmp.ao_cross3_cr C1 ON A.cuc = C1.cuc AND A.PERIODO = C1.PERIODO
LEFT JOIN coe_analytics_tmp.ao_cross3_cc C2 ON A.cuc = C2.cuc AND A.PERIODO = C2.PERIODO;
"""

wr.s3.delete_objects('s3://rimac-analytics-temporal/individuals/Antonio Ordonez/data/base_cross/ao_cross3_tmp04')

query_id = wr.athena.start_query_execution('DROP TABLE coe_analytics_tmp.ao_cross3_tmp04;', database='coe_analytics_tmp')
res = wr.athena.wait_query(query_execution_id=query_id)

query_id = wr.athena.start_query_execution(q, database='coe_analytics_tmp', )
res = wr.athena.wait_query(query_execution_id=query_id)

In [13]:
q = """
CREATE TABLE coe_analytics_tmp.ao_cross3_tmp03
with (
 format = 'parquet',
 parquet_compression = 'snappy',
 external_location = 's3://rimac-analytics-temporal/individuals/Antonio Ordonez/data/base_cross/ao_cross3_tmp03'
)  as 
with tmp2 as (
   with tmp as (
      select cuc, periodo, GRUPO_ANALYTICS,  
          first_value(periodo) over (partition by cuc, GRUPO_ANALYTICS order by periodo ) first_apearance,
          lag(periodo, 1) over (partition by cuc, GRUPO_ANALYTICS order by periodo ) last_apearance
      from  coe_analytics_tmp.ao_cross3_tmp01
    )
  select *, 
     lag(periodo, 1) over (partition by cuc order by periodo ) last_apearance2 ,
     lead(periodo, 1) over (partition by cuc order by periodo ) next_apearance2 ,
     (case when periodo = '201501' then 0
           when last_apearance is null then 1 
           when date_diff('month', date_parse(last_apearance, '%Y%m'), date_parse(periodo, '%Y%m')) > 2 then 1
           else 0 end ) new_client_producto
     from tmp 
  )
select *,
   (case when new_client_producto = 1 and 
              date_diff('month', date_parse(last_apearance2, '%Y%m'), date_parse(periodo, '%Y%m')) <= 1 then 1
         when new_client_producto = 1 and
               date_diff('month', date_parse(next_apearance2, '%Y%m'), date_parse(periodo, '%Y%m')) = 0 then 1 
         else 0 end ) new_cross_producto
from tmp2;
"""

wr.s3.delete_objects('s3://rimac-analytics-temporal/individuals/Antonio Ordonez/data/base_cross/ao_cross3_tmp03')

query_id = wr.athena.start_query_execution('DROP TABLE coe_analytics_tmp.ao_cross3_tmp03;', database='coe_analytics_tmp')
res = wr.athena.wait_query(query_execution_id=query_id)

query_id = wr.athena.start_query_execution(q, database='coe_analytics_tmp', )
res = wr.athena.wait_query(query_execution_id=query_id)