In [1]:
import os
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import geopandas as gpd
import rioxarray as rxr
import rasterio 
import xarray as xr
import earthpy as et
from multiprocessing import Pool
from parallel_xarray import sample_tif
from get_dynamic_by_date import get_date_df, get_SAVI_date_df
from istarmap import istarmap
import tqdm
import pandas as pd 
# import elevation
# import richdem as rd
from shapely import geometry
import datetime
from dask import dataframe as dd

In [8]:
# Create a grid of points - 250m spacing 

# Read the shapefile
df = gpd.read_file('F:\\Tara_Fall_2019\\Kenya_Drought\\reference_spatial_files\\narok.shp')
# Reproject to projected coordinate system
df = df.to_crs('EPSG:3857')
# 250m spacing
spacing = 250
# get the bounds
xmin, ymin, xmax, ymax = df.total_bounds

# pull the x and y coordinates
xcoords = [i for i in np.arange(xmin, xmax, spacing)]
ycoords = [i for i in np.arange(ymin, ymax, spacing)]

pointcoords = np.array(np.meshgrid(xcoords, ycoords)).T.reshape(-1, 2) #A 2D array like [[x1,y1], [x1,y2], ...
points = gpd.points_from_xy(x=pointcoords[:,0], y=pointcoords[:,1])
grid = gpd.GeoSeries(points, crs=df.crs)
grid.name = 'geometry'

#If you just want to points inside polygons:
gridinside = gpd.sjoin(gpd.GeoDataFrame(grid), df[['geometry']], how="inner")

fishnet = gpd.GeoDataFrame(gridinside, columns=['geometry']).set_crs('EPSG:3857')
# fishnet.to_file('F:\\Tara_Fall_2019\\Kenya_Drought\\reference_spatial_files\\narok_fishnet.shp')


  pd.Int64Index,


## Sample Tiff files with point centroids - Static Data
Change all crs projections to epsg:4326

### Point Centroids

In [2]:
# open points shapefiles 
# points shapefiles crs = epsg:3857
# tur_shp = gpd.read_file('F:\\Tara_Fall_2019\\Kenya_Drought\\reference_spatial_files\\turkana_fishnet.shp').to_crs(epsg = 4326)
# nar_shp = gpd.read_file('F:\\Tara_Fall_2019\\Kenya_Drought\\reference_spatial_files\\narok_fishnet.shp').to_crs(epsg = 4326)

# get point centroids to put lat and lon in csv
# tur_shp['lon'] = tur_shp['geometry'].x
# tur_shp['lat'] = tur_shp['geometry'].y

# nar_shp['lon'] = nar_shp['geometry'].x
# nar_shp['lat'] = nar_shp['geometry'].y

# tur_shp.to_csv('F:\\Tara_Fall_2019\\Kenya_Drought\\reference_spatial_files\\turkana_fishnet.csv')
# nar_shp.to_csv('F:\\Tara_Fall_2019\\Kenya_Drought\\reference_spatial_files\\narok_fishnet.csv')

##############################################################################

# pull csv of latitude logitude coordinates of points 
# turkana
# tur_path = '//Users//taraippolito//Desktop//Tara_Fall_2019//Kenya_Drought//reference_spatial_files//turkana_fishnet.csv'
tur_path = 'E:\\Tara_Fall_2019\\Kenya_Drought\\reference_spatial_files\\turkana_fishnet.csv'
tur_pts = pd.read_csv(tur_path)

tur_lats = list(tur_pts.lat)
tur_lons = list(tur_pts.lon)

# narok 
# nar_path = '//Users//taraippolito//Desktop//Tara_Fall_2019//Kenya_Drought//reference_spatial_files//narok_fishnet.csv'
nar_path = 'E:\\Tara_Fall_2019\\Kenya_Drought\\reference_spatial_files\\narok_fishnet.csv'
nar_pts = pd.read_csv(nar_path)

nar_lats = list(nar_pts.lat)
nar_lons = list(nar_pts.lon)

### LULC

In [15]:
# Pull LULC raster - crs = epsg:4326
lulc = "F:\\Tara_Fall_2019\\Kenya_Drought\\reference_spatial_files\\Kenya_Sentinel2_LULC2016.tif"

