In [None]:
import argparse
import yaml
import os, sys
import time
import logging, logging.config

import pandas as pd
import geopandas as gpd
import rasterio
from rasterio.mask import mask
from rasterstats import zonal_stats
from shapely.geometry import mapping

import numpy as np
import stat
import math
import matplotlib

from tqdm import tqdm

import misc_fct

In [None]:
with open('config.yaml') as fp:
    cfg = yaml.load(fp, Loader=yaml.FullLoader)['statistical_analysis.py']    #  [os.path.basename(__file__)]


# Defitions of the functions


# Definition of the constants
DEBUG_MODE=cfg['debug_mode']
CORRECT_BALANCE=cfg['correct_balance']
BANDS=range(1,5)
COUNT_THRESHOLD = 50

PROCESSED=cfg['processed']
PROCESSED_FOLDER=PROCESSED['processed_folder']
FINAL_FOLDER=cfg['final_folder']

## Inputs
ROADS=PROCESSED_FOLDER + PROCESSED['input_files']['roads']
TILES_DIR=PROCESSED_FOLDER + PROCESSED['input_files']['images']
TILES_INFO=PROCESSED_FOLDER + PROCESSED['input_files']['tiles']

written_files=[]


## Definition of functions

In [None]:
def get_pixel_values(polygons, tile, pixel_values, **kwargs):
    '''
    Extract the value of the raster pixels falling under the mask and save them in a dataframe.

    - polygons: shapefile determining the zones where the pixels are extracted
    - tile: path to the raster image
    - pixel_values: dataframe to which the values for the pixels are going to be concatenated
    - kwargs: additional arguments we would like to pass the dataframe of the pixels
    '''
    
    # extract the geometry in GeoJSON format
    geoms = polygons.geometry.values # list of shapely geometries

    geoms = [mapping(geoms[0])]

    # extract the raster values values within the polygon 
    with rasterio.open(tile) as src:
        out_image, out_transform = mask(src, geoms, crop=True)

    # no data values of the original raster
    no_data=src.nodata

    if no_data is None:
        no_data=0
        # print('The value of "no data" is set to 0 by default.')
    
    for band in BANDS:

        # extract the values of the masked array
        data = out_image[band-1]

        # extract the the valid values
        val = np.extract(data != no_data, data)
        val_0 = np.extract(data == no_data, data)

        # print(f'{len(val_0)} pixels equal to the no data value ({no_data}).')

        d=pd.DataFrame({'pix_val':val, 'band_num': band, **kwargs})

        pixel_values = pd.concat([pixel_values, d],ignore_index=True)

    return pixel_values, no_data

def get_df_stats(dataframe, col, results_dict = None, to_df = False):
    '''
    Get the min, max, mean, median, std and count of a column in a dataframe and send back a dict or a dataframe

    - dataframe: dataframe from which the statistics will be calculated
    - col: sting or list of string indicating the column(s) from which the statistics will be calculated
    - result dict: dictionary for the results with the key 'min', 'max', 'mean', 'median', 'std', and 'count'
    - to_df: results from dictionary to dataframe
    '''

    if results_dict==None:
        results_dict={'min': [], 'max': [], 'mean': [], 'median': [], 'std': [], 'count': []}

    results_dict['min'].append(dataframe[col].min())
    results_dict['max'].append(dataframe[col].max())
    results_dict['mean'].append(dataframe[col].mean())
    results_dict['median'].append(dataframe[col].median())
    results_dict['std'].append(dataframe[col].std())
    results_dict['count'].append(dataframe[col].count())

    if to_df:
        results_df=pd.DataFrame(results_dict)
        return results_df
    else:
        return results_dict

# Main

## Import data

In [None]:
# Importation of the files
roads=gpd.read_file(ROADS)
tiles_info = gpd.read_file(TILES_INFO)

In [None]:
print(roads.columns)
print(tiles_info.columns)
roads.shape[0]


## Data treatment

In [None]:
if DEBUG_MODE:
    tiles_info=tiles_info[1:500]

In [None]:
if roads[roads.is_valid==False].shape[0]!=0:
       print(f"There are {roads[roads.is_valid==False].shape[0]} invalid geometries for the roads.")
       sys.exit(1)          

