In [1]:
import os
import pandas as pd 
import logging
from datetime import date

### Setup

In [36]:
logging.basicConfig(format='%(asctime)s - %(message)s', datefmt='%d-%b-%y %H:%M:%S', level=logging.INFO)

In [38]:
pd.set_option('display.max_columns', None)

In [39]:
for p in ['raw', 'cleaned', 'production', 'helpers']:
    if not os.path.exists(p):
       os.makedirs(p)

In [1]:
import hashlib
import logging
import pandas as pd
import glob
import emoji


logging.basicConfig(format='%(asctime)s - %(message)s', datefmt='%d-%b-%y %H:%M:%S', level=logging.INFO)

REQ_COLS = [
    'year', 
    'state', 
    'commodity_name', 
    'acres_planted',
    'acres_harvested',
    'tons_harvested',
    'us_dollars_harvested',
    'percent_maturity'
]

ISSUES = []

def open_file(file_path):
    try: 
        df = pd.read_csv(file_path, index_col=0)
        logging.info(f"reading data...")
        yield df
    except pd.errors.ParserError as e:
        logging.logging.critical(f"file at {file_path} not formatted as .csv")
        raise e
    

def make_hash_id(df, *column):
    """
    Create unique identifier for each row of data to avoid dups in database.
    eg: make_hash_id(df, "permalink", "type", "uuid").
    
    """
    df['uid'] = df[list(column)].apply(lambda row: '_'.join(row.values.astype(str)), axis=1)
    df['uid'] = df['uid'].str.encode('utf-8').apply(lambda x: (hashlib.sha512(x).hexdigest().upper()))
    logging.info(f'uid hashes created for rows in dataset')
    return df

def remove_dups(df):
    """
    Drop column is hashed uid is duplicate. Must be run after "make_hash_id"
    """
    logging.info("Checking for and removing duplicates rows..")
    return df.drop_duplicates(subset='uid', keep="first")



def df_lowerize(df):
    logging.info("Converting strings to lower..")
    return df.applymap(lambda s: s.lower() if type(s) == str else s)


def create_schema(df):
    schema = df.dtypes.map(lambda x: x.name).to_dict()
    return schema


def validate_farm_data_schema(df, schema, required=REQ_COLS):
    df_schema = create_schema(df)
    
    if not schema == df_schema:
        #log validating schema pass
        return None
    else:
        return df
    


# def validate_schema(df, schema):
#     schema =  FARM_DATA_SCHEMA
    
#     if not schema:
#         logging.critical(f"Cannot find schema file used for validation")
#         return 

    
#     missing = []
    
# #     df_col_ls = df.columns.to_list()
    
# #     for required_c in required_col_ls:
# #         if required_c not in df_col_ls:
# #             missing.append(required_c)
    
#     if len(missing) > 0:
#         logging.critical(f"Cannot process file <id> missing required cols: {missing}")
#         ISSUES.append("Missing required fields, cannot perform calculations.")

#         return missing
    
#     return None

def drop_nan_rows_in_required_cols(df):
    logging.info("Dropping rows with nan/null in required cols.")
    return df.dropna(axis=0, subset=REQ_COLS)

def find_nan_columns(df):
    
    if len(nan_cols)>0:
        logging.info(f"Found null/nan values in columns: {nan_cols}")
        try: 
            logging.info("Processing nan columns...")
            drop_rows_nan_in_required_col(df)
            fillna_num_cols(df)

            logging.info("Processed all nan/null columns...")
        except: 
            logging.info(f"Found null/nan values in columns: {nan_cols}")
            ISSUES.append("Unprocessed nan/null values")
            
        return
            
    else:
        logging.info("No nan/null columns found. Process complete.")
        return 


def path_files_to_ls(path):
    file_ls = []
 
    for (root, dirs, file) in os.walk(path):
        for f in file:
            if '.csv' in f:
                file_ls.append(f)
    
    return file_ls
    
    