In [16]:
# lulc arguments for turkana  
lulc_args_tur = (lulc, tur_lats, tur_lons)
lulc_tur = sample_tif(lulc_args_tur)

In [17]:
# make dataframe with column from lulc 
tur_pts['lulc'] = lulc_tur[1]

In [18]:
# lulc arguments for turkana  
lulc_args_nar = (lulc, nar_lats, nar_lons)
lulc_nar = sample_tif(lulc_args_nar)

In [19]:
# make dataframe with column from lulc 
nar_pts['lulc'] = lulc_nar[1]

### SoilGrids

In [20]:
# Pull SoilGrid rasters - crs epsg:7030
sg_path = "F:\\Tara_Fall_2019\\Kenya_Tanzania_LCC\\LCC_Project\\Soilgrids_Data\\soil_textural_tifs"
sg_files = os.listdir(sg_path)
# take full profile rasters only - calculated as weighted average of soil horizons
sg_paths = [os.path.join(sg_path, file) for file in sg_files if "0_200cm" in file]

In [21]:
# soilgrids arguments for turkana
sg_args_tur = [(path, tur_lats, tur_lons) for path in sg_paths]

arguments = sg_args_tur
# PARALLEL 
if __name__ == '__main__':
    print ("in main.")
    with Pool(14) as pool:
        print ("in pool.")
        sg_result_t = pool.map(sample_tif, arguments)
        pool.close()

in main.
in pool.


In [22]:
# make a dataframe from sg results - turkana
tur_sg_cols = [(pair[0][83:-4]) for pair in sg_result_t]
tur_sg_list = [pair[1] for pair in sg_result_t]
tur_sg_df = pd.DataFrame(tur_sg_list).transpose()
tur_sg_df.columns = tur_sg_cols

In [23]:
# soilgrids arguments for narok
sg_args_nar = [(path, nar_lats, nar_lons) for path in sg_paths]

arguments = sg_args_nar
# PARALLEL 
if __name__ == '__main__':
    print ("in main.")
    with Pool(14) as pool:
        print ("in pool.")
        sg_result_n = pool.map(sample_tif, arguments)
        pool.close()

in main.
in pool.


In [24]:
# make a dataframe from sg results - turkana
nar_sg_cols = [(pair[0][83:-4]) for pair in sg_result_n]
nar_sg_list = [pair[1] for pair in sg_result_n]
nar_sg_df = pd.DataFrame(nar_sg_list).transpose()
nar_sg_df.columns = nar_sg_cols

### Slope

In [25]:
# Pull slope raster - crs epsg:32636
slope = "F:\\DEM\\Kenya_slope.tif"

In [26]:
# slope arguments for turkana  
slope_args_tur = (slope, tur_lats, tur_lons)
slope_tur = sample_tif(slope_args_tur)

In [27]:
# make dataframe with column from slope 
tur_pts['slope'] = slope_tur[1]

In [28]:
# slope arguments for turkana  
slope_args_nar = (slope, nar_lats, nar_lons)
slope_nar = sample_tif(slope_args_nar)

In [29]:
# make dataframe with column from slope 
nar_pts['slope'] = slope_nar[1]

### Elevation

In [30]:
# Pull elevation raster - crs epsg:32636
elev = "F:\\DEM\\Kenya_DEM.tif"

In [31]:
# lulc arguments for turkana  
elev_args_tur = (elev, tur_lats, tur_lons)
elev_tur = sample_tif(elev_args_tur)

In [32]:
# make dataframe with column from slope 
tur_pts['elevation'] = elev_tur[1]

In [33]:
# lulc arguments for turkana  
elev_args_nar = (elev, nar_lats, nar_lons)
elev_nar = sample_tif(elev_args_nar)

In [34]:
# make dataframe with column from slope 
nar_pts['elevation'] = elev_nar[1]

### Concatenate Static Together

In [35]:
# turkana
final_df = pd.concat([tur_pts, tur_sg_df], axis = 1)
out_df = final_df[['FID', 'lat', 'lon', 'lulc', 'slope',
       'elevation', 'bdod0_200cm_mean', 'cfvo0_200cm_mean', 'clay0_200cm_mean',
       'sand0_200cm_mean', 'silt0_200cm_mean', 'soc0_200cm_mean']]