simplified_roads=roads.drop(columns=['ERSTELLUNG', 'ERSTELLU_1', 'HERKUNFT', 'HERKUNFT_J', 'HERKUNFT_M','KUNSTBAUTE', 'WANDERWEGE',
              'VERKEHRSBE', 'BEFAHRBARK', 'EROEFFNUNG', 'STUFE', 'RICHTUNGSG', 'KREISEL', 'EIGENTUEME', 'VERKEHRS_1', 'NAME', 'TLM_STRASS', 'STRASSENNA', 
              'SHAPE_Leng', 'Width'])


# to_file(PROCESSED_FOLDER + '/shapefiles_gpkg/test_invalid_geom.shp')

In [None]:
roads_reproj=simplified_roads.to_crs(epsg=3857)
tiles_info_reproj=tiles_info.to_crs(epsg=3857)

fp_list=[]
for tile_idx in tiles_info_reproj.index:
        # Get the name of the tiles
        x, y, z = tiles_info_reproj.loc[tile_idx,'id'].lstrip('(,)').rstrip('(,)').split(',')
        im_name = z.lstrip() + '_' + x + '_' + y.lstrip() + '.tif'
        im_path = os.path.join(TILES_DIR, im_name)
        fp_list.append(im_path)

tiles_info_reproj['filepath']=fp_list

misc_fct.test_crs(roads_reproj.crs, tiles_info_reproj.crs)


In [None]:
if roads_reproj[roads_reproj.is_valid==False].shape[0]!=0:
       print(f"There are {roads_reproj[roads_reproj.is_valid==False].shape[0]} invalid geometries for the road after the reprojection.")

       print("Correction of the roads presenting an invalid geometry with a buffer of 0 m...")
       corrected_roads=roads_reproj.copy()
       corrected_roads.loc[corrected_roads.is_valid==False,'geometry']=corrected_roads[corrected_roads.is_valid==False]['geometry'].buffer(0)


In [None]:
clipped_roads=gpd.GeoDataFrame()
for idx in tqdm(tiles_info_reproj.index, desc='Clipping roads'):

    roads_to_tile = gpd.clip(corrected_roads, tiles_info_reproj.loc[idx,'geometry']).explode(index_parts=False)
    roads_to_tile['tile']=tiles_info_reproj.loc[idx, 'title']

    clipped_roads=pd.concat([clipped_roads, roads_to_tile], ignore_index=True)


In [None]:
print(corrected_roads.shape)
print(clipped_roads.shape)
print(tiles_info_reproj.head(5))


In [None]:
# dirpath=misc_fct.ensure_dir_exists(os.path.join(PROCESSED_FOLDER, 'shapefiles_gpkg'))

# clipped_roads.to_file(os.path.join(dirpath, 'test_clipped_geom.shp')

### Calcul des statistiques de zone pour les routes

1. Avec rasterstats.zonal_stats

Test

In [None]:
roads_on_tile=clipped_roads[clipped_roads['tile']==tiles_info_reproj.loc[1,'title']]

x, y, z = tiles_info_reproj.loc[1,'id'].lstrip('(,)').rstrip('(,)').split(',')
im_name = z.lstrip() + '_' + x + '_' + y.lstrip() + '.tif'
im_path = os.path.join(TILES_DIR, im_name)

test=zonal_stats(roads_on_tile.iloc[0:1], im_path, stats=['min', 'max', 'mean', 'median','std','count'], band=2)

In [None]:
test

Implementation

In [None]:
roads_stats=pd.DataFrame()

for tile_idx in tqdm(tiles_info_reproj.index, desc='Calculating zonal statistics'):

    roads_on_tile=clipped_roads[clipped_roads['tile']==tiles_info_reproj.loc[tile_idx,'title']]

    # Get the path of the tiles
    im_path=tiles_info_reproj.loc[tile_idx,'filepath']

    roads_on_tile.reset_index(drop=True, inplace=True)

    # Calculation for each road on each band
    for road_idx in roads_on_tile.index:

        road=roads_on_tile.iloc[road_idx:road_idx+1]

        if road.shape[0]>1:
            print('More than one road is being tested.')
            sys.exit(1)

        for band_num in BANDS:

            stats=zonal_stats(road, im_path, stats=['min', 'max', 'mean', 'median','std','count'], band=band_num, nodata=0)
            stats_dict=stats[0]
            stats_dict['band']=band_num
            stats_dict['road_id']=road.loc[road_idx,'OBJECTID']
            stats_dict['road_type']=road.loc[road_idx,'BELAGSART']
            stats_dict['geometry']=road.loc[road_idx,'geometry']
            stats_dict['tile_id']=tiles_info_reproj.loc[tile_idx,'id']

            roads_stats = pd.concat([roads_stats, pd.DataFrame(stats_dict,index=[0])],ignore_index=True)

