# Data Load for Corteva Code Challenge
## This code section is in response to problem #2 - Ingestion
The ingestion of the data is being completed using Python 3.9 from a Jupyter Notebook.  This solution was chosen because it allows for documentation along side the code.  The code was created in the notebook using Microsoft Visual Studio Code.  This provided a linter and code formatting utilizing the Jupyter extension.

Data is loaded from the flat files to the stage tables of the database.  Once data is processed in the final tables the stage data is deleted.  For a more robust solution an arvhive set of tables might might since.

Duplicates are handled by leveraging MySQL unique key constraints and the ON DUPLICATE KEY clause that allows for a similar result to the MERGE used by other RDBMS frameworks.  

Logging has been added using the Phython logging module and places the results into a log directory. See set requirements for details on setting up the directories to support this code.

## Import Section
This section contains the imports used for the rest of the code base.  

In [12]:
import mysql.connector
import os
import pandas as pd
import configparser
import logging

## Configuration Section
This secion setups the configuration for the database, logging, and configuration file.

The username and password is retreive from the machines environment variables.  This is just one way to secure the private details.

Next the configuration file is defined and the log directory read in.

The database connection is defined using a combination of environment and configuration settings.

Finally all the SQL statements are included in this section.  These are placed here so they are easily available for review, additions, or modifications.

In [13]:
db_user = os.getenv('DBUSER')
db_pass = os.getenv('DBPASS')

config = configparser.ConfigParser()

config.read('corteva.ini')

log_directory = config['DEFAULT']['log_directory'].strip("'")
logging.basicConfig(level=logging.DEBUG, filename=log_directory+'corteva.log', filemode='w', format='%(asctime)s - %(levelname)s - %(message)s', datefmt='%d-%b-%y %H:%M:%S')

 
mydb = mysql.connector.connect(
 host=config['DATABASE']['host'],
 user=db_user,
 password=db_pass,
 database=config['DATABASE']['database'])

mycursor = mydb.cursor()

sql1 = ("INSERT INTO CCC_STG_YIELD_DATA (YIELD_FILE_NAME, YEAR_OF_YIELD, YIELD_AMOUNT )"
        "VALUES (%s, %s, %s)")
colName1 = ['year', 'amount']
dataType1 ={'year': str,'amount': int}

sql2 = ("INSERT INTO CCC_STG_WEATHER_STATION_DATA (FILE_NAME, DATE_OF_WEATHER, MAX_DAILY_TEMP,"
        " MIN_DAILY_TEMP, DAILY_PRECIPITATION) "
        "VALUES (%s, %s, %s, %s, %s)")
colName2 = ['date', 'max_temp', 'min_temp','precip']
dataType2 ={'date': str,'max_temp': int, 'min_temp': int, 'precip': int}

sqlDimYear = ("INSERT INTO CCC_DIM_YEAR(YEAR_KEY, THE_YEAR, YYYY) "
              "(SELECT DISTINCT NULL, YEAR_OF_YIELD, YEAR_OF_YIELD FROM CCC_STG_YIELD_DATA yd) "
              "ON DUPLICATE KEY UPDATE YEAR_KEY=YEAR_KEY")

sqlFactYld = ("INSERT INTO CCC_FACT_CROP_YIELD(YEAR_KEY, CROP_YIELD_AMOUNT) "
              "(SELECT yd.YEAR_KEY, sy.YIELD_AMOUNT "
              "FROM CCC_DIM_YEAR yd "
              "JOIN CCC_STG_YIELD_DATA sy ON sy.YEAR_OF_YIELD = yd.THE_YEAR) "
              "ON DUPLICATE KEY UPDATE CROP_YIELD_AMOUNT = sy.YIELD_AMOUNT")

sqlStgYldDel = ("DELETE FROM CCC_STG_YIELD_DATA")