out_df.to_csv('F:\\Tara_Fall_2019\\Kenya_Drought\\reference_spatial_files\\turkana_static_data.csv')

In [36]:
# narok
final_df = pd.concat([nar_pts, nar_sg_df], axis = 1)
out_df = final_df[['FID', 'lat', 'lon', 'lulc', 'slope',
       'elevation', 'bdod0_200cm_mean', 'cfvo0_200cm_mean', 'clay0_200cm_mean',
       'sand0_200cm_mean', 'silt0_200cm_mean', 'soc0_200cm_mean']]
out_df.to_csv('F:\\Tara_Fall_2019\\Kenya_Drought\\reference_spatial_files\\narok_static_data.csv')

## Sample Tiff files with point centroids - Dynamic Data
For each date, sample points and create a new dataframe 
For each date, pull last 6 months of climate data 

In [3]:
# pull dates to then get individual dataframes - TURKANA

# pull SAVI z score files -
# tur_vi_path = "//Volumes//Seagate Expansion Drive//bulk_download_USGS//Bulk_Order_Turkana//Landsat_8-9_OLI_TIRS_C2_L2//SAVI_zscore_mosaic"
tur_vi_path = "E:\\bulk_download_USGS\\Bulk_Order_Turkana\\Landsat_8-9_OLI_TIRS_C2_L2\\SAVI_zscore_mosaic"
tur_vi_files = os.listdir(tur_vi_path)
tur_savi_files = [os.path.join(tur_vi_path, file) for file in tur_vi_files]

# pull dates 
tur_dates =  [datetime.datetime.strptime(file[:8], '%Y%m%d') for file in tur_vi_files]

In [4]:
# pull dates to then get individual dataframes - NAROK 

# pull SAVI z score files -
# nar_vi_path = "//Volumes//Seagate Expansion Drive//bulk_download_USGS//Bulk_Order_Maasai_Mara//Landsat_8-9_OLI_TIRS_C2_L2//SAVI_zscore_new"
nar_vi_path = "E:\\bulk_download_USGS\\Bulk_Order_Maasai_Mara\\Landsat_8-9_OLI_TIRS_C2_L2\\SAVI_zscore_new"
nar_vi_files = os.listdir(nar_vi_path)
nar_savi_files = [os.path.join(nar_vi_path, file) for file in nar_vi_files]

# pull dates 
nar_dates =  [datetime.datetime.strptime(file[7:15], '%Y%m%d') for file in nar_vi_files]

### Get dataframe for each date

In [5]:
# precip data starts at 03/01/2014, so start with dates 64 days after that so they have a full record of climate
cutoff = tur_dates[9] + datetime.timedelta(days=60)
valid_dates_tur = [d for d in tur_dates if d > cutoff]
valid_dates_nar = [d for d in nar_dates if d > cutoff]

In [6]:
# %%time
# ### Get SAVI values sampled for each date - then merged to the full dataframe eventually 
# # datewise arguments for turkana
args_tur = [(date, tur_pts, "Turkana") for date in valid_dates_tur]

# arguments = args_tur
# # PARALLEL 
# if __name__ == '__main__':
#     print ("in main.")
#     with Pool(10) as pool:
#         print ("in pool.")
#         date_result_t = pool.map(get_SAVI_date_df, arguments)
#         pool.close()

In [None]:
date_result_t = []
for arg in args_tur: 
    date_result_t.append(get_SAVI_date_df(arg))
    print (arg[0], " done.")

2014-05-08 00:00:00  done.
2014-05-15 00:00:00  done.
2014-05-17 00:00:00  done.
2014-05-24 00:00:00  done.


In [27]:
# make a dataframe from date results 
tur_dynamic_SAVI = pd.concat(date_result_t)

In [28]:
# put into dask dataframe to handle size
from dask import dataframe as dd
tur_dyn_dd = dd.from_pandas(tur_dynamic_SAVI, npartitions = 2022)
# send to parquet file storage 
name_function = lambda x: f"tur_dyn_SAVI-{x}.parquet"
tur_dyn_dd.to_parquet('F:\\Tara_Fall_2019\\Kenya_Drought\\reference_spatial_files\\turkana_parquet', name_function=name_function)

(None,)

