<h1>Script to Extract Transform and Load Error Check Calibration Survey 123 data on AGOL to the ROMN Water Quality Database</h1>

<h3>Code Processes the ROMN Error Check Calibration Data collected in Survey 123 from the defined Feature Layer on ArcGIS Online to the defined Water Quality Access Database:<br><br>
    1)Downloads via the ArcGIS Python API the Error Check Calibration Feature Layer (i.e. Survey 123 EC/Cal data/table) <br><br>
    2)Preprocessed fields in the Feature Layer to Match the expected schema in the 'tbl_Form0_WQ_EC_Cal_Master' table<br><br>
    3)Appends records to the 'tbl_Form0_WQ_EC_Cal_Master' table in the defined Water Quality Access Database</h3><br>
         
<h5>Dependicies:<br>
Python Version 3.8<br>
Packages: Pandas, Numpy, shutil, sys, os<br>
Anaconda Environment: py38<br>
sqlalchemyh-access - used for pandas dataframe '.to_sql' functionality: install via: 'pip install sqlalchemy-access'<br><br>
An ArcGIS Online Application Client Id must be created in order to connect with AGOL via the python API.  This application client it must be OAuth 2.0 registered and allows for two factor identification.
 


<h5>Notebook created by Kirk Sherrill - Data Manager Rock Mountian Network - I&M National Park Service<br>
    Date - May 26th, 2022</h5>

In [None]:
#Import Required Libraries
import pandas as pd
import numpy as np
import sys
from datetime import date
import shutil
import os

from zipfile import ZipFile 
import traceback

#import pyodbc - not being used

In [None]:
import arcgis
from arcgis.gis import GIS

In [None]:
import sqlalchemy as sa

<h3>Define Input Parameters</h3>

In [None]:
#Define Cell Width to be Dynamic
from IPython.display import display, HTML

display(HTML(data="""
<style>
    div#notebook-container    { width: 95%; }
    div#menubar-container     { width: 65%; }
    div#maintoolbar-container { width: 99%; }
</style>
"""))

In [None]:
#Set the Number of maximum row to display to 1000
pd.set_option('display.max_rows', 1000)

In [None]:
AGOL_ID = 'fad0563cc2a4401ba94a6131a7dc5a31'  #The Service ItemID (i.e. the AGOL Identification number) for the Error Check Calibration Survey 123 Feature Layer.

pythonApp_ID = 'VFfN107sG4W47jXo'  #AGOL Application Client Id. In order to download from AGOL the python application must be OAuth 2.0 registered.
downloadFormat = 'CSV'  #Desired format of downloaded Feature ('CSV'|'Excel')

#Water Quality Access Database
WQDB = r'C:\ROMN\Monitoring\Loggers\Data\Certified\WaterQuality_ROMN_AllYears_MASTER_20221026v5.accdb'    #ROMN Water Quality Access DB - where the 'tbl_Form0_WQ_EC_Cal_Master' table to be append to.
WQDBTable = 'tbl_Form0_WQ_EC_Cal_Master'   #Table in 'WQDB' to be appended to (i.e. 'tbl_Form0_WQ_EC_Cal_Master')

#Output Name and Directory Paths
outZipName = "EC_Cal_Survey123_2022"    #Name given to zip file during export from AGOL
outRawTable = "Form0_0"     #Name of layer being exported from AGOL
workspace = r'C:\ROMN\Monitoring\Loggers\DataGathering\WaterQuality\EC_Cal_Survey123\2022\2022_Download_AGOL_EC_Cal'  #Workspace Output Directory
logFileName = workspace + "\\" + outZipName + ".LogFile.txt"  #Name of the .txt script logfile which is saved in the workspace directory

In [None]:
#################################
# Checking for working directories and Log File
##################################
if os.path.exists(workspace):
    pass
else:
    os.makedirs(workspace)

# Check if logFile exists
if os.path.exists(logFileName):
    pass
else:
    logFile = open(logFileName, "w")  # Creating index file if it doesn't exist
    logFile.close()

In [None]:
#Function to Get the Date/Time
def timeFun():
    from datetime import datetime
    b=datetime.now()
    messageTime = b.isoformat()
    return messageTime

<h3>Download AGOL Feature Layer/Survey 123 table via ArcGIS.API</h3>

