<a href="https://colab.research.google.com/github/marvande/master-thesis/blob/main/ProcessingNcFiles.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Processing google cloud files of all years into one NC file

Authenticate

In [7]:
# Authenticate
from google.colab import auth
auth.authenticate_user()
# Give our project ID
project_id = 'ee-iceshelf-gee4geo'
!gcloud config set project {project_id}
# Download the file from a given Google Cloud Storage bucket.
!gsutil cp gs://ee-downscalingclimatemodels/test.txt /tmp/gsutil_download.txt
# Print the result to make sure the transfer worked.
!cat /tmp/gsutil_download.txt 

Are you sure you wish to set property [core/project] to ee-iceshelf-gee4geo?

Do you want to continue (Y/n)?  y

Updated property [core/project].
Copying gs://ee-downscalingclimatemodels/test.txt...
/ [1 files][   13.0 B/   13.0 B]                                                
Operation completed over 1 objects/13.0 B.                                       
AIAIAIAIAIAIA

## Imports

In [8]:
! pip install zarr xarray fsspec gcsfs



In [9]:
import xarray as xr
import zarr
import os
import pandas as pd
from os import listdir
from os.path import isfile, join

import ftplib
import sys
from re import search
from tqdm import tqdm 
import glob

import fsspec
import gcsfs


## Setting up

In [10]:
PROJECT = 'ee-iceshelf-gee4geo'
BUCKET = "ee-downscalingclimatemodels"

In [11]:
os.environ["GCLOUD_PROJECT"] = PROJECT

In [97]:
# Google cloud
from google.cloud import storage
storage_client = storage.Client()
bucket = storage_client.bucket(BUCKET)

# Download files from google cloud

Copy all files from the google cloud bucket:

Other way of doing it:

`! gsutil -m cp gs://ee-downscalingclimatemodels/Chris_data/RawData/MAR-ACCESS1.3/RH/* /tmp/RH/`

and upload to google cloud
` !gsutil -m cp -r /tmp/CC_zarr/CC*.zarr gs://ee-downscalingclimatemodels/Chris_data/RawData/MAR-ACCESS1.3/zarr_data/CC_zarr/`

In [126]:
def empty_dir(pathLocal):
  # delete all files as precaution
  for file_name in os.listdir(pathLocal):
      # construct full file path
      file = pathLocal + file_name
      if os.path.isfile(file):
          os.remove(file)

def create_dir(path):
  # create empty directory for variable
  if not os.path.exists(path):
      os.makedirs(path)

def listFilesGC(path, VAR):
  # Get all files already on GC:
  filesGC = []
  for blob in storage_client.list_blobs(bucket, prefix=f'Chris_data/RawData/MAR-ACCESS1.3/{path}/'):
    #print(str(blob))
    file_ = str(blob)
    if search(VAR, file_):
      span = search(f"{VAR}ant(.*?).nc", file_).span(0)
      filesGC.append(file_[span[0]:span[1]])
  return filesGC

def downloadFromGC(destName, filesGC):
  N = len(filesGC)
  for i in tqdm(range(N)):
    file_name = filesGC[i]
    # Download from GC locally
    blob = bucket.blob(destName + file_name)
    blob.download_to_filename(pathLocal+file_name)

def filesInDir(pathLocal):
  return sorted([f for f in listdir(pathLocal) if isfile(join(pathLocal, f))])

def ZarrPerDecade(onlyfiles, NumDecades, pathLocal, pathLocalZarr, pathGC):
  m = 10
  for j in tqdm(range(NumDecades)):
    # open first file
    f0 = onlyfiles[j*m]
    df = xr.open_dataset(pathLocal+f0)
    if j < NumDecades-1:
      for f in onlyfiles[(j*m)+1:m*(j+1)]:
        df2 = xr.open_dataset(pathLocal+f)
        df = df.merge(df2)
    else:
      for f in onlyfiles[(j*m)+1:]:
        df2 = xr.open_dataset(pathLocal+f)
        df = df.merge(df2)
    # upload to GC
    df.to_zarr('gs://'+pathGC+f'{path}_decade_{j+1}.zarr', mode = 'w', consolidated = True)