In [1]:
# %%time
# ### Get SAVI values sampled for each date - then merged to the full dataframe eventually 
# # datewise arguments for narok
# args_nar = [(date, nar_pts, "Narok") for date in valid_dates_nar]

# arguments = args_nar
# # PARALLEL 
# if __name__ == '__main__':
#     print ("in main.")
#     with Pool(14) as pool:
#         print ("in pool.")
#         date_result_n = pool.map(get_SAVI_date_df, arguments)
#         pool.close()

In [9]:
args_nar = [(date, nar_pts, "Narok") for date in valid_dates_nar]
date_result_n = []
for arg in args_nar: 
    date_result_n.append(get_SAVI_date_df(arg))
    print (arg[0], " done.")

2014-05-17 00:00:00  done.
2014-06-02 00:00:00  done.
2014-06-18 00:00:00  done.
2014-07-04 00:00:00  done.
2014-07-20 00:00:00  done.
2014-08-05 00:00:00  done.
2014-08-21 00:00:00  done.
2014-10-08 00:00:00  done.
2014-10-24 00:00:00  done.
2014-11-09 00:00:00  done.
2014-11-25 00:00:00  done.
2014-12-11 00:00:00  done.
2014-12-27 00:00:00  done.
2015-01-28 00:00:00  done.
2015-02-13 00:00:00  done.
2015-03-01 00:00:00  done.
2015-03-17 00:00:00  done.
2015-04-02 00:00:00  done.
2015-05-04 00:00:00  done.
2015-06-21 00:00:00  done.
2015-07-07 00:00:00  done.
2015-08-24 00:00:00  done.
2015-09-09 00:00:00  done.
2015-10-11 00:00:00  done.
2015-11-28 00:00:00  done.
2015-12-30 00:00:00  done.
2016-02-16 00:00:00  done.
2016-03-03 00:00:00  done.
2016-03-19 00:00:00  done.
2016-04-20 00:00:00  done.
2016-05-22 00:00:00  done.
2016-06-07 00:00:00  done.
2016-07-09 00:00:00  done.
2016-07-25 00:00:00  done.
2016-08-26 00:00:00  done.
2016-09-11 00:00:00  done.
2016-09-27 00:00:00  done.
2

In [10]:
# make a dataframe from date results 
nar_dynamic_SAVI = pd.concat(date_result_n)

In [12]:
# put into dask dataframe to handle size
from dask import dataframe as dd
nar_dyn_dd = dd.from_pandas(nar_dynamic_SAVI, npartitions = 133)
# send to parquet file storage 
name_function = lambda x: f"nar_dyn_SAVI-{x}.parquet"
nar_dyn_dd.to_parquet('F:\\Tara_Fall_2019\\Kenya_Drought\\reference_spatial_files\\narok_parquet', name_function=name_function)

(None,)

## Get zscore + climate dataframes

In [None]:
# datewise arguments for turkana - pulling zscore and climate 
args_tur = [(date, tur_pts, "Turkana") for date in valid_dates_tur]

arguments = args_tur
# PARALLEL 
if __name__ == '__main__':
    print ("in main.")
    with Pool(14) as pool:
        print ("in pool.")
        date_result_t = pool.map(get_date_df, arguments)
        pool.close()

in main.
in pool.


In [None]:
# make a dataframe from date results 
tur_dynamic = pd.concat(date_result_t)

In [None]:
# put into dask dataframe to handle size
from dask import dataframe as dd
tur_dyn_dd = dd.from_pandas(tur_dynamic, npartitions = 2022)
# send to parquet file storage 
name_function = lambda x: f"tur_dyn_new-{x}.parquet"
tur_dyn_dd.to_parquet('E:\\Tara_Fall_2019\\Kenya_Drought\\reference_spatial_files\\turkana_parquet', name_function=name_function)

In [8]:
# save output 
# tur_dynamic.to_csv('F:\\Tara_Fall_2019\\Kenya_Drought\\reference_spatial_files\\turkana_dynamic_data.csv')

In [None]:
# datewise arguments for turkana
args_nar = [(date, nar_pts, "Narok") for date in valid_dates_nar]

arguments = args_nar
# PARALLEL 
if __name__ == '__main__':
    print ("in main.")
    with Pool(14) as pool:
        print ("in pool.")
        date_result_n = pool.map(get_date_df, arguments)
        pool.close()