def run_cleaning_pipeline(files: list, pipeline: str):
    """
    We don't want to exit the program if a file cannot be processed. 
    Instead, we log the error, move the file to the "errors" folder and continue.
    """
    f_ct = len(files)
    f_fail = []
    
    for f in files:
        try:
            f.apply(pipeline)
        except: 
            f_fail.append(f)
            logging.critical(f'Could not process file: {f} moving to dir: "errors/"')
            #remove file from path and relocate
            #save ISSUES to text files in some location
    
    fail_ct = len(f_fail)
    
    if fail_ct > 0:
        logging.critical(f'{emoji.emojize(":zipper-mouth_face:")}: {fail_ct} of {f_ct} files could not be processed. Review new files at "errors/" and try again.')
    
    logging.info(f'All files processed! {emoji.emojize(":grinning_face_with_big_eyes:")}')
    return

In [3]:
# %%writefile helpers/calculations.py


def calc_tons_never_harvested(df):
    logging.info(f'Calculating tons never harvested for <filename>"')

    df['tons_never_harvested'] = df['acres_unharvested'] * df['yield_tons_per_acre'] * df['percent_maturity']
    
    return df

def calc_tons_never_harvested_causes(df):
    logging.info(f'Calculating tons never by cause.')
#     causes_db = pd.read_csv('farm_not_harvested_causes.csv')
    #merge dfs with summing
    pass
    



# Part 1: Load and Process Initial Data

1. Load farm_part_1.csv
2. Convert all strings to lowercase
3. Check to make sure data is complete and there are no duplicates or other potential problems
4. Save a v1 version of the 'clean' raw dataset.
5. Calculate tons_never_harvested for each item (each row) and append to dataset (See appendix 1 for
how to calculate tons_never_harvested)
6. Load farm_not_harvested_causes.csv
7. Calculate amount of food not harvested (tons_never_harvested) for each 'cause' (the reason for why
that food was not harvest) for each item (row) and append to dataset (see appendix 1 for calculation)
8. Save a v1 'production' dataset in the format of your choice

### Data Exploration

In [5]:
import hashlib

farm_data = pd.read_csv("/Users/margiehenry/Downloads/wetransfer_farm_not_harvested_causes-csv_2022-11-16_1428/farm_part_1.csv", index_col=0)

bad_df = pd.read_csv("/Users/margiehenry/Downloads/wetransfer_farm_not_harvested_causes-csv_2022-11-16_1428/farm_not_harvested_causes.csv")
#create schema on first run

FARM_DATA_SCHEMA = create_schema(farm_data)

 
# #1 validate schema
print(validate_farm_data_schema(farm_data))


# #2 to lower 
df_lower = df_lowerize(farm_data)


# # 3 check for and handle missing values in required rows 
req_no_nan = drop_nan_rows_in_required_cols(df_lower)
req_no_nan


# # # 4 hash for dups
df_hashed = make_hash_id(req_no_nan, 'year', 'state', 'commodity_name')


# # # 5 remove dups 
df_no_dups = remove_dups(df_hashed)

# # fillin refed_food_department	refed_food_category with previous seen 
df_no_dups


21-Nov-22 20:12:14 - Converting strings to lower..
21-Nov-22 20:12:14 - Dropping rows with nan/null in required cols.
21-Nov-22 20:12:14 - uid hashes created for rows in dataset
21-Nov-22 20:12:14 - Checking for and removing duplicates rows..


      year           state commodity_name refed_food_department  \
2202  2016         Alabama         PECANS             Dry goods   
2203  2016         Alabama        PEANUTS             Dry goods   
2204  2016         Arizona         PECANS             Dry goods   
2205  2016        Arkansas        PEANUTS             Dry goods   
2206  2016      California         PECANS             Dry goods   
...    ...             ...            ...                   ...   
3124  2018         Indiana     WATERMELON               Produce   
3125  2018        Maryland     WATERMELON               Produce   
3126  2018  North Carolina     WATERMELON               Produce   
3127  2018  South Carolina     WATERMELON               Produce   
3128  2018           Texas     WATERMELON               Produce   

     refed_food_category  acres_planted  acres_harvested  \
2202      Nuts and seeds         8900.0           8900.0   
2203      Nuts and seeds       175000.0         172000.0   
2204      Nuts 