sqlDimDate = ("INSERT INTO CCC_DIM_DATE(DATE_KEY, THE_DATE, DAY_OF_YEAR, WEEK_OF_YEAR, THE_MONTH, MONTH_NAME, "
              "THE_QUARTER, QUARTER_NAME, THE_YEAR, DATE_SORT) "
              "(SELECT DISTINCT NULL, STR_TO_DATE(DATE_OF_WEATHER, '%Y%m%d'), DAYOFYEAR(STR_TO_DATE(DATE_OF_WEATHER, '%Y%m%d')),"
              "WEEKOFYEAR(STR_TO_DATE(DATE_OF_WEATHER, '%Y%m%d')), MONTH(STR_TO_DATE(DATE_OF_WEATHER, '%Y%m%d')), "
              "MONTHNAME(STR_TO_DATE(DATE_OF_WEATHER, '%Y%m%d')), QUARTER(STR_TO_DATE(DATE_OF_WEATHER, '%Y%m%d')), "
              "CONCAT('Q', QUARTER(STR_TO_DATE(DATE_OF_WEATHER, '%Y%m%d'))), YEAR(STR_TO_DATE(DATE_OF_WEATHER, '%Y%m%d')), "
              "DATE_OF_WEATHER FROM CCC_STG_WEATHER_STATION_DATA) ON DUPLICATE KEY UPDATE DATE_KEY=DATE_KEY")

sqlDimWeatherStation = ("INSERT INTO CCC_DIM_WEATHER_STATION(WEATHER_STATION_KEY, WEATHER_STATION_CODE, WEATHER_STATION_NAME) "
                        "(SELECT DISTINCT NULL, SUBSTR(FILE_NAME, 1,  LOCATE('.txt', FILE_NAME,1)-1),SUBSTR(FILE_NAME, 1,  LOCATE('.txt', FILE_NAME,1)-1)"
                        "FROM CCC_STG_WEATHER_STATION_DATA) ON DUPLICATE KEY UPDATE WEATHER_STATION_KEY=WEATHER_STATION_KEY")

sqlFactWeather = ("INSERT INTO CCC_FACT_WEATHER(DATE_KEY, WEATHER_STATION_KEY, MAX_DAILY_TEMP, MIN_DAILY_TEMP, DAILY_PRECIPITATION) "
                  "(SELECT dd.DATE_KEY, wd.WEATHER_STATION_KEY, wsd.MAX_DAILY_TEMP, wsd.MIN_DAILY_TEMP, wsd.DAILY_PRECIPITATION "
                  "FROM CCC_STG_WEATHER_STATION_DATA wsd "
                  "JOIN CCC_DIM_DATE dd ON dd.THE_DATE = STR_TO_DATE(DATE_OF_WEATHER, '%Y%m%d') "
                  "JOIN CCC_DIM_WEATHER_STATION wd ON wd.WEATHER_STATION_CODE = SUBSTR(FILE_NAME, 1,  LOCATE('.txt', FILE_NAME,1)-1)) "
                  "ON DUPLICATE KEY UPDATE MAX_DAILY_TEMP = wsd.MAX_DAILY_TEMP, MIN_DAILY_TEMP = wsd.MIN_DAILY_TEMP, "
                  "DAILY_PRECIPITATION = wsd.DAILY_PRECIPITATION")

sqlStgWeatherDel = ("DELETE FROM CCC_STG_WEATHER_STATION_DATA")


## Functions
This sections contains the functions used to load data into the database tables.  They are called by the next section of code in the appropriate order.

In [14]:
def loadYieldFileToDF(yd, yf):
    logging.debug('loadYieldFileToDF start')
    yf_df = pd.read_csv(yd+yf,sep='\t', names=colName1, dtype=dataType1)
    logging.debug('loadYieldFileToDF end')
    return yf_df

def loadYieldFileToStage(yd, yf):
    cnt = 0
    logging.debug('loadYieldFileToStage start')
    yld_df = loadYieldFileToDF(yd, yf)
    for index, row in yld_df.iterrows():
        #print(row['amount'], row['year'])
        val1 = (yf
           , row['year']
           , row['amount'])
        mycursor.execute(sql1, val1)
        cnt = cnt + mycursor.rowcount
        mydb.commit()
    logging.debug('loadYieldFileToStage data frame size:'+ str(yld_df.size))
    logging.debug('loadYieldFileToStage rows inserted to table:'+ str(cnt))
    logging.debug('loadYieldFileToStage start')

    return

def loadDimYearFromYield():
    logging.debug('loadDimYearFromYield start')
    mycursor.execute(sqlDimYear)
    logging.debug('loadDimYearFromYield rows: '+ str(mycursor.rowcount))
    mydb.commit()
    logging.debug('loadDimYearFromYield start')
    return

