In [4]:
import pandas as pd
import os
from typing import List
from sqlalchemy import create_engine
from sqlalchemy.orm import scoped_session, sessionmaker
import datetime

# using a little hack to allow the corteva_app module to be seen in this notebook
import sys
sys.path.insert(0, "..")

from corteva_app.database.models import Weather, Yield, WeatherStats

In [5]:
# create postgres engine that we will reuse a few times for writing dataframes
# this should really be an environment variable
engine = create_engine("postgresql://postgres:password@localhost:5432/corteva-db")


## Ingest weather data

In [6]:
# change active directory to wx_data for easy file path construction
os.chdir("../wx_data")

# list all files in the wx_data directory where our weather data is held
os.listdir()


['USC00137161.txt',
 'USC00336600.txt',
 'USC00253175.txt',
 'USC00121873.txt',
 'USC00258395.txt',
 'USC00121030.txt',
 'USC00255080.txt',
 'USC00112140.txt',
 'USC00134894.txt',
 'USC00336118.txt',
 'USC00120676.txt',
 'USC00258480.txt',
 'USC00124837.txt',
 'USC00254900.txt',
 'USC00254440.txt',
 'USC00130600.txt',
 'USC00331072.txt',
 'USC00114442.txt',
 'USC00338769.txt',
 'USC00338313.txt',
 'USC00335041.txt',
 'USC00123513.txt',
 'USC00112193.txt',
 'USC00127125.txt',
 'USC00250640.txt',
 'USC00253615.txt',
 'USC00119241.txt',
 'USC00132977.txt',
 'USC00116738.txt',
 'USC00258133.txt',
 'USC00132789.txt',
 'USC00115833.txt',
 'USC00123527.txt',
 'USC00115079.txt',
 'USC00116579.txt',
 'USC00113879.txt',
 'USC00134142.txt',
 'USC00120177.txt',
 'USC00126705.txt',
 'USC00110072.txt',
 'USC00138296.txt',
 'USC00336781.txt',
 'USC00127875.txt',
 'USC00132864.txt',
 'USC00117551.txt',
 'USC00258915.txt',
 'USC00123418.txt',
 'USC00253185.txt',
 'USC00110187.txt',
 'USC00252205.txt',


In [7]:
# add dataframes into a giant list so we can use .concat to join them all into a single dataframe at once
# this could be memory intensive, but significantly faster than using .append in a for loop

dfs: List[pd.DataFrame] = []

for file in os.listdir():
    df_weather_one_station = pd.read_csv(file, sep="\t", header=None)
    # rename columns so concat can match them up
    df_weather_one_station.columns = ["date", "max_temp_tenth_c", "min_temp_tenth_c", "precip_tenth_mm"]
    # station id is not contained within files so we will label it manually
    station_id = file.split(".")[0]
    df_weather_one_station["station_id"] = station_id
    dfs.append(df_weather_one_station)

df_weather = pd.concat(dfs)
df_weather


Unnamed: 0,date,max_temp_tenth_c,min_temp_tenth_c,precip_tenth_mm,station_id
0,19850101,-111,-156,0,USC00137161
1,19850102,-44,-189,0,USC00137161
2,19850103,6,-167,0,USC00137161
3,19850104,44,-100,0,USC00137161
4,19850105,56,-78,0,USC00137161
...,...,...,...,...,...
10679,20141227,6,-67,5,USC00258465
10680,20141228,-6,-111,0,USC00258465
10681,20141229,50,-100,0,USC00258465
10682,20141230,-33,-172,0,USC00258465


In [8]:
# transform temps into degrees C and precip into centimeters following our data model
df_weather["max_temp_c"] = df_weather["max_temp_tenth_c"] / 10
df_weather["min_temp_c"] = df_weather["min_temp_tenth_c"] / 10
df_weather["precip_cm"] = df_weather["precip_tenth_mm"] / 100

# interpret date string as a datetime object
df_weather["date"] = pd.to_datetime(df_weather["date"], format="%Y%m%d")

df_weather


Unnamed: 0,date,max_temp_tenth_c,min_temp_tenth_c,precip_tenth_mm,station_id,max_temp_c,min_temp_c,precip_cm
0,1985-01-01,-111,-156,0,USC00137161,-11.1,-15.6,0.00
1,1985-01-02,-44,-189,0,USC00137161,-4.4,-18.9,0.00
2,1985-01-03,6,-167,0,USC00137161,0.6,-16.7,0.00
3,1985-01-04,44,-100,0,USC00137161,4.4,-10.0,0.00
4,1985-01-05,56,-78,0,USC00137161,5.6,-7.8,0.00
...,...,...,...,...,...,...,...,...
10679,2014-12-27,6,-67,5,USC00258465,0.6,-6.7,0.05
10680,2014-12-28,-6,-111,0,USC00258465,-0.6,-11.1,0.00
10681,2014-12-29,50,-100,0,USC00258465,5.0,-10.0,0.00
10682,2014-12-30,-33,-172,0,USC00258465,-3.3,-17.2,0.00


In [9]:
# drop extra columns
df_weather.drop(columns=["max_temp_tenth_c", "min_temp_tenth_c", "precip_tenth_mm"], inplace=True)
# rename cleaned columns to match our database column names
df_weather.rename(columns={
    "max_temp_c": "max_temp",
    "min_temp_c": "min_temp",
    "precip_cm": "precip"
}, inplace=True)

df_weather

Unnamed: 0,date,station_id,max_temp,min_temp,precip
0,1985-01-01,USC00137161,-11.1,-15.6,0.00
1,1985-01-02,USC00137161,-4.4,-18.9,0.00
2,1985-01-03,USC00137161,0.6,-16.7,0.00
3,1985-01-04,USC00137161,4.4,-10.0,0.00
4,1985-01-05,USC00137161,5.6,-7.8,0.00
...,...,...,...,...,...
10679,2014-12-27,USC00258465,0.6,-6.7,0.05
10680,2014-12-28,USC00258465,-0.6,-11.1,0.00
10681,2014-12-29,USC00258465,5.0,-10.0,0.00
10682,2014-12-30,USC00258465,-3.3,-17.2,0.00


In [11]:
# write cleaned data into our database
# note - this will error if a single duplicate is detected... I don't know of a way to use INSERT IGNORE instead of just INSERT with the pandas API

db_session = scoped_session(sessionmaker(autocommit=False,
                                         autoflush=False,
                                         bind=engine))

start = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
print(f"Weather ingestion start: {start}")

df_weather.to_sql(name="weather", con=engine, if_exists="append", index=False)

end = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
print(f"Weather ingestion end: {end}")

num_rows = db_session.query(Weather).count()
print(f"Inserted {num_rows} rows")


Weather ingestion start: 2023-01-11 15:41:08


IntegrityError: (psycopg2.errors.UniqueViolation) duplicate key value violates unique constraint "weather_pkey"
DETAIL:  Key (station_id, date)=(USC00137161, 1985-01-01) already exists.

[SQL: INSERT INTO weather (date, station_id, max_temp, min_temp, precip) VALUES (%(date)s, %(station_id)s, %(max_temp)s, %(min_temp)s, %(precip)s)]
[parameters: ({'date': datetime.datetime(1985, 1, 1, 0, 0), 'station_id': 'USC00137161', 'max_temp': -11.1, 'min_temp': -15.6, 'precip': 0.0}, {'date': datetime.datetime(1985, 1, 2, 0, 0), 'station_id': 'USC00137161', 'max_temp': -4.4, 'min_temp': -18.9, 'precip': 0.0}, {'date': datetime.datetime(1985, 1, 3, 0, 0), 'station_id': 'USC00137161', 'max_temp': 0.6, 'min_temp': -16.7, 'precip': 0.0}, {'date': datetime.datetime(1985, 1, 4, 0, 0), 'station_id': 'USC00137161', 'max_temp': 4.4, 'min_temp': -10.0, 'precip': 0.0}, {'date': datetime.datetime(1985, 1, 5, 0, 0), 'station_id': 'USC00137161', 'max_temp': 5.6, 'min_temp': -7.8, 'precip': 0.0}, {'date': datetime.datetime(1985, 1, 6, 0, 0), 'station_id': 'USC00137161', 'max_temp': 6.7, 'min_temp': -2.2, 'precip': 0.0}, {'date': datetime.datetime(1985, 1, 7, 0, 0), 'station_id': 'USC00137161', 'max_temp': 2.8, 'min_temp': -4.4, 'precip': 0.0}, {'date': datetime.datetime(1985, 1, 8, 0, 0), 'station_id': 'USC00137161', 'max_temp': 1.1, 'min_temp': -11.1, 'precip': 0.0}  ... displaying 10 of 1729957 total bound parameter sets ...  {'date': datetime.datetime(2014, 12, 30, 0, 0), 'station_id': 'USC00258465', 'max_temp': -3.3, 'min_temp': -17.2, 'precip': 0.0}, {'date': datetime.datetime(2014, 12, 31, 0, 0), 'station_id': 'USC00258465', 'max_temp': -12.2, 'min_temp': -17.8, 'precip': 0.0})]
(Background on this error at: https://sqlalche.me/e/14/gkpj)

## Ingest yield data

In [13]:
# move active directory away from weather data to yield data
os.chdir("../yld_data")
os.listdir()

['US_corn_grain_yield.txt']

In [19]:
df_yield = pd.read_csv(os.listdir()[0], sep="\t", header=None)

df_yield.columns = ["year", "total_grain_yield"]

df_yield

Unnamed: 0,year,total_grain_yield
0,1985,225447
1,1986,208944
2,1987,181143
3,1988,125194
4,1989,191320
5,1990,201534
6,1991,189868
7,1992,240719
8,1993,160986
9,1994,255295


In [20]:
# write into postgres table, don't need to transform anything

start = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
print(f"Yield ingestion start: {start}")

df_yield.to_sql(name="yield", con=engine, if_exists="append", index=False)

end = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
print(f"Yield ingestion end: {end}")

num_rows = db_session.query(Yield).count()
print(f"Inserted {num_rows} rows")

Yield ingestion start: 2023-01-11 16:00:02
Yield ingestion end: 2023-01-11 16:00:02
Inserted 30 rows
