In [1]:
import pandas as pd
import numpy as np
from sodapy import Socrata
from typing import Dict
import datetime
from pyspark.sql import DataFrame as SparkDataFrame
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext
from secop.pipelines.data_engineering.utilities import (
    schema_secop_2,
    schema_secop_int,
    _get_nits_to_extract,
    _remove_tildes,
    _clean_modalidad_contratacion,
    _clean_modalidad_contratacion_2,
    _clean_tipo_contrato,
    _to_int,
)
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext
import pyspark.sql.functions as F
from pyspark.sql.functions import col, udf

CODE_INTEGRATED = "rpmr-utcd"
CODE_SECOPII = "p6dx-8zbt"
CODE_SECOPII_CONT = "jbjy-vk9h"

In [16]:
from pyspark.sql.types import StructType, StructField, StringType, DateType, NumericType

In [17]:
dic_schemas = {
    "schema_secop_2": StructType(
        [
            StructField("nit_entidad", StringType(), True),
            StructField("entidad", StringType(), True),
            StructField("departamento_entidad", StringType(), True),
            StructField("ciudad_entidad", StringType(), True),
            StructField("ordenentidad", StringType(), True),
            StructField("id_del_proceso", StringType(), True),
            StructField("referencia_del_proceso", StringType(), True),
            StructField("nombre_del_procedimiento", StringType(), True),
            StructField("descripci_n_del_procedimiento", StringType(), True),
            StructField("fase", StringType(), True),
            StructField("precio_base", StringType(), True),
            StructField("modalidad_de_contratacion", StringType(), True),
            StructField("duracion", StringType(), True),
            StructField("unidad_de_duracion", StringType(), True),
            StructField("proveedores_invitados", StringType(), True),
            StructField("proveedores_con_invitacion", StringType(), True),
            StructField("visualizaciones_del", StringType(), True),
            StructField("proveedores_que_manifestaron", StringType(), True),
            StructField("respuestas_al_procedimiento", StringType(), True),
            StructField("respuestas_externas", StringType(), True),
            StructField("conteo_de_respuestas_a_ofertas", StringType(), True),
            StructField("proveedores_unicos_con", StringType(), True),
            StructField("estado_del_procedimiento", StringType(), True),
            StructField("adjudicado", StringType(), True),
            StructField("departamento_proveedor", StringType(), True),
            StructField("ciudad_proveedor", StringType(), True),
            StructField("valor_total_adjudicacion", StringType(), True),
            StructField("nombre_del_adjudicador", StringType(), True),
            StructField("nombre_del_proveedor", StringType(), True),
            StructField("nit_del_proveedor_adjudicado", StringType(), True),
            StructField("tipo_de_contrato", StringType(), True),
            StructField("subtipo_de_contrato", StringType(), True),
            StructField("fecha_de_publicacion_del", DateType(), True),
            StructField("fecha_de_ultima_publicaci", DateType(), True),
            StructField("fecha_de_publicacion_fase_3", DateType(), True),
            StructField("fecha_de_recepcion_de", DateType(), True),
            StructField("fecha_de_apertura_efectiva", DateType(), True),
        ]
    ),
    "schema_secop_int": StructType(
        [
            StructField("nivel_entidad", StringType(), True),
            StructField("nombre_de_la_entidad", StringType(), True),
            StructField("nit_de_la_entidad", StringType(), True),
            StructField("estado_del_proceso", StringType(), True),
            StructField("modalidad_de_contrataci_n", StringType(), True),
            StructField("objeto_a_contratar", StringType(), True),
            StructField("tipo_de_contrato", StringType(), True),
            StructField("valor_contrato", StringType(), True),
            StructField("nom_raz_social_contratista", StringType(), True),
            StructField("departamento_entidad", StringType(), True),
            StructField("municipio_entidad", StringType(), True),
            StructField("objeto_del_proceso", StringType(), True),
            StructField("fecha_de_firma_del_contrato", DateType(), True),
            StructField("fecha_inicio_ejecucion", DateType(), True),
            StructField("fecha_fin_ejecucion", DateType(), True),
        ]
    ),
    "schema_secop_2_cont": StructType(
        [
            StructField("nombre_entidad", StringType(), True),
            StructField("nit_entidad", NumericType(), True),
            StructField("departamento", StringType(), True),
            StructField("ciudad", StringType(), True),
            StructField("orden", StringType(), True),
            StructField("proceso_de_compra", StringType(), True),
            StructField("referencia_del_contrato", StringType(), True),
            StructField("estado_contrato", StringType(), True),
            StructField("descripcion_del_proceso", StringType(), True),
            StructField("modalidad_de_contratacion", StringType(), True),
            StructField("fecha_de_firma", DateType(), True),
            StructField("fecha_de_inicio_del_contrato", DateType(), True),
            StructField("fecha_de_fin_del_contrato", DateType(), True),
            StructField("tipodocproveedor", StringType(), True),
            StructField("documento_proveedor", StringType(), True),
            StructField("proveedor_adjudicado", StringType(), True),
            StructField("valor_del_contrato", NumericType(), True),
            StructField("objeto_del_contrato", StringType(), True),
            StructField("dias_adicionados", NumericType(), True),
        ]
    ),
}

