# Transfer Tables

This notebook transfers selected data tables along with a list of uris from the current workspace to a new one (can be a new workspace with the autoclass feature enabled). After copying file to the same directory in the new bucket this notebook will create new tables with the modified gs uris. The name of the new tables will end with "_transferred". They can then be imported to the new workspace.

## Define functions, load packages and workspace info 

In [None]:
!pip install terra_notebook_utils terra_pandas

In [None]:
from terra_notebook_utils import WORKSPACE_NAME
import terra_pandas as tp
import numpy as np
import pandas as pd
import os
import copy
import random 
import statistics as stats
import matplotlib.patches as mplpatches
import matplotlib.pyplot as plt
from google.cloud import storage
import os
import sys
from concurrent.futures import ThreadPoolExecutor, as_completed 
import datetime

In [None]:
WORKSPACE_NAME

In [None]:
# This function is adapted from 
# https://cloud.google.com/storage/docs/samples/storage-copy-file#storage_copy_file-python
def copy_blob(transfer_tuple):
    """Copies a blob from one bucket to another with a new name."""
    # bucket_name = "your-bucket-name"
    # blob_name = "your-object-name"
    # destination_bucket_name = "destination-bucket-name"
    # destination_blob_name = "destination-object-name"
    bucket_name, blob_name, destination_bucket_name, destination_blob_name = transfer_tuple

        
    storage_client = storage.Client()

    source_bucket = storage_client.bucket(bucket_name)
    source_blob = source_bucket.blob(blob_name)
    destination_bucket = storage_client.bucket(destination_bucket_name)
    destination_blob = destination_bucket.blob(blob_name)

    # Optional: set a generation-match precondition to avoid potential race conditions
    # and data corruptions. The request to copy is aborted if the object's
    # generation number does not match your precondition. For a destination
    # object that does not yet exist, set the if_generation_match precondition to 0.
    # If the destination object already exists in your bucket, set instead a
    # generation-match precondition using its generation number.
    # There is also an `if_source_generation_match` parameter, which is not used in this example.
    #destination_generation_match_precondition = 0

    #blob_copy = source_bucket.copy_blob(
    #    source_blob, destination_bucket, destination_blob_name)
    
    rewrite_token = None
    rewrite_token, bytes_rewritten, bytes_to_rewrite = destination_blob.rewrite(source_blob)
    
    while rewrite_token is not None:
        rewrite_token, bytes_rewritten, bytes_to_rewrite = destination_blob.rewrite(source_blob, token=rewrite_token)
        print(f'Progress so far: {bytes_rewritten}/{bytes_to_rewrite} bytes.')

    return get_object_name(blob_name)

In [None]:
# These functions are taken from 
# https://github.com/mobinasri/terra_scripts/blob/main/pull_terra_table.py
def get_time():
    return datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")

def get_bucket_name(uri):
    return uri.split("/")[2]

def get_blob_name(uri):
    prefix_len = len(get_bucket_name(uri)) + 5
    return uri[prefix_len + 1:]

def get_object_name(uri):
    return uri.split("/")[-1]

def is_external(uri, workspace_bucket_name):
    return get_bucket_name(uri) != workspace_bucket_name


def get_size_uri(uri):
    try:
        storage_client = storage.Client()
        bucket_name = get_bucket_name(uri)
        blob_name = get_blob_name(uri)
        object_name = get_object_name(uri)
        bucket = storage_client.bucket(bucket_name)
        blob = bucket.get_blob(blob_name)
        return blob.size
    except Exception as e:
        print("Error for ", uri)
        print(e)
        return False

In [None]:
def keep_existant_files(uris):
    kept_uris = []
    for uri in uris:
        if get_size_uri(uri):
            kept_uris.append(uri)
    return kept_uris

In [None]:
# Get the Google billing project name and workspace name
PROJECT = os.environ['WORKSPACE_NAMESPACE']
WORKSPACE =os.path.basename(os.path.dirname(os.getcwd()))
bucket = os.environ['WORKSPACE_BUCKET'] + "/"

# Verify that we've captured the environment variables
print("Billing project: " + PROJECT)
print("Workspace: " + WORKSPACE)
print("Workspace storage bucket: " + bucket)

In [None]:
def read_uris_from_file(gs_uri):
    if gs_uri == "" or gs_uri == None:
        return []
    !gsutil cp {gs_uri} other_uris.txt
    with open("other_uris.txt", "r") as f:
        return f.read().split()

## Inputs for the notebook

Below is the only code block that has be filled with proper variables. You can freely run the remaining code blocks.

