In [None]:
import pandas as pd
import numpy as np
import json
import requests
import mariadb
import os
from datetime import datetime
from requests.packages.urllib3.util.retry import Retry
from requests.adapters import HTTPAdapter
import urllib3

In [None]:
def cleanTypes(df,targetCols,renameCols=False):
    """
    Objetivo: Función que permite retirar registros no numéricos en la columnas especificadas
    Entradas:
        *type df: pandas.DataFrame
        *df:Dataframe a limpiar
        
        *type targetCols: list(string)
        *targetCols: Columnas a retirar registros no numéricos
        
        *type renameCols: list(string)
        *renameCols: Permite renombrar las columnas que se limpiaron
    
    Salidas:
        *type nanRows: pandas.DataFrame
        *nanRows: Dataframe con registros no numéricos

        *type dfCleaned: pandas.DataFrame
        *dfCleaned: Dataframe sin los registros numéricos
    """
    newDf=df.copy()
    dummyPrefix="dummy_"
    for col in targetCols:
        newDf[dummyPrefix+col]=pd.to_numeric(df[col],errors="coerce")
    dfNulls=newDf.isnull()
    dummyNames=[dummyPrefix+k for k in targetCols]
    nanFilters=' | '.join(['{}=={}'.format(k, True) for k in dummyNames ])
    nanRows=newDf[dfNulls.eval(nanFilters)].drop(columns=dummyNames)
    nanRows=nanRows.assign(status= lambda x:"error")
    nanRows=nanRows.assign(message= lambda x:"Data type error")
    goodFilters=' | '.join(['{}!={}'.format(k, True) for k in dummyNames ])
    dfCleaned=newDf[dfNulls.eval(goodFilters)].drop(columns=dummyNames)
    if renameCols!=False:
        newNames={k:v for k,v in zip(targetCols,renameCols) }
        nanRows.rename(columns=newNames,inplace=True)
        dfCleaned.rename(columns=newNames,inplace=True)

    return dfCleaned,nanRows

In [None]:
def createJsonGeo(df,newCol,rootJson="root",limit=1):
    """
    Objetivo: Función que genera un Json de Geolocations para consumir la api 
                https://api.postcodes.io/postcodes
    Entradas:
        *type df: pandas.DataFrame
        *df: Dataframe que contiene los registros a transformar en Json

        *type newCol: string
        *newCol: Nombre de la nueva columna a crear

        *type rootJson: string
        *rootJson: Raíz del json nuevo a crear

        *type limit: int
        *limit: Escalar para la nueva columna que se creará 
    Salidas:
        *type Json: string (JSON)
        *dfJson: Json que contiene la estructura necesaria para consumir la API
    """
    newDf=df.copy()
    newDf[newCol]=(np.ones((newDf.shape[0],1),dtype=int))*limit
    dictRows=newDf.to_dict(orient="records")
    dfJson=json.dumps({rootJson:dictRows})
    return dfJson