roads_stats['mean']=roads_stats['mean'].round(1)
roads_stats['std']=roads_stats['std'].round(1)


In [None]:
roads_stats[roads_stats['road_type']==200]


2. Avec les statistiques des pixels

In [None]:
# pixel_values=pd.DataFrame()
from shapely.geometry.multipolygon import MultiPolygon

roads_stats={'cover':[], 'band':[], 'road_id': [], 'road_type': [], 'geometry': [], 'min':[], 'max':[], 'mean':[], 'median':[], 'std':[], 'count':[]}

for road_idx in tqdm(corrected_roads.index, desc='Extracting road statistics'):

    # Get the characteristics of the road
    objectid=corrected_roads.loc[road_idx, 'OBJECTID']
    cover_type=corrected_roads.loc[road_idx, 'BELAGSART']
    road=corrected_roads.loc[corrected_roads['OBJECTID'] == objectid,['OBJECTID', 'BELAGSART', 'geometry']]
    road.reset_index(inplace=True, drop=True)
    geometry = road.loc[0,'geometry'] if road.shape[0]==1 else MultiPolygon([road.loc[k,'geometry'] for k in road.index])

    if objectid in roads_stats['road_id']:
        continue
    
    # Get the corresponding tile(s)
    misc_fct.test_crs(road.crs, tiles_info_reproj.crs)
    intersected_tiles=gpd.overlay(tiles_info_reproj, road)

    intersected_tiles.drop_duplicates(subset=['id'], inplace=True)
    intersected_tiles.reset_index(drop=True, inplace=True)

    pixel_values=pd.DataFrame()

    # Get the pixels for each tile
    for tile_idx in intersected_tiles.index:

        # Get the name of the tiles
        im_path = intersected_tiles.loc[tile_idx,'filepath']
        
        pixel_values, no_data=get_pixel_values(road, im_path, pixel_values, road_id=objectid, road_cover=cover_type)

    for band in BANDS:
        pixels_subset=pixel_values[pixel_values['band_num']==band]

        roads_stats['cover'].append(cover_type)
        roads_stats['band'].append(band)
        roads_stats['road_id'].append(objectid)
        roads_stats['road_type'].append(cover_type)
        roads_stats['geometry'].append(geometry)

        roads_stats=get_df_stats(pixel_values, 'band_num', roads_stats)

roads_stats=pd.DataFrame(roads_stats)

In [None]:
roads_stats['mean']=roads_stats['mean'].round(1)
roads_stats['std']=roads_stats['std'].round(1)

In [None]:
print(roads_stats.shape[0]/4)

Finish

In [None]:
roads_stats_gdf=gpd.GeoDataFrame(roads_stats)

dirpath=misc_fct.ensure_dir_exists(os.path.join(PROCESSED_FOLDER, 'shapefiles_gpkg'))

# roads_stats_gdf.to_file(os.path.join(dirpath, 'roads_stats.shp'))
# written_files.append('processed/shapefiles_gpkg/roads_stats.shp')

In [None]:
roads_stats_df= roads_stats.drop(columns=['geometry'])

print(roads_stats_df.tail(8))

dirpath=misc_fct.ensure_dir_exists(os.path.join(PROCESSED_FOLDER,'tables'))


roads_stats_df.to_csv(os.path.join(dirpath, 'stats_roads.csv'), index=False)
written_files.append('processed/tables/road_stats.csv')

In [None]:
roads_stats_df.plot.hist(column=['count'], by='road_type', bins=50, title = 'Pixel count for each road')


In [None]:
roads_stats_filtered=roads_stats_df[roads_stats_df['count']>COUNT_THRESHOLD]

print(f"{roads_stats_df.shape[0]-roads_stats_filtered.shape[0]} on {roads_stats_df.shape[0]} were dropped because they contained less than {COUNT_THRESHOLD} pixels.")


### Calcul des statistiques par types