In [None]:
#gis = GIS(None,verify_cert=True)
#gis = GIS('https://nps.maps.arcgis.com',verify_cert=False)
#gis = GIS('https://nps.maps.arcgis.com',"NPS\\ksherrill")

In [None]:
#Open Connection to AGOL via the ArcGIS API - ArcGIS.GIS module, using a Python application client_id connection for authentication
gis = GIS('https://nps.maps.arcgis.com',client_id='VFfN107sG4W47jXo')

In [None]:
#Get List of AGOL Content by owner or Directly define the AGOL Item ID
#owner = 'ksherrill@nps.gov_nps'
##owner ='kzybko@nps.gov_nps'
#items = gis.content.search('owner:{0}'.format(owner))

In [None]:
#Pull the desired AGOL content via the AGOL ID
item=gis.content.get(AGOL_ID)

In [None]:
item

In [None]:
#Download the imported AGOL content
#result = item.export(item.title, downloadFormat, wait=True)
result = item.export(outZipName, downloadFormat, wait=True)
result.download(workspace)

zipFull = workspace + "\\" + outZipName + ".zip"

In [None]:
messageTime = timeFun()
scriptMsg = "Successfully Exported Item: " + AGOL_ID + " from AGOL to: " + zipFull + " - " + messageTime
print(scriptMsg)
logFile = open(logFileName, "a")
logFile.write(scriptMsg + "\n")
logFile.close()

In [None]:
#Unzip and export to the workspace
with ZipFile(zipFull, 'r') as zip:
    print ('Unzipping files')
    zip.extractall(path=workspace)

outFullTable = workspace + "\\" + outRawTable + "." + downloadFormat   
messageTime = timeFun()
scriptMsg = "Successfully Unzipped: " + outZipName + ".zip to: " + outFullTable
print(scriptMsg)
logFile = open(logFileName, "a")
logFile.write(scriptMsg + "\n")
logFile.close()

<h3>Begin Processing to Upload the AGOL export to Access Database</h3>

In [None]:
outFullTable = workspace + "\\" + outRawTable + "." + downloadFormat   
#Import excel or csv file to Pandas Dataframe
if downloadFormat.lower() == "csv":
    df = pd.read_csv(outFullTable)
else:
    df = pd.read_excel(outFullTable)

In [None]:
#Add 'DeviceSN' field
df.insert(9,"DeviceSN","")

In [None]:
# Redefining SN fields to Integer then to String
df['SmarTroll_SN'] = df['SmarTroll_SN'].fillna(0).astype(int)
df['SmarTroll_SN'] = df['SmarTroll_SN'].astype('str')
df['AquaTroll600_SN'] = df['AquaTroll600_SN'].fillna(0).astype(int)
df['AquaTroll600_SN'] = df['AquaTroll600_SN'].astype('str')
df['Aquatroll400_SN'] = df['Aquatroll400_SN'].fillna(0).astype(int)
df['Aquatroll400_SN'] = df['Aquatroll400_SN'].astype('str')
df['Oakton_SN'] = df['Oakton_SN'].fillna(0).astype(int)
df['Oakton_SN'] = df['Oakton_SN'].astype('str')

In [None]:
##Processing being done with function 'processDeviceSerialNumber'
# df['DeviceSN'] = np.where((df['Sonde'] == 'smartroll'), df['SmarTroll_SN'], df['DeviceSN'])
# df['DeviceSN'] = np.where((df['Sonde'] == 'aquatroll_600'), df['AquaTroll600_SN'], df['DeviceSN'])
# df['DeviceSN'] = np.where((df['Sonde'] == 'aquatroll_400'), df['Aquatroll400_SN'], df['DeviceSN'])
# # df['DeviceSN'] = np.where((df['Sonde'] == 'oakton_pc_450'), df['Oakton_SN'], df['DeviceSN'])
# df['DeviceSN'] = np.where((df['Sonde'] == 'other'), df['Other_Sonde_Type_Serial'], df['DeviceSN'])

<h3>Replace 'Other' Values in Crew and SiteName fields with mathcing 'Other' fields</h3><br>

In [None]:
#Function to work through the Defined Fields checking for 'other' values in the 'inField' if there is an 'other' value populate with the defiend ifOtherField.<br>
#Input Parameters:
#'inDf' - data frame being evaluated
#'inField' - field being checked if 'Other' value 
#'ifOtherField - field to populate 'inField' when 'inField' is 'other'

