# This script defines functions for geocoding
We have used three different approaches for solving this issue:
1. RUIAN API from czech bureau (payed for commercial purpose)
2. Google API (payed always)
3. Joining method - uses file containing all ddresses in the Cyech Republic and using join tries to find GPS coordinates (free but slow)

All of the methods presents different advantages and disadvantages and therefore, all of them are implemented.

Structure of the code:
1. geoRuian -  geocode class fro RUIAN
2. geoGoogle - geocode class for Google API
3. geoJOIN - join geocode method
4. generateAddressString - given separated elements of an address creates a complete string
5. geoWrapper - a wrapper for all above described classes/approaches



In [1]:
import requests 
from bs4 import BeautifulSoup
from IPython.core.debugger import Tracer
import ast
import pandas as pd
import numpy as np
import tqdm

In [2]:
#load api key for ggoogle api
import apicka as api

In [3]:
API_KEY = api.api

# 1. class for RUIAN geocode API

In [4]:
class geoRUIAN:
    '''
    a class for geocode using RUIAN's API
    '''
    def __init__(self):
        #define the API's link
        self.ruianlink = 'http://ags.cuzk.cz/arcgis/rest/services/RUIAN/Vyhledavaci_sluzba_nad_daty_RUIAN/MapServer/exts/GeocodeSOE/findAddressCandidates'
        
    def testConnection(self):
        '''
        a method that tests connection to the API
        '''
        try:
            _ = requests.get(self.ruianlink)
            print('Connection was succesfull.')
            return True
        except requests.ConnectionError:
            print("No internet connection available or another issues.")
        return False
    
    def sendRequest(self,adresaString):
        '''
        
        '''
        self.adresaString = adresaString
        pars = {'SingleLine':adresaString,
               'outSR':      '',
               'maxLocations': 100,
               'searchExtent': '',
               'f':'html'}
        
        self.r = requests.get(self.ruianlink, params=pars)
        self.r.encoding = 'utf8'
        
        
        self.soup = BeautifulSoup(self.r.text,'lxml')
         #pridat blbuvzdornost

    def parseOutput(self):
        
        #locate the table
        output_raw = self.soup.findAll('table','formTable')
        
        #there are two formTables on the page. the second one contains the resuls
        results_raw = output_raw[1]
        
        #our element is 'pre'
        allResults = results_raw.find('pre')
        
        #convert the unicode format into something human readible. i.e. a dictionary
        allResultsString = ast.literal_eval(allResults.text)
        
        #slice only 'candidates' key of the dictionary. The output is a list.
        if allResultsString.get('candidates'):
            self.candidatesAll = allResultsString['candidates']
        else:
            self.candidatesAll = False
        
    
    def testAddressOutput(self):
        '''
        The geocoding service might not be able to return the searched address GPS. This method tests whether the API detected the address correctly.
        '''
        #get all 'Type' of the dictionary
        if self.candidatesAll: 
            #detect whether at least one of the candidates is 'AdresniMisto'
            self.types = [d['attributes']['Type'] for d in self.candidatesAll]
            if 'AdresniMisto' in self.types:
                self.AdresniMistoFound = True
                return True
            else:
                self.AdresniMistoFound = False
                self.somethingFound    = True
        else:
            self.AdresniMistoFound = False
            self.somethingFound    = False
            self.notFoundAtAll = True

    
    def selectAddressCandidate(self, method='First', printOutput=True):
        '''
        In some cases RUIAN might return multiple outputs. This method selects one of them.
        method muze byt first nebo centroid
        '''
        if self.AdresniMistoFound:
                #subset only AdresniMista
                AdresniMistoFlag = [ d for d in self.candidatesAll if d['attributes']['Type']=='AdresniMisto' ]
                
                self.adresniMistaAll = AdresniMistoFlag
                
                #if htere is only one candidate, we are done. Otherwise sort candidates according to score...
                #in case multiple candidates have the same score, create an average of GPS and create a flag
                if len(self.adresniMistaAll) == 1:
                    matchedAddress = self.adresniMistaAll[0]['address']
                    xMean     = np.mean([self.adresniMistaAll[0]['attributes']['Xmax'],
                                        self.adresniMistaAll[0]['attributes']['Xmin']])
                    
                    yMean     = np.mean([self.adresniMistaAll[0]['attributes']['Ymax'],
                                        self.adresniMistaAll[0]['attributes']['Ymin']])
                    
                    #we create a flag to indicate how certain we are about the returned result
                    flagConfidence = 1
                    
                
                elif method == 'First':
                    #using this method we select the first offered adresniMisto by RUIAN
                    matchedAddress = self.adresniMistaAll[0]['address']
                    xMean     = np.mean([self.adresniMistaAll[0]['attributes']['Xmax'],
                                        self.adresniMistaAll[0]['attributes']['Xmin']])
                    
                    yMean     = np.mean([self.adresniMistaAll[0]['attributes']['Ymax'],
                                        self.adresniMistaAll[0]['attributes']['Ymin']])
                    
                    #we create a flag to indicate how certain we are about the returned result
                    flagConfidence = 2


                elif method == 'Centroid':
                    #using this method as GOS coordinates we return an average of the given addresses
                    matchedAddress = self.adresniMistaAll[0]['address']
                    xMean     = np.mean([d['attributes']['Xmax'] for d in self.adresniMistaAll] +
                                        [d['attributes']['Xmin'] for d in self.adresniMistaAll])
                    
                    yMean     = np.mean([d['attributes']['Ymax'] for d in self.adresniMistaAll] +
                                        [d['attributes']['Ymin'] for d in self.adresniMistaAll])
                    
                    #we create a flag to indicate how certain we are about the returned result
                    flagConfidence = 3
                    #list of output variables
                    self.geocodedAddress = {'originalAddress':self.adresaString, 
                                            'matchedAddress':matchedAddress,
                                            'lon': xMean,
                                            'lat': yMean, 
                                            'confidenceFlag': flagConfidence}
                    
                else:
                    print('method is not defined!')
        
        #create flag if something was return by RUIAN but it is not an address point
        elif self.somethingFound:
            matchedAddress = ''
            xMean = ''
            yMean = ''
            flagConfidence = 4          
        
        else:
            matchedAddress = ''
            xMean = ''
            yMean = ''
            flagConfidence = 5
            #print('address not found.')

        #list of output variables
        self.geocodedAddress = {'originalAddress':self.adresaString, 
                                'matchedAddress':matchedAddress,
                                'lon': xMean,
                                'lat': yMean, 
                                'confidenceFlag': flagConfidence}            
            
        #optionally print the output
        if printOutput:
            return self.geocodedAddress
        
        #explanation of flagConfidence
        # 1 - string matched as address, only one result from RUIAN
        # 2 - string matched as address, multiple results from RUIAN
        # 3 - not define
        # 4 - something was found, but it was not defined as address
        # 5 - nothing found at all
    
    
    
    def auto(self, add, method='First',printOutput=True):
        '''
        Simple wrapper for calling all steps in one. if you trust the function, you should use it.
        '''
        self.sendRequest(add)
        self.parseOutput()
        self.testAddressOutput()
        return self.selectAddressCandidate()
        
        