In [None]:
# Create a table with the values of pixels on a road
# cf https://gis.stackexchange.com/questions/260304/extract-raster-values-within-shapefile-with-pygeoprocessing-or-gdal

pixel_values=pd.DataFrame()

for tile_idx in tqdm(tiles_info_reproj.index, desc='Getting pixel values'):

    roads_on_tile=clipped_roads[clipped_roads['tile']==tiles_info_reproj.loc[tile_idx,'title']]
    tile = tiles_info_reproj.loc[tile_idx,'filepath']

    for cover_type in roads_on_tile['BELAGSART'].unique().tolist():

        road_shapes=roads_on_tile[roads_on_tile['BELAGSART']==cover_type]

        pixel_values, no_data =get_pixel_values(road_shapes, tile, pixel_values, road_type=cover_type)


In [None]:
# Create a new table with a column per band (just reformatting the table)
pixels_per_band={'road_type':[], 'band1':[], 'band2':[], 'band3':[], 'band4':[]}

for cover_type in pixel_values['road_type'].unique().tolist():

    for band in BANDS:

        pixels_list=pixel_values.loc[(pixel_values['road_type']==cover_type) & (pixel_values['band_num']==band), ['pix_val']]['pix_val'].to_list()
        pixels_per_band[f'band{band}'].extend(pixels_list)

    # Following part to change. Probably, better handling of the no data would avoid this mistake
    max_pixels=max(len(pixels_per_band['band1']), len(pixels_per_band['band2']), len(pixels_per_band['band3']), len(pixels_per_band['band4']))

    for band in BANDS:
        len_pixels_serie=len(pixels_per_band[f'band{band}'])

        if len_pixels_serie!=max_pixels:

            fill=[no_data]*max_pixels
            pixels_per_band[f'band{band}'].extend(fill[len_pixels_serie:])

            print(f'{max_pixels-len_pixels_serie} pixels were missing on the band {band} for the road cover {cover_type}. There were replaced with the value used of no data ({no_data})')


    pixels_per_band['road_type'].extend([cover_type]*len(pixels_list))

pixels_per_band=pd.DataFrame(pixels_per_band)

In [None]:
len(pixels_per_band['road_type'])

In [None]:
# Calculate the statistics of the pixel by band and by type of road cover

cover_stats={'cover':[], 'band':[], 'min':[], 'max':[], 'mean':[], 'median':[], 'std':[], 'iq25':[], 'iq75':[], 'count':[]}

for cover_type in pixel_values['road_type'].unique().tolist():

    for band in BANDS:
        pixels_subset=pixel_values[(pixel_values['band_num']==band) & (pixel_values['road_type']==cover_type)]

        cover_stats['cover'].append(cover_type)
        cover_stats['band'].append(band)

        get_df_stats(pixels_subset, 'pix_val', cover_stats)
        cover_stats['iq25'].append(pixels_subset['pix_val'].quantile(.25))
        cover_stats['iq75'].append(pixels_subset['pix_val'].quantile(.75))



In [None]:
print([x-256 for x in cover_stats['max']])


In [None]:
cover_stats['max']=[int(x) for x in cover_stats['max']] # Otherwise, the values get transformed to x-256 when converted in dataframe

cover_stats_df=pd.DataFrame(cover_stats)
cover_stats_df['mean']=cover_stats_df['mean'].round(1)
cover_stats_df['std']=cover_stats_df['std'].round(1)

print(cover_stats_df)

dirpath=misc_fct.ensure_dir_exists(os.path.join(FINAL_FOLDER, 'tables') )

cover_stats_df.to_csv(os.path.join(dirpath, 'statistics_roads_by_type.csv'), index=False)
written_files.append('final/tables/statistics_roads_by_type.csv')