def replaceOtherField(inDf, inField, ifOtherField):
    try:
             
        inDf[inField] = np.where((inDf[inField] == 'other'), inDf[ifOtherField], df[inField])
        
        return "Success Function", inDf
        
    except:

        messageTime = tim
        eFun()
        print("Error on 'replaceOtherField' Function ")
        traceback.print_exc(file=sys.stdout)
        return "Failed function - 'replaceOtherField'"    
        

In [None]:
#Run function for 'Crew' field with 'Other' field
outVal = replaceOtherField(df,"Crew", "Specify other." )
if outVal[0].lower() != "success function":
    print ("WARNING - Function 'replaceOtherField' - Failed for 'Crew' field - Exiting Script")
else:

    print ("Success - Function 'replaceOtherField - Crew'")
    #Assign the df to the processed df
    df = outVal[1] 

In [None]:
#Run function for 'SiteName' field with 'Other' field
outVal = replaceOtherField(df,"SiteName", "Specify other..1" )
if outVal[0].lower() != "success function":
    print ("WARNING - Function 'replaceOtherField' - Failed for 'SiteName' field - Exiting Script")
else:

    print ("Success - Function 'replaceOtherField - SiteName'")
    #Assign the df to the processed df
    df = outVal[1] 

<h3>Process Device Series Number Fields</h3><br>
    

In [None]:
#Function Populate the 'DeviceSN' field<br>
#Input Parameters:
#'inDf' - data frame being evaluated

######Function Not Being Used - KRS 20220525

def processDeviceSerialNumber(inDf):
    try:
        
        inDf['DeviceSN'] = np.where((inDf['Sonde'] == 'smartroll'), inDf['SmarTroll_SN'], inDf['DeviceSN'])
        inDf['DeviceSN'] = np.where((inDf['Sonde'] == 'aquatroll_400'), inDf['Aquatroll400_SN'], inDf['DeviceSN'])
        inDf['DeviceSN'] = np.where((inDf['Sonde'] == 'aquatroll_600'), inDf['AquaTroll600_SN'], inDf['DeviceSN'])
        inDf['DeviceSN'] = np.where((inDf['Sonde'] == 'oakton_pc_450'), inDf['Oakton_SN'], inDf['DeviceSN'])
        inDf['DeviceSN'] = np.where((inDf['Sonde'] == 'other'), inDf['Other_Sonde_Type_Serial'], inDf['DeviceSN'])
        
        return "Success Function", inDf
        
    except:

        messageTime = timeFun()
        print("Error on 'replaceNullOtherField' Function ")
        traceback.print_exc(file=sys.stdout)
        return "Failed function - 'replaceNullOtherField'"    
        

In [None]:
#Run function for 'Crew' field with 'Other' field
outVal = processDeviceSerialNumber(df)
if outVal[0].lower() != "success function":
    print ("WARNING - Function 'processDeviceSerialNumber' - Failed - Exiting Script")
else:

    print ("Success - Function 'processDeviceSerialNumber'")
    #Assign the df to the processed df
    df = outVal[1] 

In [None]:
#Drop Fields Not in 'tbl_Form0_WQ_EC_Cal_Master'
df.drop(['ObjectID','DateSimple','Specify other.','Specify other..1','SmarTroll_SN','AquaTroll600_SN','Aquatroll400_SN','Oakton_SN','Other_Sonde_Type_Serial'], axis=1, inplace=True)


<h3>Connect with the 'WQDB' to push the new Error Check/Calibration records</h3><br>

In [None]:
#Connect to the Master Water Quality Table via sqlAlchemy-access connection
#connStr_pyodbc = (r"DRIVER={Microsoft Access Driver (*.mdb, *.accdb)};DBQ=" + waterQualityDB  +";")   #PYODBC Connection
connStr = (r"DRIVER={Microsoft Access Driver (*.mdb, *.accdb)};DBQ=" + WQDB  +";ExtendedAnsiSQL=1;")  #sqlAlchemy-access connection
#cnxn = pyodbc.connect(connStr)  #PYODBC Connection
cnxn = sa.engine.URL.create("access+pyodbc", query={"odbc_connect": connStr})
engine = sa.create_engine(cnxn)