Unnamed: 0,year,state,commodity_name,refed_food_department,refed_food_category,acres_planted,acres_harvested,us_dollars_harvested,tons_harvested,percent_maturity,uid
2202,2016,alabama,pecans,dry goods,nuts and seeds,8900.0,8900.0,4467000.0,1100.0,0.5,114D11AF85C5F71BA5F4200395075C725DF791EB68C6D0...
2203,2016,alabama,peanuts,dry goods,nuts and seeds,175000.0,172000.0,121982000.0,309600.0,0.5,2F5D972CA3E3D1DE3929211C39EF55D71E895378180E1D...
2204,2016,arizona,pecans,dry goods,nuts and seeds,15000.0,15000.0,67208000.0,12400.0,0.5,1167043B641F38317A6AAF4910F5641DB52D257582B8E3...
2205,2016,arkansas,peanuts,dry goods,nuts and seeds,24000.0,23000.0,20424000.0,55200.0,0.5,59B08A276DE56E1DE2EAF474E33A34566698C7DE8D7BBC...
2206,2016,california,pecans,dry goods,nuts and seeds,3500.0,3500.0,14656000.0,2885.0,0.5,2C8CF0CEBDF0FE536818C44B0B7D9A5D7EEBF805A051E1...
...,...,...,...,...,...,...,...,...,...,...,...
3124,2018,indiana,watermelon,produce,watermelons,6600.0,6500.0,27915000.0,152750.0,0.5,E9693DB3EF776B12A8075474FDB59E4A7707656843E802...
3125,2018,maryland,watermelon,produce,watermelons,3700.0,3500.0,10395000.0,52500.0,0.5,B66F4465E092CD8B8C57A4CFD44BCB026C22DF1F6F99B8...
3126,2018,north carolina,watermelon,produce,watermelons,7700.0,7300.0,22314000.0,91250.0,0.5,72E76C2308688BD79449C8076ED8E45F5737B21A9B1947...
3127,2018,south carolina,watermelon,produce,watermelons,4500.0,4300.0,17174000.0,49450.0,0.5,EC6CB3FCFD7773B50728D430A6BB1D3188DD39EF3C8C28...


In [8]:
# run as pipline as save

import os

FARM_DATA_SCHEMA = {'year': 'int64', 'state': 'object', 'commodity_name': 'object', 'refed_food_department': 'object', 'refed_food_category': 'object', 'acres_planted': 'float64', 'acres_harvested': 'float64', 'us_dollars_harvested': 'float64', 'tons_harvested': 'float64', 'percent_maturity': 'float64'}


class CleaningPipeline:
    def __init__(self, FarmDataPath, FARM_DATA_SCHEMA):
        self.path = FarmDataPath
        self.schema = FARM_DATA_SCHEMA
        self.filename = os.path.split(FarmDataPath)[1]
        self.df = None

        return None
       
    def open_csv(self):
        try: 
            self.df = pd.read_csv(self.path, index_col=0).apply(lambda x: x.astype(str).str.lower())
            logging.info(f"Reading data from {self.filename}.")
            return self.df
        
        except pd.errors.ParserError as e:
            logging.logging.critical(f"file at {self.filename} not formatted as .csv")
            raise e
    
    def validate_farm_data_schema(self, df, required=REQ_COLS):    
        df_schema = create_schema(df)
        
        if not self.schema == df_schema:
            logging.info(f"{self.filename} failed validation with schema {self.schema}")
#             return None
            return self.df
        else:
            logging.info(f"{self.filename} passed validation with schema {self.schema}")
            return df
        
    def make_hash_id(self, *column):
        df['uid'] = df[list(column)].apply(lambda row: '_'.join(row.values.astype(str)), axis=1)
#         df['uid'] = df['uid'].str.encode('utf-8').apply(lambda x: (hashlib.sha512(x).hexdigest().upper()))
        logging.info(f'uid hashes created for rows in dataset')
        return df

    
    

pipeline = CleaningPipeline(
    "/Users/margiehenry/Downloads/wetransfer_farm_not_harvested_causes-csv_2022-11-16_1428/farm_part_1.csv",
    "/src/schema")