In [None]:
if CORRECT_BALANCE:
    print('Taking only a subset of the artifical roads and pixels to have a balanced dataset.')

    natural_pixels=pixels_per_band[pixels_per_band['road_type']==200]
    natural_stats=roads_stats_filtered[roads_stats_filtered['road_type']==200]

    artificial_pixels=pixels_per_band[pixels_per_band['road_type']==100].reset_index(drop=True)
    artificial_stats=roads_stats_filtered[roads_stats_filtered['road_type']==100].reset_index(drop=True)

    artificial_pixels_subset=artificial_pixels.sample(frac=natural_pixels.shape[0]/artificial_pixels.shape[0], random_state=1)
    artificial_stats_subset=artificial_stats.sample(frac=natural_stats.shape[0]/artificial_stats.shape[0], random_state=9)

    # print(artificial_stats['mean'].mean()-artificial_stats_subset['mean'].mean())
    # print(artificial_stats['median'].mean()-artificial_stats_subset['median'].mean())
    # print(artificial_stats['std'].mean()-artificial_stats_subset['std'].mean())
    # print(artificial_stats['count'].mean()-artificial_stats_subset['count'].mean())

    # print(artificial_stats['mean'].std()-artificial_stats_subset['mean'].std())
    # print(artificial_stats['median'].std()-artificial_stats_subset['median'].std())
    # print(artificial_stats['std'].std()-artificial_stats_subset['std'].std())
    # print(artificial_stats['count'].std()-artificial_stats_subset['count'].std())

    pixels_per_band=pd.concat([artificial_pixels_subset, natural_pixels], ignore_index=True)
    roads_stats_filtered=pd.concat([artificial_stats_subset,natural_stats], ignore_index=True)

    balance='_balanced'

else:
    balance=''

In [None]:
## Change the format to reader-frienldy
BANDS=['NIR','R','G','B']
pixels_per_band.rename(columns={'band1': 'NIR', 'band2': 'Red', 'band3': 'Green', 'band4': 'Blue'}, inplace=True)
roads_stats_filtered.loc['band']=roads_stats_filtered['band'].replace({1: 'NIR', 2: 'R', 3: 'G', 4: 'B'})

pixels_per_band['road_type']=pixels_per_band['road_type'].replace({100: 'artificial', 200: 'natural'})
roads_stats_filtered['road_type']=roads_stats_filtered['road_type'].replace({100: 'artificial', 200: 'natural'})

### Boxplots

In [None]:
print('Calculating boxplots...')

dirpath_images=misc_fct.ensure_dir_exists(os.path.join(FINAL_FOLDER, 'images'))

# The green bar in the boxplot is the median (cf. https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.plot.box.html)

Boxplot of the pixel values

In [None]:
bp_pixel_bands=pixels_per_band.plot.box(by='road_type', title=f'Repartition of the values for the pixels', figsize=(15,8), grid=True)
fig = bp_pixel_bands[0].get_figure()
fig.savefig(os.path.join(dirpath_images, 'boxplot_pixel_in_bands.jpg'))
written_files.append('final/images/boxplot_pixel_in_bands.jpg')

In [None]:
# pixels_subset.plot.box(column='pix_val', by=['road_type','band_num'], figsize=(10,8))

Boxplots of the statistics

In [None]:
for band in BANDS:
    roads_stats_subset=roads_stats_filtered[roads_stats_filtered['band']==band].drop(columns=['count', 'band', 'road_id'])
    roads_stats_plot=roads_stats_subset.plot.box(by='road_type', figsize=(30,8), title=f'Boxplot of the statistics for the band {band}', grid=True)

    # roads_stats_subset.boxplot(by='road_type', figsize=(30,8))

    fig = roads_stats_plot[0].get_figure()
    fig.savefig(os.path.join(dirpath_images, f'boxplot_stats_band_{band}.jpg'))
    written_files.append(f'final/images/boxplot_stats_band_{band}.jpg')

### PCA

Do the parameters (bands and stats) successfully explain/distinguish the type of road cover?
 
-> Are the clusters well defined?

In [None]:
from sklearn.preprocessing import StandardScaler
from sklearn.decomposition import PCA

import matplotlib.colors as mcolors
import matplotlib.pyplot as plt
import plotly.express as px

# cf. https://towardsdatascience.com/pca-using-python-scikit-learn-e653f8989e60
print('Calculating PCAs...')



In [None]:
def evplot(ev):
    '''
    Implementation of Kaiser's rule and the Broken stick model (MacArthur, 1957) to determine the number of components to keep in the PCA.
    https://www.mohanwugupta.com/post/broken_stick/ -> adapted for Python

    - ev: eigenvalues
    '''

    n=len(ev)

    # Broken stick model (MacArthur 1957)
    j=np.arange(n)+1
    bsm=[1/n]
    for k in range(n-1):
        bsm.append(bsm[k] + 1/(n-1-k))
    bsm=[100*x/n for x in bsm]
    bsm.reverse()

    avg_ev=sum(ev)/len(ev)

    # Plot figures
    fig = plt.figure(figsize = (8,8))

    ax = fig.add_subplot(2,1,1)
    bx = fig.add_subplot(2,1,2)

    ## Kaiser rule
    ax.bar(j,ev)
    ax.axhline(y=avg_ev, color='r', linestyle='-')

    ## Broken stick model
    bx.bar(j-0.25, ev, color='y', width=0.5)
    bx.bar(j+0.25, bsm, color='r', width=0.5)

    return bsm, fig
    
