In [1]:
import pandas as pd
import numpy as np
import os
from datetime import datetime
import time
import shutil

from urllib.request import urlopen
import requests
import json
import zipfile

from netCDF4 import Dataset
import reverse_geocoder as rg

In [2]:
from tqdm.auto import tqdm

In [3]:
GET_ENV=os.environ.get('PATH',None)
if 'google' in GET_ENV:
    GLOBAL_PATH='/content/drive/My Drive/Colab Notebooks/datas_sentinel5/'
else:
    GLOBAL_PATH='datas_sentinel5/'

In [4]:
# Pour téléchargez les données, veuillez vous enregistrer sur le site ONDA: https://www.onda-dias.eu/cms/
url='https://auth.creodias.eu/auth/realms/DIAS/protocol/openid-connect/token'
user = '' #Email
password = '' #Password

# Paramètres de téléchargements
_Update_API=False #True: Requête vers l'API de ONDA + enregistrement dans un CSV | False: Lecture des CSV enregistrés
_Download=True #True: télécharge 1 fichier (i=x) de ONDA. 
_Tracking=True #True: write in tracking_files.csv

In [5]:
def f_poly(x):
    poly=[[[elem.replace("((","")]] for elem in x[14:-3].split(")),")]
    poly=[[j.split(" ") for j in poly[i][0][0].split(",")] for i in range(0,len(poly))]
    return [poly]

#Distances
def distance(origin, destination):
    lat1, lon1 = origin
    lat2, lon2 = destination
    radius = 6373 # km

    dlat = np.radians(lat2-lat1)
    dlon = np.radians(lon2-lon1)
    a = np.sin(dlat/2) * np.sin(dlat/2) + np.cos(np.radians(lat1)) \
        * np.cos(np.radians(lat2)) * np.sin(dlon/2) * np.sin(dlon/2)
    c = 2 * np.arctan2(np.sqrt(a), np.sqrt(1-a))
    d = radius * c

    return d

def logger(msg=None, alert=None):
    '''
    Les alert='WARNING' or 'ERROR' g
    to
    logs_errors.txt
    '''
    #Get date
    timer = '['+str(datetime.now())+']'
    
    #Check if logs files
    if 'logs.txt' not in os.listdir(GLOBAL_PATH+'logs/'):
        file = open(GLOBAL_PATH+'logs/'+"logs.txt", "w") 
        file.write(timer+' '+"Logs initialisation") 
        file.close() 
        
    timer = timer + '['+str(alert)+']'

    if alert in ['WARNING','ERROR','ERRORS']:
        if 'logs_errors.txt' not in os.listdir(GLOBAL_PATH+'logs/'):
            file = open(GLOBAL_PATH+'logs/'+"logs_errors.txt", "w") 
            file.write(timer+' '+"Logs errors initialisation") 
            file.close() 

        file = open(GLOBAL_PATH+'logs/'+"logs_errors.txt", "a") 
        file.write("\n"+timer+' '+str(msg)) 
        file.close() 
        
    file = open(GLOBAL_PATH+'logs/'+"logs.txt", "a") 
    file.write("\n"+timer+' '+str(msg)) 
    file.close() 


def get_token(url, user, password, force=False):
    if force == False:
        if 'tokens' not in globals():
            global tokens

        now=pd.to_datetime('today')

        if 'tokens' in globals():
            exp_date = tokens.loc[tokens.expiration_date == max(tokens.expiration_date),'expiration_date']
            if (exp_date[0] - now) > pd.Timedelta('1 minute'):
                return tokens['access_token'][0]
  
    logger('Refreshing token')
    payload = dict(client_id='CLOUDFERRO_PUBLIC', username=user, password=password, grant_type='password')
    res = requests.post(url, data=payload)
    res=json.loads(res.content.decode('utf-8'))
    res['creation_date']=pd.to_datetime('today')
    res['expiration_date']=pd.to_datetime('today')+pd.Timedelta(str(res['expires_in'])+' seconds')

    tokens=pd.DataFrame.from_dict(res, orient='index').transpose()
    return res['access_token']


def get_startDate():
    list_csv = pd.Series(os.listdir(GLOBAL_PATH+"csv/"))
    list_csv=list_csv[list_csv.str.contains('.csv')]
    if len(list_csv) == 0:
        return ''
  
    max_date = pd.to_datetime(list_csv.str.slice(26,-4), format='%Y_%m_%d_%H_%M').dt.date.max()
    max_date = max_date.strftime('%Y-%m-%d')

    return "&startDate="+max_date

In [6]:
#Vérification existence dossiers
ls=["", "archives", "csv", "temp",'logs']
for elem in ls:
    if(os.path.isdir(GLOBAL_PATH+elem)==0):
        try:
            os.mkdir(GLOBAL_PATH+elem)
        except OSError:
            print ("Creation of the directory failed")