In [129]:
def CreateDecadeData(path, VAR):
  pathLocal = f'/tmp/{path}/'
  pathLocalZarr = f'/tmp/{path}_zarr/'

  # create empty dir:
  create_dir(pathLocal)
  create_dir(pathLocalZarr)

  # empty dir out of precaution
  empty_dir(pathLocal)
  empty_dir(pathLocalZarr)

  # list files on GC
  filesGC = listFilesGC(path, VAR)
  print(f'Number of files already on GC: {len(filesGC)}')

  # Download files from GC
  destName = f'Chris_data/RawData/MAR-ACCESS1.3/{path}/'
  downloadFromGC(destName, filesGC)

  # Get all file names locally
  onlyfiles = filesInDir(pathLocal)
  print(f'Number of files downloaded from GC: {len(onlyfiles)}')
  print('Examples of files: \n {}'.format(onlyfiles[0]))

  # Create a zarr file per decade:
  pathZarr = f'ee-downscalingclimatemodels/Chris_data/RawData/MAR-ACCESS1.3/zarr_data/{path}_zarr/'
  print(f'Saving zarr files at: {pathZarr}')
  m = 10 # num years per zarr folder
  NumDecades = int(len(onlyfiles)/m)
  print(f'Number of decades: {NumDecades}, number of files: {len(onlyfiles)}')

  ZarrPerDecade(onlyfiles, NumDecades, pathLocal, pathLocalZarr, pathZarr)

  # empty dir out of precaution
  empty_dir(pathLocal)
  empty_dir(pathLocalZarr)

In [None]:
# RH
path = 'RH'
VAR = path+'_'
CreateDecadeData(path, VAR)

In [None]:
# CC
path = 'CC'
VAR = path+'_'
CreateDecadeData(path, VAR)

In [131]:
# RU
path = 'RU'
VAR = path+'_'
CreateDecadeData(path, VAR)

Number of files already on GC: 121


100%|██████████| 121/121 [01:47<00:00,  1.13it/s]

Number of files downloaded from GC: 0





IndexError: ignored

In [None]:
# VVP
path = 'VVP'
VAR = path+'_'
CreateDecadeData(path, VAR)

# FTP download files and move to google cloud

## Prepare ftp session:

In [130]:
# test connection to server:
ftp_server = 'ftp.climato.be'
ftp_session= ftplib.FTP(ftp_server)
ftp_session.login()
ftp_session.quit()

'221 Goodbye.'

In [None]:
# Write a function that initiates a FTP session
def open_ftp_session(ftp_server):
    """
       Open a ftp session given the server ftp address,
       the user's ID and the user's password.
       
       @param ftp_server: name of the ftp server (string)
       @param my_userid:  user ID on the ftp server (string)
       @param my_passwd:  user password on the ftp server (string)
    """
    
    ftp_session = ftplib.FTP(ftp_server)
    ftp_session.login()
    
    return ftp_session


def ftp_dir_content(ftp_session, dir_name=None):
    """
       List the content of a diirectory in a ftp server.
       If the directory is not provided, will list the content
       of the top directory.
       
       @param ftp_session: ftp session object
       @param dir_name:    name of the directory you want to access (string)
        
       Returned Value:
          - List of directories and files 
           (similar to the Unix command 'ls -l')
    """ 
 
    data = []
    
    if dir_name != None:
        # Change directory
        ftp_session.cwd(dir_name)

    # Get the list of files
    ftp_session.dir(data.append)

    return data

def ftp_get_file(ftp_session, file_name):
    """
         Get a file from a ftp server

         @param ftp_session: ftp session object
         @param file_name: name of the file you want to download  
    """
    try:
        ftp_session.retrbinary("RETR " + file_name ,open(file_name, 'wb').write)
    except:
        print("Error - Cannot obtain file: "+ file_name)

In [None]:
ftp_session = open_ftp_session(ftp_server)
# To list the top directories in the server
ftp_session.retrlines('LIST')

ConnectionRefusedError: ignored

In [None]:
data = ftp_dir_content(ftp_session, dir_name='climato/ckittel/MARv3.11/Marijn/MAR-ACCESS1.3/')
for line in data[:5]:
    print("-", line)