def determine_pc_num(ev, bsm):
    '''
    Determine the number of pc to keep
    '''

    pc_to_keep_kaiser=len([x for x in ev if x>sum(ev)/len(ev)])

    pc_to_keep_bsm=len([x for x in ev if x>bsm[ev.tolist().index(x)]])

    pc_to_keep=min(pc_to_keep_kaiser,pc_to_keep_bsm)

    if pc_to_keep<2:
        print(f'The number of components to keep was {pc_to_keep}. The number of components to keep is set to 1 and the number of components to plot is set to 2.')
        pc_to_keep=1
        pc_to_plot=2
    elif pc_to_keep>10:
        print(f'The number of components to keep and plot was {pc_to_keep}. It is set to a maximum limit of 10')
        pc_to_keep=10
        pc_to_plot=10
    else:
        pc_to_plot=pc_to_keep
        print(f'The number of components to keep and plot is {pc_to_keep}.')

    return pc_to_keep, pc_to_plot
        
def calculate_pca(dataset, features, to_describe,
                dirpath_tables='tables',  dirpath_images='images',
                file_pca_values='PCA_values.csv', file_pc_to_keep='PC_to_keep_evplot.jpg',
                file_graph_ind='PCA_PC1{pc}_individuals.jpg', file_graph_feat='PCA_PC1{pc}_features.jpeg'):
    '''
    Calculate a PCA, determine the number of components to keep, plot the individuals and the variables along those components. The results as saved
    as files.

    Variables:
    - dataset: dataset from which the PCA will be calculated
    - features: decriptive variables of the dataset (must be numerical only)
    - to_describe: explenatory variables or the variables to describe with the PCA (FOR NOW, ONLY ONE EXPLENATORY VARIALBE CAN BE PASSED)
    - dirpath_tables: direcory for the tables
    - dirpath_images: directory for the images
    - file_pca_values: csv file where the coordoniates of the individuals after the PCA are saved
    - file_pc_to_keep: image file where the graphs for the determination of the number of principal components to keep are saved
    - file_graph_ind: image file where the graph for the individuals is saved
    - file_graph_feat: image file where the graph for the features is saved
    '''

    written_files=[]

    # 1. Define the variables and scale
    dataset.reset_index(drop=True, inplace=True)
    x=dataset.loc[:,features].values
    y=dataset.loc[:,to_describe].values

    x = StandardScaler().fit_transform(x)

    # 2. Calculate the PCA
    pca = PCA(n_components=len(features))

    coor_PC = pca.fit_transform(x)

    coor_PC_df = pd.DataFrame(data = coor_PC, columns = [f"PC{k}" for k in range(1,len(features)+1)])
    results_PCA = pd.concat([coor_PC_df, dataset[to_describe]], axis = 1)

    results_PCA.round(3).to_csv(os.path.join(dirpath_tables, file_pca_values), index=False)
    written_files.append(file_pca_values)


    # 3. Get the number of components to keep
    eigenvalues=pca.explained_variance_
    bsm, fig_pc_num = evplot(eigenvalues)

    pc_to_keep, pc_to_plot = determine_pc_num(eigenvalues, bsm)

    fig_pc_num.savefig(os.path.join(dirpath_images, file_pc_to_keep))
    written_files.append(file_pc_to_keep)


    # 4. Plot the graph of the individuals
    expl_var_ratio=[round(x*100,2) for x in pca.explained_variance_ratio_.tolist()]

    for pc in range(2,pc_to_plot+1):
        locals={'pc': pc}
        fig = plt.figure(figsize = (8,8))

        ax = fig.add_subplot(1,1,1) 
        ax.set_xlabel(f'Principal Component 1 ({expl_var_ratio[0]}%)', fontsize = 15)
        ax.set_ylabel(f'Principal Component {pc} ({expl_var_ratio[1]}%)', fontsize = 15)
        ax.set_title('PCA for the values of the pixels on each band', fontsize = 20)

        targets = dataset[to_describe].unique().tolist()
        colors=[key[4:] for key in mcolors.TABLEAU_COLORS.keys()][pc_to_plot]
        for target, color in zip(targets, colors):
            indicesToKeep = results_PCA['road_type'] == target
            ax.scatter(results_PCA.loc[indicesToKeep, 'PC1']
                    , results_PCA.loc[indicesToKeep, f'PC{pc}']
                    , c = color
                    , s = 50)
        ax.legend(targets)
        ax.set_aspect(1)
        ax.grid()

        fig.savefig(os.path.join(dirpath_images, eval(f'f"{file_graph_ind}"', locals)))
        written_files.append(eval(f'f"{file_graph_ind}"', locals))

        # 5. Plot the graph of the variables
        labels_column=[f'Principal component {k+1} ({expl_var_ratio[k]}%)' for k in range(len(features))]
        coor_PC=pd.DataFrame(coor_PC, columns=labels_column)

        loadings = pca.components_.T * np.sqrt(pca.explained_variance_)

        # fig = px.scatter(coor_PC, x= f'Principal component 1 ({expl_var_ratio[0]}%)', y=f'Principal component {pc} ({expl_var_ratio[1]}%)', color=results_PCA['road_type'])
        fig = px.scatter(pd.DataFrame(columns=labels_column), x = f'Principal component 1 ({expl_var_ratio[0]}%)', y=f'Principal component {pc} ({expl_var_ratio[1]}%)')

        for i, feature in enumerate(features):
            fig.add_shape(
                type='line',
                x0=0, y0=0,
                x1=loadings[i, 0],
                y1=loadings[i, 1]
            )

            fig.add_annotation(
                x=loadings[i, 0],
                y=loadings[i, 1],
                ax=0, ay=0,
                xanchor="center",
                yanchor="bottom",
                text=feature,
            )

        fig.update_yaxes(
        scaleanchor = "x",
        scaleratio = 1,
        )

        fig.write_image(os.path.join(dirpath_images, eval(f'f"{file_graph_feat}"', locals)))
        fig.write_image(os.path.join(dirpath_images, eval(f'f"{file_graph_feat}"', locals).replace('jpeg','webp')))

        written_files.append( eval(f'f"{file_graph_feat}"', locals))
        written_files.append( eval(f'f"{file_graph_feat}"', locals).replace('jpeg','webp'))

    return written_files