def loadFactYield():
    logging.debug('loadFactYield start')
    mycursor.execute(sqlFactYld)
    mydb.commit()
    logging.debug('loadFactYield rows: '+ str(mycursor.rowcount))

    mycursor.execute(sqlStgYldDel)
    mydb.commit()
    logging.debug('loadFactYield end')
    
    return    

def loadWeatherFileToDF(wd, wf):
    logging.debug('loadWeatherFileToDF start')
    wx_df = pd.read_csv(wd+wf,sep='\t', names=colName2, dtype=dataType2)
    logging.debug('loadWeatherFileToDF end')
    return wx_df

def loadWeatherFileToStage(wd, wf):
    cnt = 0
    logging.debug('loadWeatherFileToStage start')
    wx_df = loadWeatherFileToDF(wd, wf)
    for index, row in wx_df.iterrows():
        val2 = (wf
            , row['date']
            , row['max_temp']
            , row['min_temp']
            , row['precip'])
    
        mycursor.execute(sql2, val2)
        cnt = cnt + mycursor.rowcount
        mydb.commit()
    logging.debug('loadWeatherFileToStage rows: '+ str(cnt))
    logging.debug('loadWeatherFileToStage end')
    return

def loadDimDateFromWeather():
    logging.debug('loadDimDateFromWeather start')
    mycursor.execute(sqlDimDate)
    logging.debug('loadDimDateFromWeather rows:' + str(mycursor.rowcount))
    mydb.commit()
    logging.debug('loadDimDateFromWeather end')
    return

def loadDimWeatherStation():
    logging.debug('loadDimWeatherStation start')
    mycursor.execute(sqlDimWeatherStation)
    logging.debug('loadDimWeatherStation rows: '+ str(mycursor.rowcount))
    mydb.commit()
    logging.debug('loadDimWeatherStation end')
    return

def loadFactWeather():
    logging.debug('loadFactWeather start')
    mycursor.execute(sqlFactWeather)
    logging.debug('loadFactWeather rows: '+ str(mycursor.rowcount))
    mydb.commit()

    mycursor.execute(sqlStgWeatherDel)
    mydb.commit()
    logging.debug('loadFactWeather end')
    
    return

## Main section
This is the main section of the code.  Here the folders where the files are input to (as defined in the configuration file) are looped through looking for appropriate named files.  In code is setup to look in specific foilders for a specific type of file.  File naming could also have been used, but for this challenge the folder was sufficient.

### The flow is as follows:
    1) load yield data file(s) into the stage table
    2) Validate the the year dimension table contains all the years in the stage table.  Add any that are missing.
    3) Load the yield fact table.  Utilize the logic and constriants to handle duplicates.  Update the detail data if the values change.

    4) Load the weather station data into the stage table
    5) Validate the date dimension table contains all the dates in the stage table.  Add any that are missing.
    6) Validate the weather station dimension table contains all the weather stations based on the file names provided.  Add any that are missing.
    7) Load the weather fact table.  Utilize the logic and constriants to handle duplicates.  Update the detail data if the values change.

In [15]:

logging.debug('Data Ingestion Main start')

yield_directory = config['DEFAULT']['yield_directory'].strip("'")

archive_directory = config['DEFAULT']['archive_directory'].strip("'")

ylddir = os.fsencode(yield_directory)
    
for file in os.listdir(ylddir):
     filename = os.fsdecode(file)
     if filename.endswith(".txt"): #or filename.endswith(".csv"): 
         #Load file to yield tables
         loadYieldFileToStage(yield_directory, filename)
         os.rename(yield_directory+filename, archive_directory+filename)
         continue
     else:
         #Add comment to log file
         continue

loadDimYearFromYield()
loadFactYield()

weather_directory = config['DEFAULT']['weather_directory'].strip("'")
wtrdir = os.fsencode(weather_directory)
    
for file in os.listdir(wtrdir):
     filename = os.fsdecode(file)
     if filename.endswith(".txt") or filename.endswith(".csv"): 
         #Load file to weather tables
         loadWeatherFileToStage(weather_directory, filename)
         os.rename(weather_directory+filename, archive_directory+filename)
         continue
     else:
         #Add comment to log file
         continue

loadDimDateFromWeather()
loadDimWeatherStation()
loadFactWeather()

logging.debug('Data Ingestion Main end')