In [5]:
#test
sample = {'fulladdress': ["Jugoslávských partyzánů 1580/3, Praha 6",
                           "Duškova 7, Praha 5",
                           "Opatovická 160/18, Praha ",
                           "nám. W. Churchilla 4, Praha 3",
                           "Slezská 68, Praha 3 - Vinohrady"],
        'obec': ['Praha', 'Praha', 'Praha', 'Praha', 'Praha'],
        'castobce': ['Praha 6', 'Praha 5', None, 'Praha 3', 'Vinohrady'],
        'ulice': ['Jugoslávských partyzánů','Duškova', 'Opatovická','nám. W. Churchilla', 'Slezská'],
        'cp': [1580, 7, 160, 4, 68],
        'co': [3,None,18,None, None],
        'psc': [None,None,None,None,None]}

sampleadres = pd.DataFrame.from_dict(sample)

In [6]:
a = geoRUIAN()

In [7]:
testoutput = {}
for i in sampleadres['fulladdress']:
    testoutput[i] = a.auto(i)
    
pd.DataFrame.from_dict(testoutput, orient='index')

Unnamed: 0,originalAddress,matchedAddress,lon,lat,confidenceFlag
"Duškova 7, Praha 5","Duškova 7, Praha 5","Duškova 1094/7, Smíchov, 15000 Praha 5",14.395358,50.07154,1
"Jugoslávských partyzánů 1580/3, Praha 6","Jugoslávských partyzánů 1580/3, Praha 6","Jugoslávských partyzánů 1580/3, Dejvice, 16000...",14.394598,50.10397,1
"Opatovická 160/18, Praha","Opatovická 160/18, Praha","Opatovická 160/18, Nové Město, 11000 Praha 1",14.418277,50.079652,1
"Slezská 68, Praha 3 - Vinohrady","Slezská 68, Praha 3 - Vinohrady","Slezská 1486/68, Vinohrady, 13000 Praha 3",14.44959,50.076193,1
"nám. W. Churchilla 4, Praha 3","nám. W. Churchilla 4, Praha 3","náměstí Winstona Churchilla 1938/4, Žižkov, 13...",14.441162,50.08427,1