In [7]:
if _Update_API:
    req=0
    q_limit=pd.to_datetime("today")
    logger('Update API')

    while True:
        url_api="http://finder.creodias.eu/resto/api/collections/Sentinel5P/search.json?_pretty=true&productType=L2__NO2___&maxRecords=2000&sortParam=published&sortOrder=ascending"+get_startDate()

        response=requests.get(url_api)
        response=json.loads(response.content.decode('utf-8'))

        keys=['id','title','orbitNumber','processingLevel','productType','cloudCover','snowCover','startDate','completionDate','published','services']
        infos=[[elem.get(key, elem['properties'].get(key)) for key in keys] for elem in response["features"]]
        infos=pd.DataFrame(infos,columns=keys)

        infos=infos.join(pd.concat([pd.DataFrame.from_dict(elem, orient='index') for elem in infos['services']]).reset_index(drop=True)).drop(columns=['mimeType','services'])

        date_min = pd.to_datetime(infos.published.min()).strftime('%Y_%m_%d_%H_%M')
        date_max = pd.to_datetime(infos.published.max()).strftime('%Y_%m_%d_%H_%M')

        if get_startDate()[-10:] == date_max[:10].replace('_','-'):
            break

        else: 
            infos.to_csv(GLOBAL_PATH+"csv/infos_"+date_min+"_to_"+date_max+".csv")
            req+=1

      #Sleeper : Max 60req/Min
    if req > 59 :
        waiting_time = 60 - (pd.to_datetime("today") - q_limit).seconds
        if waiting_time > 0 :
            time.sleep(waiting_time)
        
        q_limit=pd.to_datetime("today")
        req=0
        
    elif (pd.to_datetime("today") - q_limit).seconds > 60:
        q_limit=pd.to_datetime("today")
        req=0

logger('Update API successfull','SUCCESS')

In [8]:
#Check Récup tous les fichiers infos
logger('Loading infos file')

infos=pd.DataFrame()

list_infos = pd.Series(os.listdir(GLOBAL_PATH+"csv/"))
list_infos=list_infos[list_infos.str.contains('.csv')]

for csv in list_infos:
    infos=pd.concat([infos, pd.read_csv(GLOBAL_PATH+'csv/'+csv, index_col=0)], axis=0)

infos=infos.reset_index(drop=True)

#Regarder les jours déjà téléchargés
if 'tracking_files.csv' not in os.listdir(GLOBAL_PATH):
    tracking_files=pd.DataFrame(columns=['date','number'])
else:
    tracking_files=pd.read_csv(GLOBAL_PATH+'tracking_files.csv')
    tracking_files['date']=pd.to_datetime(tracking_files['date']).dt.date

#Converti format date
infos['date']=pd.to_datetime(infos.startDate)
infos['date']=infos['date'].dt.date

tab_days = infos.groupby('date')['id'].count().reset_index()
tab_days=tab_days.merge(tracking_files, on='date', how='left').fillna(0)
tab_days.columns=['date','available','owned']
tab_days['to_dl']=tab_days['available']-tab_days['owned']

#Days to run
tab_days = tab_days[tab_days.to_dl > 0]

logger('Infos file loaded','SUCCESS')

