# Reader preparation : Data exploration tool for reader creation 

This notebook aims to guide you through creating a reader. It will provide you with functions that will display or check the way you import your datasets. In this notebook, you will get all the parameters/elements to define an new reader - pieces of code which eventually should be consolidated in `parser_template.py`

This notebook uses a lightweight dataset as a sample. You may use it (make a copy) for exploring your own dataset, when preparing a new reader. 


Following the documentation to [Add a new reader], we'll follow 3 steps : (https://disdrodb.readthedocs.io/en/latest/readers.html#adding-a-new-reader).

* Step 1 : Data, where we introduce the sample data.
* Step 2 : where we dig into the data to set up the transformation parameters.
* Step 3 : where we create the reader

## Step 1: Data in the right folder structure

You will find the sample data in the folder`data` of the GitHub repository. It corresponds to one measurement campaign composed of two stations (`ID_station_1` and `ID_station_2`) during two
days.

```
📁 data/
  📁 DISDRODB/
  ├── 📁 Raw/
      ├── 📁 INSTITUTION\_or\_COUNTRY/
          ├── 📁 CAMPAIGN/
              ├── 📁 data
                  ├── 📁 ID\_station\_1/
                  ├── 📜 file60\_20180817.dat.gz
                  ├── 📜 file60\_20180818.dat.gz
                  ├── 📁 ID\_station\_2/
                  ├── 📜 file61\_20180817.dat.gz
                  ├── 📜 file61\_20180818.dat.gz
              ├── 📁 info
              ├── 📁 issue
                  ├── 📜 ID\_station\_1.yml
                  ├── 📜 ID\_station\_2.yml
              ├── 📁 metadata
                  ├── 📜 ID\_station\_1.yml
                  ├── 📜 ID\_station\_2.yml
```