In [None]:
# make a dataframe from date results 
nar_dynamic = pd.concat(date_result_n)

In [None]:
# put into dask dataframe to handle size
from dask import dataframe as dd
nar_dyn_dd = dd.from_pandas(nar_dynamic, npartitions = 133)
# send to parquet file storage 
name_function = lambda x: f"nar_dyn_new-{x}.parquet"
nar_dyn_dd.to_parquet('F:\\Tara_Fall_2019\\Kenya_Drought\\reference_spatial_files\\narok_parquet', name_function=name_function)

In [11]:
# save output 
# nar_dynamic.to_csv('F:\\Tara_Fall_2019\\Kenya_Drought\\reference_spatial_files\\narok_dynamic_data.csv')

### Merge Static With Dynamics using Dask 

In [4]:
# static data 
# turkana files
tur_stat = dd.read_csv('//Users//taraippolito//Desktop//Tara_Fall_2019//Kenya_Drought//reference_spatial_files//turkana_static_data.csv').drop("Unnamed: 0", axis = 1)
# tur_stat = tur_stat.set_index('FID')
# narok files
nar_stat = dd.read_csv('//Users//taraippolito//Desktop//Tara_Fall_2019//Kenya_Drought//reference_spatial_files//narok_static_data.csv').drop("Unnamed: 0", axis = 1)
# nar_stat = nar_stat.set_index('FID')

In [5]:
# dynamic data
# turkana files
tur_path = '//Users//taraippolito//Desktop//Tara_Fall_2019//Kenya_Drought//reference_spatial_files//turkana_parquet'
tur_files = [os.path.join(tur_path, file) for file in os.listdir(tur_path) if '_dyn' in file and '_SAVI' not in file]
tur_df = dd.read_parquet(tur_files)
tur_clean = tur_df.dropna(how = 'any')

tur_files_savi = [os.path.join(tur_path, file) for file in os.listdir(tur_path) if '_SAVI' in file]
tur_df_savi = dd.read_parquet(tur_files_savi)
tur_clean_savi = tur_df_savi.dropna(how = 'any')

# narok files 
nar_path = '//Users//taraippolito//Desktop//Tara_Fall_2019//Kenya_Drought//reference_spatial_files//narok_parquet'
nar_files = [os.path.join(nar_path, file) for file in os.listdir(nar_path) if '_dyn' in file and '_SAVI' not in file]
nar_df = dd.read_parquet(nar_files)
nar_clean = nar_df.dropna(how = 'any')

nar_files_savi = [os.path.join(nar_path, file) for file in os.listdir(nar_path) if '_SAVI' in file]
nar_df_savi = dd.read_parquet(nar_files_savi)
nar_df_savi = nar_df_savi.replace(-255., np.nan)
nar_clean_savi = nar_df_savi.dropna(how = 'any')


In [23]:
# reset index to be the date, then partition by the date so each partition is one date 
tur_by_date = tur_clean_savi.set_index('date', divisions=sorted(valid_dates_tur), compute = True)
nar_by_date = nar_clean_savi.set_index('date', divisions=sorted(valid_dates_nar), compute = True)

In [38]:
tur_z_by_date = tur_clean.set_index('date', divisions=sorted(valid_dates_tur), compute = True)
nar_z_by_date = nar_clean.set_index('date', divisions=sorted(valid_dates_nar), compute = True)

In [47]:
# merge savi to dynamic 
tur_dyn = dd.merge(tur_by_date, tur_z_by_date, left_on=['lat', 'lon', 'date'], right_on=['lat', 'lon', 'date'])
nar_dyn = dd.merge(nar_by_date, nar_z_by_date, left_on=['lat', 'lon', 'date'], right_on=['lat', 'lon', 'date'])
# reset index so you dont lose date information 
tur_dyn_reset = tur_dyn.reset_index()
nar_dyn_reset = nar_dyn.reset_index()
# merge static to dynamic via FID or indices 
tur_merged = dd.merge(tur_dyn_reset, tur_stat, left_on=['lat', 'lon'], right_on=['lat', 'lon'])
nar_merged = dd.merge(nar_dyn_reset, nar_stat, left_on=['lat', 'lon'], right_on=['lat', 'lon'])
# # pull lulc of interest, drop the rest
tur_all = tur_merged[tur_merged.lulc.isin([1,2,3,4])]
nar_all = nar_merged[nar_merged.lulc.isin([1,2,3,4])]

