In [3]:
import requests
import numpy as np
import pandas as pd
import geopandas as gpd
import scipy.sparse as sparse
from bs4 import BeautifulSoup
import multiprocessing
import sys, os
from multiprocessing import Pool, cpu_count
from concurrent.futures import ThreadPoolExecutor,as_completed
from functools import partial
import urllib.request
import fiona
import rasterio.mask
import rasterio
import requests
import gzip
import pprint
import logging
import click
import itertools
import glob
pp = pprint.PrettyPrinter(indent=2)
logging.basicConfig(filename='app.log', filemode='w', format='%(asctime)s — %(name)s — %(levelname)s — %(funcName)s:%(lineno)d — %(message)s',level=logging.ERROR)





In [4]:
def delete_all_downloaded_files(folderpath):
    '''
    Delete all files in given folder. Main usae is to erase all downloaded files
    folderpath: folder path
    Parameters
    ----------
    folderpath : Path to the directory to erase files from
    Returns
    -------
    '''
    files = glob.glob(folderpath+'*')
    for f in files:
        os.remove(f)
        

def proj_init():
    '''
    Init function for folders creation and determining useful number of threads to use
    Parameters
    ----------
    ''
    Returns
    -------
    DOWNLOADS_DIR: Folder where the CHIRPS files are downloaded in 
    MASKED_FILES_DIR: Folder where the masked files are saved in
    SATCKED_FILES_DIR: Folder where the stacked files are saved in
    THREADS: Number of thread that the multithreading functions will use
    '''
    # CREATE WORK DIRS 
    DOWNLOADS_DIR='/tmp/downloads/' # downloaded files
    if not os.path.exists(DOWNLOADS_DIR):
        os.makedirs(DOWNLOADS_DIR)
    MASKED_FILES_DIR='/tmp/masked/' # created masked files
    if not os.path.exists(MASKED_FILES_DIR): 
        os.makedirs(MASKED_FILES_DIR)
    SATCKED_FILES_DIR='/tmp/stacked/'
    if not os.path.exists(SATCKED_FILES_DIR): # created stacked files
        os.makedirs(SATCKED_FILES_DIR)
    # Nbr of thread to use for multiprocessing
    if (multiprocessing.cpu_count() > 2):
        THREADS=multiprocessing.cpu_count()-2
    else:
        THREADS=2 # At least 2 for multi threading
    return {'DOWNLOADS_DIR':DOWNLOADS_DIR,'MASKED_FILES_DIR':MASKED_FILES_DIR, 'SATCKED_FILES_DIR':SATCKED_FILES_DIR,'THREADS':THREADS }


def files_url_list(url,files,year):
    '''
    Build the files url list from the CHIRPS website. Use beautiful soup to extract urls from thewebsite html. 
    Parameters
    ----------
    url: url of the website to download the data from
    files: object to store the files url in
    year: year of selection
    Returns
    -------
    no return 
    '''
    page = requests.get(url).text
    soup = BeautifulSoup(page, 'html.parser')
    for node in soup.find_all('a'):
        try:
            if(node.get('href').endswith('tif') | node.get('href').endswith('gz')):
                if(node.get('href').split('.')[-4]==str(year) or node.get('href').split('.')[-5]==str(year)):
                    files.append( url + '/' + node.get('href'))
        except Exception as e:
            logging.exception("files_url_list: Exception caught during processing")


            
def concurrent_files_url_list(baseUrl, years):
    '''
    Concurrent Donwload of .tiff file urls in a list
    Parameters
    ----------
    baseUrl: url of the page to download the files from
    years: list of year(s) of selection
    Returns
    -------
    files: list or urls to download
    '''
    files={}
    # Concurent downloading of the data
    from concurrent.futures import ThreadPoolExecutor, as_completed
    append_data=[]
    result=[]
    # Concurences
    with ThreadPoolExecutor(max_workers=THREADS) as executor:
        for year in years:
            files[str(year)]=[]
            if(baseUrl.split('_')[-1].split('/')[0]=='daily'):# In case using daily rainfall data in the daily folder
                try:
                    append_data.append(executor.submit(files_url_list, baseUrl+str(year), files[str(year)],year))
                except Exception as e:
                    logging.exception("download_file_links: Exception caught during processing")
            elif(baseUrl.split('_')[-1].split('/')[0]=='monthly'):# In case using daily rainfall data in the monthly folder
                try:
                    append_data.append(executor.submit(files_url_list, baseUrl, files[str(year)],year))
                except Exception as e:
                    logging.exception("download_file_links: Exception caught during processing")

    # Retrieve values
    for task in as_completed(append_data):
        try:
            result.append(task.result())
        except Exception as e:
                logging.exception("download_file_links: Exception caught during processing")
    return files


