#TMS

In [None]:
#install necessary packages
!pip install requests #for http API calls
!pip install numpy  #for data manipulations
!pip install pandas #for data manipulations
!pip install -U googlemaps  #for google API calls
!pip install XlsxWriter  #for excel formatting

In [None]:
import io
import pandas as pd
import googlemaps
import numpy as np
import json
import xlsxwriter
import requests
import asyncio
import nest_asyncio
import re
import time
from google.colab import files

In [None]:
#this is to upload the api keys from the text file as it is important that API keys are kept secret.
apikeyfile = files.upload()

In [None]:
#upload the file that you want to process here
uploaded = files.upload()

In [None]:
#initializations
nest_asyncio.apply()
fullAddrColName  = "Full Address"
emailAddrColName = "Email Address"
googleSuggestedAddrColName = "Suggested address"
confirmItemsColName = "ConfirmItems"
unConfirmItemsColName = "UnconfirmItems"
googleCorrectionInfoColName = "Google correction"
uspsConfirmationColName = "DPV Confirmation"
uspsFootNoteColName = "DPV Foot Note"
emailValidationColName = "Email validation"
commentColName = "Comment"

#for UspsData
#Ref:https://developers.google.com/maps/documentation/address-validation/reference/rest/v1/TopLevel/validateAddress
dpvConfirmationDict = {
  'Y': 'Address was DPV confirmed for primary and any secondary numbers',
  'N': 'Primary and any secondary number information failed to DPV confirm',
  'S': 'Address was DPV confirmed for the primary number only, and the secondary number information was present by not confirmed',
  'D': 'Address was DPV confirmed for the primary number only, and the secondary number information was missing',
}

dpvFootnoteDict = {
  'AA': 'Input address matched to the ZIP+4 file',
  'A1': 'Input address was not matched to the ZIP+4 file',
  'BB': 'Matched to DPV (all components)',
  'CC': 'Secondary number not matched (present but invalid)',
  'N1': 'High-rise address missing secondary number',
  'M1': 'Primary number missing',
  'M3': 'Primary number invalid',
  'P1': 'Input address RR or HC box number missing',
  'P3': 'Input address PO, RR, or HC Box number invalid',
  'F1': 'Input address matched to a military address',
  'G1': 'Input address matched to a general delivery address',
  'U1': 'Input address matched to a unique ZIP code',
  'PB': 'Input address matched to PBSA record',
  'RR': 'DPV confirmed address with PMB information',
  'R1': 'DPV confirmed address without PMB information',
  'R7': 'Carrier Route R777 or R779 record'
}

comment1 ="Google has corrected some components of the input address. Please check the suggested address."
comment2 ="The input address looks ok and Google has not done any correction but there are some unconfirmed address component."
comment3 ="Google has not done any correction but there are some unconfirmed address component."

#this will be assigned later after reading from the input file
GOOGLE_API_KEY =""
MAIL_API_KEY =""

#for writing dataframe to excel sheet and downloading the file
timestr = time.strftime("%Y%m%d-%H%M%S")
output_filename = 'tms-validated-{}.xlsx'.format(timestr)
sheetname = "withAddrVerification"

#get the API keys from the file
apikeyfilename = next(iter(apikeyfile))
file = open(apikeyfilename)
lines = file.readlines()
for line in lines:
    head, sep, tail = line.partition("=")
    if head and head == "GOOGLE_API_KEY":
      GOOGLE_API_KEY = tail
    if head and head == "MAIL_API_KEY":
      MAIL_API_KEY = tail
file.close()

#get the file and translate to dataframe
filename = next(iter(uploaded))
df_all = pd.read_excel(filename, engine="openpyxl")
new_df_all = df_all.copy() #copy to manipulate the dataframe so that original df_all will be kept as it is.

#def removeSubstringAfter(originalString, removeChar):
#  head, sep, tail = originalString.partition(removeChar)

