In [1]:
"""
  A manager that facilitates reading and writing files to GCP Storage
"""
import logging
import os
import subprocess
from io import BytesIO
from typing import List, Dict, Callable, Tuple, Union
from mypy_extensions import TypedDict

from google.cloud import storage # type: ignore

def get_path_prefix(root_dir: str, relative_path: str) -> str:
    prefix = ''
    if root_dir:
        prefix = root_dir.rstrip('/') + '/'

    if relative_path and relative_path != '/':
        prefix = prefix + relative_path.strip('/') + '/'

    return prefix

class PathNode(TypedDict, total=False):
    name: str
    type: str
    size: float
        
class GCPStorageManager(object):

    def __init__(self, storage_details: Dict, verbose: bool) -> None:
        self._bucket_name = storage_details.get('bucket')
        self._root_dir = storage_details.get('root')
        self.client = storage.Client()
        self.verbose = verbose

    def _abs_path(self, rel_path: str) -> str:
        if not self._root_dir:
            return rel_path

        return os.path.join(self._root_dir, rel_path)

    def _build_current_url(self) -> str:
        if self._root_dir:
            return "https://console.cloud.google.com/storage/browser/" + self._bucket_name + "/" + self._root_dir
        else:
            return "https://console.cloud.google.com/storage/browser/" + self._bucket_name

    def get_root_dir(self) -> str:
        return self._root_dir

    def get_storage_details(self) -> Dict:
        return {
            'provider': file_utils.ProviderList.GCP_STORAGE,
            'bucket': self._bucket_name,
            'root': self._root_dir
        }

    def get_sync_login_command(self, env_vars: Dict) -> List[str]:
        return None

    def get_sync_url(self, path: str) -> str:
        if not path or path == '/':
            abs_path = self._root_dir
        else:
            abs_path = self._abs_path(path)
        return f'gs://{self._bucket_name}/{abs_path}'

    def get_sync_command(self, src_dir: str, remote_path: str) -> Callable:

        def sync_call() -> Tuple[int, str]:
            cmd = ['gsutil', 'rsync', '-r', src_dir, self.get_sync_url(remote_path)]
            p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
            stdout, stderr = p.communicate()
            err_code = 0
            stderr_val = ''

            if p.returncode != 0:
                logging.error('Copy response is: {}'.format(stderr.decode('utf-8')))
                stderr_val = stderr.decode('utf-8')
                err_code = p.returncode

            return err_code, stderr_val

        return sync_call

    def rm_file(self, relative_path: str) -> None:
        bucket = self.client.bucket(self._bucket_name)
        path = self._abs_path(relative_path)
        blob = bucket.blob(path)
        logging.info('Deleting file at ' + path)
        blob.delete()

    def rm_dir(self, relative_path: str) -> None:
        bucket = self.client.bucket(self._bucket_name)
        prefix = get_path_prefix(self._root_dir, relative_path)
        blob = bucket.blob(prefix)
        logging.info('Deleting files at ' + prefix)
        blob.delete()

    def read_content(self, path: str, throw_exception: bool, read_range: str = None, streaming: bool = False) -> bytes:
        try:
            path = self._abs_path(path)
            bucket = self.client.bucket(self._bucket_name)
            blob = bucket.blob(path)
            result = blob.download_as_bytes()

            if self.verbose:
                logging.info(f"Downloading content from {self._build_current_url()}/{path}")

            return result
        except Exception as e:
            raise

    #def gen_presigned_url(self, path: str) -> str:
    #    # dask and other frameworks explicit s3 link rather than a byte stream or contet (e.g. s3://bucket/foo.csv)
    #    return self._s3.generate_presigned_url('get_object',
    #                                           Params={'Bucket': self._bucket_name, 'Key': self._abs_path(path)})

    # Checks to see if the job directory exists.  No side-effects.
    def check_dir_exists(self, path: str) -> bool:
        # Create the prefix for this particular job.
        exists = False
        prefix = get_path_prefix(self._root_dir, path)
        blobs = list(self.client.list_blobs(
            self._bucket_name, prefix=prefix
        ))
        if len(blobs) > 0:
            exists = True

        return exists

    def _download_content(self, remote_path: str) -> bytes:
        remote_path = self._abs_path(remote_path)
        bucket = self.client.bucket(self._bucket_name)
        blob = bucket.blob(remote_path)
        if self.verbose:
            logging.info(f"Downloading content from {self._build_current_url()}/{remote_path}")

        fileobj = BytesIO()
        blob.download_to_file(fileobj)
        return fileobj.getvalue()

    def download_file(self, remote_path: str, file_name: str) -> None:
        # Move references to large data items across folders
        remote_path = self._abs_path(remote_path)
        bucket = self.client.bucket(self._bucket_name)
        blob = bucket.blob(remote_path)
        if self.verbose:
            logging.info(f"Downloading file {self._build_current_url()}/{remote_path} to {file_name}")

        dirname = os.path.dirname(file_name)

        if dirname:
            # Only create a directory if it's not ''
            if not os.path.exists(dirname):
                os.makedirs(dirname)

        blob.download_to_filename(file_name)

    def download_and_unzip(self, remote_path: str, local_dir: str) -> None:
        zip_bytes = self._download_content(remote_path)
        file_utils.unzip_into_dir(zip_bytes, local_dir)

    def download_dir(self, remote_path: str, local_path: str) -> int:
        nFiles = 0
        if self.verbose:
            logging.info("Downloading folder: " + remote_path + " to " + local_path)

        prefix = file_utils.get_path_prefix(self._root_dir, remote_path)
        for blob in self.client.list_blobs(self._bucket_name, prefix=prefix):
            if blob.name.endswith('/'):
                continue

            rel_path = os.path.relpath(blob.name, prefix)
            dest_pathname = os.path.join(local_path, rel_path)

            if not os.path.exists(os.path.dirname(dest_pathname)):
                os.makedirs(os.path.dirname(dest_pathname))

            if self.verbose:
                logging.info(f"Downloading file {blob.name} to {dest_pathname}")

            blob.download_to_filename(dest_pathname)  # Download
            nFiles += 1

        return nFiles

    def upload_content(self, content: bytes, file_name: str) -> None:
        # Uploads file content to a specific filename location
        bucket = self.client.bucket(self._bucket_name)
        if self.verbose:
            logging.info(f"Uploading content to {self._build_current_url()}/{file_name}")

        blob = bucket.blob(self._abs_path(file_name))
        blob.upload_from_file(BytesIO(content))

    def list_directory(self, path: str, with_size: bool = False) -> Dict:
        prefix = get_path_prefix(self._root_dir, path)
        bucket = self.client.bucket(self._bucket_name)
        blobs = bucket.list_blobs(prefix=prefix)
        nodes = []
        for blob in blobs:
            if blob.name.endswith('/'):
                nodes.append(PathNode(name=os.path.basename(blob.name[:-1]), type='folder'))
            else:
                nodes.append(PathNode(name=os.path.basename(blob.name), type='file'))

        return {
            'nodes': nodes
        }