In [None]:
#Define New Index field order and create new dateframe
newIndex = ['GlobalID','Protocol','SiteName','EventName_ECCal','Date','Type','Crew','Sonde','DeviceSN','Notes','Temp_Sensor_Check','Temp_EC1_C','Temp_Sonde1_C','Temp_EC2_C','Temp_sonde2_C','Temp_Mean_Difference','Temp_EC_CommentAction','pH_EC_PreCal_Temp','pH_EC_PreCal_Standard','pH_EC_PreCalibrated_Reading','pH_EC_PreCalibrated_dif_EC_Standard','pH_EC_PreCal_Allowable','pH_EC_PreCal_CommentAction','pH_Cal1_Temp','pH_Cal1_Standard','pH_Cal1_Reading','pH_Cal2_Temp','pH_Cal2_Standard','pH_Cal2_Reading','pH_Cal1_Cal2_CommentAction','pH_EC_PostCal_Temp','pH_EC_PostCal_Standard','pH_EC_PostCal_Reading','pH_EC_PostCal_dif_EC_Standard','pH_EC_PostCal_dif_EC_Standard_PercError','pH_EC_PostCal_Allowable','pH_EC_PostCal_CommentAction','SC_EC_PreCal_Temp','SC_EC_PreCal_Standard','SC_EC_PreCalibrated_Reading','SC_EC_PreCalibrated_dif_EC_Standard','SC_EC_PreCalibrated_dif_EC_PercError','SC_EC_PreCal_Allowable','SC_EC_PreCal_CommentAction','SC_Cal1_Temp','SC_Cal1_Standard','SC_Cal1_Reading','SC_Cal1_CommentAction','SC_EC_PostCal_Temp','SC_EC_PostCal_Standard','SC_EC_PostCal_Reading','SC_EC_PostCal_dif_EC_Standard','SC_EC_PostCal_dif_EC_Standard_PercError','SC_EC_PostCal_Allowable','SC_EC_PostCal_CommentAction','BP_mmHg','DO_EC_PreCal_Temp','DO_EC_PreCal_Standard','DO_EC_PreCal_Expected_mgl','DO_EC_PreCalibrated_Reading_mgl','DO_EC_PreCalibrated_Reading_perc','DO_EC_PreCalibrated_dif_EC_Standard','DO_EC_PreCal_dif_EC_Standard_PercError','DO_EC_PreCal_Allowable','DO_EC_PreCal_CommentAction','DO_1_or_2_pt_cal','DO_Cal1_Temp','DO_Cal1_Standard','DO_Cal1_Reading_mgl','DO_Cal1_Reading_perc','DO_Cal2_Temp','DO_Cal2_Standard','DO_Cal2_Reading_mgl','DO_Cal2_Reading_perc','DO_Cal1_Cal2_CommentAction','DO_EC_PostCal_Temp','DO_EC_PostCal_Standard','DO_EC_PostCal_Expected_mgl','DO_EC_PostCal_Reading_mgl','DO_EC_PostCal_Reading_perc','DO_EC_PostCal_dif_EC_Standard','DO_EC_PostCal_dif_EC_Standard_PercError','DO_EC_PostCal_Allowable','DO_EC_PostCal_CommentAction','Turb_EC_PreCal_Temp','Turb_EC_PreCal_Standard','Turb_EC_PreCalibrated_Reading','Turb_EC_PreCalibrated_dif_EC_Standard','Turb_EC_PreCalibrated_dif_EC_Standard_PercError','Turb_EC_PreCal_Allowable','Turb_EC_PreCal_CommentAction','Turb_Cal1_Temp','Turb_Cal1_Standard','Turb_Cal1_Reading','Turb_Cal2_Temp','Turb_Cal2_Standard','Turb_Cal2_Reading','Turb_Cal1_Cal2_CommentAction','Turb_EC_PostCal_Temp','Turb_EC_PostCal_Standard','Turb_EC_PostCal_Reading','Turb_EC_PostCal_dif_EC_Standard','Turb_EC_PostCal_dif_EC_Standard_PercError','Turb_EC_PostCal_Allowable','Turb_EC_PostCal_CommentAction','CreationDate','Creator','EditDate','Editor','x','y']

#Apply new field index order to new dataframe
df2 = df[newIndex]
#df.reindex(newIndex)


In [None]:
#Pull from 'WQDBTable' table to get a count of existing records to be used to define the index
query = "SELECT " + WQDBTable + ".* FROM " + WQDBTable +";"
query_Df = pd.read_sql(query, con=engine)

In [None]:
#Define number of rows
x=query_Df.shape
lenRows = x[0]