In [None]:
# Write the name of the new bucket
new_bucket = "gs://fc-dc39cbb8-a30e-4cd6-b/"

# Which tables you want to transfer ?
# Please note that the uris from other 
# buckets/workspaces will not be transfered
table_names_to_transfer = ["table_2", "table_1"]

# Take a list of uris that should be transfered but 
# may not exist in the given tables
other_uris_to_transfer = read_uris_from_file("")

## Parse tables and transferrable URIs

In [None]:
# parse desired tables
tables = []
for name in table_names_to_transfer:
    tables.append(tp.table_to_dataframe(table_name = name, 
                                        workspace = WORKSPACE, 
                                        workspace_namespace = PROJECT))

In [None]:
def flatten_entities(entities):
    elements = []
    for entity in entities:
        if isinstance(entity, list):
            elements.extend(entity)
        else:
            elements.append(entity)
    return np.array(elements)

In [None]:
def get_uris_from_this_bucket(elements, bucket):
    if len(elements) == 0: return []
    # keep only the files in this bucket
    is_in_this_bucket = np.char.startswith(elements, bucket)
    uris = elements[is_in_this_bucket]
    return uris

In [None]:
# Extract uris to transfer
uris_to_transfer = []

for table in tables:
    # flatten table to get all entities
    entities = table.to_numpy().flatten()
    
    # get all elements 
    # (if an entity is a list it will be 
    # flattened before appending to the 
    # output list of elements)
    elements = flatten_entities(entities)
    
    # keep uris from this bucket
    uris_to_transfer.extend(get_uris_from_this_bucket(elements, bucket))

# Add other uris given in a separate list
uris_to_transfer.extend(other_uris_to_transfer)

uris_to_transfer = keep_existant_files(uris_to_transfer)

# Show some uris
print("First 10 uris:")
uris_to_transfer[:10]

## Transfer objects

In [None]:
total_size = sum([get_size_uri(uri) for uri in uris_to_transfer])
print(f"Total size of the objects to be dowloaded = {total_size/1e9} GB (Count = {len(uris_to_transfer)})")

Prepare a list of inputs to pass to the copying function

In [None]:
transfer_list = []
for old_uri in uris_to_transfer:
    new_uri = old_uri.replace(bucket, new_bucket)
    new_bucket_name = get_bucket_name(new_uri)
    old_bucket_name = get_bucket_name(old_uri)
    blob_name = get_blob_name(old_uri)
    transfer_list.append((old_bucket_name, blob_name, new_bucket_name, blob_name))
# remove redundant uris
transfer_set = set(transfer_list)

Start transferring data. It may take a while depending on the size of the files.

In [None]:
transferred_count = 0
threads = 4
# make a pool of threads for downloading files in parallel
with ThreadPoolExecutor(max_workers=threads) as executor:
    futures = [executor.submit(copy_blob, x) for x in transfer_set]
    for future in as_completed(futures):
        transferred_count += 1
        print(f"Object ({transferred_count}/{len(transfer_set)}) is transfered:\t{future.result()}")
        pass

## Make new tables

Make a new list of tables after modifying the uris to point to the new bucket. It should leave uris from other buckets and numeric entities not changed.

In [None]:
copied_tables = [table.copy(deep=True)for table in tables]
for table in copied_tables:
    for column in table.columns:
        for row in table.index:
            entity = table[column][row]
            if isinstance(entity, str) and np.char.startswith(entity, bucket):
                table[column][row] = entity.replace(bucket, new_bucket)
            elif isinstance(entity, list):
                for i, element in enumerate(entity):
                    if isinstance(element, str) and np.char.startswith(element, bucket):
                        table[column][row][i] = element.replace(bucket, new_bucket)

In [None]:
copied_tables[0]

Save new tables with their names ending with "_transferred"

In [None]:
for table in copied_tables:
    table_name = table.index.name[:-3] # remove "_id" from the end
    new_table_name = f"{table_name}_transferred"
    upload_df = table.rename(index={'1': new_table_name + "_id"})
    tp.dataframe_to_table(table_name = new_table_name, 
                          df = upload_df,
                          workspace = WORKSPACE,
                          workspace_namespace = PROJECT)

Make a new list of other uris after modifying the uris to point to the new bucket

In [None]:
with open("other_uris_new_bucket.txt", "w+") as f:
    if len(other_uris_to_transfer) > 0:
        for uri in other_uris_to_transfer:
            new_uri = uri.replace(bucket, new_bucket)
            f.write(f"{new_uri}\n")

In [None]:
!cat other_uris_new_bucket.txt