This structure fulfills the requirements described in the documentation to [Add a new reader](https://disdrodb.readthedocs.io/en/latest/readers.html#adding-a-new-reader).


## Step 2: Read and analyse the data

Once the data folders are correctly set up, we can now start analysing our dataset. 

The objectives of step 2 is to define the raw reading and writing specifications. At the end, you should be able to generate Apache parquet files from your input raw data.  Step 2 breaks down the script `disdrodb\L0\readers\parser_template.py`

We load modules and packages. *Nothing must be changed here*. 

In [2]:
import os
import sys
import logging
import pandas as pd

# Add project root folder into sys path
root_path = os.path.dirname(os.path.dirname(os.path.dirname(os.getcwd())))
sys.path.insert(0,root_path)


# Directory
from disdrodb.L0.io import (
    check_directories,
    get_campaign_name,
    create_directory_structure,
)


# Tools to develop the parser
from disdrodb.L0.template_tools import (
    check_column_names,
    infer_df_str_column_names,
    print_df_first_n_rows,
    print_df_random_n_rows,
    print_df_column_names,
    print_valid_L0_column_names,
    get_df_columns_unique_values_dict,
    print_df_columns_unique_values,
    print_df_summary_stats,
)

# L0A processing
from disdrodb.L0.L0A_processing import (
    read_raw_data,
    get_file_list,
    read_L0A_raw_file_list,
    cast_column_dtypes,
    write_df_to_parquet,  # TODO: add code to write to parquet a single file in 8.3 ... to check it works
)

# Metadata
from disdrodb.L0.metadata import read_metadata

# Standards
from disdrodb.L0.check_standards import check_sensor_name, check_L0A_column_names

# Logger
from disdrodb.utils.logger import create_logger



**1. Define paths and running parameters**

In the following section, define the raw and processed folder paths. *This may be changed if you are using another folder*


In [3]:
raw_dir = os.path.join(root_path,"data","DISDRODB","Raw","INSTITUTION_or_COUNTRY","CAMPAIGN")  # Must end with campaign_name upper case
processed_dir = os.path.join(root_path,"data","DISDRODB","Processed","INSTITUTION_or_COUNTRY","CAMPAIGN") # Must end with campaign_name upper case

assert os.path.exists(raw_dir), "Raw directory does not exist"


Then we define running parameters. When the new reader will be created, these parameters will be defined within the command. Please have a look [at the documentation](https://disdrodb.readthedocs.io/en/latest/readers.html#runing-a-reader) to get a full description. 

In [4]:
force = True
lazy = True
verbose = True
debugging_mode = True
sensor_name = "Parsivel"

**2. Initialization**

We initiate some checks, and get some variable. *Nothing must be changed here.*

In [7]:
# Initial directory checks
raw_dir, processed_dir = check_directories(raw_dir, processed_dir, force=False)


# Retrieve campaign name
campaign_name = get_campaign_name(raw_dir)

# -------------------------------------------------------------------------.
# Define logging settings
create_logger(processed_dir, "parser_" + campaign_name)

# Retrieve logger
logger = logging.getLogger(campaign_name)
logger.info("### Script start ###")

# -------------------------------------------------------------------------.
# Create directory structure
create_directory_structure(raw_dir, processed_dir)

# -------------------------------------------------------------------------.
# List stations
list_stations_id = os.listdir(os.path.join(raw_dir, "data"))

**3. Selection of the station**

By default, only one station is read. *Feel free to change the station id (in the current example, we have only two stations. Therefore you can choose between 0 and 1)*

In [8]:
station_id = list_stations_id[0]

**4. Get the list of file to process**

We now list all files that are in selected station.

In [9]:
glob_pattern = os.path.join("data", station_id, "*.dat*")

file_list = get_file_list(
    raw_dir=raw_dir,
    glob_pattern=glob_pattern,
    verbose=verbose,
    debugging_mode=debugging_mode,
)

print(file_list)

 - 2 files to process in /Users/charlotteweil1/ENAC/ENAC-IT4R/disdrodb/data/DISDRODB/Raw/INSTITUTION_or_COUNTRY/CAMPAIGN
['/Users/charlotteweil1/ENAC/ENAC-IT4R/disdrodb/data/DISDRODB/Raw/INSTITUTION_or_COUNTRY/CAMPAIGN/data/ID_station_1/file60_20180817.dat.gz', '/Users/charlotteweil1/ENAC/ENAC-IT4R/disdrodb/data/DISDRODB/Raw/INSTITUTION_or_COUNTRY/CAMPAIGN/data/ID_station_1/file60_20180818.dat.gz']


Note that the  `glob_pattern` variable depends on the file extensions of your dataset. The current dataset contains only \*.dat file.

> 🚨 The `glob_pattern` variable definition will be transferred  to the reader at the end of this notebook. 


**5. Retrieve metadata from yml files**

We now load the metadata file of the station. 

In [14]:
# Retrieve metadata
attrs = read_metadata(raw_dir=raw_dir,
                      station_id=station_id)

# Retrieve sensor name
sensor_name = attrs["sensor_name"]
check_sensor_name(sensor_name)

If the name of the station is not correctly defined, an error message is raised. 

**5. Load the one file into a dataframe**

In the  `reader_kwargs` dictionary, you may set [any arguments](https://pandas.pydata.org/docs/reference/api/pandas.read_csv.html) that need to be passed for the reading of the raw data to a dataframe via Pandas.

In [15]:
reader_kwargs = {}
# - Define delimiter
reader_kwargs["delimiter"] = ","

# - Avoid first column to become df index !!!
reader_kwargs["index_col"] = False

# - Define behaviour when encountering bad lines
reader_kwargs["on_bad_lines"] = "skip"

# - Define parser engine
#   - C engine is faster
#   - Python engine is more feature-complete
reader_kwargs["engine"] = "python"

# - Define on-the-fly decompression of on-disk data
#   - Available: gzip, bz2, zip
reader_kwargs["compression"] = "infer"

# - Strings to recognize as NA/NaN and replace with standard NA flags
#   - Already included: ‘#N/A’, ‘#N/A N/A’, ‘#NA’, ‘-1.#IND’, ‘-1.#QNAN’,
#                       ‘-NaN’, ‘-nan’, ‘1.#IND’, ‘1.#QNAN’, ‘<NA>’, ‘N/A’,
#                       ‘NA’, ‘NULL’, ‘NaN’, ‘n/a’, ‘nan’, ‘null’
reader_kwargs["na_values"] = ["na", "", "error"]

# - Define max size of dask dataframe chunks (if lazy=True)
#   - If None: use a single block for each file
#   - Otherwise: "<max_file_size>MB" by which to cut up larger files
reader_kwargs["blocksize"] = None  # "50MB"

reader_kwargs['header'] = None

filepath = file_list[0]
str_reader_kwargs = reader_kwargs.copy()
str_reader_kwargs["dtype"] = str  # or object


df_raw = read_raw_data(
    filepath, column_names=None, reader_kwargs=str_reader_kwargs, lazy=False
)


print(f'Dataframe for the file {os.path.basename(filepath)} :')
display(df_raw)


Dataframe for the file file60_20180817.dat.gz :


Unnamed: 0,0,1,2,3,4,5,6,7,8,9,...,14,15,16,17,18,19,20,21,22,23
0,362511,4612.0301,00847.4977,01-08-2018 12:44:30,,OK,0000.000,0056.49,00,00,...,035,0.06,24.9,0,005.649,000,"-9.999,-9.999,-9.999,-9.999,-9.999,-9.999,-9.9...","00.000,00.000,00.000,00.000,00.000,00.000,00.0...","000,000,000,000,000,000,000,000,000,000,000,00...",0
1,362512,4612.0301,00847.4978,01-08-2018 12:45:01,,OK,0000.000,0056.49,00,00,...,035,0.06,24.9,0,005.649,000,"-9.999,-9.999,-9.999,-9.999,-9.999,-9.999,-9.9...","00.000,00.000,00.000,00.000,00.000,00.000,00.0...","000,000,000,000,000,000,000,000,000,000,000,00...",0
2,362513,4612.0301,00847.4985,01-08-2018 12:45:30,,OK,0000.000,0056.49,00,00,...,035,0.06,24.9,0,005.649,000,"-9.999,-9.999,-9.999,-9.999,-9.999,-9.999,-9.9...","00.000,00.000,00.000,00.000,00.000,00.000,00.0...","000,000,000,000,000,000,000,000,000,000,000,00...",0
3,362514,4612.0305,00847.4990,01-08-2018 12:46:01,,OK,0000.000,0056.49,00,00,...,035,0.05,24.9,0,005.649,000,"-9.999,-9.999,-9.999,-9.999,-9.999,-9.999,-9.9...","00.000,00.000,00.000,00.000,00.000,00.000,00.0...","000,000,000,000,000,000,000,000,000,000,000,00...",0
4,362515,4612.0303,00847.4992,01-08-2018 12:46:31,,OK,0000.000,0056.49,00,00,...,034,0.06,24.9,0,005.649,000,"-9.999,-9.999,-9.999,-9.999,-9.999,-9.999,-9.9...","00.000,00.000,00.000,00.000,00.000,00.000,00.0...","000,000,000,000,000,000,000,000,000,000,000,00...",0
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
4736,367249,4612.0313,00847.4956,03-08-2018 04:13:25,,OK,0000.000,0056.71,00,00,...,015,0.06,24.9,0,005.671,000,"-9.999,-9.999,-9.999,-9.999,-9.999,-9.999,-9.9...","00.000,00.000,00.000,00.000,00.000,00.000,00.0...","000,000,000,000,000,000,000,000,000,000,000,00...",0
4737,367250,4612.0313,00847.4955,03-08-2018 04:13:56,,OK,0000.000,0056.71,00,00,...,015,0.06,24.9,0,005.671,000,"-9.999,-9.999,-9.999,-9.999,-9.999,-9.999,-9.9...","00.000,00.000,00.000,00.000,00.000,00.000,00.0...","000,000,000,000,000,000,000,000,000,000,000,00...",0
4738,367251,4612.0313,00847.4955,03-08-2018 04:14:26,,OK,0000.000,0056.71,00,00,...,015,0.06,24.9,0,005.671,000,"-9.999,-9.999,-9.999,-9.999,-9.999,-9.999,-9.9...","00.000,00.000,00.000,00.000,00.000,00.000,00.0...","000,000,000,000,000,000,000,000,000,000,000,00...",0
4739,367252,4612.0313,00847.4954,03-08-2018 04:14:55,,OK,0000.000,0056.71,00,00,...,015,0.06,24.9,0,005.671,000,"-9.999,-9.999,-9.999,-9.999,-9.999,-9.999,-9.9...","00.000,00.000,00.000,00.000,00.000,00.000,00.0...","000,000,000,000,000,000,000,000,000,000,000,00...",0


If the structure of the dataframe looks fine (header, index), we are on the good track ! 

Depending on the schema of your data, this `reader_kwargs` dictionary may be fairly different from the one above. 

> 🚨 The `reader_kwargs` dictionary will be transferred to the reader at the end of this notebook. 


**6. Data exploration**

The settings for the loading of the data is now ready, we can now load one file and analyse its content to see if there is any errors or inconsistencies.

Here are some instructions : 

* Do not assign column names to the columns yet
* Do not assign a dtype to the columns yet
* Possibly look at multiple files ;)


We print the content first 3 rows :
 (*Feel free to change the value of n to see more/less rows*)

In [28]:
print_df_first_n_rows(df_raw, n=2, column_names=False)

 - Column 0 :
      ['362511' '362512' '362513']
 - Column 1 :
      ['4612.0301' '4612.0301' '4612.0301']
 - Column 2 :
      ['00847.4977' '00847.4978' '00847.4985']
 - Column 3 :
      ['01-08-2018 12:44:30' '01-08-2018 12:45:01' '01-08-2018 12:45:30']
 - Column 4 :
      [nan nan nan]
 - Column 5 :
      ['OK' 'OK' 'OK']
 - Column 6 :
      ['0000.000' '0000.000' '0000.000']
 - Column 7 :
      ['0056.49' '0056.49' '0056.49']
 - Column 8 :
      ['00' '00' '00']
 - Column 9 :
      ['00' '00' '00']
 - Column 10 :
      ['-9.999' '-9.999' '-9.999']
 - Column 11 :
      ['9999' '9999' '9999']
 - Column 12 :
      ['12611' '12617' '12600']
 - Column 13 :
      ['00000' '00000' '00000']
 - Column 14 :
      ['035' '035' '035']
 - Column 15 :
      ['0.06' '0.06' '0.06']
 - Column 16 :
      ['24.9' '24.9' '24.9']
 - Column 17 :
      ['0' '0' '0']
 - Column 18 :
      ['005.649' '005.649' '005.649']
 - Column 19 :
      ['000' '000' '000']
 - Column 20 :
      ['-9.999,-9.999,-9.999,-9

In [29]:
df_raw.head(3)

Unnamed: 0,0,1,2,3,4,5,6,7,8,9,...,14,15,16,17,18,19,20,21,22,23
0,362511,4612.0301,847.4977,01-08-2018 12:44:30,,OK,0.0,56.49,0,0,...,35,0.06,24.9,0,5.649,0,"-9.999,-9.999,-9.999,-9.999,-9.999,-9.999,-9.9...","00.000,00.000,00.000,00.000,00.000,00.000,00.0...","000,000,000,000,000,000,000,000,000,000,000,00...",0
1,362512,4612.0301,847.4978,01-08-2018 12:45:01,,OK,0.0,56.49,0,0,...,35,0.06,24.9,0,5.649,0,"-9.999,-9.999,-9.999,-9.999,-9.999,-9.999,-9.9...","00.000,00.000,00.000,00.000,00.000,00.000,00.0...","000,000,000,000,000,000,000,000,000,000,000,00...",0
2,362513,4612.0301,847.4985,01-08-2018 12:45:30,,OK,0.0,56.49,0,0,...,35,0.06,24.9,0,5.649,0,"-9.999,-9.999,-9.999,-9.999,-9.999,-9.999,-9.9...","00.000,00.000,00.000,00.000,00.000,00.000,00.0...","000,000,000,000,000,000,000,000,000,000,000,00...",0


We print the content of n rows picked randomly : 

In [30]:
print_df_random_n_rows(df_raw, n=6, with_column_names=False) 

- Column 0 : ['362582' '363501' '367139' '364888' '363502' '364372']
- Column 1 : ['4612.0334' '4612.0312' '4612.0284' '4612.0330' '4612.0312' '4612.0306']
- Column 2 : ['00847.4961' '00847.4952' '00847.4968' '00847.4958' '00847.4953'
 '00847.4965']
- Column 3 : ['01-08-2018 13:20:01' '01-08-2018 20:59:31' '03-08-2018 03:18:31'
 '02-08-2018 08:33:01' '01-08-2018 21:00:01' '02-08-2018 04:15:01']
- Column 4 : [nan nan nan nan nan nan]
- Column 5 : ['OK' 'OK' 'OK' 'OK' 'OK' 'OK']
- Column 6 : ['0000.000' '0000.000' '0000.000' '0000.000' '0000.000' '0000.000']
- Column 7 : ['0056.49' '0056.67' '0056.71' '0056.67' '0056.67' '0056.67']
- Column 8 : ['00' '00' '00' '00' '00' '00']
- Column 9 : ['00' '00' '00' '00' '00' '00']
- Column 10 : ['-9.999' '-9.999' '-9.999' '-9.999' '-9.999' '-9.999']
- Column 11 : ['9999' '9999' '9999' '9999' '9999' '9999']
- Column 12 : ['12637' '12514' '11413' '12662' '12515' '12575']
- Column 13 : ['00000' '00000' '00000' '00000' '00000' '00000']
- Column 14 : ['

Get the number of column :

In [31]:
len(df_raw.columns)

24

Look at unique values for a single column :

In [14]:
print_df_columns_unique_values(df_raw, column_indices=11, column_names=False)

 - Column 11 :
      ['0824', '0906', '1363', '1397', '2921', '3203', '3326', '3816', '4465', '9999']


Look at unique values for a few columns :

*Use column=indices=None to look a unique values for all columns*

In [15]:
print_df_columns_unique_values(df_raw, column_indices=slice(10, 12), column_names=False)

 - Column 10 :
      ['-9.999', '02.669', '04.241', '04.745', '04.826', '04.879', '05.430', '06.095', '06.220', '07.415', '08.436', '08.489', '08.506', '08.724', '08.956', '09.079', '09.894', '10.057', '10.567', '11.705', '12.097', '12.390', '12.923', '13.114', '13.407', '13.684', '14.324', '15.060', '16.530', '16.636', '16.668', '17.194', '17.382', '17.829', '17.918', '18.334', '18.655', '19.526', '20.329', '21.134', '21.426', '23.098', '23.664', '23.760', '24.472', '25.473', '25.957', '29.270', '31.271', '32.255', '33.844', '36.196']
 - Column 11 :
      ['0824', '0906', '1363', '1397', '2921', '3203', '3326', '3816', '4465', '9999']


Get the unique values as dictionnary

In [16]:
get_df_columns_unique_values_dict(df_raw, column_indices=slice(10, 12), column_names=False)

{'Column 10': ['-9.999',
  '02.669',
  '04.241',
  '04.745',
  '04.826',
  '04.879',
  '05.430',
  '06.095',
  '06.220',
  '07.415',
  '08.436',
  '08.489',
  '08.506',
  '08.724',
  '08.956',
  '09.079',
  '09.894',
  '10.057',
  '10.567',
  '11.705',
  '12.097',
  '12.390',
  '12.923',
  '13.114',
  '13.407',
  '13.684',
  '14.324',
  '15.060',
  '16.530',
  '16.636',
  '16.668',
  '17.194',
  '17.382',
  '17.829',
  '17.918',
  '18.334',
  '18.655',
  '19.526',
  '20.329',
  '21.134',
  '21.426',
  '23.098',
  '23.664',
  '23.760',
  '24.472',
  '25.473',
  '25.957',
  '29.270',
  '31.271',
  '32.255',
  '33.844',
  '36.196'],
 'Column 11': ['0824',
  '0906',
  '1363',
  '1397',
  '2921',
  '3203',
  '3326',
  '3816',
  '4465',
  '9999']}

**7. Columns name**

Now we have validated the content of our data. It's time to care about its structure (columns name). 

The function `infer_df_str_column_names()` tries to guess the column name based on string patterns (according to `L0A_encodings.yml` and the type of sensor.)

In [32]:
infer_df_str_column_names(df_raw, sensor_name=sensor_name)

{0: [],
 1: [],
 2: [],
 3: [],
 4: [],
 5: [],
 6: ['rainfall_rate_32bit'],
 7: ['rainfall_accumulated_32bit'],
 8: ['weather_code_synop_4677', 'weather_code_synop_4680'],
 9: ['weather_code_synop_4677', 'weather_code_synop_4680'],
 10: ['reflectivity_32bit'],
 11: ['mor_visibility'],
 12: ['number_particles', 'laser_amplitude'],
 13: ['number_particles', 'laser_amplitude'],
 14: ['sensor_temperature', 'error_code'],
 15: ['sensor_heating_current'],
 16: ['sensor_battery_voltage'],
 17: ['sensor_status'],
 18: ['rainfall_amount_absolute_32bit'],
 19: ['sensor_temperature', 'error_code'],
 20: ['raw_drop_average_velocity', 'raw_drop_concentration'],
 21: ['raw_drop_average_velocity', 'raw_drop_concentration'],
 22: ['raw_drop_number'],
 23: ['sensor_status']}

This can help us to define later define the columns name.

As reference, here is the list of valid columns name (taken from `L0A_encodings.yml`):

In [18]:
print_valid_L0_column_names(sensor_name)

['rainfall_rate_32bit', 'rainfall_accumulated_32bit', 'weather_code_synop_4680', 'weather_code_synop_4677', 'weather_code_metar_4678', 'weather_code_nws', 'reflectivity_32bit', 'mor_visibility', 'sample_interval', 'laser_amplitude', 'number_particles', 'sensor_temperature', 'sensor_serial_number', 'firmware_iop', 'firmware_dsp', 'sensor_heating_current', 'sensor_battery_voltage', 'sensor_status', 'start_time', 'sensor_time', 'sensor_date', 'station_name', 'station_number', 'rainfall_amount_absolute_32bit', 'error_code', 'rainfall_rate_16bit', 'rainfall_rate_12bit', 'rainfall_accumulated_16bit', 'reflectivity_16bit', 'raw_drop_concentration', 'raw_drop_average_velocity', 'raw_drop_number']


It's time now to define our current column names : 

Hint to define the names :
* get information from the disdrometer user guide. 
* use `infer_df_str_column_names()` to help you
* analyse the content column after column with `print_df_columns_unique_values()`  

In [34]:
column_names = [
    "unknown1",
    "unknown2",
    "unknown3",
    "time",
    "unknown4",
    "unknown5",
    "rainfall_rate_32bit",
    "rainfall_accumulated_32bit",
    "weather_code_synop_4680",
    "weather_code_synop_4677",
    "reflectivity_32bit",
    "mor_visibility",
    "laser_amplitude",
    "number_particles",
    "sensor_temperature",
    "sensor_heating_current",
    "sensor_battery_voltage",
    "sensor_status",
    "rainfall_amount_absolute_32bit",
    "error_code",
    "raw_drop_concentration",
    "raw_drop_average_velocity",
    "raw_drop_number",
    "unknown6",
]

> 🚨 The `column_names` list will be transferred  to the reader at the end of this notebook. 

Check the validity of your definition 

In [35]:
check_column_names(column_names,sensor_name)

The following columns do no met the DISDRODB standards: ['unknown3', 'unknown4', 'unknown1', 'unknown6', 'unknown5', 'unknown2'].
Please remove such columns within the df_sanitizer_fun


Ok, fair enough. As mentioned in the error message, the unknown columns will be removed later.

**8. Read the dataframe with correct columns name**

We can now create a new dataframe with the columns name :

In [38]:
df = read_raw_data(filepath=filepath,column_names=column_names,reader_kwargs=reader_kwargs,lazy=False)

And print the dataframe column names : 

In [39]:
print_df_column_names(df)

 - Column 0 : unknown1
 - Column 1 : unknown2
 - Column 2 : unknown3
 - Column 3 : time
 - Column 4 : unknown4
 - Column 5 : unknown5
 - Column 6 : rainfall_rate_32bit
 - Column 7 : rainfall_accumulated_32bit
 - Column 8 : weather_code_synop_4680
 - Column 9 : weather_code_synop_4677
 - Column 10 : reflectivity_32bit
 - Column 11 : mor_visibility
 - Column 12 : laser_amplitude
 - Column 13 : number_particles
 - Column 14 : sensor_temperature
 - Column 15 : sensor_heating_current
 - Column 16 : sensor_battery_voltage
 - Column 17 : sensor_status
 - Column 18 : rainfall_amount_absolute_32bit
 - Column 19 : error_code
 - Column 20 : raw_drop_concentration
 - Column 21 : raw_drop_average_velocity
 - Column 22 : raw_drop_number
 - Column 23 : unknown6


Check if the lazily loading (dask) is correct :

In [40]:
df_dask = read_raw_data(filepath=filepath, column_names=column_names, reader_kwargs=reader_kwargs, lazy=True)
df_dask = df_dask.compute()

And print the dataframe column names : 

In [41]:
print_df_column_names(df_dask)

 - Column 0 : unknown1
 - Column 1 : unknown2
 - Column 2 : unknown3
 - Column 3 : time
 - Column 4 : unknown4
 - Column 5 : unknown5
 - Column 6 : rainfall_rate_32bit
 - Column 7 : rainfall_accumulated_32bit
 - Column 8 : weather_code_synop_4680
 - Column 9 : weather_code_synop_4677
 - Column 10 : reflectivity_32bit
 - Column 11 : mor_visibility
 - Column 12 : laser_amplitude
 - Column 13 : number_particles
 - Column 14 : sensor_temperature
 - Column 15 : sensor_heating_current
 - Column 16 : sensor_battery_voltage
 - Column 17 : sensor_status
 - Column 18 : rainfall_amount_absolute_32bit
 - Column 19 : error_code
 - Column 20 : raw_drop_concentration
 - Column 21 : raw_drop_average_velocity
 - Column 22 : raw_drop_number
 - Column 23 : unknown6


We can now verify that the pandas and the dask reading are similar 

In [42]:
assert df.equals(df_dask)

We can get some statistics for the `rainfall_rate_32bit`, but feel free to change it

**9. Run somme tests and analysis**

This must be done once that `reader_kwargs` and `column_names` are correctly defined

Input some statistics on columns : 

In [45]:
stats = df.loc[:, ["rainfall_rate_32bit"]]
stats = stats.astype('float')

print_df_summary_stats(stats)

 - Column 0 ( rainfall_rate_32bit ):
                    
mean  0.005426
min   0.000000
25%   0.000000
50%   0.000000
75%   0.000000
max   2.881000


Check if the file is empty : 

In [46]:
if len(df.index) == 0:
    raise ValueError(f"{filepath} is empty and has been skipped.")

Check the number of columns :

In [47]:
if len(df.columns) != len(column_names):
    raise ValueError(f"{filepath} has wrong columns number, and has been skipped.")

**10. Final columns formatting**

It's time now to remove all the columns that does not match the standard.

In [48]:
df = df.drop(columns=["unknown1", "unknown2", "unknown3","unknown4",'unknown5','unknown6'])

In [49]:
df["time"] = pd.to_datetime(df["time"], format="%m-%d-%Y %H:%M:%S")

Check column names met DISDRODB standards after custom processing :

In [50]:
check_L0A_column_names(df, sensor_name=sensor_name)

 Determine dtype based on standards :

In [51]:
df = cast_column_dtypes(df, sensor_name=sensor_name)

Check the dataframe looks as desired :

In [52]:
print_df_column_names(df)

 - Column 0 : time
 - Column 1 : rainfall_rate_32bit
 - Column 2 : rainfall_accumulated_32bit
 - Column 3 : weather_code_synop_4680
 - Column 4 : weather_code_synop_4677
 - Column 5 : reflectivity_32bit
 - Column 6 : mor_visibility
 - Column 7 : laser_amplitude
 - Column 8 : number_particles
 - Column 9 : sensor_temperature
 - Column 10 : sensor_heating_current
 - Column 11 : sensor_battery_voltage
 - Column 12 : sensor_status
 - Column 13 : rainfall_amount_absolute_32bit
 - Column 14 : error_code
 - Column 15 : raw_drop_concentration
 - Column 16 : raw_drop_average_velocity
 - Column 17 : raw_drop_number


In [53]:
print_df_random_n_rows(df, n=5)

- Column 0 (time) : ['2018-01-08T19:45:31.000000000' '2018-02-08T22:22:01.000000000'
 '2018-02-08T19:51:31.000000000' '2018-02-08T17:38:01.000000000'
 '2018-02-08T03:10:30.000000000']
- Column 1 (rainfall_rate_32bit) : [0. 0. 0. 0. 0.]
- Column 2 (rainfall_accumulated_32bit) : [56.67 56.71 56.71 56.71 56.67]
- Column 3 (weather_code_synop_4680) : [0 0 0 0 0]
- Column 4 (weather_code_synop_4677) : [0 0 0 0 0]
- Column 5 (reflectivity_32bit) : [-9.999 -9.999 -9.999 -9.999 -9.999]
- Column 6 (mor_visibility) : [9999 9999 9999 9999 9999]
- Column 7 (laser_amplitude) : [12507 12213 12471 12476 12577]
- Column 8 (number_particles) : [0 0 0 0 0]
- Column 9 (sensor_temperature) : [18 17 18 20 17]
- Column 10 (sensor_heating_current) : [0.06 0.06 0.06 0.06 0.06]
- Column 11 (sensor_battery_voltage) : [24.9 24.9 24.9 24.9 24.9]
- Column 12 (sensor_status) : [0 0 0 0 0]
- Column 13 (rainfall_amount_absolute_32bit) : [5.667 5.671 5.671 5.671 5.667]
- Column 14 (error_code) : [0 0 0 0 0]
- Column 1

In [54]:
print_df_columns_unique_values(df, column_indices=2, column_names=True)

 - Column 2 ( rainfall_accumulated_32bit ):
      [56.4900016784668, 56.52000045776367, 56.529998779296875, 56.54999923706055, 56.56999969482422, 56.58000183105469, 56.59000015258789, 56.599998474121094, 56.61000061035156, 56.619998931884766, 56.630001068115234, 56.63999938964844, 56.650001525878906, 56.65999984741211, 56.66999816894531, 56.68000030517578, 56.70000076293945, 56.709999084472656]


**11. Define the sanitizer function**

This function will be used in the reader. It contains the list of columns we have to drop, as defined previously. 

In [61]:
def df_sanitizer_fun(df, lazy=False):
    # Import dask or pandas
    if lazy:
        import dask.dataframe as dd
    else:
        import pandas as dd


    # - Drop datalogger columns
    columns_to_drop = ["unknown1", "unknown2", "unknown3","unknown4",'unknown5','unknown6']

    df = df.drop(columns=columns_to_drop)
    
    # - Convert time column to datetime format
    df["time"] = dd.to_datetime(df["time"], format="%m-%d-%Y %H:%M:%S")  
    
    return df

> 🚨 The `df_sanitizer_fun()` function will be transferted to the reader at the end of this notebook. 

**12. Try calling the reader function, as it will be called in reader script**

* You may try with increasing number of files (update file_list)
* Try reading with pandas (lazy=False), then with dask (lazy=True)


In [60]:
lazy = False 
subset_file_list = file_list[:1]

df = read_L0A_raw_file_list(
    file_list=subset_file_list,
    column_names=column_names,
    reader_kwargs=reader_kwargs,
    sensor_name=sensor_name,
    verbose=verbose,
    df_sanitizer_fun=df_sanitizer_fun,
    lazy=lazy,
)

 -  - 0 of 2 have been skipped.
 -  - Concatenation of dataframes started.
 -  - Concatenation of dataframes has finished.


The function `read_L0A_raw_file_list` takes as argument :
* `file_list` : list of file based on data folder analysis
* `column_names` : the list of column, defined previously  
* `reader_kwargs` : dictionary to data loading  into the dataframe, defined previously 
* `sensor_name` : taken from the config file 
* `df_sanitizer_fun`: the function to clean up the data frame, defined previously 

All these arguments are defined either in the data folder, or earlier in the code. It is now time to create the reader !

## Step 3 : Create the reader

We have now all the elements to start creating the new reader. All the modifications that we did in this notebook must be now transcribed into a reader file.

1. Copy and paste the `disdrodb\L0\readers\parser_template.py` into the folder `disdrodb\L0\readers\TUTORIAL`

2. Rename the copied file `parser_TUTORIAL.py`

3. Add the root folder as path variable : 
   
   Before :

    ``` python

        import click
        from disdrodb.L0 import run_L0

    ```
    After : 

    ``` python
        import os
        import sys

        # Add project root folder into sys path
        root_path = os.path.dirname(os.path.dirname(os.path.dirname(os.getcwd())))
        sys.path.insert(0,root_path)
        import click
        from disdrodb.L0.L0_processing import run_L0
    ```

4. Define the columns names : 

   Before :

    ``` python

        column_names = []
    ```
    After : 

    ``` python
        column_names = [
            "unknown1",
            "unknown2",
            "unknown3",
            "time",
            "unknown4",
            "unknown5",
            "rainfall_rate_32bit",
            "rainfall_accumulated_32bit",
            "weather_code_synop_4680",
            "weather_code_synop_4677",
            "reflectivity_32bit",
            "mor_visibility",
            "laser_amplitude",
            "number_particles",
            "sensor_temperature",
            "sensor_heating_current",
            "sensor_battery_voltage",
            "sensor_status",
            "rainfall_amount_absolute_32bit",
            "error_code",
            "raw_drop_concentration",
            "raw_drop_average_velocity",
            "raw_drop_number",
            "unknown6",
        ]
    ```


5. Add raw data loading parameter

    Before :

    ``` python

        reader_kwargs["blocksize"] = None # "50MB"

    ```
    After : 

    ``` python
        reader_kwargs["blocksize"] = None # "50MB"
        reader_kwargs['header'] = None
    ```

6. Modify the `df_sanitizer_fun()` function 

   Before :

    ``` python

        def df_sanitizer_fun(df, lazy=False):
            # Import dask or pandas
            if lazy:
                    import dask.dataframe as dd
            else:
                    import pandas as dd

            # - Drop datalogger columns
            columns_to_drop = ['id', 'datalogger_temperature', 'datalogger_voltage', 'datalogger_error']
            df = df.drop(columns=columns_to_drop)

            # - Drop latitude and longitude
            # --> Latitude and longitude is specified in the the metadata.yaml
            df = df.drop(columns=['latitude', 'longitude'])

            # - Convert time column to datetime with resolution in seconds
            df['time'] = dd.to_datetime(df['time'], format='%d-%m-%Y %H:%M:%S')

            return df
    ```
    
    After : 

    ``` python
        def df_sanitizer_fun(df, lazy=False):
            # Import dask or pandas
            if lazy:
                import dask.dataframe as dd
            else:
                import pandas as dd

            # - Drop datalogger columns
            columns_to_drop = ["unknown1", "unknown2", "unknown3","unknown4",'unknown5','unknown6']

            df = df.drop(columns=columns_to_drop)

            # - Convert time column to datetime format
            df["time"] = dd.to_datetime(df["time"], format="%m-%d-%Y %H:%M:%S")  

            return df
    ```
 
 
 7. Run the script
 
     From the root folder, just run (commands to be updated):
     Windows:
     ``` batch
     python .\disdrodb\L0\readers\TUTORIAL\parser_TUTORIAL.py  <data_folder>\DISDRODB\Raw\INSTITUTION_or_COUNTRY\CAMPAIGN\ <data_folder>\DISDRODB\Processed\INSTITUTION_or_COUNTRY\CAMPAIGN\ -l0b True -f True -v True -d False
     ```
     
     Mac/Linux
      ``` batch
     python disdrodb/L0/readers/TUTORIAL/parser_TUTORIAL.py  <data_folder>/DISDRODB/Raw/INSTITUTION_or_COUNTRY/CAMPAIGN/ <data_folder>/DISDRODB/Processed/INSTITUTION_or_COUNTRY/CAMPAIGN/ -l0b True -f True -v True -d False
     ```
     python disdrodb/L0/readers/TUTORIAL/parser_TUTORIAL_correction.py  data/DISDRODB/Raw/INSTITUTION_or_COUNTRY/CAMPAIGN/ data/DISDRODB/Processed/INSTITUTION_or_COUNTRY/CAMPAIGN/ -l0b True -f True -v True -d False
 
    You need to adapt the <data_folder> parameter to your local data folder.

    Have a look here if you want to customize this command. (TODO link still to update)

 
8. Check if the script has correctly run

    The output folder should be as follow :
    
    ```
    📁 DISDRODB/
    ├── 📁 Processed/
       ├── 📁 INSTITUTION\_or\_COUNTRY/
          ├── 📁 CAMPAIGN/
              ├── 📁 info
                  ├── 📜 ID\_station\_1.yml
                  ├── 📜 ID\_station\_2.yml
              ├── 📁 L0A
                  ├── 📁 ID\_station\_1/
                     ├── 📜 \_sID\_station\_1.parquet
                  ├── 📁 ID\_station\_2
                     ├── 📜 \_sID\_station\_2.parquet
              ├── 📁 L0B
                  ├── 📁 ID\_station\_1/
                     ├── 📜 \_sID\_station\_1.nc
                  ├── 📁 ID\_station\_2/
                     ├── 📜 \_sID\_station\_2.nc
              ├── 📁 logs
                  ├── 📜 \<date\>\_LO\_parser.log
              ├── 📁 metadata
                  ├── 📜 ID\_station\_1.yml
                  ├── 📜 ID\_station\_2.yml

    ```

Well done 👋 you have created a new reader. You can now :

-   Create your own reader based on your data.
-   Run this reader over your entire dataset to generate L0 files.
-   Publish this reader to the github main repository to enrich the
    DISDRODB project ! Have a look at the [contributors
    guidelines](https://disdrodb.readthedocs.io/en/latest/contributors_guidelines.html)

    