In [1]:
import numpy as np
import numpy.ma as ma
import os
from datetime import datetime, timedelta
import json

In [2]:
class Patch:
    def __init__(self, *args, **kwargs):
        """
        Constructor of the class
        @params:
            args            - Optional  : list of parameters
            kwargs          - Optional  : list of parameters in key value format
        """
        if 'json' in kwargs:
            path_json = kwargs.get('json')
            if os.path.exists(path_json):
                with open(path_json) as f:
                    data = json.load(f)
                    self.x = int(path_json.split('/')[-1].split('_')[1])
                    self.y = int(path_json.split('/')[-1].split('_')[2].split('.')[0])
                    self.tile = str(path_json.split('/')[-1].split('_')[0])
                    self.s1_dates = Patch.to_date(data['corresponding_s1'].split(';'), is_filename=True)
                    self.s2_dates = Patch.to_date(data['corresponding_s2'].split(';'), is_filename=True)
                    self.projection = str(data['projection'])
                    self.labels = data['labels'].split(';')
            else:
                raise Exception("Json path does\'t exists !!'")
        else:
            self.x = int(kwargs.get('x', None))
            self.y = int(kwargs.get('y', None))
            self.tile = kwargs.get('tile', None)
            self.s2_dates = kwargs.get('s2_dates', None)
            self.s1_dates = kwargs.get('s1_dates', None)
            self.projection = kwargs.get('projection', None)
            self.labels = kwargs.get('labels', None)

        
    def reconstruct_filename(self, data='gr', dates_to_keep=None, date_format='%Y%m%d'):
        """
        Reconstruct the filename(s) of ground reference (gr) patch, Sentinel-2 (s2) patche(s) or Sentinel-1 (s1) patches.
        @params:
            data            - Required  : data to reconstruct filenames ('gr' for ground reference, 's2' for Sentinel-2, 's1' for Sentinel-1)
            dates_to_keep   - Required  : Dates to keep to generate filenames (available for s2 and s1)
            date_format     - Required  : format of the dates in dates_to_keep (default : %Y%m%d)
        """
        assert data == 'gr' or data == 's2' or data == 's1'
        
        template_s1 = '{tile}_{date}_S1_{x}_{y}.tif'
        template_s2 = '{tile}_{date}_S2_{x}_{y}.tif'
        template_gr = '{tile}_GR_{x}_{y}.tif'
        
        if data == 'gr':
            return template_gr.format(tile=str(self.tile), x=str(self.x), y=str(self.y))
        elif data == 's2':
            res = []
            if dates_to_keep is not None:
                dates_datetime = Patch.to_date(dates_to_keep, date_format=date_format)
                
                for d_keep in dates_datetime:
                    for d in self.s2_dates:
                        if (d.day == d_keep.day) and (d.month == d_keep.month) and (d.year == d_keep.year):
                            res.append(template_s2.format(tile=str(self.tile), date=d.strftime(date_format), x=str(self.x), y=str(self.y)))
                
                return res
            else:
                for d in self.s2_dates:
                    res.append(template_s2.format(tile=str(self.tile), date=d.strftime(date_format), x=str(self.x), y=str(self.y)))
                    
                return res
        elif data == 's1':
            res = []
            if dates_to_keep is not None:
                dates_datetime = Patch.to_date(dates_to_keep, date_format=date_format)
                
                for d_keep in dates_datetime:
                    for d in self.s1_dates:
                        if (d.day == d_keep.day) and (d.month == d_keep.month) and (d.year == d_keep.year):
                            res.append(template_s1.format(tile=str(self.tile), date=d.strftime(date_format), x=str(self.x), y=str(self.y)))
                
                return res
            else:
                for d in self.s1_dates:
                    res.append(template_s1.format(tile=str(self.tile), date=d.strftime(date_format), x=str(self.x), y=str(self.y)))
                
                return res
        
        
    def has_nb_dates(self, nb_dates, is_s2=True):
        """
        Check if the patch has a certain number (nb_data patchs) of dates 
        @params:
            nb_dates            - Required  : list of dates in month format
            is_s2               - Required  : directory containing Sentinel-2 images
        """
        has_dates = False
        
        if is_s2:
            if len(self.s2_dates) >= nb_dates:
                has_dates = True
        else:
            if len(self.s1_dates) >= nb_dates:
                has_dates = True
                
        return has_dates
    
    
    def has_matching_monthes(self, dates, nb_data_per_month=1, date_format='%Y%m', is_s2=True):
        """
        Check if the patch has a certain number (nb_data_per_month patchs) for each months (dates) for S1 or S2.
        @params:
            dates               - Required  : list of dates in month format
            nb_data_per_month   - Required  : directory containing Sentinel-2 images
            date_format         - Required  : day interval between Sentinel-2 image to download Sentinel-1 images
            is_s2               - Required  : True your want to perform on S2 patches, False if you want to perform on S1 patches
        """
        res = True
        
        dates_to_search = Patch.to_date(dates, date_format=date_format)
        _tmp_dates = {}
        sat_dates = None
        
        if is_s2:
            sat_dates = self.s2_dates
        else:
            sat_dates = self.s1_dates
        
        for date in sat_dates:
            new_date = datetime.strptime(str(date.year) + str(date.month), '%Y%m')
            if new_date not in _tmp_dates:
                _tmp_dates[new_date] = 1
            else:
                 _tmp_dates[new_date] += 1
        
        for d_to_search in dates_to_search:
            if d_to_search not in _tmp_dates:
                return False
            else:
                if _tmp_dates[d_to_search] < nb_data_per_month:
                    return False
                
        return res
    
    
    @staticmethod
    def to_date(list_str_dates, date_format='%Y%m%d', is_filename=False):
        """
        Convert a list of string dates in a format (date_format). 
        Possibility to extract a date from a filename (S1 or S2 patches filename)
        @params:
            list_str_dates      - Required  : list of dates
            date_format         - Required  : format of the dates in list_str_dates
            is_filename         - Required  : if list_str_dates contains filenames
        """
        if is_filename:
            for i in range(0, len(list_str_dates)):
                list_str_dates[i] = list_str_dates[i].split('_')[1]
        
        if not any(isinstance(item, datetime) for item in list_str_dates):
            res_list_dates = []

            for d in list_str_dates:
                res_list_dates.append(datetime.strptime(d, date_format))

            return res_list_dates
        else:
            return list_str_dates
    
    
    @staticmethod
    def generate_list_patches(folder_json):
        """
        Create a list of patches objects from json files available in Sentinel-GE.
        @params:
            folder_json      - Required  : folder containing json files
        """
        list_patches = []
        for json_file in os.listdir(folder_json):
            with open(os.path.join(folder_json, json_file)) as f:
                data = json.load(f)
                x = int(json_file.split('_')[1])
                y = int(json_file.split('_')[2].split('.')[0])
                tile = str(json_file.split('_')[0])
                s1_dates = Patch.to_date(data['corresponding_s1'].split(';'), is_filename=True)
                s2_dates = Patch.to_date(data['corresponding_s2'].split(';'), is_filename=True)
                projection = str(data['projection'])
                labels = data['labels'].split(';')
                list_patches.append(Patch(x=x, y=y, tile=tile, s2_dates=s2_dates, s1_dates=s1_dates, projection=projection, labels=labels))
        
        return list_patches

In [3]:
# Extract every patch in a list
list_patches = Patch.generate_list_patches('./labels')