In [9]:
%load_ext nb_black

In [6]:
num_nits_to_extract = catalog.load('params:num_nits_to_extract')

In [2]:
secop_2_cont_log = catalog.load('secop_2_cont_log_in')

In [18]:
schema = dic_schemas["schema_secop_2_cont"]

In [19]:
code = "jbjy-vk9h"

In [26]:
col_part = "nit_entidad"

In [7]:
spark = SparkSession.builder.getOrCreate()
sql_ctx = SQLContext(spark.sparkContext)
# Nit to extract. If all nits have been extracted then the oldest extraction is updated
nits_to_extract = _get_nits_to_extract(secop_2_cont_log, num_nits_to_extract)
# Request
client = Socrata("www.datos.gov.co", None)
lim = 4000
offset = lim

In [20]:
print(f"req - {offset-lim} - {datetime.datetime.now()}")
request = client.get(
    code,
    limit=lim,
    select=", ".join(schema.fieldNames()),
    where=col_part+' in ("' + '","'.join(nits_to_extract) + '")',
)
request_df = pd.DataFrame.from_records(request)
results_df = request_df.copy()

req - 0 - 2022-08-12 16:30:15.030516


In [27]:
while len(request_df) > 0:
    print(f"req - {offset} - {datetime.datetime.now()}")
    request = client.get(
        code,
        limit=lim,
        offset=offset,
        select=", ".join(schema.fieldNames()),
        where=col_part + ' in ("' + '","'.join(nits_to_extract) + '")',
    )
    request_df = pd.DataFrame.from_records(request)
    results_df = pd.concat([results_df, request_df], ignore_index=True)
    offset += lim

req - 4000 - 2022-08-12 16:32:48.191921
req - 8000 - 2022-08-12 16:32:49.200347
req - 12000 - 2022-08-12 16:32:49.911431
req - 16000 - 2022-08-12 16:32:50.654863
req - 20000 - 2022-08-12 16:32:51.535306
req - 24000 - 2022-08-12 16:32:52.388442
req - 28000 - 2022-08-12 16:32:53.229956
req - 32000 - 2022-08-12 16:32:53.956854
req - 36000 - 2022-08-12 16:32:54.693469
req - 40000 - 2022-08-12 16:32:55.482638
req - 44000 - 2022-08-12 16:32:56.318025
req - 48000 - 2022-08-12 16:32:57.202863
req - 52000 - 2022-08-12 16:32:59.562572
req - 56000 - 2022-08-12 16:33:00.377692
req - 60000 - 2022-08-12 16:33:01.118024
req - 64000 - 2022-08-12 16:33:01.857909
req - 68000 - 2022-08-12 16:33:02.567798
req - 72000 - 2022-08-12 16:33:03.347210
req - 76000 - 2022-08-12 16:33:04.037247
req - 80000 - 2022-08-12 16:33:04.760177
req - 84000 - 2022-08-12 16:33:05.641720
req - 88000 - 2022-08-12 16:33:06.566558
req - 92000 - 2022-08-12 16:33:07.396083
req - 96000 - 2022-08-12 16:33:08.141584
req - 100000 - 202

In [None]:

# Fix nulls
results_df.fillna("", inplace=True)
# Adds columns from schema not received
for c in set(schema.fieldNames()).difference(results_df.columns):
    results_df[c] = ""
for n in nits_to_extract:
    secop_2_log[n]["req"] = 1
    secop_2_log[n]["date"] = str(datetime.datetime.now())
success = 1
for c in set(results_df.columns).intersection(
    [s for s in schema.fieldNames() if str(schema[s].dataType) == "DateType"]
):
    results_df[c] = pd.to_datetime(
        results_df[c].replace("", pd.NaT), errors="coerce"
    )
results_df = results_df[schema_secop_2.fieldNames()]
try:
    result_spark = sql_ctx.createDataFrame(results_df, schema=schema)
except IndexError:
    result_spark = sql_ctx.createDataFrame([], schema)
    success = 0
for n in nits_to_extract:
    secop_2_cont_log[n]["success"] = success