cleaned_data = pipeline.open_csv().pipe(pipeline.validate_farm_data_schema).pipe(pipeline.make_hash_id, 'year', 'state', 'commodity_name')\
                .pipe(drop_nan_rows_in_required_cols)\
                .pipe(make_hash_id, 'year', 'state', 'commodity_name')\
                .pipe(remove_dups)

cleaned_data


21-Nov-22 20:13:14 - Reading data from farm_part_1.csv.


NameError: name 'df' is not defined

In [97]:
def loadData(self, fileName= "pilot1.csv"):
    filePath = str(self.path + fileName) 
    self.df = pd.read_csv(filePath, skiprows=2, decimal=".") 
    return self.df

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 927 entries, 0 to 926
Data columns (total 11 columns):
 #   Column                 Non-Null Count  Dtype  
---  ------                 --------------  -----  
 0   Unnamed: 0             927 non-null    int64  
 1   year                   927 non-null    int64  
 2   state                  927 non-null    object 
 3   commodity_name         927 non-null    object 
 4   refed_food_department  927 non-null    object 
 5   refed_food_category    927 non-null    object 
 6   acres_planted          927 non-null    float64
 7   acres_harvested        927 non-null    float64
 8   us_dollars_harvested   927 non-null    float64
 9   tons_harvested         927 non-null    float64
 10  percent_maturity       927 non-null    float64
dtypes: float64(5), int64(2), object(4)
memory usage: 79.8+ KB


AttributeError: 'NoneType' object has no attribute 'to_dict'

In [57]:
df.dtypes.map(lambda x: x.name).to_dict()

{'Unnamed: 0': 'int64',
 'year': 'int64',
 'state': 'object',
 'commodity_name': 'object',
 'refed_food_department': 'object',
 'refed_food_category': 'object',
 'acres_planted': 'float64',
 'acres_harvested': 'float64',
 'us_dollars_harvested': 'float64',
 'tons_harvested': 'float64',
 'percent_maturity': 'float64'}

In [40]:
dict2 = df2.dtypes.map(lambda x: x.name).to_dict()

In [41]:
if farm_data_schema == dict2:
    print(True)
else: 
    print(False)

True


In [7]:
import pandas as pd
ls = list(pd.read_csv("/Users/margiehenry/Downloads/wetransfer_farm_not_harvested_causes-csv_2022-11-16_1428/farm_update_part_3.csv").columns)
ls

['Unnamed: 0',
 'year',
 'state',
 'commodity_name',
 'refed_food_department',
 'refed_food_category',
 'acres_planted',
 'acres_harvested',
 'us_dollars_harvested',
 'tons_harvested',
 'percent_maturity']

In [7]:
print(df.info())

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 483 entries, 0 to 482
Data columns (total 11 columns):
 #   Column                 Non-Null Count  Dtype  
---  ------                 --------------  -----  
 0   Unnamed: 0             483 non-null    int64  
 1   year                   483 non-null    int64  
 2   state                  483 non-null    object 
 3   commodity_name         483 non-null    object 
 4   refed_food_department  483 non-null    object 
 5   refed_food_category    483 non-null    object 
 6   acres_planted          483 non-null    float64
 7   acres_harvested        483 non-null    float64
 8   us_dollars_harvested   483 non-null    float64
 9   tons_harvested         483 non-null    float64
 10  percent_maturity       483 non-null    float64
dtypes: float64(5), int64(2), object(4)
memory usage: 41.6+ KB
None


### Data Cleaning and Curation

In [7]:
#save to cleaned/v1{date} with timestamp

### Data Processing

In [None]:
#save to prod_staged/v1{date} with timestamp
#save to DB

In [9]:
causes_data = pd.read_csv("/Users/margiehenry/Downloads/wetransfer_farm_not_harvested_causes-csv_2022-11-16_1428/farm_not_harvested_causes.csv")
causes_data

Unnamed: 0,cause,rate
0,not_marketable,0.28
1,inedible,0.28
2,bad_weather,0.042
3,pests_disease,0.00063
4,market_dynamics,0.00013
5,other,0.39724


