# Task 1: Converting NetCDF Files to ZARR Format

# Step 1: Data Extract

There are three ways to access the `.aust.ipar.nc` file: 
1. OPENDAP
2. HTTPServer
3. WMS

I accessed the `.aust.ipar.nc` file through HTTPServer. The link of the NetCDF file at the first day of 2023 is (https://thredds.aodn.org.au/thredds/fileServer/IMOS/SRS/OC/gridded/aqua/P1D/2023/01/A.P1D.20230101T053000Z.aust.ipar.nc).

In [None]:
import requests
url = 'https://thredds.aodn.org.au/thredds/fileServer/IMOS/SRS/OC/gridded/aqua/P1D/2023/01/A.P1D.20230101T053000Z.aust.ipar.nc'
response = requests.get(url) 

filename = 'A.P1D.20230101T053000Z.aust.ipar.nc'

if response.status_code == 200:
    with open(filename, 'wb') as f:
        f.write(response.content)
else:
    print('Failed to download dataset file')

# Step 2: Data Transform

I used xarray library to open this NetCDF dataset, and to ingest its information.

In [None]:
!pip install xarray

In [None]:
import numpy as np
import xarray as xr

ds = xr.open_dataset(filename)
ds

# Step 3: Chunking Strategy

Following this, I noticed that 'time', 'latitude', and 'longitude' are 1D, whereas 'ipar' is 3D with a relatively large size. I decided to divide 'ipar' into chunks by 'latitude' and 'longitude'. I simply divided 'ipar' into chunks follows '(1,70,100)', refering that each chunk contains 100 elements. This can be improved by providing more details, such as the partitioning strategy for latitude and longitude, and the specific range that represents a region in terms of latitude and longitude.

In [None]:
chunks={'time':1,'latitude':70, 'longitude':100}
chuncked_ds = ds.chunk(chunks)
chuncked_ds.to_zarr('zarr_store', mode='w')

for var in chuncked_ds.variables:
    data_array = chuncked_ds[var]
    if isinstance(data_array, xr.DataArray):
        print(f"Variable '{var}':")
        if data_array.chunks:
            print(f" - Chunks: {data_array.chunks}")
        else:
            print(" - Not chunked")


def convert2zarr(filename):
    ds = xr.open_dataset(filename)
    chunks={'time':1,'latitude':70, 'longitude':100}
    chuncked_ds = ds.chunk(chunks)
    chuncked_ds.to_zarr('zarr_store', mode='w')

# Step 4: Automation Strategy

Lastly, I consider how to automatically execute the Zarr conversion to update daily. I noticed that PID distinguished daily datasets. In the form of PID, the date was displayed as YYYYMMDD. For instance, the PID of 01/01/2023 is 20230101T053000Z, and the PID of 02/01/2023 is 20230102T053000Z. Meanhwile, the url shows a pattern as ''https://thredds.aodn.org.au/thredds/fileServer/IMOS/SRS/OC/gridded/aqua/P1D/**YEAR**/**MONTH**/A.P1D.**PID**.aust.ipar.nc''

The dataset of current day can be extracred with the following function `auto_conversion()`.

The whole automatic conversion process is demonstrated as:
 1. create a python file with the `auto_conversion` code
 2. deploy this program on the server
 3. set up a schedule to run this program daily. 

In [None]:
from datetime import datetime
def auto_conversion():
    today = datetime.now()
    year = today.year
    month = f'{today.month:02d}'
    current_date = today.strftime("%Y%m%d")
    url = f'https://thredds.aodn.org.au/thredds/fileServer/IMOS/SRS/OC/gridded/aqua/P1D/{year}/{month}/A.P1D.{current_date}T053000Z.aust.ipar.nc'
    print(url)
    response = requests.get(url)
    if response.status_code == 200:
        filename = f'A.P1D.{current_date}T053000Z.aust.ipar.nc'
        with open(filename, 'wb') as f:
            f.write(response.content)
        convert2zarr(filename)
        print('Successfully convet to Zarr format')
    else:
        print('Failed to download dataset file')

auto_conversion()

Notably, this depends on the update schema of the NetCDF dataset. Specifically, the program should run after the data becomes available on the HTTP Server. Therefore, the aforementioned function can be enhanced to collect a specific dataset with a given date (in the format of 'YYYYMMDD') as follows:

In [None]:
from datetime import datetime
def auto_conversion_by_date(date):
    year = date[:4]
    month = date[4:6]
    url = f'https://thredds.aodn.org.au/thredds/fileServer/IMOS/SRS/OC/gridded/aqua/P1D/{year}/{month}/A.P1D.{date}T053000Z.aust.ipar.nc'
    response = requests.get(url)
    if response.status_code == 200:
        filename = f'A.P1D.{date}T053000Z.aust.ipar.nc'
        with open(filename, 'wb') as f:
            f.write(response.content)
        convert2Zarr(filename)
        print('Successfully convet to Zarr format')
    else:
        print('Failed to download dataset file')

auto_conversion_by_date('20240201')

# Task 2: Converting CSV to GeoParquet
This task requires to convert the `abs-regional-lga-2021` dataset from a csv file to the GeoParquet format. 

# Step 1: Data Cleaning

After go through the dataset, I determined these tasks to clean the source data:
- Check and remove duplicate values: identify and remove duplicate data.
- Process empty values: identify empty values in the dataset and decide how to handle them (e.g., remove rows with empty values or fill in empty values).
- Check data types: Ensure that each column of data is of the correct type (as denoted in column 'UNIT_MEASURE: Unit of Measure').

In [None]:
!pip install pandas pyarrow fastparquet

In [None]:
!pip install s3fs

In [None]:
import pandas as pd

filename = 'ABS_ABS_REGIONAL_LGA2021_1.0.0.csv'
df = pd.read_csv(filename)

parquet_file = "s3://gbr-dms-data-public/abs-regional-lga-2021/data.parquet"
parquet_df = pd.read_parquet(parquet_file, storage_options={'anon': True})
print(parquet_df.head())

By analysing the expected formats of results, I preprocessed the dataset to ensure data align with the expected outcome.

In [None]:
df['FREQUENCY'] = df['FREQUENCY: Frequency'].apply(lambda x:x.split(':')[0])
df['REGIONTYPE'] = df['REGIONTYPE: Region Type'].apply(lambda x:x.split(':')[0])
df['REGION_CODE'] = df['LGA_2021: Region'].apply(lambda x:x.split(':')[0])
df['REGION_NAME'] = df['LGA_2021: Region'].apply(lambda x:x.split(':')[1])
df['MEASURE'] = df['MEASURE: Data Item'].apply(lambda x:x.split(':')[0])
df.loc[df['UNIT_MULT: Unit of Multiplier'].notnull(), 'OBS_VALUE'] = df['OBS_VALUE'] * 1000000

df = df.drop(['FREQUENCY: Frequency', 'REGIONTYPE: Region Type', 'LGA_2021: Region', 'MEASURE: Data Item', 
              'UNIT_MEASURE: Unit of Measure','OBS_STATUS: Observation Status', 'OBS_COMMENT: Observation Comment', 
              'UNIT_MULT: Unit of Multiplier'], axis=1)

df.rename(columns={'TIME_PERIOD: Time Period': 'TIME_PERIOD'}, inplace=True)
column_order = ['DATAFLOW', 'FREQUENCY', 'TIME_PERIOD', 'REGIONTYPE', 'REGION_CODE', 'REGION_NAME', 'MEASURE', 'OBS_VALUE']
df = df[column_order]

print(df.head())

From the printed information,  I observed that columns 0 to 7 are of the same size and contain no null data. To clean the data, I will first check for and remove any duplicate data in these columns, and then address any empty data in the last two columns. As for column 8, which is labeled 'UNIT_MULT: Unit of Multiplier', the values in this column are mostly null, but some of them have the same value, '6: Millions'. This indicates the measured value should be multiplied by one million. IF this column is non-null, I can adjust the values in column 6 'OBS_VALUE' accordingly and drop this column as well.

Meanwhile, column 7, labeled 'UNIT_MEASURE', indicates the unit of measurement. Particularly, values representing numbers should be integers.

Therefore, the data clean strategy is improved as:
- Check and remove duplicate values: identify and remove duplicate data.
- Process empty values: adjust values in column 6 'OBS_VALUE' if column 8 'UNIT_MULT' is not-null. Remove columns 8-10.
- Check data types: Ensure that each column of data is of the correct type (as denoted in column 'UNIT_MEASURE: Unit of Measure'). 

# Step 2: Pivot Table

Since this is a regional dataset, I selected columns 'REGION_CODE', 'REGION_NAME', 'TIME_PERIOD' as rows, 'MEASURE: Data Item' as a column, 'OBS_VALUE' as values.

In [None]:
pivot_table = df.pivot_table(
    index=['REGION_CODE', 'REGION_NAME', 'TIME_PERIOD'], 
    columns='MEASURE',
    values='OBS_VALUE'
)
pivot_table = pivot_table.reset_index()
df_sorted = pivot_table.sort_values(by=['TIME_PERIOD', 'REGION_CODE'])
pivot_table_sorted = df_sorted.reset_index(drop=True)

print(pivot_table_sorted.head())

# Step 3: Add Reference Geometry

Add a geometry column for the pivot table. And access AWS S3 to get reference geometry.

In [None]:
credential_df = pd.read_csv('rootkey.csv')
access_key_id = credential_df.iloc[0, 0]
secret_access_key = credential_df.iloc[0, 1]

geoms_file = "s3://gbr-dms-data-public/tasks/geoms.parquet"
geoms_df = pd.read_parquet(geoms_file, storage_options={'anon': True})
print(geoms_df)

result = pd.merge(pivot_table_sorted, geoms_df, left_on='REGION_CODE', right_on='LGA_CODE21')
result = result.drop(['minx', 'miny', 'maxx', 'maxy'], axis=1)

result = result.sort_values(by=['TIME_PERIOD', 'REGION_CODE'])
result = result.reset_index(drop=True)

print(result)

# Step 4: Convert to the GeoParquet Format

Convert the updated dataframe to the GeoParquet format through Python package GeoPandas.

In [None]:
import geopandas as gpd

gdf = gpd.GeoDataFrame(result, geometry='geometry')

gdf.to_parquet('data.parquet')

# Step 5: Save to AWS S3
Save this dataset in the GeoParquet format to AWS S3. *An error occurs when excuting this cell. This error is due to there is no bucket on my S3 server. It can be replaced to the server which has the bucket*.

In [None]:
import boto3
from botocore.exceptions import NoCredentialsError

# Initialize a boto3 client with S3 access
s3_client = boto3.client('s3', aws_access_key_id=access_key_id, aws_secret_access_key=secret_access_key)

file_name = 'data.parquet'
folder_name = 'abs-regional-lga-2021'
bucket_name = 'gbr-dms-data-public'

# Upload the file
try:
    s3_client.upload_file(file_name, bucket_name, 'folder_name/{}'.format(filename))
    print(f"File uploaded to s3://gbr-dms-data-public/{folder_name}/{file_name}")
except NoCredentialsError:
    print("Credentials not available")