# 2. class for Google geocode API
API key is required!

In [8]:
class geoGoogle:
    '''
    a class for geocode using google's API
    Expects a string of an address upon initialization
    '''
    
    def __init__(self, API):
        self.apiLinkGen  = 'https://maps.googleapis.com/maps/api/geocode/json?address={}'
        self.apikey      = API
        self.apiLink     = 'https://maps.googleapis.com/maps/api/geocode/json?address={}' + '&key={}'.format(API)
        
        self.testAddress = 'opletalova 26, Nove Mesto'
        
    def testConnection(self):
        '''
        a method that serves to test the response from the API
        '''
        self.testLink = self.apiLink.format(self.testAddress)
        self.req = requests.get(self.testLink)
        
        #test wthether success
        if self.req.status_code == 200:
            self.status=True
            #print('Connection successfull')

        else:
            self.status=False
            print('Something happened. Code:' + str(self.req.status_code))
        
        return self.status
    
    
    def geocodeOne(self, address):
        '''
        a method that serves to geocode a single address
        '''
        #check whether connection was made
        stat = self.testConnection()
        if stat == True:
        
            #call the api
            getRes = self.apiLink.format(address)

            #pull data
            addReq = requests.get(getRes)

            #convert to json if somtehing returned
            self.apiOutput = addReq.json()

            if len(addReq.json()['results']) > 0:
                self.addResult = self.apiOutput['results'][0]
                nOfRes = 999
            else:
                nOfRes = 0

            #test whether some result was returned or not
            if nOfRes ==0:
                output = {
                    "confidenceFlag": 5,
                    "source_address" : address,
                    "formatted_address" : None,
                    "latitude": None,
                    "longitude": None
                    #"google_place_id": None,
                }
            else:
                #GPS coordinates
                lat = self.addResult['geometry']['location']['lat']
                lng = self.addResult['geometry']['location']['lng']

                #formatted address
                formAddress = self.addResult['formatted_address']

                #type of location identified
                typ = self.addResult['types']
                #accuracy
                acc = self.addResult['geometry']['location_type']

                #google place ID
                placeid = self.addResult['place_id']


                #explanation of flagConfidence
                # 1 - string matched as address, only one result from RUIAN
                # 2 - string matched, multiple results from Google, a centroid is returned
                # 3 - not define
                # 4 - something was found, but it was not defined as address
                # 5 - nothing found at all
                tp = self.addResult['geometry']['location_type']
                if tp == 'ROOFTOP':
                    conf = 1
                elif tp == 'RANGE_INTERPOLATED':
                    conf = 2
                elif tp == 'GEOMETRIC_CENTER':
                    conf = 4
                elif tp == 'APPROXIMATE':
                    conf = 5
                else:
                    conf='error'

                #assign determinated values to output list
                output={
                    "confidenceFlag" : conf,
                    "latitude": lat,
                    "longitude": lng,
                    "matchedAddress" : formAddress,
                    "originalAddress" : address,

                    "accuracy": acc,
                    "google_place_id": placeid,
                    "type": typ
                }

                #print crying emoji
                print('right now you have spent 0.01USD :(')

            return output
        
        else:
            print("A confirmation of connection is not provided. Try to run method testconnection()")


In [9]:
#test the function
geo = geoGoogle(API_KEY)
geo.testConnection()