In [None]:
#for day in tab_days:
for day in tab_days.loc[:,'date']:
    id_torun=infos[infos['date']==day].index
    nfiles=len(id_torun)
    
    if _Download:
        logger('Download day '+str(day))
        print('Download day '+str(day))

        for k in tqdm(id_torun):
            try:
                my_token = get_token(url, user, password)
            except Exception as E:
                logger(E, 'ERROR')

            dl_url = infos.loc[k,"url"]+'?token='+my_token
            r = requests.get(dl_url)
            logger(str(r.status_code)+' for k='+str(k))

            if r.status_code == 200 :
                try:
                    my_token = get_token(url, user, password)
            
                    with open(GLOBAL_PATH+'temp/'+infos.loc[k,"id"], 'wb') as f:
                        f.write(r.content)

                    with zipfile.ZipFile(GLOBAL_PATH+'temp/'+infos.loc[k,"id"], 'r') as zip_ref:
                        zip_ref.extractall(GLOBAL_PATH+'temp/')

                    #Supp ZIP
                    os.remove(GLOBAL_PATH+'temp/'+infos.loc[k,"id"])
                    !trash-empty
                except Exception as E:
                    logger(E, 'ERROR')
            else:
                logger('Code '+str(r.status_code)+' when downloading', 'ERROR')

    rootgrp=dict()
    df_sat=dict()

    logger('Cleaning part for day '+str(day))

    for k in tqdm(id_torun):
        logger('File '+str(k)+': cleaning...')

        try:
            rootgrp[k]=Dataset(GLOBAL_PATH+'temp/'+infos.loc[k,"title"]+"/"+infos.loc[k,"title"]+".nc", "r", format="NETCDF4")
        except:
            logger('No such file in directory','ERROR')
            continue

        product_data=['longitude','latitude','nitrogendioxide_tropospheric_column','qa_value']
        x1={elem:rootgrp[k].groups["PRODUCT"].variables[elem][0].data.flatten() for elem in product_data}
        df_sat[k]=pd.DataFrame.from_dict(x1)

        support_data=['eastward_wind','northward_wind','surface_pressure','snow_ice_flag','surface_classification','surface_altitude','surface_altitude_precision']
        x2=dict()
        for elem in support_data:
            try:
                x2[elem]=rootgrp[k].groups["PRODUCT"].groups['SUPPORT_DATA'].groups['INPUT_DATA'].variables[elem][0].data.flatten()
            except:
                x2[elem]=np.array([float('NaN')]*df_sat[k].shape[0])


        x2=pd.DataFrame.from_dict(x2)

        df_sat[k]=df_sat[k].join(x2)

        #Close file
        rootgrp[k].close()

        #Del folder
        shutil.rmtree(GLOBAL_PATH+'temp/'+infos.loc[k,"title"])

    logger('Cleaning successfull','SUCCESS')
    !trash-empty

    logger('Add coordinates...')
    #Get locations infos from latitude and longitude
    coordinates=dict()
    results=dict()
    id_torun = list(df_sat.keys())

    for k in tqdm(id_torun):
        logger('File '+str(k)+': coordinates...')
        coordinates[k] =list(zip(df_sat[k]["latitude"], df_sat[k]["longitude"]))

        results[k] =rg.search(coordinates[k])
        
        
        results[k]=pd.DataFrame.from_dict(results[k]).rename(columns={"lat":"cc_lat","lon":"cc_lon","name":"cc_ville","admin1":"cc_region","admin2":"cc_departement","cc":"cc_pays"})
        df_sat[k]=pd.concat([df_sat[k], results[k]], axis=1)

        df_sat[k].cc_lon=df_sat[k].cc_lon.astype(float)
        df_sat[k].cc_lat=df_sat[k].cc_lat.astype(float)

        #Compute distance
        df_sat[k]["dist"]=distance([df_sat[k].latitude, df_sat[k].longitude], [df_sat[k].cc_lat, df_sat[k].cc_lon])
        #Drop if dist > 30
        df_sat[k]=df_sat[k][df_sat[k].dist <= 30]

        #Remove columns generated
        df_sat[k]=df_sat[k].drop(['cc_lat', 'cc_lon', "dist"], axis=1)

        #Save to csv
        df_sat[k]['date']=pd.to_datetime(infos.loc[k,'startDate'])
        
        del(coordinates[k], results[k])

    logger('Coordinates successfull','SUCCESS')

    tosave=pd.concat([df_sat[elem] for elem in df_sat.keys()])
    del(df_sat)

    #Keep only QA > 0.5
    tosave = tosave[tosave.qa_value >= 0.5]

    cols_group = ['cc_pays','cc_departement','cc_region','cc_ville']

    #Dict of aggregate
    agg_dict=dict()
    for elem in ['longitude','latitude','nitrogendioxide_tropospheric_column','qa_value','eastward_wind','northward_wind','surface_pressure','surface_altitude','surface_altitude_precision']:
        agg_dict[elem]='mean'

    for elem in ['nb_obs']:
        agg_dict[elem]='sum'

    if tosave.shape[0] > 0:

        tosave['nb_obs']=1
        #Get max values_counts()
        snow_ice_flag_max=pd.crosstab([tosave[elem] for elem in cols_group], tosave['snow_ice_flag'])
        snow_ice_flag_max=snow_ice_flag_max.idxmax(axis=1).values

        surface_classification_max=pd.crosstab([tosave[elem] for elem in cols_group], tosave['surface_classification'])
        surface_classification_max=surface_classification_max.idxmax(axis=1).values

        #Groupby city
        tosave=tosave.groupby(cols_group, as_index=False).agg(agg_dict)

        tosave['snow_ice_flag_max']=snow_ice_flag_max
        tosave['surface_classification_max']=surface_classification_max
        tosave['Date']=str(day)

    filename="archived_"+str(day)+".csv"
    #Save files
    tosave.to_csv(GLOBAL_PATH+"archives/"+filename+'.gz', index=False, compression='gzip')

    if _Tracking:
        logger('Save date '+str(day)+' to csv')
        tracking_files=tracking_files.append({'date': str(day), 'number':nfiles}, ignore_index=True)
        tracking_files.to_csv(GLOBAL_PATH+'tracking_files.csv', index=False)

    logger('WORK SUCCESSFULL WITHOUT ERRORS FOR DAY '+str(day),'SUCCESS')
    print('Done')

Download day 2020-11-12


HBox(children=(HTML(value=''), FloatProgress(value=0.0, max=177.0), HTML(value='')))