def download_file(url):
    '''
    Download the .tif or .gz files and uncompress the .gz file in memory
    Parameters
    ----------
    url: url of the file to download
    Returns
    -------
    no return. Downloads files into DOWNLOADS_DIR
    '''
    if (url.endswith('gz')):
        outFilePath = DOWNLOADS_DIR+url.split('/')[-1][:-3]
    else:
        outFilePath = DOWNLOADS_DIR+url.split('/')[-1]
    response = urllib.request.urlopen(url) 
    with open(outFilePath, 'wb') as outfile:
        if (url.endswith('gz')):
            outfile.write(gzip.decompress(response.read()))
            gzip.decompress(response.read())
        elif (url.endswith('tif')):
            outfile.write(response.read())
        else:
            pass


def concurrent_file_downloader(files):    
    '''
    Concurent downloading and extraction of the data
    Parameters
    ----------
    files: list of url to download
    Returns
    -------
    no return
    '''
    from concurrent.futures import ThreadPoolExecutor, as_completed
    append_data=[]
    result=[]
    # Concurences
    with ThreadPoolExecutor(max_workers=THREADS) as executor:
        for year in files:
            for url in files[year]:
                try:
                    append_data.append(executor.submit(download_file,url))
                except Exception as e:
                    logging.exception("concurrent_file_downloader: Exception caught during processing")

    # Retrieve values
    for task in as_completed(append_data):
        try:
            result.append(task.result())
        except Exception as e:
            logging.exception("concurrent_file_downloader: Exception caught during processing")


def aoi_shapefile_reader(aoishapefile):
    '''
    Download the shapefile of the area of interest (aoi)
    Parameters
    ----------
    aoishapefile: path to the shapefile to download
    Returns
    -------
    shapes: file containing the coordinates of the aoi's polygon
    '''
    # Read the AOI's shapefile
    with fiona.open(aoishapefile, "r") as shapefile:
        shapes = [feature["geometry"] for feature in shapefile]
    return shapes


def masking(file,shapes):
    '''Masking of .tif files by the provided shapefile
    Parameters
    ----------
    file: .tif file to be masked
    shapes: Coordinate of the aoi polygon
    Returns
    -------
    export files into MASKED_FILES_DIR directory
    '''
    if file[-4:]=='.tif':
        with rasterio.open(DOWNLOADS_DIR+file) as src:
            out_image, out_transform = rasterio.mask.mask(src, shapes, crop=True)
            out_meta = src.meta
        # use the updated spatial transform and raster height and width to write the masked raster to a new file.   
        out_meta.update({"driver": "GTiff",
                         "height": out_image.shape[1],
                         "width": out_image.shape[2],
                         "transform": out_transform})
        with rasterio.open(MASKED_FILES_DIR+file[:-4]+".masked.tif", "w", **out_meta) as dest:
            dest.write(out_image)
            

def concurrent_masking(shapes):
    '''
    Launch the concurent masking of the list of .tiff files by the aio provided
    Parameters
    ----------
    shapes: Coordinate of the aoi polygon
    Returns
    -------
    '''
    append_data=[]
    result=[]
    # Concurent masking
    with ThreadPoolExecutor(max_workers=10) as executor:
         for file in os.listdir(DOWNLOADS_DIR):
            try:
                append_data.append(executor.submit(masking,file,aoishapes))
            except Exception as e:
                logging.exception("concurrent_file_downloader: Exception caught during processing")

    # Retrieve values
    for task in as_completed(append_data):
        try:
            result.append(task.result())
        except Exception as e:
            logging.exception("concurrent_file_downloader: Exception caught during processing")
            

