# Ingest NCEP GFS 0.25 Degree Data for 6 hour forecasts. 

#### 1.) Conda package installations to environment and importing appropriate libraries. 

In [None]:
# conda install -c conda-forge gdal
# conda install -c conda-forge geopandas
# conda install -c conda-forge earthpy
# conda install -c conda-forge cloudpathlib
# conda install -c conda-forge pyhdf
# conda install -c anaconda basemap

#conda install -c conda-forge ipywidgets
#conda install -c conda-forge cartopy

## For IO dependencies in xarray 
#conda install -c conda-forge xarray dask netCDF4 bottleneck
#conda install -c conda-forge cfgrib
#conda install -c conda-forge pygrib
#conda install -c yt87 pywgrib2_xr

## For writing / reading parquet files 
#conda install -c conda-forge pyarrow

#For s3 
#conda install -c conda-forge boto3

#For timing 
#conda install -c conda-forge profilehooks

In [1]:
!python --version

Python 3.9.10


In [1]:
#Import Packages. 
import sys
import os
import requests
import warnings
import glob
import time
import re
from functools import reduce

import matplotlib.pyplot as plt
import seaborn as sns
import numpy.ma as ma
import numpy as np
#from shapely.geometry import mapping, box
import geopandas as gpd
import earthpy as et
import earthpy.spatial as es
import earthpy.plot as ep
from osgeo import gdal
import pandas as pd

#GFS data
import xarray # used for reading the data.
import xarray_extras.csv # used for writing data to csv format. 
import pygrib
import xarray # used for reading the data.
import ipywidgets as widgets
import matplotlib.pyplot as plt # used to plot the data.
import ipywidgets as widgets # For ease in selecting variables.
import cartopy.crs as ccrs # Used to georeference data.

#For writing to s3
import boto3
from botocore import UNSIGNED
from botocore.config import Config
import io
import pickle

#For timing function
from profilehooks import profile

#For multiprocessing of function
import multiprocessing
from multiprocessing import Pool


# #from cloudpathlib import S3Path, S3Client
# from pyhdf.SD import SD, SDC

warnings.simplefilter('ignore')



In [3]:
#Get number of CPUs
print("Number of cpu : ", multiprocessing.cpu_count())

Number of cpu :  4


In [2]:
s3 = boto3.client('s3')

#### Process grid_metadata.csv to get updated lat, lon bounds for a 5km x 5km labeled grid. Pull in grid ID for each. 

In [3]:
df_grids = pd.read_csv('../capstone/grid_metadata.csv')

final_grid_ids = []
final_min_lats = []
final_max_lats = []
final_min_lons = []
final_max_lons = []

#Note order is lon, lat for each of the comma separated values
for index, row in df_grids.iterrows(): 
    lons = []
    lats = []
    grid_id = row['grid_id']
    nums = row['wkt'][10:-2]
    nums = nums.replace(',','')
    pairs = nums.split(' ')
    for i in range(10): 
        if i % 2 == 0: 
            lons.append(pairs[i])
        else: 
            lats.append(pairs[i])
    
    # Adding +/- 0.125 guarantees we get at least 1, 0.25 degree forecast in the appropriate area (given GFS scale is larger than Sat. scale). 
    min_lat = float(min(lats)) - 0.15
    max_lat = float(max(lats)) + 0.15
    # Match our 0 to 360 longitudes in GFS data, vs the -180 to +180 longitudes here. 
    min_lon = float(min(lons)) + 180.00 - 0.15
    max_lon = float(max(lons)) + 180.00 + 0.15
    
    #Now append the appropriate scraped data to our lists to put into a dataframe. 
    final_grid_ids.append(grid_id)
    final_min_lats.append(min_lat)
    final_max_lats.append(max_lat)
    final_min_lons.append(min_lon)
    final_max_lons.append(max_lon) 
                        
    
    
df_grids_clean = pd.DataFrame(columns = ['grid_id', 'min_lat', 'max_lat', 'min_lon', 'max_lon'])


df_grids_clean['grid_id'] = final_grid_ids 
df_grids_clean['min_lat'] = final_min_lats
df_grids_clean['max_lat'] = final_max_lats
df_grids_clean['min_lon'] = final_min_lons
df_grids_clean['max_lon'] = final_max_lons

    
df_grids_clean
        

Unnamed: 0,grid_id,min_lat,max_lat,min_lon,max_lon
0,1X116,24.827661,25.168369,301.330849,301.675764
1,1Z2W7,28.396645,28.736092,257.109616,257.454532
2,3S31A,33.645584,33.982902,61.916175,62.171259
3,6EIL6,28.396645,28.736092,256.885037,257.229953
4,7334C,28.396645,28.736092,256.929953,257.274869
5,78V83,28.396645,28.736092,256.75029,257.095206
6,7F1D1,28.436092,28.775526,256.929953,257.274869
7,8KNI6,28.317704,28.657182,257.109616,257.454532
8,90BZ1,24.868369,25.209064,301.375764,301.72068
9,90S79,28.475526,28.814944,257.019784,257.3647


In [4]:
#Define lat/lon bounds of our regions of interest. 
#Note: We must convert the original lon bounds of -180, 180 --> 0, 360 to match the GFS data format. 

#Los Angeles
la_min_lat = 30.01
la_max_lat = 40.00
la_min_lon = 49.46
la_max_lon = 76.06
la_bounds = [la_min_lat, la_max_lat, la_min_lon, la_max_lon]

#Tapei
tp_min_lat = 20.01
tp_max_lat = 30.00
tp_min_lon = 297.07
tp_max_lon = 318.55
tp_bounds = [tp_min_lat, tp_max_lat, tp_min_lon, tp_max_lon]

#Delhi
dl_min_lat = 20.01
dl_max_lat = 30.00
dl_min_lon = 243.85
dl_max_lon = 260.82
dl_bounds = [dl_min_lat, dl_max_lat, dl_min_lon, dl_max_lon]

In [5]:
#Filter by appropriate lat/lon bounds
def subset_dataset(dataset, min_lat, max_lat, min_lon, max_lon): 
    '''Takes a dataset and bounding coordinates and returns a filtered subset for the region of interest'''
    mask_lat = np.logical_and(dataset.coords['latitude'] >= min_lat, dataset.coords['latitude'] <= max_lat)
    mask_lon = np.logical_and(dataset.coords['longitude'] >= min_lon, dataset.coords['longitude'] <= max_lon)
    ds_filt = dataset.where(mask_lat & mask_lon, drop = True)
    return ds_filt

In [6]:
def dataset_to_df(dataset, level, var_name, grid_id, time): 
    '''Convert xarray Dataset to Dataframe, Drop Unecessary Columns, Add Grid ID, and Rename Columns'''
    df = dataset.to_dataframe(name = var_name)
    df = df.drop(columns = [level, 'time', 'step', 'valid_time'])
    df.insert(0, 'grid_id', grid_id)
    df = df.rename(columns = {"t" : "t_surface" + time, "hpbl" : "pbl_surface" + time, "landn" : "landn_surface" + time, "hindex" : "hindex_surface" + time, "gust" : "gust_surface" + time, 
                              "r" : "r_atmosphere" + time, "pwat": "pwat_atmosphere" + time, 
                              "u" : "u_pbl" + time, "v": "v_pbl" + time, "VRATE" : "vrate_pbl" + time})
    return df

#### 2.) Download data from NCAR servers. 

In [7]:
 ## First, we need to authenticate
try:
    import getpass
    input = getpass.getpass
except:
    try:
        input = raw_input
    except:
        pass

In [8]:
## Now, we need your password.
pswd = input('password: ')

password:  ···········


In [9]:
values = {'email' : 'jericojohns@berkeley.edu', 'passwd' : pswd, 'action' : 'login'}
login_url = 'https://rda.ucar.edu/cgi-bin/login'

In [10]:
ret = requests.post(login_url, data=values)
if ret.status_code != 200:
    print('Bad Authentication')
    print(ret.text)
    exit(1)

In [11]:
dspath = 'https://rda.ucar.edu/data/ds084.1/'
save_dir = '/local/train/GFS/'
filelist = []