In [51]:
nar_all.partitions[5].head()

Unnamed: 0,date,lat,lon,SAVI,SAVI_zscore,16_day_sum_ppt,32_day_sum_ppt,48_day_sum_ppt,64_day_sum_ppt,16_day_mean_ppt,...,FID,lulc,slope,elevation,bdod0_200cm_mean,cfvo0_200cm_mean,clay0_200cm_mean,sand0_200cm_mean,silt0_200cm_mean,soc0_200cm_mean
0,2014-08-05,-1.830582,35.798562,0.243137,0.081756,0.380693,0.705438,0.829949,2.276737,0.023793,...,186472,2.0,12.147817,2100.0,119.95,71.925,334.6,430.575,234.825,260.475
1,2014-08-05,-1.828337,35.798562,0.298676,0.085019,0.380693,0.705438,0.829949,2.276737,0.023793,...,186473,2.0,25.365385,2053.0,119.95,56.425,342.625,427.1,230.25,264.8
2,2014-08-05,-1.826092,35.798562,0.354735,0.071692,0.380693,0.705438,0.829949,2.276737,0.023793,...,186474,1.0,30.14433,2051.0,120.725,61.55,337.575,436.225,225.7,265.75
3,2014-08-05,-1.823848,35.798562,0.359277,0.395288,0.380693,0.705438,0.829949,2.276737,0.023793,...,186475,1.0,21.44923,2065.0,120.6,74.5,321.725,446.875,231.9,264.225
4,2014-08-05,-1.821603,35.798562,0.363497,0.625477,0.380693,0.705438,0.829949,2.276737,0.023793,...,186476,1.0,7.28869,2069.0,121.05,78.825,325.325,451.875,222.975,276.85


In [53]:
# send to parquet file storage 
name_function = lambda x: f"tur_all-{x}.parquet"
tur_all.to_parquet('//Users//taraippolito//Desktop//Tara_Fall_2019//Kenya_Drought//reference_spatial_files//turkana_parquet', name_function=name_function)

In [54]:
# send to parquet file storage 
name_function = lambda x: f"nar_all-{x}.parquet"
nar_all.to_parquet('//Users//taraippolito//Desktop//Tara_Fall_2019//Kenya_Drought//reference_spatial_files//narok_parquet', name_function=name_function)

### Get z score of climate variables 

In [None]:
# import parquet files into dask dataframes to make for easier processing 
# lulc = [1,2,3,4]
# lulc_strings = ["tree", "shrub", "grass", "crop"]

# turkana files
# tur_path = '//Users//taraippolito//Desktop//Tara_Fall_2019//Kenya_Drought//turk_moving_window_regr//turkana_parquet'
tur_path = "E:\\Tara_Fall_2019\\Kenya_Drought\\reference_spatial_files\\turkana_parquet"
tur_files = [os.path.join(tur_path, file) for file in os.listdir(tur_path) if '_all' in file]
turkana = dd.read_parquet(tur_files) 
# narok files 
# nar_path = '//Users//taraippolito//Desktop//Tara_Fall_2019//Kenya_Drought//nar_moving_window_regr//narok_parquet'
nar_path = "E:\\Tara_Fall_2019\\Kenya_Drought\\reference_spatial_files\\narok_parquet"
nar_files = [os.path.join(nar_path, file) for file in os.listdir(nar_path) if '_all' in file]
narok = dd.read_parquet(nar_files)

In [None]:
# create year and month columns 
turkana["year"] = turkana.date.dt.year
turkana["month"] = turkana.date.dt.month

narok["year"] = narok.date.dt.year
narok["month"] = narok.date.dt.month

In [None]:
# subset by LULC for turkana and narok data -  one new dask dataframe for each land use type 
turkana_lulc = []
narok_lulc = []
for l in [1,2,3,4]: 
    turkana_lulc.append(turkana[turkana.lulc == l])
    narok_lulc.append(narok[narok.lulc == l])