def calculate_rainy_days(years):
    '''
    Calculate the number of rainy dates in month over a year
    Parameters
    ----------
    years: List of years selected
    Returns
    -------
    OrderedDict of month - rainy days
    '''
    MONTHS_DICT={1:'Jan', 2: 'Feb', 3:'Mar',4:'Apr', 5:'May',6:'Jun',7:'Jul',8:'Aug',9:'Sep',10:'Oct',11:'Nov',12:'Dec'}
    data_array=[]
    Mat = pd.DataFrame()
    table=pd.DataFrame(index=np.arange(0,1))
    for file in os.listdir(MASKED_FILES_DIR):
        if file[-4:]=='.tif':
            dataset = rasterio.open(MASKED_FILES_DIR+file)
            widht=dataset.profile.get('width')
            height=dataset.profile.get('height')
            data_array=dataset.read(1)
            data_array_sparse=sparse.coo_matrix(data_array,shape=(height,widht))
            if(baseUrl.split('_')[-1].split('/')[0]=='daily'):
                data = file[12:-11]
            elif(baseUrl.split('_')[-1].split('/')[0]=='monthly'):
                data = file[12:-14]
            Mat[data]=data_array_sparse.toarray().tolist()

    # Calculate the precipitaion per day dataframe
    raindatadf = pd.DataFrame(Mat.applymap(lambda x: sum([t for t in x])).sum()).T #sum the array in each dataframe cells
    rainy_days={}
    for year in years:
        number_of_days={}
        for i in range(1,13): # for 12 months
            number_of_days[MONTHS_DICT[i]]=raindatadf[raindatadf.columns[raindatadf.columns.str.slice(0,7).str.endswith(f'{year}.{i:02}')]].gt(0.0).sum(axis=1)[0]
        rainy_days[year]=number_of_days
    return collections.OrderedDict(sorted(rainy_days.items())) 


def order_masked_files_per_month():
    '''
    Order masked .tiff files from MASKED_FILES_DIR  per month.
    Parameters
    ----------
    years: List of years selected
    Returns
    -------
    rasterfiles: dictionnary of list of masked .tiff files ordered by month
    '''
    rasterfiles={}
    key = lambda x: x[17:19] # Month
    masked_raster=os.listdir(MASKED_FILES_DIR)
    masked_raster=sorted(masked_raster, key=key)
    myfileslist=[]

    for key, group in itertools.groupby(masked_raster, key):
        myfileslist.append(list(group))
    # Month selection as key
    for l in myfileslist:
        rasterfiles[str(l[0][17:19])]=l 
    return rasterfiles


def stack_rasters(filelist,month):
    '''
    Stack list of .tif files by month
    Parameters
    ----------
    filelist: List of masked tiff images
    month: month selected in number ('01', '02'...'12')
    Returns
    -------
    no return. Copy the stacked .tiff images in SATCKED_FILES_DIR
    '''
    with rasterio.open(MASKED_FILES_DIR+filelist[0]) as src0:
        meta = src0.meta

    # Update meta to reflect the number of layers
    meta.update(count = len(filelist))

    # Read each layer and write it to stack
    yearslist='_'.join(map(str, [years[-1], years[0]]))
    with rasterio.open(f'{SATCKED_FILES_DIR}stacked_{yearslist}.{month}.tif', 'w', **meta) as dst:
        for id, layer in enumerate(filelist, start=1):
            with rasterio.open(MASKED_FILES_DIR+layer) as src1:
                dst.write_band(id, src1.read(1))


                    
def concurrent_stack_rasters():
    '''
    Launch the concurent stacking of the tiff images
    Parameters
    ----------
    No parameters
    Returns
    -------
    No return. Copy the stacked .tiff images in SATCKED_FILES_DIR
    '''
    appenddata=[]
    result=[]
    # Concurent masking
    with ThreadPoolExecutor(max_workers=THREADS) as executor:
         for month in rasterfiles:
            try:
                #int(month[:3]) # trick to remove .DS_STORE file
                filelist=rasterfiles[month] 
                pp.pprint(month)
                appenddata.append(executor.submit(stack_rasters,filelist,month))
            except Exception as e:
                pass
    # Retrieve values
    for task in as_completed(appenddata):
        try:
            result.append(task.result())
        except Exception as e:
            pass

In [None]:

if __name__ == "__main__":
    # Init and folder creation. Threads number determination
    myenv=proj_init()
    DOWNLOADS_DIR=myenv['DOWNLOADS_DIR']
    MASKED_FILES_DIR=myenv['MASKED_FILES_DIR']
    SATCKED_FILES_DIR=myenv['SATCKED_FILES_DIR']
    THREADS=myenv['THREADS']
    # Selection of the dataset base URL 
    BASE_URL='https://data.chc.ucsb.edu/products/CHIRPS-2.0/global_daily/tifs/p25/'
    # Selection of the year(s) of interest
    YEARS=[ 2020]#, 2014, 2013, 2012, 2011, 2010]
    # get all files ulrs from the CHIRPS dataset web page
    files=concurrent_files_url_list(BASE_URL, YEARS)
    # launch concurent dowload of all the .tiff files
    concurrent_file_downloader(files)
    # dowload aoi
    aoishapes=aoi_shapefile_reader("data/aoi.shp")
    # Masking
    concurrent_masking(aoishapes)