## Library Implementation 

In [1]:
import certifi
import json
import pandas as pd
import urllib3
import sqlalchemy as sq
import logging

## Logger Setup

In [2]:
logger = logging.getLogger("ETL Pipeline")
logging.basicConfig(filename='ETL.log',format='%(asctime)s:%(levelname)s:%(message)s', encoding='utf-8', level=logging.DEBUG)

## Extract crop CSV 

In [9]:
filename = 'fao_data_crops_data.csv'

def extract_from_csv(filename, logger):
    logger.info(f"Starting extraction from {filename}")
    #read CSV
    try:
        agri_csv = pd.read_csv(filename)
        #view head and tale 
        display(agri_csv.head(10))
        display(agri_csv.tail(10))
        logger.info(f"{len(agri_csv)} records extracted from {filename}")
        return agri_csv
    except Exception as exp:
        logger.error(f"{exp} occurred while extracting {filename}")

agri_csv = extract_from_csv(filename, logger)

Unnamed: 0,country_or_area,element_code,element,year,unit,value,value_footnotes,category
0,Americas +,31,Area Harvested,2007.0,Ha,49404.0,A,agave_fibres_nes
1,Americas +,31,Area Harvested,2006.0,Ha,49404.0,A,agave_fibres_nes
2,Americas +,31,Area Harvested,2005.0,Ha,49404.0,A,agave_fibres_nes
3,Americas +,31,Area Harvested,2004.0,Ha,49113.0,A,agave_fibres_nes
4,Americas +,31,Area Harvested,2003.0,Ha,48559.0,A,agave_fibres_nes
5,Americas +,31,Area Harvested,2002.0,Ha,48506.0,A,agave_fibres_nes
6,Americas +,31,Area Harvested,2001.0,Ha,47767.0,A,agave_fibres_nes
7,Americas +,31,Area Harvested,2000.0,Ha,48747.0,A,agave_fibres_nes
8,Americas +,31,Area Harvested,1999.0,Ha,46978.0,A,agave_fibres_nes
9,Americas +,31,Area Harvested,1998.0,Ha,48571.0,A,agave_fibres_nes


Unnamed: 0,country_or_area,element_code,element,year,unit,value,value_footnotes,category
2255339,World +,51,Production Quantity,1964.0,tonnes,143203.0,A,yautia_cocoyam
2255340,World +,51,Production Quantity,1963.0,tonnes,142094.0,A,yautia_cocoyam
2255341,World +,51,Production Quantity,1962.0,tonnes,123840.0,A,yautia_cocoyam
2255342,World +,51,Production Quantity,1961.0,tonnes,117284.0,A,yautia_cocoyam
2255343,fnSeqID,Footnote,,,,,,yautia_cocoyam
2255344,Fc,Calculated Data,,,,,,yautia_cocoyam
2255345,A,"May include official, semi-official or estimat...",,,,,,yautia_cocoyam
2255346,NR,Not reported by country,,,,,,yautia_cocoyam
2255347,F,FAO Estimate,,,,,,yautia_cocoyam
2255348,*,Unofficial figure,,,,,,yautia_cocoyam


## Extract Cattle Data from API

In [4]:
"""
Read API key, endpoint, define parameters, and create http pool manager and read api response
"""
#api key and endpoint url
api_key = 'D9A13880-58EA-3A24-8CC2-A08A2C9A0E34'
url = 'https://quickstats.nass.usda.gov/api/api_GET'

#api paramters to retrieve cattle data 
parameters = {'key': api_key,
    'source_desc': 'SURVEY',  
    'agg_level_desc': 'NATIONAL',  
    'commodity_desc': 'CATTLE',
    'unit_desc': 'HEAD',
    'freq_desc': 'ANNUAL',
    'statisticcat':'COWS, INCL CALVES - INVENTORY'
}

def extract_from_http(url, parameters, logger):
    logger.info(f"Starting extraction from {url}")
    agri_data_api = pd.DataFrame()
    with urllib3.PoolManager(cert_reqs='CERT_REQUIRED', ca_certs=certifi.where()) as http:
        response = http.request('GET', url, fields=parameters)
        display(response.status)
        """
        If response status is successful: notify the user. Retrieve the data from the response and place into a readable format and flatten the data into a Pandas Dataframe
        """
        if response.status == 200:
            read_response = json.loads(response.data.decode('utf-8'))
            agri_data_api = pd.json_normalize(read_response['data'])
            logger.info(f"{len(agri_data_api)} records extracted from {url}")
        
        else:
            logger.error(f"Error code {response.status}") 
        
    display(agri_data_api)
    return agri_data_api
    
agri_data_api = extract_from_http(url, parameters, logger)

200