In [None]:
def getPostCodes(url,data,headers,rawData,maxRetry=3):
    """
    Objetivo: Función que consume una la api https://api.postcodes.io/postcodes y devuelve
    un dataframe con los coordenadas, el estado de la petición y su respectivo código postal
    Entradas:
        *type url: string
        *url: Url de la API a consumir

        *type data: string (JSON)
        *data:    Json con que se va a enviar a la API

        *type headers: dictionary
        *headers: Headers de la petición

        *type rawData: pandas.DataFrame
        *rawData: Dataframe que cotiene todas las coordenadas antes de ser procesadas transformadas en JSON

        *type maxRetry: int
        *maxRetry: Número máximo de reintentos 
    Salidas:
        *type dfresults: pandas.DataFrame
        *dfresults: Dataframe que contiene las coordenas y sus respectivos códigos postales. Solo se retorna si
                la petición tiene un estado de respuesta 200.
        
        *type arrayError: pandas.DataFrame
        *arrayError: Dataframe que contiene las coordenadas pero con status de error. Solo se retorna si la 
                petición tiene un estado de respuesta diferente a 200.      
    """
    http = requests.Session()  
    retry_strategy = Retry(
    connect=maxRetry,
    other=maxRetry,
    status=maxRetry,
    status_forcelist=[501,502,503],
    allowed_methods=["POST"],
    backoff_factor=0.3
    )
    adapter = HTTPAdapter(max_retries=retry_strategy)
    http.mount("https://",adapter)
    try:
        response=http.post(url=url,data=data,headers=headers)
    except urllib3.exceptions.MaxRetryError as e:
        raise Exception(e)
    
    responseJson=response.json()
    if responseJson["status"]==200:
        resultsArray=[]
        for apiResults in responseJson["result"]:
            resultDictionary={}
            if apiResults["result"]!=None:
                resultDictionary["longitude"]=apiResults["result"][0]["longitude"]
                resultDictionary["latitude"]=apiResults["result"][0]["latitude"]
                resultDictionary["status"]="ok"
                resultDictionary["message"]=apiResults["result"][0]["postcode"]
            else:
                resultDictionary["longitude"]=apiResults["query"]["longitude"]
                resultDictionary["latitude"]=apiResults["query"]["latitude"]
                resultDictionary["status"]="error"
                resultDictionary["message"]="Postcode not found"
            resultsArray.append(resultDictionary)
        dfresults=pd.DataFrame(resultsArray)
        success=True
        return dfresults,success
    else:
        arrayError=rawData.copy()
        arrayError=arrayError.assign(status= lambda x:"error")
        arrayError=arrayError.assign(message= lambda x:responseJson["error"])
        success=False
        return arrayError,success
    

In [None]:
def generateLogFile(df,path,filename,header=False):
    """
    Objetivo: Función que escribe en un archivo csv los registros de dataframe especificado
    Entradas:
        *type df: pandas.DataFrame
        *df: Dataframe que contiene los registros a escribir

        *type path: string
        *path: Ruta del directorio del archivo de salida
        
        *type filename: string
        *filename: Nombre del archivo de salida sin extensión
        
        *type header: boolean
        *header: Si es True escribe los titulos del dataframe en caso contrario no.     
    """
    output=open(os.path.join(path,filename),"a")
    output.write(df.to_csv(index=False,line_terminator='\n',header=header))
    output.close()

In [None]:
"""
Esta clase está diseñada para concetarse a la base de datos ukpostcodes
y con sus diferentes métodos poder extraer o insertar datos en la tabla
coordinates_postcodes
"""

class coordinatesPostcodes:
    conn=''
    def connectDb(self,user,password,uri,database,port=3306):
        """
        Objetivo: Conectase a la base de datos
        Entradas:
            *type self: coordinatesPostcodes
            *self: Atributos de la clase como 'conn'

            *type user: string 
            *user:     Usuario de la base de datos
            
            *type password: string
            *password: Contraseña de la base de datos
            
            *type uri: string
            *uri: Host donde esta la base de datos
            
            *type database: string
            *database: Nombre de la base de datos
            
            *type port: int
            *port:     Puerto de conexión a la base de datos
        Salida:            
            *type success: boolean
            *success:   Si la conexión fue exitosa retorna True en caso contrario False
            
            *type message: string
            *message:   Si la conexión fue fallida contiene el mensaje de error en caso contrario esta vacio
        """
        try:
            self.conn = mariadb.connect(
            user=user,
            password=password,
            host=uri,
            port=port,
            database=database
            )
            success=True
            message=""
        except mariadb.Error as e:
            self.conn=''
            success=False
            message=e
        return success,message
    
    def insertPostcode(self,values):

        """ 
        Objetivo: Insertar en la tabla coordenates_postcodes los nuevos postcocdes. Si ya existe postcode
        para la latitud y longitud especificada, se actualiza.
        Entradas:
            *type values: tuple
            *values: Tupla que contiene los valores de longitud, latitud y postcode a insertar en la base de 
            datos
        Salidas:
            *type success: boolean
            *success: Si la inserción fue exitosa retorna True en caso contrario retorna False.

            *type message: string
            *message: Si la inserción no fue exitosa retorna mensaje de error en caso contrario 
            retorna vacio.
        
        """
        cur=self.conn.cursor()
        try:
            cur.execute(
            "INSERT INTO coordenates_postcodes (latitude,longitude,postcode) VALUES (?,?,?) ON DUPLICATE KEY UPDATE postcode= ? "
            , values )
            success=True
            message=''
            return success,message
        except mariadb.Error as e:
            success=False
            message=e
            return success,message
            
    def closeConnection(self):
        """ 
        Objetivo: Cerrar la conexión de la base de datos
        """
        self.conn.close()