In [None]:
# get each location's monthly mean climate variables
clim_month_mean_tur = [df.groupby(['FID', 'month']).mean().reset_index()[['month', 'FID', '16_day_sum_ppt',
       '32_day_sum_ppt', '48_day_sum_ppt', '64_day_sum_ppt', '16_day_mean_temp', '32_day_mean_temp',
       '48_day_mean_temp', '64_day_mean_temp']] for df in turkana_lulc]
clim_month_mean_nar = [df.groupby(['FID', 'month']).mean().reset_index()[['month', 'FID', '16_day_sum_ppt',
       '32_day_sum_ppt', '48_day_sum_ppt', '64_day_sum_ppt', '16_day_mean_temp', '32_day_mean_temp',
       '48_day_mean_temp', '64_day_mean_temp']] for df in narok_lulc]

# get each location's monthly stdv of  climate variables
clim_month_stdv_tur = [df.groupby(['FID', 'month']).std().reset_index()[['month', 'FID', '16_day_sum_ppt',
       '32_day_sum_ppt', '48_day_sum_ppt', '64_day_sum_ppt', '16_day_mean_temp', '32_day_mean_temp',
       '48_day_mean_temp', '64_day_mean_temp']] for df in turkana_lulc]
clim_month_stdv_nar = [df.groupby(['FID', 'month']).std().reset_index()[['month', 'FID', '16_day_sum_ppt',
       '32_day_sum_ppt', '48_day_sum_ppt', '64_day_sum_ppt', '16_day_mean_temp', '32_day_mean_temp',
       '48_day_mean_temp', '64_day_mean_temp']] for df in narok_lulc]

In [None]:
# calculate z score of climate variables for turkana and narok based on month and FID specific means and stdvs 
# merge together the means and stdvs to regular dfs
merged_tur = []
merged_nar = []

for i in range(4): 
    merged1 = turkana_lulc[i].merge(clim_month_mean_tur[i], how = "left", on = ["FID", "month"])
    merged2 = merged1.merge(clim_month_stdv_tur[i], how = "left", on = ["FID", "month"])
    merged_tur.append(merged2)
for i in range(4): 
    merged1 = narok_lulc[i].merge(clim_month_mean_nar[i], how = "left", on = ["FID", "month"])
    merged2 = merged1.merge(clim_month_stdv_nar[i], how = "left", on = ["FID", "month"])
    merged_nar.append(merged2)

In [None]:
# create new climate variables that are z scores for that point, given the monthly means for that point 
z_clim_tur = []
z_clim_nar = []
for df in merged_tur: 
    df["16_day_sum_ppt_Z"] = (df["16_day_sum_ppt_x"] - df["16_day_sum_ppt_y"]) / df["16_day_sum_ppt"]
    df["32_day_sum_ppt_Z"] = (df["32_day_sum_ppt_x"] - df["32_day_sum_ppt_y"]) / df["32_day_sum_ppt"]
    df["48_day_sum_ppt_Z"] = (df["48_day_sum_ppt_x"] - df["48_day_sum_ppt_y"]) / df["48_day_sum_ppt"]
    df["64_day_sum_ppt_Z"] = (df["64_day_sum_ppt_x"] - df["64_day_sum_ppt_y"]) / df["64_day_sum_ppt"]
    df["16_day_mean_temp_Z"] = (df["16_day_mean_temp_x"] - df["16_day_mean_temp_y"]) / df["16_day_mean_temp"]
    df["32_day_mean_temp_Z"] = (df["32_day_mean_temp_x"] - df["32_day_mean_temp_y"]) / df["32_day_mean_temp"]
    df["48_day_mean_temp_Z"] = (df["48_day_mean_temp_x"] - df["48_day_mean_temp_y"]) / df["48_day_mean_temp"]
    df["64_day_mean_temp_Z"] = (df["64_day_mean_temp_x"] - df["64_day_mean_temp_y"]) / df["64_day_mean_temp"]
    out = df.drop(['16_day_sum_ppt_y', '32_day_sum_ppt_y', '48_day_sum_ppt_y',
       '64_day_sum_ppt_y', '16_day_mean_temp_y', '32_day_mean_temp_y',
       '48_day_mean_temp_y', '64_day_mean_temp_y', '16_day_sum_ppt',
       '32_day_sum_ppt', '48_day_sum_ppt', '64_day_sum_ppt',
       '16_day_mean_temp', '32_day_mean_temp', '48_day_mean_temp',
       '64_day_mean_temp'], axis = 1)
    z_clim_tur.append(out)
    