In [29]:
#For identifying variable names and layers
file = '2019' + '/' + '2019' + '06' + '13' + '/gfs.0p25.' + '2019' + '06' + '13' + '00' + '.f006.grib2'
filename = dspath + file
outfile = save_dir + os.path.basename(filename) 
print('Downloading', file)
req = requests.get(filename, cookies = ret.cookies, allow_redirects=True)
open(outfile, 'wb').write(req.content)
filelist_arr = [save_dir + os.path.basename(file)]
selected_file = widgets.Dropdown(options=filelist_arr, description='data file')
display(selected_file)

#Now we use xarray to open the file by the type_of_level we are interested in 
type_of_level1 = 'surface' # for Temperature and Planetary Boundary Layer Height
ds_level_surface = xarray.open_dataset(selected_file.value, filter_by_keys={'typeOfLevel': type_of_level1, 'stepType': 'instant'}, engine="cfgrib")
(ds_level_surface.data_vars)


Downloading 2019/20190613/gfs.0p25.2019061300.f006.grib2


Dropdown(description='data file', options=('/local/train/GFS/gfs.0p25.2019061300.f006.grib2',), value='/local/…

Data variables:
    vis      (latitude, longitude) float32 ...
    gust     (latitude, longitude) float32 ...
    hindex   (latitude, longitude) float32 ...
    sp       (latitude, longitude) float32 ...
    orog     (latitude, longitude) float32 ...
    t        (latitude, longitude) float32 ...
    sdwe     (latitude, longitude) float32 ...
    sde      (latitude, longitude) float32 ...
    pevpr    (latitude, longitude) float32 ...
    cpofp    (latitude, longitude) float32 ...
    cprat    (latitude, longitude) float32 ...
    prate    (latitude, longitude) float32 ...
    csnow    (latitude, longitude) float32 ...
    cicep    (latitude, longitude) float32 ...
    cfrzr    (latitude, longitude) float32 ...
    crain    (latitude, longitude) float32 ...
    wilt     (latitude, longitude) float32 ...
    fldcp    (latitude, longitude) float32 ...
    SUNSD    (latitude, longitude) float32 ...
    lftx     (latitude, longitude) float32 ...
    cape     (latitude, longitude) float32 .

In [19]:
xarray.set_options(display_expand_attrs = True, display_expand_data_vars = True, display_width = 200, display_max_rows = 200)
print(ds_level_surface.data_vars)

Data variables:
    u        (latitude, longitude) float32 ...
    v        (latitude, longitude) float32 ...
    VRATE    (latitude, longitude) float32 ...


### Figure out why data is corrupting. 

In [None]:
# year = '2019'
# month = '06'
# day = '13'
# time  = '00'
# final_df = pd.DataFrame()

# #Now we use xarray to open the file by the type_of_level we are interested in 
# type_of_level1 = 'surface' # for Temperature and Planetary Boundary Layer Height, land cover, haines index, max surface wind speed 
# type_of_level2 = 'atmosphereSingleLayer' # for Relative Humidity and preciptable water
# type_of_level3 = 'planetaryBoundaryLayer' # For U and V wind, and ventilation rate
# ds_level_surface = xarray.open_dataset(selected_file.value, filter_by_keys={'typeOfLevel': type_of_level1, 'stepType': 'instant'}, engine="cfgrib")
# ds_level_atmosphere = xarray.open_dataset(selected_file.value, filter_by_keys={'typeOfLevel': type_of_level2}, engine="cfgrib")
# ds_level_pbl = xarray.open_dataset(selected_file.value, filter_by_keys={'typeOfLevel': type_of_level3}, engine="cfgrib")

# #Define surface variable names
# var_t = 't' #Temperature (K) 
# var_hpbl = 'hpbl' #Planetary Boundary Layer Height (m)
# #var_landn = 'landn' #Land cover (0=sea, 1=land) (Proportion)
# var_hindex = 'hindex' #Haines index (fire index) 
# var_gust = 'gust' #Wind speed (gust) (m s-1)

# #Define atmosphere single Layer variables
# var_r = 'r' #Relative Humidity %
# var_pwat = 'pwat' #Precipitable water (kg m-2)

# #Planetary Boundary Layer
# var_u = 'u' #u-component of wind (m s-1)
# var_v = 'v' #v-component of wind (m s-1)
# var_VRATE = 'VRATE' #Ventilation rate (m2 s-1)

# #Define filtered datasets (for each variable). 
# #Surface 
# ds_t = ds_level_surface[var_t] 
# ds_hpbl = ds_level_surface[var_hpbl]
# #ds_landn = ds_level_surface[var_landn]
# ds_hindex = ds_level_surface[var_hindex]
# ds_gust = ds_level_surface[var_gust]

# #Atmosphere
# ds_r = ds_level_atmosphere[var_r]
# ds_pwat = ds_level_atmosphere[var_pwat]

# #Planetary Boundary Layer
# ds_u = ds_level_pbl[var_u]
# ds_v = ds_level_pbl[var_v]
# ds_VRATE =ds_level_pbl[var_VRATE]


# #Initialize empty dataframe to append each regional dataframe to
# daily_df = pd.DataFrame()
# for index, row in df_grids_clean.iterrows(): 
#     grid_id = row['grid_id']
#     min_lat = row['min_lat']
#     max_lat = row['max_lat']
#     min_lon = row['min_lon']
#     max_lon = row['max_lon']

#     #Filter to bounds of 5x5km regions of interest. 
#     #Surface
#     ds_t_filt = subset_dataset(ds_t, min_lat, max_lat, min_lon, max_lon)
#     ds_hpbl_filt = subset_dataset(ds_hpbl, min_lat, max_lat, min_lon, max_lon)
#     # ds_landn_filt = subset_dataset(ds_landn, min_lat, max_lat, min_lon, max_lon)
#     ds_hindex_filt = subset_dataset(ds_hindex, min_lat, max_lat, min_lon, max_lon)
#     ds_gust_filt = subset_dataset(ds_gust, min_lat, max_lat, min_lon, max_lon)

#     #Atmosphere
#     ds_r_filt = subset_dataset(ds_r, min_lat, max_lat, min_lon, max_lon)
#     ds_pwat_filt = subset_dataset(ds_pwat, min_lat, max_lat, min_lon, max_lon)

#     #PBL
#     ds_u_filt = subset_dataset(ds_u, min_lat, max_lat, min_lon, max_lon)
#     ds_v_filt = subset_dataset(ds_v, min_lat, max_lat, min_lon, max_lon)
#     ds_VRATE_filt = subset_dataset(ds_VRATE, min_lat, max_lat, min_lon, max_lon)


#     #Make sure we preserve the type of level (atmospheric) of the observation to preserve metadata within the variable names
#     ##Surface
#     df_t = dataset_to_df(ds_t_filt, 'surface', var_t, grid_id, time)
#     df_hpbl = dataset_to_df(ds_hpbl_filt, 'surface', var_hpbl, grid_id, time)
#     # df_landn = dataset_to_df(ds_landn_filt, 'surface', var_landn, grid_id, time)
#     df_hindex = dataset_to_df(ds_hindex_filt, 'surface', var_hindex, grid_id, time)
#     df_gust = dataset_to_df(ds_gust_filt, 'surface', var_gust, grid_id, time)

#     #Atmosphere
#     df_r = dataset_to_df(ds_r_filt, 'atmosphereSingleLayer', var_r, grid_id, time)
#     df_pwat = dataset_to_df(ds_pwat_filt, 'atmosphereSingleLayer', var_pwat, grid_id, time)

#     #PBL
#     df_u = dataset_to_df(ds_u_filt, 'planetaryBoundaryLayer', var_u, grid_id, time)
#     df_v = dataset_to_df(ds_v_filt, 'planetaryBoundaryLayer', var_v, grid_id, time)
#     df_VRATE = dataset_to_df(ds_VRATE_filt, 'planetaryBoundaryLayer', var_VRATE, grid_id, time)


#     #Now join all fields into same df
#     data_frames = [df_t, df_hpbl, df_hindex, df_gust, df_r, df_pwat, df_u, df_v, df_VRATE]

#     joined_df_current = reduce(lambda left,right: pd.merge(left,right, on=["latitude", "longitude", "grid_id"],
#                         how = 'left'), data_frames)

#     # joined_df_current = pd.merge(df_t, df_pbl, on = ["latitude", "longitude", "grid_id"], how = "left")
#     # joined_df_current = pd.merge(joined_df_current, df_r, on = ["latitude", "longitude", "grid_id"], how = "left")

#     #Now concatenate current dataframe into final dataframe
#     if daily_df.empty: 
#         daily_df = joined_df_current
#     else: 
#         daily_df = pd.concat([daily_df, joined_df_current], axis = 0)

#      #Now we delete the .grib2 file so as to save memory. Otherwise, we'd be storing ~1tb of data. 
#     if os.path.exists(outfile):
#         os.remove(outfile)
#     if os.path.exists(outfile + '.923a8.idx'):
#         os.remove(outfile + '.923a8.idx')

#     #Join the different forecast time dataframes together so that we have one column per forecast time. 
#     if final_df.empty: 
#         final_df = daily_df
#     else: 
#         final_df = pd.merge(final_df, daily_df, on = ["latitude", "longitude", "grid_id"], how = "left")

# #final_df = final_df.groupby(by = 'grid_id').mean()
# final_df.insert(0, 'date', year + '-' + month + '-' + day)
# #final_df.reset_index(drop=True, inplace=True)


# #Convert to final_df to parquet, with the appropriate metadata in file name (will extract as field names later). 
# out_parquet =  'gfs.0p25.' + year + month + day + '.f006.parquet'
# final_df
# #For now just upload to s3
# # filepath = '../train/GFS/parquet/' + out_parquet
# # final_df.to_parquet(path = filepath, engine = 'pyarrow')

In [31]:
def run_gfs_pipeline(years, months, days, times): 
    '''Pipeline the GFS data for specified times and specified variables, at the specified levels.
       Output will be a parquet file with 1 row per unique lat/lon combination (within regions of interest). 
       Forecast times will be added column-wise so that there are 4 forecasts per variable per day per row.'''
    for year in years: 
        for month in months: 
            for day in days: 
                final_df = pd.DataFrame()
                for time in times:
                    #In case file doesn't exist for a given date / time, continue to next day/time. 
                    try: 
                        #Download .glib2 file (temporarily) to scrape the desired fields. We will delete after use. 
                        file = year + '/' + year + month + day + '/gfs.0p25.' + year + month + day + time + '.f006.grib2'
                        filename = dspath + file
                        outfile = save_dir + os.path.basename(filename) 
                        print('Downloading', file)
                        req = requests.get(filename, cookies = ret.cookies, allow_redirects=True)
                        open(outfile, 'wb').write(req.content)
                        filelist_arr = [save_dir + os.path.basename(file)]
                        selected_file = widgets.Dropdown(options=filelist_arr, description='data file')
                        display(selected_file)

                        #Now we use xarray to open the file by the type_of_level we are interested in 
                        type_of_level1 = 'surface' # for Temperature and Planetary Boundary Layer Height, land cover, haines index, max surface wind speed 
                        type_of_level2 = 'atmosphereSingleLayer' # for Relative Humidity and preciptable water
                        type_of_level3 = 'planetaryBoundaryLayer' # For U and V wind, and ventilation rate
                        ds_level_surface = xarray.open_dataset(selected_file.value, filter_by_keys={'typeOfLevel': type_of_level1, 'stepType': 'instant'}, engine="cfgrib")
                        ds_level_atmosphere = xarray.open_dataset(selected_file.value, filter_by_keys={'typeOfLevel': type_of_level2}, engine="cfgrib")
                        ds_level_pbl = xarray.open_dataset(selected_file.value, filter_by_keys={'typeOfLevel': type_of_level3}, engine="cfgrib")

                        #Define surface variable names
                        var_t = 't' #Temperature (K) 
                        var_hpbl = 'hpbl' #Planetary Boundary Layer Height (m)
                        # var_landn = 'landn' #Land cover (0=sea, 1=land) (Proportion)
                        var_hindex = 'hindex' #Haines index (fire index) 
                        var_gust = 'gust' #Wind speed (gust) (m s-1)

                        #Define atmosphere single Layer variables
                        var_r = 'r' #Relative Humidity %
                        var_pwat = 'pwat' #Precipitable water (kg m-2)

                        #Planetary Boundary Layer
                        var_u = 'u' #u-component of wind (m s-1)
                        var_v = 'v' #v-component of wind (m s-1)
                        var_VRATE = 'VRATE' #Ventilation rate (m2 s-1)

                        #Define filtered datasets (for each variable). 
                        #Surface 
                        ds_t = ds_level_surface[var_t] 
                        ds_hpbl = ds_level_surface[var_hpbl]
                        # ds_landn = ds_level_surface[var_landn]
                        ds_hindex = ds_level_surface[var_hindex]
                        ds_gust = ds_level_surface[var_gust]

                        #Atmosphere
                        ds_r = ds_level_atmosphere[var_r]
                        ds_pwat = ds_level_atmosphere[var_pwat]

                        #Planetary Boundary Layer
                        ds_u = ds_level_pbl[var_u]
                        ds_v = ds_level_pbl[var_v]
                        ds_VRATE =ds_level_pbl[var_VRATE]


                        #Initialize empty dataframe to append each regional dataframe to
                        daily_df = pd.DataFrame()
                        for index, row in df_grids_clean.iterrows(): 
                            grid_id = row['grid_id']
                            min_lat = row['min_lat']
                            max_lat = row['max_lat']
                            min_lon = row['min_lon']
                            max_lon = row['max_lon']

                            #Filter to bounds of 5x5km regions of interest. 
                            #Surface
                            ds_t_filt = subset_dataset(ds_t, min_lat, max_lat, min_lon, max_lon)
                            ds_hpbl_filt = subset_dataset(ds_hpbl, min_lat, max_lat, min_lon, max_lon)
                            # ds_landn_filt = subset_dataset(ds_landn, min_lat, max_lat, min_lon, max_lon)
                            ds_hindex_filt = subset_dataset(ds_hindex, min_lat, max_lat, min_lon, max_lon)
                            ds_gust_filt = subset_dataset(ds_gust, min_lat, max_lat, min_lon, max_lon)

                            #Atmosphere
                            ds_r_filt = subset_dataset(ds_r, min_lat, max_lat, min_lon, max_lon)
                            ds_pwat_filt = subset_dataset(ds_pwat, min_lat, max_lat, min_lon, max_lon)

                            #PBL
                            ds_u_filt = subset_dataset(ds_u, min_lat, max_lat, min_lon, max_lon)
                            ds_v_filt = subset_dataset(ds_v, min_lat, max_lat, min_lon, max_lon)
                            ds_VRATE_filt = subset_dataset(ds_VRATE, min_lat, max_lat, min_lon, max_lon)


                            #Make sure we preserve the type of level (atmospheric) of the observation to preserve metadata within the variable names
                            ##Surface
                            df_t = dataset_to_df(ds_t_filt, 'surface', var_t, grid_id, time)
                            df_hpbl = dataset_to_df(ds_hpbl_filt, 'surface', var_hpbl, grid_id, time)
                            # df_landn = dataset_to_df(ds_landn_filt, 'surface', var_landn, grid_id, time)
                            df_hindex = dataset_to_df(ds_hindex_filt, 'surface', var_hindex, grid_id, time)
                            df_gust = dataset_to_df(ds_gust_filt, 'surface', var_gust, grid_id, time)

                            #Atmosphere
                            df_r = dataset_to_df(ds_r_filt, 'atmosphereSingleLayer', var_r, grid_id, time)
                            df_pwat = dataset_to_df(ds_pwat_filt, 'atmosphereSingleLayer', var_pwat, grid_id, time)

                            #PBL
                            df_u = dataset_to_df(ds_u_filt, 'planetaryBoundaryLayer', var_u, grid_id, time)
                            df_v = dataset_to_df(ds_v_filt, 'planetaryBoundaryLayer', var_v, grid_id, time)
                            df_VRATE = dataset_to_df(ds_VRATE_filt, 'planetaryBoundaryLayer', var_VRATE, grid_id, time)


                            #Now join all fields into same df
                            data_frames = [df_t, df_hpbl, df_hindex, df_gust, df_r, df_pwat, df_u, df_v, df_VRATE]

                            joined_df_current = reduce(lambda left,right: pd.merge(left,right, on=["latitude", "longitude", "grid_id"],
                                                how = 'left'), data_frames)

                            # joined_df_current = pd.merge(df_t, df_pbl, on = ["latitude", "longitude", "grid_id"], how = "left")
                            # joined_df_current = pd.merge(joined_df_current, df_r, on = ["latitude", "longitude", "grid_id"], how = "left")

                            #Now concatenate current dataframe into final dataframe
                            if daily_df.empty: 
                                daily_df = joined_df_current
                            else: 
                                daily_df = pd.concat([daily_df, joined_df_current], axis = 0)

                         #Now we delete the .grib2 file so as to save memory. Otherwise, we'd be storing ~1tb of data. 
                        if os.path.exists(outfile):
                            os.remove(outfile)
                        if os.path.exists(outfile + '.923a8.idx'):
                            os.remove(outfile + '.923a8.idx')

                        #Join the different forecast time dataframes together so that we have one column per forecast time. 
                        if final_df.empty: 
                            final_df = daily_df
                        else: 
                            final_df = pd.merge(final_df, daily_df, on = ["latitude", "longitude", "grid_id"], how = "left")
                    except: 
                        continue
                try: 
                    #final_df = final_df.groupby(by = 'grid_id').mean()
                    final_df.insert(0, 'date', year + '-' + month + '-' + day)
                    #final_df.reset_index(drop=True, inplace=True)


                    #Convert to final_df to parquet, with the appropriate metadata in file name (will extract as field names later). 
                    out_parquet =  'gfs.0p25.' + year + month + day + '.f006.parquet'

                    #For now just upload to s3
                    filepath = '../train/GFS/parquet/' + out_parquet
                    final_df.to_parquet(path = filepath, engine = 'pyarrow')

                    #Put file in read mode so we can upload to s3 / Databricks storage bucket. 
                    with open(filepath, 'rb') as data:
                        s3.upload_fileobj(data, 'capstone-particulate-storage', out_parquet)
                        
                except: 
                    continue

                        
    print("Pipeline complete.")

In [None]:
%%time
#Iterate through all file names. 
# for year in ['2018', '2019', '2020']: 
#     for month in ['01', '02', '03', '04', '05', '06', '07', '08', '09', '10', '11', '12']: 
#         for day in ['01', '02', '03', '04', '05', '06', '07', '08', '09', '10', '11', '12', '13', '14', '15', '16', '17', '18', '19', '20', '21', '22', '23', '24', '25', '26', '27', '28', '29', '30', '31']: 
#             for time in ['00', '06', '12', '18']: 


years = ['2019']
months = ['07','08', '09', '10', '11', '12']
days = ['01', '02', '03', '04', '05', '06', '07', '08', '09', '10', '11', '12', '13', '14', '15', '16', '17', '18', '19', '20', '21', '22', '23', '24', '25', '26', '27', '28', '29', '30', '31']
times = ['00', '06', '12', '18']

run_gfs_pipeline(years, months, days, times)

Downloading 2019/20190701/gfs.0p25.2019070100.f006.grib2


Dropdown(description='data file', options=('/local/train/GFS/gfs.0p25.2019070100.f006.grib2',), value='/local/…

Ignoring index file '/local/train/GFS/gfs.0p25.2019070100.f006.grib2.923a8.idx' incompatible with GRIB file
Ignoring index file '/local/train/GFS/gfs.0p25.2019070100.f006.grib2.923a8.idx' incompatible with GRIB file


Downloading 2019/20190701/gfs.0p25.2019070106.f006.grib2


Dropdown(description='data file', options=('/local/train/GFS/gfs.0p25.2019070106.f006.grib2',), value='/local/…

Ignoring index file '/local/train/GFS/gfs.0p25.2019070106.f006.grib2.923a8.idx' incompatible with GRIB file
Ignoring index file '/local/train/GFS/gfs.0p25.2019070106.f006.grib2.923a8.idx' incompatible with GRIB file


Downloading 2019/20190701/gfs.0p25.2019070112.f006.grib2


Dropdown(description='data file', options=('/local/train/GFS/gfs.0p25.2019070112.f006.grib2',), value='/local/…

Ignoring index file '/local/train/GFS/gfs.0p25.2019070112.f006.grib2.923a8.idx' incompatible with GRIB file
Ignoring index file '/local/train/GFS/gfs.0p25.2019070112.f006.grib2.923a8.idx' incompatible with GRIB file


Downloading 2019/20190701/gfs.0p25.2019070118.f006.grib2


Dropdown(description='data file', options=('/local/train/GFS/gfs.0p25.2019070118.f006.grib2',), value='/local/…

Ignoring index file '/local/train/GFS/gfs.0p25.2019070118.f006.grib2.923a8.idx' incompatible with GRIB file
Ignoring index file '/local/train/GFS/gfs.0p25.2019070118.f006.grib2.923a8.idx' incompatible with GRIB file


Downloading 2019/20190702/gfs.0p25.2019070200.f006.grib2


Dropdown(description='data file', options=('/local/train/GFS/gfs.0p25.2019070200.f006.grib2',), value='/local/…

Ignoring index file '/local/train/GFS/gfs.0p25.2019070200.f006.grib2.923a8.idx' incompatible with GRIB file
Ignoring index file '/local/train/GFS/gfs.0p25.2019070200.f006.grib2.923a8.idx' incompatible with GRIB file


Downloading 2019/20190702/gfs.0p25.2019070206.f006.grib2


Dropdown(description='data file', options=('/local/train/GFS/gfs.0p25.2019070206.f006.grib2',), value='/local/…

Ignoring index file '/local/train/GFS/gfs.0p25.2019070206.f006.grib2.923a8.idx' incompatible with GRIB file
Ignoring index file '/local/train/GFS/gfs.0p25.2019070206.f006.grib2.923a8.idx' incompatible with GRIB file


Downloading 2019/20190702/gfs.0p25.2019070212.f006.grib2


Dropdown(description='data file', options=('/local/train/GFS/gfs.0p25.2019070212.f006.grib2',), value='/local/…

Ignoring index file '/local/train/GFS/gfs.0p25.2019070212.f006.grib2.923a8.idx' incompatible with GRIB file
Ignoring index file '/local/train/GFS/gfs.0p25.2019070212.f006.grib2.923a8.idx' incompatible with GRIB file


Downloading 2019/20190702/gfs.0p25.2019070218.f006.grib2


Dropdown(description='data file', options=('/local/train/GFS/gfs.0p25.2019070218.f006.grib2',), value='/local/…

Ignoring index file '/local/train/GFS/gfs.0p25.2019070218.f006.grib2.923a8.idx' incompatible with GRIB file
Ignoring index file '/local/train/GFS/gfs.0p25.2019070218.f006.grib2.923a8.idx' incompatible with GRIB file


Downloading 2019/20190703/gfs.0p25.2019070300.f006.grib2


Dropdown(description='data file', options=('/local/train/GFS/gfs.0p25.2019070300.f006.grib2',), value='/local/…

Ignoring index file '/local/train/GFS/gfs.0p25.2019070300.f006.grib2.923a8.idx' incompatible with GRIB file
Ignoring index file '/local/train/GFS/gfs.0p25.2019070300.f006.grib2.923a8.idx' incompatible with GRIB file


Downloading 2019/20190703/gfs.0p25.2019070306.f006.grib2


Dropdown(description='data file', options=('/local/train/GFS/gfs.0p25.2019070306.f006.grib2',), value='/local/…

Ignoring index file '/local/train/GFS/gfs.0p25.2019070306.f006.grib2.923a8.idx' incompatible with GRIB file
Ignoring index file '/local/train/GFS/gfs.0p25.2019070306.f006.grib2.923a8.idx' incompatible with GRIB file


Downloading 2019/20190703/gfs.0p25.2019070312.f006.grib2


Dropdown(description='data file', options=('/local/train/GFS/gfs.0p25.2019070312.f006.grib2',), value='/local/…

Ignoring index file '/local/train/GFS/gfs.0p25.2019070312.f006.grib2.923a8.idx' incompatible with GRIB file
Ignoring index file '/local/train/GFS/gfs.0p25.2019070312.f006.grib2.923a8.idx' incompatible with GRIB file


Downloading 2019/20190703/gfs.0p25.2019070318.f006.grib2


Dropdown(description='data file', options=('/local/train/GFS/gfs.0p25.2019070318.f006.grib2',), value='/local/…

Ignoring index file '/local/train/GFS/gfs.0p25.2019070318.f006.grib2.923a8.idx' incompatible with GRIB file
Ignoring index file '/local/train/GFS/gfs.0p25.2019070318.f006.grib2.923a8.idx' incompatible with GRIB file


Downloading 2019/20190704/gfs.0p25.2019070400.f006.grib2


Dropdown(description='data file', options=('/local/train/GFS/gfs.0p25.2019070400.f006.grib2',), value='/local/…

Ignoring index file '/local/train/GFS/gfs.0p25.2019070400.f006.grib2.923a8.idx' incompatible with GRIB file
Ignoring index file '/local/train/GFS/gfs.0p25.2019070400.f006.grib2.923a8.idx' incompatible with GRIB file


Downloading 2019/20190704/gfs.0p25.2019070406.f006.grib2


Dropdown(description='data file', options=('/local/train/GFS/gfs.0p25.2019070406.f006.grib2',), value='/local/…

Ignoring index file '/local/train/GFS/gfs.0p25.2019070406.f006.grib2.923a8.idx' incompatible with GRIB file
Ignoring index file '/local/train/GFS/gfs.0p25.2019070406.f006.grib2.923a8.idx' incompatible with GRIB file


Downloading 2019/20190704/gfs.0p25.2019070412.f006.grib2


Dropdown(description='data file', options=('/local/train/GFS/gfs.0p25.2019070412.f006.grib2',), value='/local/…

Ignoring index file '/local/train/GFS/gfs.0p25.2019070412.f006.grib2.923a8.idx' incompatible with GRIB file
Ignoring index file '/local/train/GFS/gfs.0p25.2019070412.f006.grib2.923a8.idx' incompatible with GRIB file


Downloading 2019/20190704/gfs.0p25.2019070418.f006.grib2


Dropdown(description='data file', options=('/local/train/GFS/gfs.0p25.2019070418.f006.grib2',), value='/local/…

Ignoring index file '/local/train/GFS/gfs.0p25.2019070418.f006.grib2.923a8.idx' incompatible with GRIB file
Ignoring index file '/local/train/GFS/gfs.0p25.2019070418.f006.grib2.923a8.idx' incompatible with GRIB file


Downloading 2019/20190705/gfs.0p25.2019070500.f006.grib2


Dropdown(description='data file', options=('/local/train/GFS/gfs.0p25.2019070500.f006.grib2',), value='/local/…

Ignoring index file '/local/train/GFS/gfs.0p25.2019070500.f006.grib2.923a8.idx' incompatible with GRIB file
Ignoring index file '/local/train/GFS/gfs.0p25.2019070500.f006.grib2.923a8.idx' incompatible with GRIB file


Downloading 2019/20190705/gfs.0p25.2019070506.f006.grib2


Dropdown(description='data file', options=('/local/train/GFS/gfs.0p25.2019070506.f006.grib2',), value='/local/…

Ignoring index file '/local/train/GFS/gfs.0p25.2019070506.f006.grib2.923a8.idx' incompatible with GRIB file
Ignoring index file '/local/train/GFS/gfs.0p25.2019070506.f006.grib2.923a8.idx' incompatible with GRIB file


Downloading 2019/20190705/gfs.0p25.2019070512.f006.grib2


Dropdown(description='data file', options=('/local/train/GFS/gfs.0p25.2019070512.f006.grib2',), value='/local/…

Ignoring index file '/local/train/GFS/gfs.0p25.2019070512.f006.grib2.923a8.idx' incompatible with GRIB file
Ignoring index file '/local/train/GFS/gfs.0p25.2019070512.f006.grib2.923a8.idx' incompatible with GRIB file


Downloading 2019/20190705/gfs.0p25.2019070518.f006.grib2


Dropdown(description='data file', options=('/local/train/GFS/gfs.0p25.2019070518.f006.grib2',), value='/local/…

Ignoring index file '/local/train/GFS/gfs.0p25.2019070518.f006.grib2.923a8.idx' incompatible with GRIB file
Ignoring index file '/local/train/GFS/gfs.0p25.2019070518.f006.grib2.923a8.idx' incompatible with GRIB file


Downloading 2019/20190706/gfs.0p25.2019070600.f006.grib2


Dropdown(description='data file', options=('/local/train/GFS/gfs.0p25.2019070600.f006.grib2',), value='/local/…

Ignoring index file '/local/train/GFS/gfs.0p25.2019070600.f006.grib2.923a8.idx' incompatible with GRIB file
Ignoring index file '/local/train/GFS/gfs.0p25.2019070600.f006.grib2.923a8.idx' incompatible with GRIB file


Downloading 2019/20190706/gfs.0p25.2019070606.f006.grib2


Dropdown(description='data file', options=('/local/train/GFS/gfs.0p25.2019070606.f006.grib2',), value='/local/…

Ignoring index file '/local/train/GFS/gfs.0p25.2019070606.f006.grib2.923a8.idx' incompatible with GRIB file
Ignoring index file '/local/train/GFS/gfs.0p25.2019070606.f006.grib2.923a8.idx' incompatible with GRIB file


Downloading 2019/20190706/gfs.0p25.2019070612.f006.grib2


Dropdown(description='data file', options=('/local/train/GFS/gfs.0p25.2019070612.f006.grib2',), value='/local/…

Ignoring index file '/local/train/GFS/gfs.0p25.2019070612.f006.grib2.923a8.idx' incompatible with GRIB file
Ignoring index file '/local/train/GFS/gfs.0p25.2019070612.f006.grib2.923a8.idx' incompatible with GRIB file


Downloading 2019/20190706/gfs.0p25.2019070618.f006.grib2


Dropdown(description='data file', options=('/local/train/GFS/gfs.0p25.2019070618.f006.grib2',), value='/local/…

Ignoring index file '/local/train/GFS/gfs.0p25.2019070618.f006.grib2.923a8.idx' incompatible with GRIB file
Ignoring index file '/local/train/GFS/gfs.0p25.2019070618.f006.grib2.923a8.idx' incompatible with GRIB file


Downloading 2019/20190707/gfs.0p25.2019070700.f006.grib2


Dropdown(description='data file', options=('/local/train/GFS/gfs.0p25.2019070700.f006.grib2',), value='/local/…

Ignoring index file '/local/train/GFS/gfs.0p25.2019070700.f006.grib2.923a8.idx' incompatible with GRIB file
Ignoring index file '/local/train/GFS/gfs.0p25.2019070700.f006.grib2.923a8.idx' incompatible with GRIB file


Downloading 2019/20190707/gfs.0p25.2019070706.f006.grib2


Dropdown(description='data file', options=('/local/train/GFS/gfs.0p25.2019070706.f006.grib2',), value='/local/…

Ignoring index file '/local/train/GFS/gfs.0p25.2019070706.f006.grib2.923a8.idx' incompatible with GRIB file
Ignoring index file '/local/train/GFS/gfs.0p25.2019070706.f006.grib2.923a8.idx' incompatible with GRIB file


Downloading 2019/20190707/gfs.0p25.2019070712.f006.grib2


Dropdown(description='data file', options=('/local/train/GFS/gfs.0p25.2019070712.f006.grib2',), value='/local/…

Ignoring index file '/local/train/GFS/gfs.0p25.2019070712.f006.grib2.923a8.idx' incompatible with GRIB file
Ignoring index file '/local/train/GFS/gfs.0p25.2019070712.f006.grib2.923a8.idx' incompatible with GRIB file


Downloading 2019/20190707/gfs.0p25.2019070718.f006.grib2


Dropdown(description='data file', options=('/local/train/GFS/gfs.0p25.2019070718.f006.grib2',), value='/local/…

Ignoring index file '/local/train/GFS/gfs.0p25.2019070718.f006.grib2.923a8.idx' incompatible with GRIB file
Ignoring index file '/local/train/GFS/gfs.0p25.2019070718.f006.grib2.923a8.idx' incompatible with GRIB file


Downloading 2019/20190708/gfs.0p25.2019070800.f006.grib2


Dropdown(description='data file', options=('/local/train/GFS/gfs.0p25.2019070800.f006.grib2',), value='/local/…

Ignoring index file '/local/train/GFS/gfs.0p25.2019070800.f006.grib2.923a8.idx' incompatible with GRIB file
Ignoring index file '/local/train/GFS/gfs.0p25.2019070800.f006.grib2.923a8.idx' incompatible with GRIB file


Downloading 2019/20190708/gfs.0p25.2019070806.f006.grib2


Dropdown(description='data file', options=('/local/train/GFS/gfs.0p25.2019070806.f006.grib2',), value='/local/…

Ignoring index file '/local/train/GFS/gfs.0p25.2019070806.f006.grib2.923a8.idx' incompatible with GRIB file
Ignoring index file '/local/train/GFS/gfs.0p25.2019070806.f006.grib2.923a8.idx' incompatible with GRIB file


Downloading 2019/20190708/gfs.0p25.2019070812.f006.grib2


Dropdown(description='data file', options=('/local/train/GFS/gfs.0p25.2019070812.f006.grib2',), value='/local/…

Ignoring index file '/local/train/GFS/gfs.0p25.2019070812.f006.grib2.923a8.idx' incompatible with GRIB file
Ignoring index file '/local/train/GFS/gfs.0p25.2019070812.f006.grib2.923a8.idx' incompatible with GRIB file


Downloading 2019/20190708/gfs.0p25.2019070818.f006.grib2


Dropdown(description='data file', options=('/local/train/GFS/gfs.0p25.2019070818.f006.grib2',), value='/local/…

Ignoring index file '/local/train/GFS/gfs.0p25.2019070818.f006.grib2.923a8.idx' incompatible with GRIB file
Ignoring index file '/local/train/GFS/gfs.0p25.2019070818.f006.grib2.923a8.idx' incompatible with GRIB file


Downloading 2019/20190709/gfs.0p25.2019070900.f006.grib2


Dropdown(description='data file', options=('/local/train/GFS/gfs.0p25.2019070900.f006.grib2',), value='/local/…

Ignoring index file '/local/train/GFS/gfs.0p25.2019070900.f006.grib2.923a8.idx' incompatible with GRIB file
Ignoring index file '/local/train/GFS/gfs.0p25.2019070900.f006.grib2.923a8.idx' incompatible with GRIB file


Downloading 2019/20190709/gfs.0p25.2019070906.f006.grib2


Dropdown(description='data file', options=('/local/train/GFS/gfs.0p25.2019070906.f006.grib2',), value='/local/…

Ignoring index file '/local/train/GFS/gfs.0p25.2019070906.f006.grib2.923a8.idx' incompatible with GRIB file
Ignoring index file '/local/train/GFS/gfs.0p25.2019070906.f006.grib2.923a8.idx' incompatible with GRIB file


Downloading 2019/20190709/gfs.0p25.2019070912.f006.grib2


Dropdown(description='data file', options=('/local/train/GFS/gfs.0p25.2019070912.f006.grib2',), value='/local/…

Ignoring index file '/local/train/GFS/gfs.0p25.2019070912.f006.grib2.923a8.idx' incompatible with GRIB file
Ignoring index file '/local/train/GFS/gfs.0p25.2019070912.f006.grib2.923a8.idx' incompatible with GRIB file


Downloading 2019/20190709/gfs.0p25.2019070918.f006.grib2


Dropdown(description='data file', options=('/local/train/GFS/gfs.0p25.2019070918.f006.grib2',), value='/local/…

Ignoring index file '/local/train/GFS/gfs.0p25.2019070918.f006.grib2.923a8.idx' incompatible with GRIB file
Ignoring index file '/local/train/GFS/gfs.0p25.2019070918.f006.grib2.923a8.idx' incompatible with GRIB file


Downloading 2019/20190710/gfs.0p25.2019071000.f006.grib2


Dropdown(description='data file', options=('/local/train/GFS/gfs.0p25.2019071000.f006.grib2',), value='/local/…

Ignoring index file '/local/train/GFS/gfs.0p25.2019071000.f006.grib2.923a8.idx' incompatible with GRIB file
Ignoring index file '/local/train/GFS/gfs.0p25.2019071000.f006.grib2.923a8.idx' incompatible with GRIB file


Downloading 2019/20190710/gfs.0p25.2019071006.f006.grib2


Dropdown(description='data file', options=('/local/train/GFS/gfs.0p25.2019071006.f006.grib2',), value='/local/…

Ignoring index file '/local/train/GFS/gfs.0p25.2019071006.f006.grib2.923a8.idx' incompatible with GRIB file
Ignoring index file '/local/train/GFS/gfs.0p25.2019071006.f006.grib2.923a8.idx' incompatible with GRIB file


Downloading 2019/20190710/gfs.0p25.2019071012.f006.grib2


Dropdown(description='data file', options=('/local/train/GFS/gfs.0p25.2019071012.f006.grib2',), value='/local/…

Ignoring index file '/local/train/GFS/gfs.0p25.2019071012.f006.grib2.923a8.idx' incompatible with GRIB file
Ignoring index file '/local/train/GFS/gfs.0p25.2019071012.f006.grib2.923a8.idx' incompatible with GRIB file


Downloading 2019/20190710/gfs.0p25.2019071018.f006.grib2


Dropdown(description='data file', options=('/local/train/GFS/gfs.0p25.2019071018.f006.grib2',), value='/local/…

Ignoring index file '/local/train/GFS/gfs.0p25.2019071018.f006.grib2.923a8.idx' incompatible with GRIB file
Ignoring index file '/local/train/GFS/gfs.0p25.2019071018.f006.grib2.923a8.idx' incompatible with GRIB file


Downloading 2019/20190711/gfs.0p25.2019071100.f006.grib2


Dropdown(description='data file', options=('/local/train/GFS/gfs.0p25.2019071100.f006.grib2',), value='/local/…

Ignoring index file '/local/train/GFS/gfs.0p25.2019071100.f006.grib2.923a8.idx' incompatible with GRIB file
Ignoring index file '/local/train/GFS/gfs.0p25.2019071100.f006.grib2.923a8.idx' incompatible with GRIB file


Downloading 2019/20190711/gfs.0p25.2019071106.f006.grib2


Dropdown(description='data file', options=('/local/train/GFS/gfs.0p25.2019071106.f006.grib2',), value='/local/…

Ignoring index file '/local/train/GFS/gfs.0p25.2019071106.f006.grib2.923a8.idx' incompatible with GRIB file
Ignoring index file '/local/train/GFS/gfs.0p25.2019071106.f006.grib2.923a8.idx' incompatible with GRIB file


Downloading 2019/20190711/gfs.0p25.2019071112.f006.grib2


Dropdown(description='data file', options=('/local/train/GFS/gfs.0p25.2019071112.f006.grib2',), value='/local/…

Ignoring index file '/local/train/GFS/gfs.0p25.2019071112.f006.grib2.923a8.idx' incompatible with GRIB file
Ignoring index file '/local/train/GFS/gfs.0p25.2019071112.f006.grib2.923a8.idx' incompatible with GRIB file


Downloading 2019/20190711/gfs.0p25.2019071118.f006.grib2


Dropdown(description='data file', options=('/local/train/GFS/gfs.0p25.2019071118.f006.grib2',), value='/local/…

Ignoring index file '/local/train/GFS/gfs.0p25.2019071118.f006.grib2.923a8.idx' incompatible with GRIB file
Ignoring index file '/local/train/GFS/gfs.0p25.2019071118.f006.grib2.923a8.idx' incompatible with GRIB file


Downloading 2019/20190712/gfs.0p25.2019071200.f006.grib2


Dropdown(description='data file', options=('/local/train/GFS/gfs.0p25.2019071200.f006.grib2',), value='/local/…

Ignoring index file '/local/train/GFS/gfs.0p25.2019071200.f006.grib2.923a8.idx' incompatible with GRIB file
Ignoring index file '/local/train/GFS/gfs.0p25.2019071200.f006.grib2.923a8.idx' incompatible with GRIB file


Downloading 2019/20190712/gfs.0p25.2019071206.f006.grib2


Dropdown(description='data file', options=('/local/train/GFS/gfs.0p25.2019071206.f006.grib2',), value='/local/…

Ignoring index file '/local/train/GFS/gfs.0p25.2019071206.f006.grib2.923a8.idx' incompatible with GRIB file
Ignoring index file '/local/train/GFS/gfs.0p25.2019071206.f006.grib2.923a8.idx' incompatible with GRIB file


Downloading 2019/20190712/gfs.0p25.2019071212.f006.grib2


Dropdown(description='data file', options=('/local/train/GFS/gfs.0p25.2019071212.f006.grib2',), value='/local/…

Ignoring index file '/local/train/GFS/gfs.0p25.2019071212.f006.grib2.923a8.idx' incompatible with GRIB file
Ignoring index file '/local/train/GFS/gfs.0p25.2019071212.f006.grib2.923a8.idx' incompatible with GRIB file


Downloading 2019/20190712/gfs.0p25.2019071218.f006.grib2


Dropdown(description='data file', options=('/local/train/GFS/gfs.0p25.2019071218.f006.grib2',), value='/local/…

Ignoring index file '/local/train/GFS/gfs.0p25.2019071218.f006.grib2.923a8.idx' incompatible with GRIB file
Ignoring index file '/local/train/GFS/gfs.0p25.2019071218.f006.grib2.923a8.idx' incompatible with GRIB file


Downloading 2019/20190713/gfs.0p25.2019071300.f006.grib2


Dropdown(description='data file', options=('/local/train/GFS/gfs.0p25.2019071300.f006.grib2',), value='/local/…

Ignoring index file '/local/train/GFS/gfs.0p25.2019071300.f006.grib2.923a8.idx' incompatible with GRIB file
Ignoring index file '/local/train/GFS/gfs.0p25.2019071300.f006.grib2.923a8.idx' incompatible with GRIB file


Downloading 2019/20190713/gfs.0p25.2019071306.f006.grib2


Dropdown(description='data file', options=('/local/train/GFS/gfs.0p25.2019071306.f006.grib2',), value='/local/…

Ignoring index file '/local/train/GFS/gfs.0p25.2019071306.f006.grib2.923a8.idx' incompatible with GRIB file
Ignoring index file '/local/train/GFS/gfs.0p25.2019071306.f006.grib2.923a8.idx' incompatible with GRIB file


Downloading 2019/20190713/gfs.0p25.2019071312.f006.grib2


Dropdown(description='data file', options=('/local/train/GFS/gfs.0p25.2019071312.f006.grib2',), value='/local/…

Ignoring index file '/local/train/GFS/gfs.0p25.2019071312.f006.grib2.923a8.idx' incompatible with GRIB file
Ignoring index file '/local/train/GFS/gfs.0p25.2019071312.f006.grib2.923a8.idx' incompatible with GRIB file


Downloading 2019/20190713/gfs.0p25.2019071318.f006.grib2


Dropdown(description='data file', options=('/local/train/GFS/gfs.0p25.2019071318.f006.grib2',), value='/local/…

Ignoring index file '/local/train/GFS/gfs.0p25.2019071318.f006.grib2.923a8.idx' incompatible with GRIB file
Ignoring index file '/local/train/GFS/gfs.0p25.2019071318.f006.grib2.923a8.idx' incompatible with GRIB file


Downloading 2019/20190714/gfs.0p25.2019071400.f006.grib2


Dropdown(description='data file', options=('/local/train/GFS/gfs.0p25.2019071400.f006.grib2',), value='/local/…

Ignoring index file '/local/train/GFS/gfs.0p25.2019071400.f006.grib2.923a8.idx' incompatible with GRIB file
Ignoring index file '/local/train/GFS/gfs.0p25.2019071400.f006.grib2.923a8.idx' incompatible with GRIB file


Downloading 2019/20190714/gfs.0p25.2019071406.f006.grib2


Dropdown(description='data file', options=('/local/train/GFS/gfs.0p25.2019071406.f006.grib2',), value='/local/…

Ignoring index file '/local/train/GFS/gfs.0p25.2019071406.f006.grib2.923a8.idx' incompatible with GRIB file
Ignoring index file '/local/train/GFS/gfs.0p25.2019071406.f006.grib2.923a8.idx' incompatible with GRIB file


Downloading 2019/20190714/gfs.0p25.2019071412.f006.grib2


Dropdown(description='data file', options=('/local/train/GFS/gfs.0p25.2019071412.f006.grib2',), value='/local/…

Ignoring index file '/local/train/GFS/gfs.0p25.2019071412.f006.grib2.923a8.idx' incompatible with GRIB file
Ignoring index file '/local/train/GFS/gfs.0p25.2019071412.f006.grib2.923a8.idx' incompatible with GRIB file


Downloading 2019/20190714/gfs.0p25.2019071418.f006.grib2


Dropdown(description='data file', options=('/local/train/GFS/gfs.0p25.2019071418.f006.grib2',), value='/local/…

Ignoring index file '/local/train/GFS/gfs.0p25.2019071418.f006.grib2.923a8.idx' incompatible with GRIB file
Ignoring index file '/local/train/GFS/gfs.0p25.2019071418.f006.grib2.923a8.idx' incompatible with GRIB file


Downloading 2019/20190715/gfs.0p25.2019071500.f006.grib2


Dropdown(description='data file', options=('/local/train/GFS/gfs.0p25.2019071500.f006.grib2',), value='/local/…

Ignoring index file '/local/train/GFS/gfs.0p25.2019071500.f006.grib2.923a8.idx' incompatible with GRIB file
Ignoring index file '/local/train/GFS/gfs.0p25.2019071500.f006.grib2.923a8.idx' incompatible with GRIB file


Downloading 2019/20190715/gfs.0p25.2019071506.f006.grib2


Dropdown(description='data file', options=('/local/train/GFS/gfs.0p25.2019071506.f006.grib2',), value='/local/…

Ignoring index file '/local/train/GFS/gfs.0p25.2019071506.f006.grib2.923a8.idx' incompatible with GRIB file
Ignoring index file '/local/train/GFS/gfs.0p25.2019071506.f006.grib2.923a8.idx' incompatible with GRIB file


Downloading 2019/20190715/gfs.0p25.2019071512.f006.grib2


Dropdown(description='data file', options=('/local/train/GFS/gfs.0p25.2019071512.f006.grib2',), value='/local/…

Ignoring index file '/local/train/GFS/gfs.0p25.2019071512.f006.grib2.923a8.idx' incompatible with GRIB file
Ignoring index file '/local/train/GFS/gfs.0p25.2019071512.f006.grib2.923a8.idx' incompatible with GRIB file


Downloading 2019/20190715/gfs.0p25.2019071518.f006.grib2


Dropdown(description='data file', options=('/local/train/GFS/gfs.0p25.2019071518.f006.grib2',), value='/local/…

Ignoring index file '/local/train/GFS/gfs.0p25.2019071518.f006.grib2.923a8.idx' incompatible with GRIB file
Ignoring index file '/local/train/GFS/gfs.0p25.2019071518.f006.grib2.923a8.idx' incompatible with GRIB file


Downloading 2019/20190716/gfs.0p25.2019071600.f006.grib2


Dropdown(description='data file', options=('/local/train/GFS/gfs.0p25.2019071600.f006.grib2',), value='/local/…

Ignoring index file '/local/train/GFS/gfs.0p25.2019071600.f006.grib2.923a8.idx' incompatible with GRIB file
Ignoring index file '/local/train/GFS/gfs.0p25.2019071600.f006.grib2.923a8.idx' incompatible with GRIB file


Downloading 2019/20190716/gfs.0p25.2019071606.f006.grib2


Dropdown(description='data file', options=('/local/train/GFS/gfs.0p25.2019071606.f006.grib2',), value='/local/…

Ignoring index file '/local/train/GFS/gfs.0p25.2019071606.f006.grib2.923a8.idx' incompatible with GRIB file
Ignoring index file '/local/train/GFS/gfs.0p25.2019071606.f006.grib2.923a8.idx' incompatible with GRIB file


Downloading 2019/20190716/gfs.0p25.2019071612.f006.grib2


Dropdown(description='data file', options=('/local/train/GFS/gfs.0p25.2019071612.f006.grib2',), value='/local/…

Ignoring index file '/local/train/GFS/gfs.0p25.2019071612.f006.grib2.923a8.idx' incompatible with GRIB file
Ignoring index file '/local/train/GFS/gfs.0p25.2019071612.f006.grib2.923a8.idx' incompatible with GRIB file


Downloading 2019/20190716/gfs.0p25.2019071618.f006.grib2


Dropdown(description='data file', options=('/local/train/GFS/gfs.0p25.2019071618.f006.grib2',), value='/local/…

Ignoring index file '/local/train/GFS/gfs.0p25.2019071618.f006.grib2.923a8.idx' incompatible with GRIB file
Ignoring index file '/local/train/GFS/gfs.0p25.2019071618.f006.grib2.923a8.idx' incompatible with GRIB file


Downloading 2019/20190717/gfs.0p25.2019071700.f006.grib2


In [None]:
# %%time
# #Now try to parellize the for loops 
# years = ['2018']
# months = ['02']
# days = ['01']
# times = ['00', '06', '12', '18']

# args = (years, months, days, times)

# def pool_handler(): 
#     p = Pool(4)
#     p.map(run_gfs_pipeline, args)
    
# if __name__ == '__main__':
#     pool_handler()


In [None]:
# %%time
# #Multiprocessing
# # #Now try to parellize the for loops 
# years = ['2018']
# months = ['02']
# days = ['01']
# times = ['00', '06', '12', '18']

# args = (years, months, days, times)

# task = ProcessWithLogAndControls(target=run_gfs_pipeline, args=(args), name="GFS Pipeline")
# task.run()

 #### Now to download the files

In [None]:
#TODOs: 
# Do this for each region and concatenate the 3 dataframes into one dataframe. (Do we want to add column with region labeled?). 
# Create strings for each possible filename (i.e. 01 through 31 for 01 through 12 months for 2018 to 2020 years). 
# Use Srishti's S3 bucket and add a test csv file to the bucket (so we don't have to store locally). 
# Pull file download, df creation, df to csv save to s3 (forecast time) and file deletion into one loop function (based on dates above). Quick exit if error bc date doesn't exist (i.e. 31).
# Make sure we can pass tuples or some combination for level and variable name into function so that we can quickly change variables included. 
# Add a timeit call to understand how long it takes to run end-to-end pipeline. 

In [34]:
out_parquet =  'gfs.0p25.' + '2019' + '06' + '13' + '.f006.parquet'
filepath = '../train/GFS/parquet/' + out_parquet
test_df = pd.read_parquet(filepath, engine='pyarrow')
test_df

Unnamed: 0_level_0,Unnamed: 1_level_0,date,grid_id,t_surface00,pbl_surface00,hindex_surface00,gust_surface00,r_atmosphere00,pwat_atmosphere00,u_pbl00,v_pbl00,...,vrate_pbl12,t_surface18,pbl_surface18,hindex_surface18,gust_surface18,r_atmosphere18,pwat_atmosphere18,u_pbl18,v_pbl18,vrate_pbl18
latitude,longitude,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1,Unnamed: 19_level_1,Unnamed: 20_level_1,Unnamed: 21_level_1,Unnamed: 22_level_1
25.00,301.50,2019-06-13,1X116,300.215790,978.826843,,4.700011,44.0,40.700001,-4.9302,-0.262259,...,5000.0,300.282898,877.404663,,5.527215,36.0,35.700001,-5.750403,-0.1078,5000.0
28.50,257.25,2019-06-13,1Z2W7,297.115784,125.626816,5.0,3.400011,30.0,25.299999,-2.9302,1.937741,...,6000.0,310.082886,2283.164795,6.0,4.127215,27.0,23.900000,-1.150403,3.3922,8000.0
33.75,62.00,2019-06-13,3S31A,316.815796,616.106812,6.0,5.500010,8.0,6.000000,-1.2302,-5.662259,...,4000.0,290.482880,108.284676,5.0,7.627215,10.0,7.700000,-2.950403,-7.2078,1000.0
28.50,257.00,2019-06-13,6EIL6,299.215790,574.186829,5.0,10.300011,31.0,27.100000,-7.5302,7.537741,...,9000.0,312.482880,2738.524658,6.0,3.827215,26.0,24.299999,-1.650403,2.8922,9000.0
28.50,257.00,2019-06-13,7334C,299.215790,574.186829,5.0,10.300011,31.0,27.100000,-7.5302,7.537741,...,9000.0,312.482880,2738.524658,6.0,3.827215,26.0,24.299999,-1.650403,2.8922,9000.0
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
28.75,257.00,2019-06-13,ZF3ZW,300.615784,444.026825,5.0,3.700011,32.0,28.299999,-3.0302,0.937741,...,6000.0,314.382874,2922.044678,6.0,2.627215,23.0,22.700001,-1.550403,2.4922,9000.0
34.25,62.25,2019-06-13,ZP1FZ,319.215790,930.906799,6.0,2.900011,9.0,7.400000,-0.3302,-2.262259,...,0.0,288.482880,24.444679,6.0,3.027215,12.0,9.500000,-1.650403,-1.5078,0.0
34.00,62.25,2019-06-13,ZP1FZ,315.515778,624.026794,6.0,4.600011,8.0,6.100000,-2.2302,-4.062259,...,0.0,287.982880,31.164679,5.0,3.727215,10.0,7.200000,-2.050403,-2.9078,0.0
33.75,62.75,2019-06-13,ZZ8JF,315.215790,1910.426758,5.0,2.100011,7.0,4.200000,0.6698,2.137741,...,0.0,283.482880,24.604679,5.0,3.227215,8.0,5.200000,-1.350403,-2.5078,0.0


In [14]:
['t_surface12']

latitude  longitude
25.00     301.50       301.441498
28.50     257.25       317.941498
33.75     62.00        290.741516
28.50     257.00       319.741516
          257.00       319.741516
                          ...    
28.75     257.00       318.141510
34.25     62.25        291.941498
34.00     62.25        289.041504
33.75     62.75        284.841492
33.50     62.75        289.941498
Name: t_surface12, Length: 84, dtype: float32

In [None]:
test_df.shape

In [None]:
pd.set_option('display.max_rows', 100)
test_df.to_csv('test.csv')

In [None]:
my_array_data = io.BytesIO()
pickle.dump(test_df, my_array_data)
my_array_data.seek(0)
s3.upload_fileobj(my_array_data, 'particulate-articulate-capstone','gfs_test.pkl')

In [None]:
#Try to download 
s3 = boto3.client('s3', config=Config(signature_version=UNSIGNED))
s3.download_file('particulate-articulate-capstone', 'trial1maiac.pkl', 'trial1maiac.pkl')

In [None]:
!pwd

In [None]:
test_df = pd.read_pickle('../train/train/gfs.0p25.20180201.f006.parquet')
test_df