- -rw-r--r--    1 1028     1000     38208563 Mar 17 09:43 CC_ant-35km_ACCESS1.3_rcp8.5_r1i1p1_ULg-MAR311_v1_day_19800101-19801231.nc
- -rw-r--r--    1 1028     1000     38104367 Mar 17 09:43 CC_ant-35km_ACCESS1.3_rcp8.5_r1i1p1_ULg-MAR311_v1_day_19810101-19811231.nc
- -rw-r--r--    1 1028     1000     38104367 Mar 17 09:43 CC_ant-35km_ACCESS1.3_rcp8.5_r1i1p1_ULg-MAR311_v1_day_19820101-19821231.nc
- -rw-r--r--    1 1028     1000     38104367 Mar 17 09:43 CC_ant-35km_ACCESS1.3_rcp8.5_r1i1p1_ULg-MAR311_v1_day_19830101-19831231.nc
- -rw-r--r--    1 1028     1000     38208563 Mar 17 09:43 CC_ant-35km_ACCESS1.3_rcp8.5_r1i1p1_ULg-MAR311_v1_day_19840101-19841231.nc


## RH

In [None]:
# create empty directory for variable
path = 'RH'
VAR = path+'_'

# Get all files already on GC:
filesGC = []
for blob in storage_client.list_blobs(bucket, prefix=f'Chris_data/RawData/MAR-ACCESS1.3/{path}/'):
  #print(str(blob))
  file_ = str(blob)
  if search(VAR, file_):
    span = search(f"{VAR}ant(.*?).nc", file_).span(0)
    filesGC.append(file_[span[0]:span[1]])
print(f'Number of files already on GC: {len(filesGC)}')

# Get all filenames in FTP server
ftpFiles = []
ftp_session = open_ftp_session(ftp_server)
data = ftp_dir_content(ftp_session, dir_name='climato/ckittel/MARv3.11/Marijn/MAR-ACCESS1.3/')
for line in data:
  if search(VAR, line):
    span = search(f"{VAR}ant(.*?).nc", line).span(0)
    ftpFiles.append(line[span[0]:span[1]])

print(f'Number of files on ftp: {len(ftpFiles)}')

# Find difference of two lists
remainingFiles = list(set(ftpFiles) - set(filesGC))
print(f'Remaining files to put on GC: {len(remainingFiles)}')

# Copy remaining files from FTP to GC:

dir_name  = 'climato/ckittel/MARv3.11/Marijn/MAR-ACCESS1.3/'
destName = f'Chris_data/RawData/MAR-ACCESS1.3/{path}/'
N = len(remainingFiles)
for i in tqdm(range(N)):
  file_name = remainingFiles[i]
  ftp_session = open_ftp_session(ftp_server)
  ftp_session.cwd(dir_name)   
  ftp_get_file(ftp_session, file_name)

  # upload to google cloud:
  blob = bucket.blob(destName+file_name)
  blob.upload_from_filename(file_name)

NameError: ignored

## RU

In [None]:
# create empty directory for variable
path = 'RU'
VAR = path+'_'

# Get all files already on GC:
filesGC = []
for blob in storage_client.list_blobs(bucket, prefix=f'Chris_data/RawData/MAR-ACCESS1.3/{path}/'):
  #print(str(blob))
  file_ = str(blob)
  if search(VAR, file_):
    span = search(f"{VAR}ant(.*?).nc", file_).span(0)
    filesGC.append(file_[span[0]:span[1]])
print(f'Number of files already on GC: {len(filesGC)}')

# Get all filenames in FTP server
ftpFiles = []
ftp_session = open_ftp_session(ftp_server)
data = ftp_dir_content(ftp_session, dir_name='climato/ckittel/MARv3.11/Marijn/MAR-ACCESS1.3/')
for line in data:
  if search(VAR, line):
    span = search(f"{VAR}ant(.*?).nc", line).span(0)
    ftpFiles.append(line[span[0]:span[1]])

print(f'Number of files on ftp: {len(ftpFiles)}')

# Find difference of two lists
remainingFiles = list(set(ftpFiles) - set(filesGC))
print(f'Remaining files to put on GC: {len(remainingFiles)}')

# Copy remaining files from FTP to GC:

dir_name  = 'climato/ckittel/MARv3.11/Marijn/MAR-ACCESS1.3/'
destName = f'Chris_data/RawData/MAR-ACCESS1.3/{path}/'
N = len(remainingFiles)
for i in tqdm(range(N)):
  file_name = remainingFiles[i]
  ftp_session = open_ftp_session(ftp_server)
  ftp_session.cwd(dir_name)   
  ftp_get_file(ftp_session, file_name)

  # upload to google cloud:
  blob = bucket.blob(destName+file_name)
  blob.upload_from_filename(file_name)

0

## VVP

In [None]:
# create empty directory for variable
path = 'VVP'
VAR = path+'_'

# Get all files already on GC:
filesGC = []
for blob in storage_client.list_blobs(bucket, prefix=f'Chris_data/RawData/MAR-ACCESS1.3/{path}/'):
  #print(str(blob))
  file_ = str(blob)
  if search(VAR, file_):
    span = search(f"{VAR}ant(.*?).nc", file_).span(0)
    filesGC.append(file_[span[0]:span[1]])
print(f'Number of files already on GC: {len(filesGC)}')

# Get all filenames in FTP server
ftpFiles = []
ftp_session = open_ftp_session(ftp_server)
data = ftp_dir_content(ftp_session, dir_name='climato/ckittel/MARv3.11/Marijn/MAR-ACCESS1.3/')
for line in data:
  if search(VAR, line):
    span = search(f"{VAR}ant(.*?).nc", line).span(0)
    ftpFiles.append(line[span[0]:span[1]])

print(f'Number of files on ftp: {len(ftpFiles)}')

# Find difference of two lists
remainingFiles = list(set(ftpFiles) - set(filesGC))
print(f'Remaining files to put on GC: {len(remainingFiles)}')

# Copy remaining files from FTP to GC:

dir_name  = 'climato/ckittel/MARv3.11/Marijn/MAR-ACCESS1.3/'
destName = f'Chris_data/RawData/MAR-ACCESS1.3/{path}/'
N = len(remainingFiles)
for i in tqdm(range(N)):
  file_name = remainingFiles[i]
  ftp_session = open_ftp_session(ftp_server)
  ftp_session.cwd(dir_name)   
  ftp_get_file(ftp_session, file_name)

  # upload to google cloud:
  blob = bucket.blob(destName+file_name)
  blob.upload_from_filename(file_name)

0

## UUP:

In [None]:
# create empty directory for variable
path = 'UUP'
VAR = path+'_'

# Get all files already on GC:
filesGC = []
for blob in storage_client.list_blobs(bucket, prefix=f'Chris_data/RawData/MAR-ACCESS1.3/{path}/'):
  #print(str(blob))
  file_ = str(blob)
  if search(VAR, file_):
    span = search(f"{VAR}ant(.*?).nc", file_).span(0)
    filesGC.append(file_[span[0]:span[1]])
print(f'Number of files already on GC: {len(filesGC)}')

# Get all filenames in FTP server
ftpFiles = []
ftp_session = open_ftp_session(ftp_server)
data = ftp_dir_content(ftp_session, dir_name='climato/ckittel/MARv3.11/Marijn/MAR-ACCESS1.3/')
for line in data:
  if search(VAR, line):
    span = search(f"{VAR}ant(.*?).nc", line).span(0)
    ftpFiles.append(line[span[0]:span[1]])

print(f'Number of files on ftp: {len(ftpFiles)}')

# Find difference of two lists
remainingFiles = list(set(ftpFiles) - set(filesGC))
print(f'Remaining files to put on GC: {len(remainingFiles)}')

# Copy remaining files from FTP to GC:

dir_name  = 'climato/ckittel/MARv3.11/Marijn/MAR-ACCESS1.3/'
destName = 'Chris_data/RawData/MAR-ACCESS1.3/UUP/'
N = len(remainingFiles)
for i in tqdm(range(N)):
  file_name = remainingFiles[i]
  ftp_session = open_ftp_session(ftp_server)
  ftp_session.cwd(dir_name)   
  ftp_get_file(ftp_session, file_name)

  # upload to google cloud:
  blob = bucket.blob(destName+file_name)
  blob.upload_from_filename(file_name)

Number of files already on GC: 93


NameError: ignored