Unnamed: 0,county_ansi,zip_5,asd_desc,state_fips_code,agg_level_desc,location_desc,Value,statisticcat_desc,state_alpha,year,...,sector_desc,county_code,util_practice_desc,county_name,end_code,domaincat_desc,class_desc,CV (%),freq_desc,short_desc
0,,,,99,NATIONAL,US TOTAL,46915200,SALES,US,2022,...,ANIMALS & PRODUCTS,,(EXCL INTER-FARM IN-STATE),,00,NOT SPECIFIED,(EXCL CALVES),,ANNUAL,"CATTLE, (EXCL CALVES), (EXCL INTER-FARM IN-STA..."
1,,,,99,NATIONAL,US TOTAL,46760900,SALES,US,2021,...,ANIMALS & PRODUCTS,,(EXCL INTER-FARM IN-STATE),,00,NOT SPECIFIED,(EXCL CALVES),,ANNUAL,"CATTLE, (EXCL CALVES), (EXCL INTER-FARM IN-STA..."
2,,,,99,NATIONAL,US TOTAL,45058000,SALES,US,2020,...,ANIMALS & PRODUCTS,,(EXCL INTER-FARM IN-STATE),,00,NOT SPECIFIED,(EXCL CALVES),,ANNUAL,"CATTLE, (EXCL CALVES), (EXCL INTER-FARM IN-STA..."
3,,,,99,NATIONAL,US TOTAL,45013100,SALES,US,2019,...,ANIMALS & PRODUCTS,,(EXCL INTER-FARM IN-STATE),,00,NOT SPECIFIED,(EXCL CALVES),,ANNUAL,"CATTLE, (EXCL CALVES), (EXCL INTER-FARM IN-STA..."
4,,,,99,NATIONAL,US TOTAL,45031300,SALES,US,2018,...,ANIMALS & PRODUCTS,,(EXCL INTER-FARM IN-STATE),,00,NOT SPECIFIED,(EXCL CALVES),,ANNUAL,"CATTLE, (EXCL CALVES), (EXCL INTER-FARM IN-STA..."
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
2071,,,,99,NATIONAL,US TOTAL,4482000,SLAUGHTERED,US,1927,...,ANIMALS & PRODUCTS,,"SLAUGHTER, COMMERCIAL, FI",,00,NOT SPECIFIED,STEERS,,ANNUAL,"CATTLE, STEERS, SLAUGHTER, COMMERCIAL, FI - SL..."
2072,,,,99,NATIONAL,US TOTAL,4769000,SLAUGHTERED,US,1926,...,ANIMALS & PRODUCTS,,"SLAUGHTER, COMMERCIAL, FI",,00,NOT SPECIFIED,STEERS,,ANNUAL,"CATTLE, STEERS, SLAUGHTER, COMMERCIAL, FI - SL..."
2073,,,,99,NATIONAL,US TOTAL,4466000,SLAUGHTERED,US,1925,...,ANIMALS & PRODUCTS,,"SLAUGHTER, COMMERCIAL, FI",,00,NOT SPECIFIED,STEERS,,ANNUAL,"CATTLE, STEERS, SLAUGHTER, COMMERCIAL, FI - SL..."
2074,,,,99,NATIONAL,US TOTAL,4456000,SLAUGHTERED,US,1924,...,ANIMALS & PRODUCTS,,"SLAUGHTER, COMMERCIAL, FI",,00,NOT SPECIFIED,STEERS,,ANNUAL,"CATTLE, STEERS, SLAUGHTER, COMMERCIAL, FI - SL..."


### Transforming Data 

In [5]:
def comma_separated_to_number(value):
    delim = ','
    temp = value.split(delim)
    output = ''
    for each in temp:
        output = output + each
    return output

In [10]:
def transform_csv_data(csv_data, logger): 
    
    ## 1: 
    ## Drop NaN Values
    logger.info("Starting csv transformation")
    try:
        csv_data.dropna(axis = "index", thresh = 4, inplace = True)
    except Exception as e:
        logger.error(f"{e} while dropping na values")
        
    try:
          
        ## 2: 
        ## Transform dataset so it only includes production quantity values.
        ## Filter for Column: element, Value: "Production Quantity"
        
        
        ## 3: 
        ## Transform dataset so it only includes data from the United States. 
        ## Filter for Column: country_or_area, Value: 'United States of America'
 
    
        ## 4: 
        ## Reduce the dataset so it only includes country name, year of data collection, value, 
        ## value unit, and type of product
        ## Filter to include only the following columns: country_or_area, year, unit, value, category

        
        ## 5:
        ## Rename these columns to the following: "Country_Name", "Year", "Unit", "Value", "Product"
        ## Map the data from the remaining columns with the column names above. 
        
        
        ## 6: 
        ## Ensure the datatypes of the columns are correct
        ## "Year" and "Value" columns should be an int data type and the remaining columns should be string datatype

        logger.info("Csv data transformation successful")
    except Exception as e:
        logger.error(f"{e} occurred while transforming data")
        
    ## 7: 
    ## Return data
    return csv_data