In [10]:
i = 0
for i in range(len(causes_data)):
    cause, rate = causes_data.iloc[i]
    df2[cause] = df2['acres_planted'].apply(lambda x: x*rate)
    i += 1

df2
#     print(cause, rate) 
    

Unnamed: 0.1,Unnamed: 0,year,state,commodity_name,refed_food_department,refed_food_category,acres_planted,acres_harvested,us_dollars_harvested,tons_harvested,percent_maturity,not_marketable,inedible,bad_weather,pests_disease,market_dynamics,other
0,2852,2018,Alabama,PECANS,Dry goods,Nuts and seeds,8000.0,8000.0,2.366000e+06,800.0,0.5,2240.0,2240.0,336.0,5.040,1.040,3177.920
1,2853,2018,Alabama,PEANUTS,Dry goods,Nuts and seeds,165000.0,161000.0,1.145660e+08,285775.0,0.5,46200.0,46200.0,6930.0,103.950,21.450,65544.600
2,2854,2018,Arizona,PECANS,Dry goods,Nuts and seeds,17000.0,17000.0,5.217300e+07,13950.0,0.5,4760.0,4760.0,714.0,10.710,2.210,6753.080
3,2855,2018,Arkansas,PEANUTS,Dry goods,Nuts and seeds,26000.0,23000.0,2.185000e+07,56350.0,0.5,7280.0,7280.0,1092.0,16.380,3.380,10328.240
4,2856,2018,California,ALMONDS,Dry goods,Nuts and seeds,1090000.0,1090000.0,5.602500e+09,1140000.0,0.5,305200.0,305200.0,45780.0,686.700,141.700,432991.600
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
478,3330,2019,Georgia,WATERMELON,Produce,Watermelons,22000.0,21100.0,1.012160e+08,411450.0,0.5,6160.0,6160.0,924.0,13.860,2.860,8739.280
479,3331,2019,Indiana,WATERMELON,Produce,Watermelons,6500.0,6100.0,3.529400e+07,115900.0,0.5,1820.0,1820.0,273.0,4.095,0.845,2582.060
480,3332,2019,North Carolina,WATERMELON,Produce,Watermelons,7800.0,7400.0,2.399800e+07,85100.0,0.5,2184.0,2184.0,327.6,4.914,1.014,3098.472
481,3333,2019,South Carolina,WATERMELON,Produce,Watermelons,5000.0,4500.0,1.602200e+07,72000.0,0.5,1400.0,1400.0,210.0,3.150,0.650,1986.200


In [33]:
# df2.iloc[1:2, -1]
df2.iloc[0]['other']

3177.92

In [None]:
df[x] = df['acres_planted'].apply(lambda x: x*rate)

# Part 2: Analysis

1. Calculate total tons_never_harvested by year and refed_food_department (hint: For each year in the data set, there should a U.S. total for tons_never_harvested for dry goods and a total for produce)
2. Plot this table
3. Calculate the U.S. total of tons_never_harvested for each 'cause', aggregated by year and refed_food_department
4. Plot this table

# Part 3: Updating the data and analysis

1. Load farm_update_part_3.csv
2. Update the relevant datasets and analysis to include 2019 (your new final output should include
data from 2016 - 2019)
3. Save a v2 production dataset

In [8]:
#save to prod_staged/v2{date} with timestamp
#insert new records to DB

# Part 4: Store Knowledge

In [None]:
# save column names for checking
#create dict: food_name: {
#     "category": "cat_name"
#     "states": ["List", "of", "states"],
# }

# create column verification 

#make CLI
#make MAKEFILE

In [265]:
from mergedeep import merge
dic = {'a' : {"keyA": 1},
    'b' : {"keyB": {"sub1": 10}},
    'c' : {"keyB": {"sub2": 20}}}

combo = pd.DataFrame.from_dict(merge(dic))
combo
# print(a)

Unnamed: 0,a,b,c
keyA,1.0,,
keyB,,{'sub1': 10},{'sub2': 20}
