## Objective:
- Import required libraries data pre-processing; numpy, pyarrow
- Convert Entire OCO2/OCO3 netCDF files to parquet format

#### Why Parquet format?
Parquet is an open source file format built to handle flat columnar storage data formats. Parquet operates well with complex data in large volumes.It is known for its both performant data compression and its ability to handle a wide variety of encoding types. 

- Parquet deploys Google's record-shredding and assembly algorithm that can address complex data structures within data storage. Some Parquet benefits include:

    * Fast queries that can fetch specific column values without reading full row data
    * Highly efficient column-wise compression
    * High compatibility with with OLAP
    
#### Benifits of using Parquet format over CSV
- HOW IS PARQUET DIFFERENT FROM CSV?

While CSV is simple and the most widely used data format (Excel, Google Sheets), there are several distinct advantages for Parquet, including:

- Parquet is column oriented and CSV is row oriented. Row-oriented formats are optimized for OLTP workloads while column-oriented formats are better suited for analytical workloads.
- Column-oriented databases such as AWS Redshift Spectrum bill by the amount data scanned per query

Therefore, converting CSV to Parquet with partitioning and compression lowers overall costs and improves performance

[source: Snowflake](https://www.snowflake.com/guides/what-parquet#:~:text=Parquet%20is%20an%20open%20source,wide%20variety%20of%20encoding%20types.)

In [1]:
# !pip install pyarrow

In [2]:
import os
import numpy as np
import pandas as pd
import pyarrow as pa
import netCDF4 as nc

from pyarrow import parquet as parq

In [3]:
## Example: creating a parquet file using table using the array
arr= np.arange(1.0, 20.0)
pa_table= pa.table({"float_data": arr})
parq.write_table(pa_table, 'test_table_data.parquet')

### NETCDF files
- PATH: downloaded netCDF files from the source
- EG: # list fo FILES initially downloaded locally

In [4]:
path_= '../../../Cluster_machine/OCO2/B_11_new_version/2020/'

In [5]:
file_path_= []

for root, dirs, files in os.walk(path_):
    
    for filename in files:
        print(os.path.join(root, filename))
        
        # Append the files into list
        file_path_.append(os.path.join(root, filename))
        
file_path_[:3]

../../../Cluster_machine/OCO2/B_11_new_version/2020/01\01\LtCO2\oco2_LtCO2_200101_B11014Ar_220902231034s.nc4
../../../Cluster_machine/OCO2/B_11_new_version/2020/01\02\LtCO2\oco2_LtCO2_200102_B11014Ar_220902231109s.nc4
../../../Cluster_machine/OCO2/B_11_new_version/2020/01\03\LtCO2\oco2_LtCO2_200103_B11014Ar_220902231140s.nc4
../../../Cluster_machine/OCO2/B_11_new_version/2020/01\08\LtCO2\oco2_LtCO2_200108_B11014Ar_220902231412s.nc4
../../../Cluster_machine/OCO2/B_11_new_version/2020/01\09\LtCO2\oco2_LtCO2_200109_B11014Ar_220902231443s.nc4
../../../Cluster_machine/OCO2/B_11_new_version/2020/01\10\LtCO2\oco2_LtCO2_200110_B11014Ar_220902231518s.nc4
../../../Cluster_machine/OCO2/B_11_new_version/2020/01\11\LtCO2\oco2_LtCO2_200111_B11014Ar_220902231544s.nc4
../../../Cluster_machine/OCO2/B_11_new_version/2020/01\12\LtCO2\oco2_LtCO2_200112_B11014Ar_220902231615s.nc4
../../../Cluster_machine/OCO2/B_11_new_version/2020/01\13\LtCO2\oco2_LtCO2_200113_B11014Ar_220902231645s.nc4
../../../Cluster_ma

../../../Cluster_machine/OCO2/B_11_new_version/2020/09\03\LtCO2\oco2_LtCO2_200903_B11014Ar_220825210657s.nc4
../../../Cluster_machine/OCO2/B_11_new_version/2020/09\04\LtCO2\oco2_LtCO2_200904_B11014Ar_220825210718s.nc4
../../../Cluster_machine/OCO2/B_11_new_version/2020/09\05\LtCO2\oco2_LtCO2_200905_B11014Ar_220825210808s.nc4
../../../Cluster_machine/OCO2/B_11_new_version/2020/09\06\LtCO2\oco2_LtCO2_200906_B11014Ar_220825210823s.nc4
../../../Cluster_machine/OCO2/B_11_new_version/2020/09\07\LtCO2\oco2_LtCO2_200907_B11014Ar_220825210912s.nc4
../../../Cluster_machine/OCO2/B_11_new_version/2020/09\08\LtCO2\oco2_LtCO2_200908_B11014Ar_220825210925s.nc4
../../../Cluster_machine/OCO2/B_11_new_version/2020/09\09\LtCO2\oco2_LtCO2_200909_B11014Ar_220825211019s.nc4
../../../Cluster_machine/OCO2/B_11_new_version/2020/09\10\LtCO2\oco2_LtCO2_200910_B11014Ar_220825211030s.nc4
../../../Cluster_machine/OCO2/B_11_new_version/2020/09\11\LtCO2\oco2_LtCO2_200911_B11014Ar_220825211122s.nc4
../../../Cluster_ma

['../../../Cluster_machine/OCO2/B_11_new_version/2020/01\\01\\LtCO2\\oco2_LtCO2_200101_B11014Ar_220902231034s.nc4',
 '../../../Cluster_machine/OCO2/B_11_new_version/2020/01\\02\\LtCO2\\oco2_LtCO2_200102_B11014Ar_220902231109s.nc4',
 '../../../Cluster_machine/OCO2/B_11_new_version/2020/01\\03\\LtCO2\\oco2_LtCO2_200103_B11014Ar_220902231140s.nc4']

In [8]:
file_names= file_path_

## conv dateTime

In [9]:
def conv_date(d):
    return datetime.strptime(str(d), '%Y%m%d%H%M%S%f')

In [10]:
%%time
countFiles=0

lon_list= []
lat_list= []
xco2_list= []
qual_flag_list= []
lat=[]
lon=[]
dateTime= []

for j in file_names:
    if j.endswith(".nc4"):
        var_= nc.Dataset(j)
        lon_list.append(np.array(var_.variables['vertex_longitude'][:]).tolist())
        lat_list.append(np.array(var_.variables['vertex_latitude'][:]).tolist())
        xco2_list.append(np.array(var_.variables['xco2'][:]).tolist())
        qual_flag_list.append(np.array(var_.variables['xco2_quality_flag'][:]).tolist())
        
        # DateTIme formating
        dateTime.append(np.array(var_.variables['sounding_id'][:].tolist()))

        # lat and long without vert
        lat.append(np.array(var_.variables['latitude'][:].tolist()))
        lon.append(np.array(var_.variables['longitude'][:].tolist()))

#print('\nTotalFiles: ', countFiles)

Wall time: 6min 5s


In [11]:
%%time
lon_list_a= [element for sublist in lon_list for element in sublist]
lat_list_a= [element for sublist in lat_list for element in sublist]
xco2_list_a= [element for sublist in xco2_list for element in sublist]
lon_a= [element for sublist in lon for element in sublist]
lat_a= [element for sublist in lat for element in sublist]
xco2_qual_flag= [ element for sublist in qual_flag_list for element in sublist]
dateTime_list= [ element for sublist in dateTime for element in sublist]

Wall time: 1min 47s


### Transformation to parquet format
- Creating table format

In [12]:
%%time
df_oco3= pa.table({
    'Latitude_vertices': lat_list_a,
    'Longitude_vertices': lon_list_a,
    'Latitude': lat_a,
    'Longitude': lon_a,
    'Xco2': xco2_list_a,
    'quality_flag': xco2_qual_flag,
    'DateTime': dateTime_list
})

Wall time: 7min 20s


## Table from parquet

In [None]:
parq.write_table(df_oco3, 'oco2_2020_parq.parquet')

In [None]:
## WRITE POLYGON values 