True

In [10]:
testoutput = {}
for i in sampleadres['fulladdress']:
    testoutput[i] = geo.geocodeOne(i)
    
pd.DataFrame.from_dict(testoutput, orient='index')

right now you have spent 0.01USD :(
right now you have spent 0.01USD :(
right now you have spent 0.01USD :(
right now you have spent 0.01USD :(
right now you have spent 0.01USD :(


Unnamed: 0,confidenceFlag,latitude,longitude,matchedAddress,originalAddress,accuracy,google_place_id,type
"Duškova 7, Praha 5",1,50.071506,14.395328,"Duškova 1094/7, 150 00 Praha 5-Smíchov, Czechia","Duškova 7, Praha 5",ROOFTOP,ChIJ4bI_G1WUC0cRgQHS5acMZEg,[street_address]
"Jugoslávských partyzánů 1580/3, Praha 6",1,50.104199,14.394595,"Jugoslávských partyzánů 1580/3, 160 00 Praha 6...","Jugoslávských partyzánů 1580/3, Praha 6",ROOFTOP,ChIJXWNcyzqVC0cRvwtfMhpqIo0,[street_address]
"Opatovická 160/18, Praha",1,50.079596,14.418309,"Opatovická 160/18, 110 00 Praha-Nové Město, Cz...","Opatovická 160/18, Praha",ROOFTOP,ChIJ4WGCjvGUC0cRWb40ZDOPq_w,[street_address]
"Slezská 68, Praha 3 - Vinohrady",1,50.076141,14.449604,"Slezská 1486/68, 130 00 Vinohrady, Czechia","Slezská 68, Praha 3 - Vinohrady",ROOFTOP,ChIJo4kvIoOUC0cRmVCZ7wMP4f8,[street_address]
"nám. W. Churchilla 4, Praha 3",1,50.083343,14.441611,"nám. Winstona Churchilla 1938/4, 120 00 Praha ...","nám. W. Churchilla 4, Praha 3",ROOFTOP,ChIJea5YS5eUC0cRgeOCTuNVLOM,[premise]


# 3. joining method for geocoding

In [11]:
from IPython.core.debugger import Tracer

class geoJoin:
    
    def __init__(self):
        #self.srctable_path = "/Users/vojtechnedved/Downloads/adresyAll.csv"
        self.srctable_path = "adresyAllMerge.csv"
        self.srccols = ['obec','castobce','ulice','cp','co', 'psc']
        
        #load all addresses and convert them to lowercase
        src = pd.read_csv(self.srctable_path).convert_objects(convert_numeric=True)
        self.srctable = src.applymap(lambda s:s.lower() if type(s) == str else s)
        
        #remap joining columns
        remap = {
            'Název obce' : 'obec',
            'Název části obce' : 'castobce', 
            'Název ulice': 'ulice',
            'Číslo domovní': 'cp',
            'Číslo orientační' : 'co',
            'PSČ': 'psc',
            'Souřadnice Y': 'lat',
            'Souřadnice X': 'lng'
        }
        #self.srctable =  self.srctable.rename(columns=remap, index=str)
        
        
    def validColumns(self, addr):
        '''
        a support method for finding non-null columns of the address. Is used inside joinOne() method. Required a pandas dataframe.
        '''
        
        #get index of non-null columns
        ind = addr.notnull()

        ind = ind.values[0]
        
        #subset those columns
        adSelected = addr.iloc[0, ind]
        
        #subset columns presented in both dataframes
        adSelected = [ x for x in adSelected.index.values.tolist() if x in self.srccols]
        
        self.colsMerge =  adSelected
        
        
    def joinOne(self, addresa):
        '''
        This method expects a list of one adress containing several attributes. Moreover, you should specify merging columns.
        '''
        
        #we expect a pandas series as input... then we convert it to pandas dataframe (for joining purposes; not elegant but working)
        #if we are provided with one row dataframe, no transofrmation is needed
        
        #pandas DF conversion
        self.addresaPD = pd.DataFrame(addresa).transpose()
        
        #get on what columns we will perform the join
        self.validColumns(self.addresaPD)
        
        #force conversion to numbers of the address
        self.addresaPD_beforeConversion = self.addresaPD.convert_objects(convert_numeric=True)
        
        #change joining columns of the input to lowercase
        self.addresaPD = self.addresaPD.applymap(lambda s:s.lower() if type(s) == str else s)
        
        #join the master address table wiht the current record
        self.joined = pd.merge(self.srctable, self.addresaPD ,on=self.colsMerge, how='inner')
     
        #check if unique result is returned. If not, input None... we have to geocode it using some of the APIs
        if self.joined.shape[0] > 1:
            self.lat = np.nan
            self.lng = np.nan
        #maybe we did not match anything at all
        elif self.joined.shape[0] == 0:
            self.lat = np.nan
            self.lng = np.nan
        #or we have aexactly one result
        else:
            #get longitude and latitude
            self.lat = self.joined['lat'][0] if self.joined['lat'][0] is not None else np.nan
            self.lng = self.joined['lng'][0] if self.joined['lng'][0] is not None else np.nan
            #print(self.lat, self.joined['lat'][0])
        
        return (self.lat, self.lng)


### test he join function

In [12]:
g = geoJoin()

For all other conversions use the data-type specific converters pd.to_datetime, pd.to_timedelta and pd.to_numeric.
  # This is added back by InteractiveShellApp.init_path()


In [13]:
testoutput = {}
for i,j in sampleadres.iterrows():
    testoutput[i] = g.joinOne(j)
    
pd.DataFrame.from_dict(testoutput, orient='index')

For all other conversions use the data-type specific converters pd.to_datetime, pd.to_timedelta and pd.to_numeric.


Unnamed: 0,0,1
0,,
1,,
2,743139.22,1043844.69
3,,
4,,


# 4. help function for joining address parts

In [14]:
def generateAddressString(pandasSer):
    '''
    Givne a pandas series object containing at least any of ['obec', 'castobce', 'ulice', 'cp', 'co', 'psc'] it returns a string of the address
    '''
    #join cislo popisne and orientacni
    cisla = (str(potentialData['cp']) if potentialData['cp'] else "") + ('/' + str(potentialData['co']) if potentialData['co'] else "")
    
    #join the rest
    addString = (str(potentialData['ulice']) if potentialData['ulice'] else "") +\
        (' '  + str(cisla) if cisla else "") +\
        (', ' + str(potentialData['castobce']) if potentialData['castobce'] else "") +\
        (', ' + str(potentialData['obec']) if potentialData['obec'] else "") +\
        (', ' + str(potentialData['psc']) if potentialData['psc'] else "")
    
    return addString


# 5. a wrapper class for all three geocode methods

In [15]:
class geoWrapper:
    '''
    this class serves as a wrapper for all geocode approaches. You have to pass a dataframe containing your address into the function with proper column names. 
    '''
    
    def __init__(self, dataFrame, API_KEY_Google=None):
        #initialize geo classes
        self.ruian = geoRUIAN()
        self.join  = geoJoin()
        
        #check if google Key was provided
        if API_KEY_Google is not None:
            self.ggl = geoGoogle(API_KEY_Google)
            self.gglAPI = True
        else:
            self.gglAPI = False
            
        #store data
        self.dataFrame = dataFrame
        
    def runJoin(self):
        '''
        applies joining method for geocode
        '''
        #iterate over the file and assign results
        for i,j in tqdm.tqdm_notebook(self.dataFrame.iterrows()):
            #geocode current row
            # print(j)
            out = self.join.joinOne(j)
            
            lat = out[0] or np.nan
            lng = out[1] or np.nan
            
            #print(fileToScore)
            #assign the value to df
            self.dataFrame.loc[i,'latitude'] = lat 
            self.dataFrame.loc[i,'longitude'] = lng
            self.dataFrame.loc[i,'methodUsed'] = 'join' if np.isfinite(lat) else -1
            self.dataFrame.loc[i,'confidence'] = 1      if np.isfinite(lat) else -1
            
    def runRuian(self):
        '''
        A wrapper for calling  RUIANs API
        '''
        
        #iterate over the file and assign results
        for i,j in tqdm.tqdm_notebook(self.dataFrame.iterrows()):

            #check whether it was not already scored by joinign method
            if j['methodUsed'] is None or j['methodUsed'] == np.nan or j['methodUsed'] == -1:
                #prepare data for call; if data are presented in separated way we have to merge them, Otherwise, use initially provided string
                self.j=j
                potentialData =  j[self.join.srccols]

                
                #generate address String
                if j['fulladdress']:
                    addString = j['fulladdress']
                    
                else:
                    addString = generateAddressString(potentialData)

                #call RUIAN
                res = self.ruian.auto(addString)

                #assign the value to df
                self.dataFrame.loc[i,'latitude'] = res['lat']
                self.dataFrame.loc[i,'longitude'] = res['lon']
                self.dataFrame.loc[i,'methodUsed'] = 'ruian'
                self.dataFrame.loc[i,'confidence'] = res['confidenceFlag']

                
                
                
    def runGoogle(self):
        '''
        A wrapper for calling Geocode class using google's API
        '''
        #as this service is paid, count non-matched records and estimate price
        nMiss = max(self.dataFrame['latitude'].isna().sum(),self.dataFrame['longitude'].isna().sum())
        estPrice = nMiss*0.1
        
        #raise a dialog box whether you want to proceed or not
        txt = input("Estimated price of the query is {} USD. Do you wanna proceed? yes/no".format(estPrice))
        
        
        if self.gglAPI and txt == 'yes':
        #iterate over the file and assign results
            for i,j in self.dataFrame.iterrows():

                #check whether it was not already scored by joinign method
                if j['methodUsed'] is None or j['methodUsed'] == np.nan or j['methodUsed'] == -1:
                    #prepare data for call; if data are presented in separated way we have to merge them, Otherwise, use initially provided string
                    self.j=j
                    potentialData =  j[self.join.srccols]
                    
                    
                    #generate address String
                    if j['fulladdress']:
                        addString = j['fulladdress']
                        
                    else:
                        addString = generateAddressString(potentialData)

                    #call ggl
                    res = self.ggl.geocodeOne(addString)

                    #assign the value to df
                    self.dataFrame.loc[i,'latitude'] = res['latitude']
                    self.dataFrame.loc[i,'longitude'] = res['longitude']
                    self.dataFrame.loc[i,'methodUsed'] = 'google'
                    self.dataFrame.loc[i,'confidence'] = res['confidenceFlag']
        elif txt != 'yes':
            print('you refused to pay. Function stops.')
        else:
            print('google API not provided!')
        
    def auto(self,ggl=False):
        '''
        A wrapper that runs joining and RUIAN method. optionally also google api. 
        '''
        # joining
        self.runJoin()
        
        #ruian
        self.runRuian()
        
        #google
        if ggl == True:
            self.runGoogle()


In [16]:
g = geoWrapper(sampleadres, API_KEY)

For all other conversions use the data-type specific converters pd.to_datetime, pd.to_timedelta and pd.to_numeric.
  # This is added back by InteractiveShellApp.init_path()


In [17]:
import numbers

In [18]:
g.auto()

HBox(children=(IntProgress(value=1, bar_style='info', max=1), HTML(value='')))

For all other conversions use the data-type specific converters pd.to_datetime, pd.to_timedelta and pd.to_numeric.


nan
nan
743139.22
nan
nan



HBox(children=(IntProgress(value=1, bar_style='info', max=1), HTML(value='')))

wtf happened
join



In [19]:
g.dataFrame

Unnamed: 0,fulladdress,obec,castobce,ulice,cp,co,psc,latitude,longitude,methodUsed,confidence
0,"Jugoslávských partyzánů 1580/3, Praha 6",Praha,Praha 6,Jugoslávských partyzánů,1580,3.0,,50.10397,14.3946,ruian,1.0
1,"Duškova 7, Praha 5",Praha,Praha 5,Duškova,7,,,50.07154,14.39536,ruian,1.0
2,"Opatovická 160/18, Praha",Praha,,Opatovická,160,18.0,,743139.22,1043845.0,join,1.0
3,"nám. W. Churchilla 4, Praha 3",Praha,Praha 3,nám. W. Churchilla,4,,,50.08427,14.44116,ruian,1.0
4,"Slezská 68, Praha 3 - Vinohrady",Praha,Vinohrady,Slezská,68,,,50.076193,14.44959,ruian,1.0
