# Notebook Setup

In [3]:
if 'google.colab' in str(get_ipython()):
  IN_COLLAB = True
else:
  IN_COLLAB = False

#TODO: CHANGE THIS BASED ON YOUR OWN LOCAL SETTINGS
MY_HOME_ABS_PATH = "/content/drive/MyDrive/W210/co2-flux-hourly-gpp-modeling"

In [4]:
if IN_COLLAB:
  from google.colab import drive
  drive.mount('/content/drive/')

Mounted at /content/drive/


## Import Modules

In [5]:
# install required modules quietly
required_packages = ['geopandas', 'pyspark', 'azure-storage-blob']

for p in required_packages: 
  try:
      __import__(p)
  except ImportError:
      %pip install {p} --quiet

[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.1/1.1 MB[0m [31m28.7 MB/s[0m eta [36m0:00:00[0m
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m7.8/7.8 MB[0m [31m110.0 MB/s[0m eta [36m0:00:00[0m
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m16.0/16.0 MB[0m [31m84.5 MB/s[0m eta [36m0:00:00[0m
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m96.6/96.6 KB[0m [31m11.1 MB/s[0m eta [36m0:00:00[0m
[?25h[31mERROR: pip's dependency resolver does not currently take into account all the packages that are installed. This behaviour is the source of the following dependency conflicts.
flask 1.1.4 requires click<8.0,>=5.1, but you have click 8.1.3 which is incompatible.[0m[31m
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m281.4/281.4 MB[0m [31m5.2 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m 

In [6]:
import os
os.environ["PYARROW_IGNORE_TIMEZONE"] = "1"
import math
import json

from pyspark.sql.functions import col
import pyspark.pandas as pd
from calendar import monthrange
from datetime import datetime
from io import BytesIO

import matplotlib.pyplot as plt
import plotly.express as px
import plotly.graph_objects as go

# Load locale custome modules
import sys
if IN_COLLAB:
  os.chdir(MY_HOME_ABS_PATH)
  sys.path.insert(0,os.path.abspath("./code/src/tools"))
else:
  sys.path.append(os.path.abspath("./code/src/tools"))

from CloudIO.AzStorageClient import AzStorageClient
from data_pipeline_lib import *

pd.set_option('display.max_rows', 500)
pd.set_option('display.max_columns', None)
pd.set_option('display.float_format', lambda x: '%.5f' % x)

In [7]:
# Import SparkSession
from pyspark.sql import SparkSession
# Create a Spark Session
spark = SparkSession.builder.master("local[*]").config(
    "spark.jars.packages", 
    "org.apache.hadoop:hadoop-azure:3.3.1,com.microsoft.azure:azure-storage:8.6.6"
    ).getOrCreate()
# Check Spark Session Information
spark

# Define Local Files System Constants

In [8]:
root_dir =  MY_HOME_ABS_PATH
tmp_dir =  root_dir + os.sep + '.tmp'
raw_data_dir = tmp_dir
data_dir = root_dir + os.sep + 'data'
cred_dir = root_dir + os.sep + '.cred'
az_cred_file = cred_dir + os.sep + 'azblobcred.json'

if IN_COLLAB:
  raw_data_dir = "/content/drive/MyDrive/CO2_flux_gpp_modeling/DS_capstone_23Spring_CO2/Data/half_hourly_data"

site_metadata_filename = data_dir + os.sep + 'site-metadata.csv'

# File
container = "gold-samples-data"
ext = "parquet"
ver = "0"
blob_name_base = f"baseline_all_v_{ver}"
train_blob_name_base = f"baseline-train-v-{ver}"
test_blob_name_base = f"baseline-test-v-{ver}"

In [9]:
# "Golden" Sites
tier1_sites = ["US-MMS", "US-Vcp", "FR-Pue", "CH-Lae", "US-Var", "US-Ne2", "ES-LJu", "US-Ton"]
tier2_sites = ["US-UMB", "US-Me2", "FI-Hyy", "US-NR1", "IT-Lav", "US-Wkg", "US-ARM", "US-SRM"]

train_sites = tier1_sites + tier2_sites

# Selected Test Sites
test_sites = ["US-GLE", # ENF, Cold
              "US-AR1", # GRA, Temperate
              "US-Seg", # GRA, Arid
              "US-FR2", # WSA, Temperate
              "ES-LM2", # WSA, Arid
              "CA-Cbo", # DBF, Cold
              "FR-Lam", # CRO, Temperate
              "IT-Cpz", # EBF, Temperate
              "CN-Cha", # MF Cold
              "IT-Lsn", # OSH, Temperate
              ]

In [10]:
# Define features and target variables of the data pipelines
included_features = ['TA_ERA', 'SW_IN_ERA', 'LW_IN_ERA', 'VPD_ERA', 'P_ERA', 'PA_ERA',
                     'datetime', 'year', 'month', 'day', 'hour', 'date',
                     'EVI', 'NDVI', 'NIRv', 'b1', 'b2', 'b3', 'b4', 'b5', 'b6', 'b7', 
                     'IGBP', 'koppen']
target_variable_qc = 'NEE_VUT_REF_QC'
target_variable = 'GPP_NT_VUT_REF'

# Get Gold Sample Site Data

In [None]:
# Load site metadata
included_site_features = ['site_id', 'filename', 'elevation', 'lat', 'long',
                          'koppen_sub', 'koppen_main', 'koppen_name',
                          'c3c4', 'c4_percent']
site_metadata_df = pd.read_csv(site_metadata_filename, usecols = included_site_features)

# only focus on target sites
site_metadata_df = site_metadata_df.loc[site_metadata_df['site_id'].isin(train_sites + test_sites)]
print(f"size:{site_metadata_df.shape}")
site_metadata_df.reset_index(inplace=True, drop=True)
site_metadata_df

# Stage 1: Trim and Merge Site Metadata

In [None]:
all_features = ['TIMESTAMP_START', 'TIMESTAMP_END', 'TA_F', 'TA_F_QC', 'TA_ERA',
       'SW_IN_POT', 'SW_IN_F', 'SW_IN_F_QC', 'SW_IN_ERA', 'LW_IN_F',
       'LW_IN_F_QC', 'LW_IN_ERA', 'VPD_F', 'VPD_F_QC', 'VPD_ERA', 'P_F',
       'P_F_QC', 'P_ERA', 'PA_F', 'PA_F_QC', 'PA_ERA', 'NETRAD', 'PPFD_IN',
       'G_F_MDS', 'G_F_MDS_QC', 'LE_F_MDS', 'LE_F_MDS_QC', 'LE_CORR',
       'H_F_MDS', 'H_F_MDS_QC', 'H_CORR', 'NEE_VUT_REF', 'NEE_VUT_REF_QC',
       'NEE_CUT_REF', 'NEE_CUT_REF_QC', 'GPP_NT_VUT_REF', 'GPP_DT_VUT_REF',
       'GPP_NT_CUT_REF', 'GPP_DT_CUT_REF', 'RECO_NT_VUT_REF',
       'RECO_DT_VUT_REF', 'RECO_NT_CUT_REF', 'RECO_DT_CUT_REF', 'datetime',
       'year', 'month', 'day', 'hour', 'SITE_ID', 'date', 'NEE_VUT_REF_qa',
       'SW_DIF', 'EVI', 'NDVI', 'NIRv', 'b1', 'b2', 'b3', 'b4', 'b5', 'b6',
       'b7', 'IGBP', 'koppen']

In [None]:
# Initial data clean and feature selections from raw data
data_df = data_cleanup(raw_data_dir, site_metadata_df[['site_id','filename']],
                  target_variable, target_variable_qc,
                  included_features)
print(f"Data size after cleanup: {data_df.shape}")

# Merge with site metadata
data_df = merge_site_metadata(data_df, site_metadata_df.drop(['filename', 'koppen_name'], axis=1))
print(f"Data size after after merged with site metadata: {data_df.shape}")

# Drop rows with NA
check_and_drop_na(data_df)
print(f"Data size after after final drop: {data_df.shape}")

display(data_df.head())

CN-Cha: (16228, 27)
FR-Pue: (117200, 27)
IT-Cpz: (59175, 27)
US-GLE: (54687, 27)
US-NR1: (98652, 27)

ERROR: US-Ne2 is mssing hourly data.
US-SRM: (95419, 27)
US-Ton: (113031, 27)
US-Var: (119950, 27)
US-Wkg: (93319, 27)
CA-Cbo: (79273, 27)
US-AR1: (28956, 27)
US-ARM: (125756, 27)
US-FR2: (30426, 27)

ERROR: US-MMS is mssing hourly data.
US-Me2: (99780, 27)
US-Seg: (91884, 27)
US-UMB: (70639, 27)
US-Vcp: (78491, 27)
CH-Lae: (112718, 27)
ES-LJu: (112724, 27)
ES-LM2: (58806, 27)
FI-Hyy: (127362, 27)
FR-Lam: (115812, 27)
IT-Lav: (120885, 27)
IT-Lsn: (40182, 27)
Data size after cleanup: (2061355, 27)
Data size after after merged with site metadata: (2061355, 34)
Data has NA.


Unnamed: 0,TA_ERA,SW_IN_ERA,LW_IN_ERA,VPD_ERA,P_ERA,PA_ERA,GPP_NT_VUT_REF,datetime,year,month,day,hour,date,EVI,NDVI,NIRv,b1,b2,b3,b4,b5,b6,b7,IGBP,koppen,minute,site_id,elevation,lat,long,koppen_sub,koppen_main,c3c4,c4_percent
0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,16228,0,0,0,0,0,0


Data size after after final drop: (2045127, 34)


Unnamed: 0,TA_ERA,SW_IN_ERA,LW_IN_ERA,VPD_ERA,P_ERA,PA_ERA,GPP_NT_VUT_REF,datetime,year,month,day,hour,date,EVI,NDVI,NIRv,b1,b2,b3,b4,b5,b6,b7,IGBP,koppen,minute,site_id,elevation,lat,long,koppen_sub,koppen_main,c3c4,c4_percent
16228,5.311,25.016,272.218,1.708,0.0,97.939,-0.53574,2001-01-01 08:30:00,2001,1,1,8,2001-01-01,0.24998,0.73349,0.10592,0.0222,0.1444,0.0074,0.0267,0.1486,0.0977,0.0,EBF,Temperate,30,FR-Pue,270.0,43.7413,3.5957,8,3,C3,6.59
16229,5.744,59.734,272.218,1.738,0.0,97.939,0.86438,2001-01-01 09:00:00,2001,1,1,9,2001-01-01,0.24998,0.73349,0.10592,0.0222,0.1444,0.0074,0.0267,0.1486,0.0977,0.0,EBF,Temperate,0,FR-Pue,270.0,43.7413,3.5957,8,3,C3,6.59
16230,6.176,91.235,272.218,1.767,0.0,97.939,-0.02627,2001-01-01 09:30:00,2001,1,1,9,2001-01-01,0.24998,0.73349,0.10592,0.0222,0.1444,0.0074,0.0267,0.1486,0.0977,0.0,EBF,Temperate,30,FR-Pue,270.0,43.7413,3.5957,8,3,C3,6.59
16231,6.608,79.264,333.933,1.797,0.05,97.939,-0.17229,2001-01-01 10:00:00,2001,1,1,10,2001-01-01,0.24998,0.73349,0.10592,0.0222,0.1444,0.0074,0.0267,0.1486,0.0977,0.0,EBF,Temperate,0,FR-Pue,270.0,43.7413,3.5957,8,3,C3,6.59
16232,7.043,94.929,333.933,1.817,0.0,97.923,1.20865,2001-01-01 10:30:00,2001,1,1,10,2001-01-01,0.24998,0.73349,0.10592,0.0222,0.1444,0.0074,0.0267,0.1486,0.0977,0.0,EBF,Temperate,30,FR-Pue,270.0,43.7413,3.5957,8,3,C3,6.59


In [None]:
data_df.describe()

Unnamed: 0,TA_ERA,SW_IN_ERA,LW_IN_ERA,VPD_ERA,P_ERA,PA_ERA,GPP_NT_VUT_REF,year,month,day,hour,EVI,NDVI,NIRv,b1,b2,b3,b4,b5,b6,b7,minute,elevation,lat,long,koppen_sub,koppen_main,c4_percent
count,2045127.0,2045127.0,2045127.0,2045127.0,2045127.0,2045127.0,2045127.0,2045127.0,2045127.0,2045127.0,2045127.0,2045127.0,2045127.0,2045127.0,2045127.0,2045127.0,2045127.0,2045127.0,2045127.0,2045127.0,2045127.0,2045127.0,2045127.0,2045127.0,2045127.0,2045127.0,2045127.0,2045127.0
mean,15.18393,376.25804,319.49078,10.11018,0.03958,91.62118,5.87229,2010.1065,6.55024,15.7513,11.923,0.29796,0.52726,0.13143,0.08435,0.2483,0.04657,0.07608,0.26551,0.2077,0.12646,15.00078,901.20832,41.38344,-58.39359,15.86035,3.24598,7.39929
std,9.17084,267.25693,49.3058,9.61124,0.19043,9.25367,7.61167,4.65177,3.00549,8.81239,3.75671,0.13328,0.22261,0.07261,0.06102,0.06284,0.04531,0.04748,0.06925,0.09472,0.08461,15.0,906.36713,7.03857,57.53619,8.51506,0.71931,14.29274
min,-30.687,0.001,107.639,0.0,0.0,66.031,-49.7372,2001.0,1.0,1.0,3.0,-0.11958,-0.18252,-0.01764,0.0054,0.0305,0.0,0.0,0.0,0.0132,0.0,0.0,1.0,29.9495,-121.5574,5.0,2.0,0.0
25%,8.974,138.391,285.186,3.23,0.0,85.196,0.44478,2007.0,4.0,8.0,9.0,0.20223,0.3397,0.07913,0.0367,0.2025,0.0211,0.0444,0.2095,0.13,0.0559,0.0,180.0,36.6058,-109.9419,8.0,3.0,0.0
50%,15.317,343.341,321.129,6.989,0.0,97.162,3.14854,2010.0,7.0,16.0,12.0,0.28931,0.55534,0.11931,0.0675,0.2389,0.0357,0.0664,0.2708,0.1953,0.1088,30.0,314.0,40.0329,-97.4888,14.0,3.0,0.35
75%,21.721,585.408,354.958,13.697,0.0,99.039,9.46059,2014.0,9.0,23.0,15.0,0.36828,0.70229,0.16489,0.1191,0.2878,0.0561,0.0946,0.3201,0.2826,0.1836,30.0,1531.0,44.4523,3.5957,26.0,4.0,6.59
max,44.087,1094.341,473.085,80.091,15.493,103.921,85.0309,2020.0,12.0,31.0,23.0,2.38835,0.93551,0.42385,0.812,0.7762,0.7689,0.8053,0.4666,0.4792,0.4347,30.0,3197.0,61.84741,24.29477,27.0,4.0,55.39


In [None]:
data_df.site_id.unique()

array(['FR-Pue', 'IT-Cpz', 'US-GLE', 'US-NR1', 'US-SRM', 'US-Ton',
       'US-Var', 'US-Wkg', 'CA-Cbo', 'US-AR1', 'US-ARM', 'US-FR2',
       'US-Me2', 'US-Seg', 'US-UMB', 'US-Vcp', 'CH-Lae', 'ES-LJu',
       'ES-LM2', 'FI-Hyy', 'FR-Lam', 'IT-Lav', 'IT-Lsn'], dtype=object)

# CheckPoint: Upload Data to Azure Storage Blob as Parquet

In [11]:
# Upload to Azure Storage Blob
# ref: https://stackoverflow.com/a/54666079
data_cleanup_checkpoint = False
tag = "raw"
blob_name = f"{blob_name_base}_{tag}.{ext}"

if data_cleanup_checkpoint:

  parquet_file = BytesIO()
  data_df.to_parquet(parquet_file, engine='pyarrow')
  parquet_file.seek(0)

  azStorageClient = AzStorageClient(az_cred_file)
  azStorageClient.uploadBlob(container, blob_name, parquet_file, overwrite=True)

# Stage 2: Data Transform - Convert to Model Ready Data
- Encode data
- Split into train and test
- Assemble data through VectorAssembler
- Normalized data through minmax

In [12]:
load_data_from_previous_checkpoint = True

if load_data_from_previous_checkpoint:
  data_df = None
  print(f"loading {tmp_dir + os.sep + blob_name}...")
  if not (os.path.exists(tmp_dir + os.sep + blob_name)):
      if not (os.path.exists(tmp_dir)):
          os.mkdir(tmp_dir)
      azStorageClient = AzStorageClient(az_cred_file)
      file_stream = azStorageClient.downloadBlob2Stream(container, blob_name)
      data_df = pd.read_parquet(file_stream, engine='pyarrow')
      data_df.to_parquet(tmp_dir + os.sep + blob_name)
  
  data_df = spark.read.parquet(tmp_dir + os.sep + blob_name)
  data_df = data_df.drop(*['__index_level_0__'])
  print(f"Data loaded: {data_df.count()} rows x {len(data_df.columns)} columns.")


loading /content/drive/MyDrive/W210/co2-flux-hourly-gpp-modeling/.tmp/baseline_all_v_0_raw.parquet...
Data loaded: 2045127 rows x 34 columns.


In [13]:
# Drop 'datetime', 'date', and 'koppen; as they are already represented by other columns
features = data_df.columns
if target_variable in features:
  features.remove(target_variable)
data_df = data_df.select([target_variable] + features) #reorder columns

categorical_cols = ['IGBP', 'c3c4', 'koppen_sub', 'koppen_main']
data_df = data_df.drop(*['datetime', 'date', 'koppen'])

In [14]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler, MinMaxScaler
 
string_indexer = StringIndexer(inputCols=categorical_cols, outputCols=[x + "_Index" for x in categorical_cols]) 
data_df = string_indexer.fit(data_df).transform(data_df)

one_hot_encoder  = OneHotEncoder(inputCols=string_indexer.getOutputCols(), outputCols=[x + "_OHE" for x in categorical_cols])
data_df = one_hot_encoder.fit(data_df).transform(data_df)

data_df = data_df.drop(*categorical_cols).drop(*string_indexer.getOutputCols())

print(f"Data size after encoding: {data_df.count()} rows x {len(data_df.columns)} columns.")
data_df.show()

Data size after encoding: 2045127 rows x 31 columns.
+--------------+------+---------+---------+-------+-----+------+----+-----+---+----+------------------+------------------+------------------+------+------+------+------+------+------+---+------+-------+---------+-------+------+----------+-------------+-------------+--------------+---------------+
|GPP_NT_VUT_REF|TA_ERA|SW_IN_ERA|LW_IN_ERA|VPD_ERA|P_ERA|PA_ERA|year|month|day|hour|               EVI|              NDVI|              NIRv|    b1|    b2|    b3|    b4|    b5|    b6| b7|minute|site_id|elevation|    lat|  long|c4_percent|     IGBP_OHE|     c3c4_OHE|koppen_sub_OHE|koppen_main_OHE|
+--------------+------+---------+---------+-------+-----+------+----+-----+---+----+------------------+------------------+------------------+------+------+------+------+------+------+---+------+-------+---------+-------+------+----------+-------------+-------------+--------------+---------------+
|     -0.535742| 5.311|   25.016|  272.218|  1.708|  

In [15]:
# Split into train and test sets
train_df = data_df.filter(col('site_id').isin(train_sites))
test_df = data_df.filter(col('site_id').isin(test_sites))

train_df = train_df.drop(*['site_id'])
test_df = test_df.drop(*['site_id'])

features = train_df.columns
if target_variable in features:
  features.remove(target_variable)

print(f"Train data size: {train_df.count()} rows x {len(train_df.columns)} columns.")
print(f"Test data size: {test_df.count()} rows x {len(test_df.columns)} columns.")
print(f"Features: {features}")

del data_df

Train data size: 1485926 rows x 30 columns.
Test data size: 559201 rows x 30 columns.
Features: ['TA_ERA', 'SW_IN_ERA', 'LW_IN_ERA', 'VPD_ERA', 'P_ERA', 'PA_ERA', 'year', 'month', 'day', 'hour', 'EVI', 'NDVI', 'NIRv', 'b1', 'b2', 'b3', 'b4', 'b5', 'b6', 'b7', 'minute', 'elevation', 'lat', 'long', 'c4_percent', 'IGBP_OHE', 'c3c4_OHE', 'koppen_sub_OHE', 'koppen_main_OHE']


In [16]:
# Assemble data
assembler = VectorAssembler(inputCols=features, outputCol="vectorized_features")
train_df = assembler.transform(train_df)
test_df = assembler.transform(test_df)

print("Train data peak:")
train_df.show(5, False)
print("Test data peak:")
test_df.show(5, False)

Train data peak:
+--------------+------+---------+---------+-------+-----+------+----+-----+---+----+------------------+------------------+------------------+------+------+------+------+------+------+---+------+---------+-------+------+----------+-------------+-------------+--------------+---------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|GPP_NT_VUT_REF|TA_ERA|SW_IN_ERA|LW_IN_ERA|VPD_ERA|P_ERA|PA_ERA|year|month|day|hour|EVI               |NDVI              |NIRv              |b1    |b2    |b3    |b4    |b5    |b6    |b7 |minute|elevation|lat    |long  |c4_percent|IGBP_OHE     |c3c4_OHE     |koppen_sub_OHE|koppen_main_OHE|vectorized_features                                                                                                        

In [17]:
# Normalize data
scaler = MinMaxScaler(inputCol='vectorized_features', outputCol='features')
scaler_model = scaler.fit(train_df)
train_df = scaler_model.transform(train_df)
test_df = scaler_model.transform(test_df)

train_df = train_df.drop(*['vectorized_features'])
test_df = test_df.drop(*['vectorized_features'])

print("Train data peak:")
train_df.show(5, False)
print("Test data peak:")
test_df.show(5, False)

Train data peak:
+--------------+------+---------+---------+-------+-----+------+----+-----+---+----+------------------+------------------+------------------+------+------+------+------+------+------+---+------+---------+-------+------+----------+-------------+-------------+--------------+---------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|GPP_NT_VUT_REF|TA_ERA|SW_IN_ERA|LW_IN_ERA|VPD_ERA|P_ERA|PA_ERA|year|month|day|hour|EVI               |NDVI              |NIRv              |b1    |b2    |b3    |b4    |b5    |b6    |b7 |minute|elevation|la

# Checkpoint: Upload train and test to Azure Blob Storage

In [None]:
final_checkpoint = True

if final_checkpoint:
  model_data_container = "baseline-data"
  azStorageClient = AzStorageClient(az_cred_file)
  sessionkeys = azStorageClient.getSparkSessionKeys()
  spark.conf.set(sessionkeys[0],sessionkeys[1])

  # Upload train dataset
  train_blob_path = f"wasbs://{model_data_container}@{sessionkeys[2]}.blob.core.windows.net/{train_blob_name_base}"
  print(f"Uploading train dataset to {train_blob_path}...")
  train_df.write.format("parquet").mode("overwrite").save(train_blob_path)

  # Upload test dataset
  test_blob_path = f"wasbs://{model_data_container}@{sessionkeys[2]}.blob.core.windows.net/{test_blob_name_base}"
  print(f"Uploading test dataset to {test_blob_path}...")
  test_df.write.format("parquet").mode("overwrite").save(test_blob_path)

Uploading train dataset to wasbs://baseline-data@mids23spring.blob.core.windows.net/baseline-train-v-0...