In [2]:
import os

from dotenv import load_dotenv
load_dotenv(verbose=True)

print(os.environ.get('GOOGLE_APPLICATION_CREDENTIALS'))

/Users/warren/sweat-equity-ventures/bespoke/flowhub_gcp_credentials.json


In [3]:
manager = GCPStorageManager({
    'bucket': 'partnerships-data-reporting',
    'root': ''
}, verbose=True)

In [4]:
manager.list_directory('')

{'nodes': [{'name': '4A5DE8Zj5gDtSmCbn_customers.csv', 'type': 'file'},
  {'name': '4A5DE8Zj5gDtSmCbn_locations.csv', 'type': 'file'},
  {'name': 'JYAZRDbztXRLwJ5r6_sales.csv', 'type': 'file'},
  {'name': '4xWeGuAg4jDmZSWxA_customers.csv', 'type': 'file'},
  {'name': '4xWeGuAg4jDmZSWxA_locations.csv', 'type': 'file'},
  {'name': 'PY9ucfFTZc4XPSuPC_sales.csv', 'type': 'file'},
  {'name': 'b5KEZ723yYvGdccgv_sales.csv', 'type': 'file'},
  {'name': '6DwQ3xhCWDGBRFFss_customers.csv', 'type': 'file'},
  {'name': '6DwQ3xhCWDGBRFFss_locations.csv', 'type': 'file'},
  {'name': 'CaNMh4HdcNdPFkoYK_sales.csv', 'type': 'file'},
  {'name': 'YLEEhTz7umoffAPqd_sales.csv', 'type': 'file'},
  {'name': 'bS7R8w9hgETJuiZoj_sales.csv', 'type': 'file'},
  {'name': 'xiJBqHKLRED7J2vHe_sales.csv', 'type': 'file'},
  {'name': '6K9bzPik9WxNFCceT_customers.csv', 'type': 'file'},
  {'name': '6K9bzPik9WxNFCceT_locations.csv', 'type': 'file'},
  {'name': 'nMdvN7zKuFLBwkye7_sales.csv', 'type': 'file'},
  {'name': '6YR

In [None]:
manager.read_content('4A5DE8Zj5gDtSmCbn/4A5DE8Zj5gDtSmCbn_customers.csv', throw_exception=True)

In [7]:
manager.download_file('4A5DE8Zj5gDtSmCbn/4A5DE8Zj5gDtSmCbn_customers.csv', '4A5DE8Zj5gDtSmCbn_customers.csv')

In [8]:
manager.download_file('4A5DE8Zj5gDtSmCbn/4A5DE8Zj5gDtSmCbn_locations.csv', '4A5DE8Zj5gDtSmCbn_locations.csv')

In [14]:
manager.download_file('4A5DE8Zj5gDtSmCbn/JYAZRDbztXRLwJ5r6/JYAZRDbztXRLwJ5r6_sales.csv', 'JYAZRDbztXRLwJ5r6_sales.csv')

In [15]:
import pandas as pd

In [30]:
customers_csv_string = manager.read_content('4A5DE8Zj5gDtSmCbn/4A5DE8Zj5gDtSmCbn_customers.csv', throw_exception=True)
locations_csv_string = manager.read_content('4A5DE8Zj5gDtSmCbn/4A5DE8Zj5gDtSmCbn_locations.csv', throw_exception=True)
sales_csv_string = manager.read_content('4A5DE8Zj5gDtSmCbn/JYAZRDbztXRLwJ5r6/JYAZRDbztXRLwJ5r6_sales.csv', throw_exception=True)

In [20]:
import sys
if sys.version_info[0] < 3: 
    from StringIO import StringIO
else:
    from io import StringIO

In [31]:
customers_io = StringIO(customers_csv_string.decode("utf-8"))
customers_dataframe = pd.read_csv(customers_io)
customers_dataframe

Unnamed: 0.1,Unnamed: 0,_id,isLoyal,type,state,isPrimary,clientId,createdAt,updatedAt,source,...,flowerGramsAvailable,isTaxExempt,lastPurchaseDate,loyaltyPoints,thcGramsAvailable,plantCount,gramLimit,checkInTime,enforceDailyLimit,lastPurchasedSaleId
0,0,YNzRaJfWyomGTroBn,True,recCustomer,CO,,4A5DE8Zj5gDtSmCbn,2017-06-18 20:26:02.266000+00:00,2018-06-27 11:19:27.090000+00:00,,...,,False,NaT,4.0,,,,1899-01-01 00:00:00+00:00,False,
1,1,nlrmkdfecyob,True,recCustomer,TX,,4A5DE8Zj5gDtSmCbn,2020-08-04 03:42:44.352000+00:00,2021-03-08 23:13:17.403000+00:00,,...,,False,2021-03-08 23:13:17.403,180.0,,,,1899-01-01 00:00:00+00:00,False,
2,2,78gklpf8nau,True,recCustomer,KS,,4A5DE8Zj5gDtSmCbn,2021-02-28 17:22:43.058000+00:00,2021-02-28 21:42:53.462000+00:00,,...,,False,NaT,0.0,,,,1899-01-01 00:00:00+00:00,False,
3,3,c0gjnxmkdrwgcaf,True,recCustomer,CO,,4A5DE8Zj5gDtSmCbn,2020-08-12 21:42:29.171000+00:00,2020-08-12 21:44:15.919000+00:00,,...,,False,2020-08-12 21:44:15.362,90.0,,,,1899-01-01 00:00:00+00:00,False,
4,4,LdJ6Mgmp7wRfsMEDY,True,recCustomer,OH,,4A5DE8Zj5gDtSmCbn,2019-05-01 21:22:57.535000+00:00,2019-08-03 14:48:42.502000+00:00,,...,,False,2019-05-01 21:27:54.106,75.0,,,,1899-01-01 00:00:00+00:00,False,
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
10156,10156,CSFqpu66dnggc4Ehf,False,medCustomer,CO,False,4A5DE8Zj5gDtSmCbn,2019-03-12 21:37:03.766000+00:00,2019-04-12 22:34:03.027000+00:00,,...,,False,2019-04-12 22:34:01.653,0.0,,6.0,56.0,1899-01-01 00:00:00+00:00,False,
10157,10157,evvBkvPTxqFp9mzXp,False,medCustomer,CO,False,4A5DE8Zj5gDtSmCbn,2017-09-04 20:39:44.248000+00:00,2018-06-27 10:55:20.868000+00:00,,...,,False,NaT,0.0,,6.0,56.0,1899-01-01 00:00:00+00:00,False,
10158,10158,8qRLsKuYjtbvW5oBC,False,medCustomer,CO,False,4A5DE8Zj5gDtSmCbn,2018-03-13 16:57:18.307000+00:00,2018-05-22 11:52:54.396000+00:00,,...,,False,NaT,0.0,,6.0,56.0,1899-01-01 00:00:00+00:00,False,
10159,10159,AFtKWpLM5F3NxjMc7,False,medCustomer,CO,False,4A5DE8Zj5gDtSmCbn,2017-07-15 20:17:01.149000+00:00,2018-06-27 10:33:44.810000+00:00,,...,,False,NaT,0.0,,6.0,56.0,1899-01-01 00:00:00+00:00,False,


In [36]:
locations_io = StringIO(locations_csv_string.decode("utf-8"))
locations_dataframe = pd.read_csv(locations_io)
locations_dataframe

Unnamed: 0.1,Unnamed: 0,clientsId,locationId,state,medLicense,recLicense
0,51,4A5DE8Zj5gDtSmCbn,JYAZRDbztXRLwJ5r6,CO,1,1


In [34]:
sales_io = StringIO(sales_csv_string.decode("utf-8"))
sales_dataframe = pd.read_csv(sales_io)
sales_dataframe

Unnamed: 0.1,Unnamed: 0,SalesCreatedAt,orderType,customerType,category,productName,strainName,numberOfItems,GrossSales,totalTaxInDollars,NetSales,costInDollars,reportUOM,DiscountAmount
0,0,2022-01-15 01:31:56.727000+00:00,in-store,recCustomer,Concentrate,OG Kush 1000mg Syringe,,1.0,49.98,9.00,40.98,11.00,Each,
1,1,2022-01-15 01:31:56.727000+00:00,in-store,recCustomer,Concentrate,Sesh Distillate Syringe - (1000mg ea) Mango Ku...,,1.0,49.98,9.00,40.98,11.50,Each,
2,2,2022-01-15 01:29:04.315000+00:00,in-store,recCustomer,Concentrate,Shwazzberry 1000mg Cartridge,Schwazzberry,1.0,77.02,13.89,63.13,17.00,Each,
3,3,2022-01-15 01:29:04.315000+00:00,in-store,recCustomer,Edible,10pk Espresso Hazelnut Truffle (s) 100MG,,1.0,77.02,13.89,63.13,8.00,,
4,4,2022-01-15 01:29:04.315000+00:00,in-store,recCustomer,Concentrate,Dazzleberry 1000mg Cartridge,Dazzleberry,1.0,77.02,13.89,63.13,17.00,Each,
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
40486,40486,2021-04-15 00:29:04.626000+00:00,in-store,recCustomer,Concentrate,Mac N Cheese & OG Kush Oil Cone 2pk,,1.0,30.00,5.40,24.60,5.00,Each,
40487,40487,2021-04-15 00:20:01.536000+00:00,in-store,recCustomer,BulkBud,,Con Leche,1.0,39.99,7.20,32.79,5.04,,2050.0
40488,40488,2021-04-15 00:20:01.536000+00:00,in-store,recCustomer,Concentrate,Mac N Cheese & OG Kush Oil Cone 2pk,,1.0,39.99,7.20,32.79,5.00,Each,
40489,40489,2021-04-15 00:20:01.536000+00:00,in-store,recCustomer,Concentrate,Gelato Cake & Mango Kush Oil Cone 2pk,,1.0,39.99,7.20,32.79,5.00,Each,