In [None]:
URL="https://api.postcodes.io/postcodes"
HEADERS={"Content-Type":"application/json"}
TARGET_COLUMNS=["lat","lon"]
COLUMNS_RENAME=["latitude","longitude"]
CHUNKSIZE=100
NEWCOL="limit"
ROOT_JSON="geolocations"
INPUT_DATA_FILE=r"..\data\postcodesgeo2.csv"
OUTPUT_LOG_DIR=r"..\logs"
OUTPUT_LOG_FILENAME="log.csv"
OUTPUT_INSERTED_RECORDS_DIR=r"..\logs"
OUTPUT_INSERTED_RECORDS_FILE="inserted_records.csv"
DB_USER="user"
DB_PASSWORD="password"
DB_NAME="ukpostcodes"
DB_URI="localhost"
INSERTED_FLAG=True
LOG_FLAG=True

In [None]:
currentTime=(datetime.now()).strftime("%Y_%m_%d-%H_%M_%S_%p")

if CHUNKSIZE>100:
    raise Exception("chunkSize no puede ser mayor a 100")

for chunk in pd.read_csv(INPUT_DATA_FILE,chunksize=CHUNKSIZE):
    dfCleaned,nanRows=cleanTypes(chunk,TARGET_COLUMNS,COLUMNS_RENAME)
    if dfCleaned.shape[0]>=1:
        dfJson=createJsonGeo(dfCleaned,NEWCOL,ROOT_JSON)
        dfProcesed,postCodesSuccess=getPostCodes(url=URL,data=dfJson,headers=HEADERS,rawData=dfCleaned)
        if postCodesSuccess==True:
            coordinatesPostcodesObject=coordinatesPostcodes()
            connSuccess,connMessage=coordinatesPostcodesObject.connectDb(DB_USER,DB_PASSWORD,DB_URI,DB_NAME)
            if connSuccess==False:
                raise Exception(connMessage)
            insertedRecordsArray=[]
            for record in dfProcesed.values:
                if record[2]!="error":
                    insertSuccess,insertMessage=coordinatesPostcodesObject.insertPostcode(
                        (record[1],record[0],record[3],record[3]))
                    if insertSuccess==False:
                        record[2]="error"
                        record[3]=insertMessage
                insertedRecordsArray.append(record)    
            coordinatesPostcodesObject.closeConnection()
            dfInsertedRecords=pd.DataFrame(data=insertedRecordsArray,columns=dfProcesed.columns.values)
            allProcesedRecords=pd.concat([dfInsertedRecords,nanRows],ignore_index=True)
            
            if INSERTED_FLAG==True:
                titlesInserted=True
                INSERTED_FLAG=False
            else:
                titlesInserted=False
            generateLogFile(dfInsertedRecords[dfInsertedRecords["status"]=="ok"],
            OUTPUT_INSERTED_RECORDS_DIR,
            filename=currentTime+"_"+OUTPUT_INSERTED_RECORDS_FILE,
            header=titlesInserted)
        else:
            allProcesedRecords=pd.concat([dfProcesed,nanRows],ignore_index=True)
    else:
        allProcesedRecords=nanRows.copy()
    if LOG_FLAG==True:
        titles=True
        LOG_FLAG=False
    else:
        titles=False
    generateLogFile(allProcesedRecords,
    OUTPUT_LOG_DIR,
    filename=currentTime+"_"+OUTPUT_LOG_FILENAME,
    header=titles)