for df in merged_nar: 
    df["16_day_sum_ppt_Z"] = (df["16_day_sum_ppt_x"] - df["16_day_sum_ppt_y"]) / df["16_day_sum_ppt"]
    df["32_day_sum_ppt_Z"] = (df["32_day_sum_ppt_x"] - df["32_day_sum_ppt_y"]) / df["32_day_sum_ppt"]
    df["48_day_sum_ppt_Z"] = (df["48_day_sum_ppt_x"] - df["48_day_sum_ppt_y"]) / df["48_day_sum_ppt"]
    df["64_day_sum_ppt_Z"] = (df["64_day_sum_ppt_x"] - df["64_day_sum_ppt_y"]) / df["64_day_sum_ppt"]
    df["16_day_mean_temp_Z"] = (df["16_day_mean_temp_x"] - df["16_day_mean_temp_y"]) / df["16_day_mean_temp"]
    df["32_day_mean_temp_Z"] = (df["32_day_mean_temp_x"] - df["32_day_mean_temp_y"]) / df["32_day_mean_temp"]
    df["48_day_mean_temp_Z"] = (df["48_day_mean_temp_x"] - df["48_day_mean_temp_y"]) / df["48_day_mean_temp"]
    df["64_day_mean_temp_Z"] = (df["64_day_mean_temp_x"] - df["64_day_mean_temp_y"]) / df["64_day_mean_temp"]
    out = df.drop(['16_day_sum_ppt_y', '32_day_sum_ppt_y', '48_day_sum_ppt_y',
       '64_day_sum_ppt_y', '16_day_mean_temp_y', '32_day_mean_temp_y',
       '48_day_mean_temp_y', '64_day_mean_temp_y', '16_day_sum_ppt',
       '32_day_sum_ppt', '48_day_sum_ppt', '64_day_sum_ppt',
       '16_day_mean_temp', '32_day_mean_temp', '48_day_mean_temp',
       '64_day_mean_temp'], axis = 1)
    z_clim_nar.append(out)

In [None]:
lulc = ["tree", "shrub", "grass", "crop"]
i = 0
for df in z_clim_tur: 
    # specify parquet path
#     parquet_path = '//Users//taraippolito//Desktop//Tara_Fall_2019//Kenya_Drought//turk_ppt_strat_regr//turkana_parquet'
    parquet_path = "E:\\Tara_Fall_2019\\Kenya_Drought\\reference_spatial_files\\turkana_parquet"
    # get name function based on lulc 
    if lulc[i] == "tree":
        name_function = lambda x: f"tur_tree-{x}.parquet"
    elif lulc[i] == "shrub":
        name_function = lambda x: f"tur_shrub-{x}.parquet"
    elif lulc[i] == "grass":
        name_function = lambda x: f"tur_grass-{x}.parquet"
    else:
        name_function = lambda x: f"tur_crop-{x}.parquet"
    # map to new parquet files 
    df.to_parquet(parquet_path, name_function=name_function)
    print ("Parquet output comlete.")
    i=+1

In [None]:
lulc = ["tree", "shrub", "grass", "crop"]
i = 0
for df in z_clim_nar: 
    # specify parquet path
#     parquet_path = '//Users//taraippolito//Desktop//Tara_Fall_2019//Kenya_Drought//nar_ppt_strat_regr//narok_parquet'
    parquet_path = "E:\\Tara_Fall_2019\\Kenya_Drought\\reference_spatial_files\\narok_parquet"
    # get name function based on lulc 
    if lulc[i] == "tree":
        name_function = lambda x: f"nar_tree-{x}.parquet"
    elif lulc[i] == "shrub":
        name_function = lambda x: f"nar_shrub-{x}.parquet"
    elif lulc[i] == "grass":
        name_function = lambda x: f"nar_grass-{x}.parquet"
    else:
        name_function = lambda x: f"nar_crop-{x}.parquet"
    # map to new parquet files 
    df.to_parquet(parquet_path, name_function=name_function)
    print ("Parquet output comlete.")
    i=+1