#Ref: https://developers.google.com/maps/documentation/address-validation/reference/rest/v1/TopLevel/validateAddress
def isValidAddress(_fulladdress):
  gmaps = googlemaps.Client(key=GOOGLE_API_KEY)
  #assuming that first line is the house number, 2nd line is city, state,zip
  addrArr = _fulladdress.split("\n")
  addressvalidation_result = False
  if len(addrArr) >= 2:
    # Validate an address with address validation
    addressvalidation_result =  gmaps.addressvalidation(addressLines=[addrArr[0],
                                                                      addrArr[1]],
                                                      regionCode='US',
                                                      enableUspsCass=True)

  return addressvalidation_result


async def addAddressEmailVerificationResultColumns(df_input):
  comment = ""
  try:
    for index,row in df_input.iterrows():
      confirmList = []
      unconfirmList = []
      addr = row[fullAddrColName]
      addrResult = isValidAddress(addr)
      #print(json.dumps(addrResult, indent=4))
      if 'addressComplete' in addrResult['result']['verdict']:
        isAddrComplete = addrResult['result']['verdict']['addressComplete']

      googleCorrectedComponentList= []

      arrayOfComponents = addrResult['result']['address']['addressComponents']
      totalComponents = len(arrayOfComponents)
      for componentItem in arrayOfComponents:
        if componentItem['confirmationLevel'] == "CONFIRMED":
          confirmList.append({componentItem['componentName']['text']:componentItem['confirmationLevel']})
        else:
          unconfirmList.append({componentItem['componentName']['text']:componentItem['confirmationLevel']})

        if 'inferred' in componentItem and componentItem['inferred']:
          googleCorrectedComponentList.append({(str(componentItem['componentType'])+":"+componentItem['componentName']['text']):'inferred'})
        if 'spellCorrected' in componentItem and componentItem['spellCorrected']:
          googleCorrectedComponentList.append({(str(componentItem['componentType'])+":"+componentItem['componentName']['text']):'spellCorrected'})
        if 'replaced' in componentItem and componentItem['replaced']:
          googleCorrectedComponentList.append({(str(componentItem['componentType'])+":"+componentItem['componentName']['text']):'replaced'})
        if 'unexpected' in componentItem and componentItem['unexpected']:
          googleCorrectedComponentList.append({(str(componentItem['componentType'])+":"+componentItem['componentName']['text']):'unexpected'})

      if len(unconfirmList) >  0:
        if isAddrComplete:
          comment = comment2
        else:
          comment = comment3

      if 'hasInferredComponents' in addrResult['result']['verdict'] or 'hasReplacedComponents' in addrResult['result']['verdict']  :
        comment = comment1

      dpvConfirmationList= []
      dpvFootNoteList= []
      if 'uspsData' in addrResult['result']:
        if 'dpvConfirmation' in addrResult['result']['uspsData']:
          dpvConfirmation =  addrResult['result']['uspsData']['dpvConfirmation']
          #dpvConfirmation are one-char responses
          listconfirm = re.findall('.',dpvConfirmation)
          #print(dpvConfirmation)
          if len(listconfirm) > 0 :
            for c in listconfirm:
              dpvConfirmationList.append(dpvConfirmationDict.get(c))
          #print(dpvConfirmationList)
          #print(listconfirm)

        if 'dpvFootnote' in addrResult['result']['uspsData']:
          dpvFootnote =  addrResult['result']['uspsData']['dpvFootnote']
          #print(dpvFootnote)
          #dpvFootnote are two-char responses
          listfootNote = re.findall('..',dpvFootnote)
          #print(listfootNote)
          if len(listfootNote) > 0 :
            for n in listfootNote:
              dpvFootNoteList.append(dpvFootnoteDict.get(n))
          #print(dpvFootNoteList)

      #loop = asyncio.get_event_loop()
      #checkEmail = loop.run_until_complete(isValidEmail(row[emailAddrColName]))
      df_input.loc[index, googleSuggestedAddrColName] =addrResult['result']['address']['formattedAddress']
      df_input.loc[index,confirmItemsColName] = (str(confirmList))
      df_input.loc[index,unConfirmItemsColName] = (str(unconfirmList))
      df_input.loc[index,googleCorrectionInfoColName] = (str(googleCorrectedComponentList))
      #df_input.loc[index,emailValidationColName] =checkEmail
      df_input.loc[index,uspsConfirmationColName] = (str(dpvConfirmationList))
      df_input.loc[index,uspsFootNoteColName] = (str(dpvFootNoteList))
      df_input.loc[index,commentColName] =comment

  except:
      print("An exception occurred")
  return df_input