In [11]:

def transform_api_data(api_data, logger):
    logger.info("Starting api data transformation")
    try:
        api_data.dropna(axis = "index", thresh = 4, inplace = True)
    except Exception as e:
        logger.error(f"{e} while dropping na values")
    try:
        api_data = api_data[api_data['statisticcat_desc'] == 'INVENTORY, AVG']
        api_data = api_data[['country_name', 'Value','unit_desc', 'year', 'commodity_desc']]
        api_data.loc[api_data['country_name'] == 'UNITED STATES', 'country_name'] = 'United States of America'
        api_data['Value'] = api_data['Value'].apply(comma_separated_to_number)
    
        api_data_rename = {'country_name': 'Country_Name', 'year': 'Year', 'unit_desc':'Unit', 'commodity_desc': 'Product' }
        api_data.rename(columns = api_data_rename, inplace = True)
        
        convert_to = { 'Year': 'int', 'Value': 'int', 'Country_Name': 'string', 'Product': 'string', 'Unit': 'string'}
        api_data = api_data.astype(convert_to)
        logger.info("Api data transformation successful")
    except Exception as e:
        logger.error(f"{e} occurred while transforming data")
    return api_data


### Running the Workflow 

In [12]:
csv_data_transformed = transform_csv_data(agri_csv, logger)
csv_data_transformed.head()

Unnamed: 0,Country_Name,Year,Unit,Value,Product
10406,United States of America,2007,tonnes,1043266,almonds_with_shell
10407,United States of America,2006,tonnes,846131,almonds_with_shell
10408,United States of America,2005,tonnes,703431,almonds_with_shell
10409,United States of America,2004,tonnes,785985,almonds_with_shell
10410,United States of America,2003,tonnes,786262,almonds_with_shell


In [13]:
api_data_transformed = transform_api_data(agri_data_api, logger)
api_data_transformed.head()

Unnamed: 0,Country_Name,Value,Unit,Year,Product
1202,United States of America,9386000,HEAD,2023,CATTLE
1203,United States of America,9400000,HEAD,2022,CATTLE
1204,United States of America,9449000,HEAD,2021,CATTLE
1205,United States of America,9396000,HEAD,2020,CATTLE
1206,United States of America,9335000,HEAD,2019,CATTLE


In [None]:
def get_connection_string():
    import os
    user_name = # Replace with your username
    password = # Replace with your password
    return f'mysql+mysqlconnector://{user_name}:{password}@datasciencedb.ucalgary.ca/{user_name}'

# Now take a look at data and add a data type for each of the columns.
def create_agriculture_table(logger):
    logger.info('Creating agriculture table')
    create_table_query = '''
        CREATE TABLE IF NOT EXISTS agriculture (
            Country_Name ------,
            Year ------,
            Unit ------,
            Value ------,
            Product ------,
        );
    '''
    engine = sq.create_engine(get_connection_string())
    with engine.connect() as con:
        try:
            con.execute(sq.text(create_table_query))
            logger.info(f"Table created successfully")
        except Exception as e:
            logger.error(f"{e} occurred while creating agriculture table")
    engine.dispose()

create_agriculture_table(logger)

def load_dataframe(dataframe, logger):
    logger.info(f"Starting loading")
    try:
        engine =  sq.create_engine(get_connection_string())
        dataframe.to_sql('agriculture', con=engine, if_exists='append', index=False)
        logger.info(f"Data loaded successfully")
    except Exception as e:
        logger.error(f"{e} occurred while loading data")    
    
    data = pd.DataFrame()
    try:
        data = pd.read_sql_table("agriculture", engine)
    except Exception as e:
        logger.error(f"{e} occurred while reading data from database")
    engine.dispose()
    return data

In [16]:

df_agriculture = load_dataframe(csv_data_transformed ,logger)
df_agriculture.head()

Unnamed: 0,Country_Name,Year,Unit,Value,Product
0,United States of America,1989,HEAD,10046000,CATTLE
1,United States of America,1988,HEAD,10224000,CATTLE
2,United States of America,1987,HEAD,10327000,CATTLE
3,United States of America,1986,HEAD,10773000,CATTLE
4,United States of America,1985,HEAD,10981000,CATTLE


In [15]:

df_agriculture = load_dataframe(api_data_transformed ,logger)
df_agriculture.head()

Unnamed: 0,Country_Name,Year,Unit,Value,Product
0,United States of America,1989,HEAD,10046000,CATTLE
1,United States of America,1988,HEAD,10224000,CATTLE
2,United States of America,1987,HEAD,10327000,CATTLE
3,United States of America,1986,HEAD,10773000,CATTLE
4,United States of America,1985,HEAD,10981000,CATTLE