In [None]:
#Find the current maximum 'Form0_ID' value
column = query_Df['Form0_ID']
max_IdValue = column.max()

In [None]:
#Add 'Form0_ID' field starting with incremental index starting at plus 1 the number of existing records in 'tbl_Form0_WQ_EC_Cal_Master'
df2.insert(0,"Form0_ID",range(max_IdValue+1, max_IdValue + len(df2)+1))

In [None]:
#Drop the dataframe index column and reassign to the 'Form0_ID' - necessary so you don't have an extra column
df2.set_index('Form0_ID', inplace = True)

In [None]:
#Create iteration range for records to be appended
shapeDf =df2.shape
lenRows = shapeDf[0]
rowRange = range(0, lenRows)

In [None]:
#Append New records one at a time due to length constraints
try:
    for row in rowRange:

        df3 = df2.iloc[row:row+1]
        #Push to Series to get ID value
        globalIDSeries = df3.iloc[0]
        globalID = globalIDSeries.get('GlobalID') 


        #Append perfectly matching dataframe to WQDTable via pandas df.t.sql
        x = df3.to_sql(WQDBTable, con=engine, if_exists='append')
        #print(x)
        messageTime = timeFun()        
        scriptMsg = "Successfully Appended 'GlobalID' - " + globalID + " - to DB: " + WQDB + " - " + messageTime
        print(scriptMsg)
        logFile = open(logFileName, "a")
        logFile.write(scriptMsg + "\n")
        logFile.close()

        del(df3)
 
    #Successfully Processed
    messageTime = timeFun()        
    scriptMsg = "Successfully Processed Files in EC/Cal Feature Layer on AGOL with ID value:" + AGOL_ID + " - to DB: " + WQDB + " - " + messageTime
    print(scriptMsg)
    logFile = open(logFileName, "a")
    logFile.write(scriptMsg + "\n")
    logFile.close()
   


except:

    messageTime = timeFun()
    print("Error processing ErrorCheck_Cal_ETL_Survey123_AGOL.ipynb - " + messageTime)
    traceback.print_exc(file=sys.stdout)
        

In [None]:
del (engine)

In [None]:
del (cnxn)

In [None]:
del (x)

<h3>Code Below Not Being Used</h3><br>

In [None]:
#Function to download AGOL data by user
def downloadUserItems(owner, downloadFormat):
    try:
        # Search items by username
        items = gis.content.search('owner:{0}'.format(owner))
        print(items)
        # Loop through each item and if equal to Feature service then download it
        for item in items:
            #if item.type == 'Feature Service':
            if item.type == 'Feature Layer Collection':
                result = item.export('sample {}'.format(item.title), downloadFormat)
                result.download(workspace)
                # Delete the item after it downloads to save on space
                result.delete()
            if item.type == 'Feature Service':
                result = item.export('sample {}'.format(item.title), downloadFormat)
                result.download(workspace)
                # Delete the item after it downloads to save on space
                result.delete()
    except Exception as e:
        print("Failed To Export File" + str(e))

In [None]:

#Function to work through the Defined Fields checking for Null values if Null populate with the defiend ifNullField.<br>  -Not Being Used
#Input Parameters:
#'inDf' - data frame being evaluated
#'inField' - field being checked if Null 
#'ifNullField - field to populate 'inField' when 'inField' if null

######Function Not Being Used - KRS 20220525

def replaceNullField(inDf, inField, ifNullField):
    try:
        
        #df['Crew'] = df.apply(lambda row: row['Specify other.'] if pd.isnull(row['Crew']) else row['Crew'], axis=1)
        inDf[inField] = df.apply(lambda row: row[ifNullField] if pd.isnull(row[inField]) else row[inField], axis=1)
        
        return "Success Function", inDf
        
    except:

        messageTime = timeFun()
        print("Error on 'replaceNullOtherField' Function ")
        traceback.print_exc(file=sys.stdout)
        return "Failed function - 'replaceNullOtherField'"    
        

In [None]:
# #Run function for 'SiteName' and 'Specify other..1' field  - Not Being Used
# outVal = replaceNullOtherField(df,"SiteName", "Specify other..1" )
# if outVal[0].lower() != "success function":
#     print ("WARNING - Function 'replaceNullOtherField' - Failed for 'SiteName' field - Exiting Script")
# else:

#     print ("Success - Function 'replaceNullOtherField'")
#     #Assign the df to the processed df
#     df = outVal[1]    