def autosize_excel_columns(worksheet, df):
  autosize_excel_columns_df(worksheet, df.index.to_frame())
  autosize_excel_columns_df(worksheet, df, offset=df.index.nlevels)

def autosize_excel_columns_df(worksheet, df, offset=0):
  for idx, col in enumerate(df):
    series = df[col]
    max_len = max((
      series.astype(str).map(len).max(),
      len(str(series.name))
    )) + 1
    worksheet.set_column(idx+offset, idx+offset, max_len)

async def main_func():
  #add new columns
  new_df_all[googleSuggestedAddrColName] =""
  new_df_all[confirmItemsColName] =""
  new_df_all[unConfirmItemsColName] =""
  new_df_all[googleCorrectionInfoColName] =""
  #new_df_all[emailValidationColName] =""
  new_df_all[uspsConfirmationColName] = ""
  new_df_all[uspsFootNoteColName] = ""
  new_df_all[commentColName] =""

  loop = asyncio.get_event_loop()
  final_df = loop.run_until_complete(addAddressEmailVerificationResultColumns(new_df_all))
  final_df = final_df.drop(confirmItemsColName, axis=1) #remove column axis=1 for col, axis=0 for row
  #save dataframe into excel and allow download the file
  writer = pd.ExcelWriter(output_filename, engine='xlsxwriter')
  final_df.to_excel(writer, sheet_name=sheetname, freeze_panes=(final_df.columns.nlevels, final_df.index.nlevels))
  worksheet = writer.sheets[sheetname]

  workbook = writer.book
  formatYellow=workbook.add_format()
  formatRed=workbook.add_format()

  #f1.set_bold(True)
  formatYellow.set_bg_color('yellow')
  formatRed.set_bg_color('red')

  autosize_excel_columns(worksheet, final_df)
  alphabetList = list('ABCDEFGHIJKLMNOPQRSTUVWXYZ')
  uspsConfirmationColIndex = final_df.columns.get_loc(uspsConfirmationColName)
  #this is to find the column name in excel so that we know individual cell e.g: A1, C5 etc to format
  #desire color.
  uspsConfirmationColAlphabet = alphabetList[uspsConfirmationColIndex + 1] #count for index column
  totalRows = final_df.shape[0] + 1 #including title row as well)
  rowRangeConfirmation  = uspsConfirmationColAlphabet + "1"+":" + uspsConfirmationColAlphabet + str(totalRows)
  #print(rowRange)
  worksheet.conditional_format(rowRangeConfirmation, {'type': 'text',
                                       'criteria': 'containing',
                                       'value': 'failed',
                                       'format': formatRed})
  worksheet.conditional_format(rowRangeConfirmation, {'type': 'text',
                                       'criteria': 'containing',
                                       'value': 'Address was DPV confirmed for the primary number only',
                                       'format': formatYellow})

  googleCorrectionInfoColIndex = final_df.columns.get_loc(googleCorrectionInfoColName)
  googleCorrectionInfoColAlphabet = alphabetList[googleCorrectionInfoColIndex + 1] #count for index column
  rowRangeGoogleCorrection  = googleCorrectionInfoColAlphabet + "1"+":" + googleCorrectionInfoColAlphabet + str(totalRows)

  worksheet.conditional_format(rowRangeGoogleCorrection, {'type': 'text',
                                       'criteria': 'containing',
                                       'value': 'unexpected',
                                       'format': formatRed})
  worksheet.conditional_format(rowRangeGoogleCorrection, {'type': 'text',
                                       'criteria': 'containing',
                                       'value': 'spellCorrected',
                                       'format': formatYellow})
  worksheet.conditional_format(rowRangeGoogleCorrection, {'type': 'text',
                                       'criteria': 'containing',
                                       'value': 'replaced',
                                       'format': formatYellow})

  writer._save()
  return final_df

#jypter already have event loop so to run asyncio here, we need a special event loop
loop = asyncio.get_event_loop()
result_df = loop.run_until_complete(main_func())
print("Please check your downloads folder for the file name starts with:"+output_filename)
#download the file
files.download(output_filename)