In [None]:
print(pixels_per_band.head(5))

#### PCA of the pixel values


In [None]:
# Test with function

features = ['NIR', 'Red', 'Green', 'Blue']
to_describe='road_type'

dirpath_tables=misc_fct.ensure_dir_exists(os.path.join(FINAL_FOLDER, 'tables'))

written_files_pca_pixels=calculate_pca(pixels_per_band, features, to_describe, dirpath_tables, dirpath_images, 
            f'PCA_pixel_values{balance}.csv', f'PCA_pixels_PC_to_keep_evplot{balance}.jpg',
            'PCA_pixels_PC1{pc}_'+f'individuals{balance}.jpg', 'PCA_pixels_PC1{pc}_'+f'features{balance}.jpg')

written_files.extend(written_files_pca_pixels)

#### PCA of the road stats

In [None]:
for band in tqdm(BANDS, desc='Processing bands'):
    roads_stats_filtered_subset=roads_stats_filtered[roads_stats_filtered['band']==band]

    roads_stats_filtered_subset.reset_index(drop=True, inplace=True)
    features = ['min', 'max', 'mean', 'std','median']

    to_describe='road_type'

    written_files_pca_stats=calculate_pca(roads_stats_filtered_subset, features, to_describe, dirpath_tables, dirpath_images, 
            f'PCA_stats_band_{band}_values{balance}.csv', f'PCA_stats_band_{band}_PC_to_keep_evplot{balance}.jpg',
            'PCA_stats_PC1{pc}_'+f'band_{band}_individuals{balance}.jpg', 'PCA_stats_PC1{pc}_'+f'band_{band}_features{balance}.jpg')

    written_files.extend(written_files_pca_stats)


In [None]:
print(f'Checkout the written files: {written_files}')
