In [17]:
from collections import deque
import sys
from pathlib import Path
import pandas as pd
import zipfile
import io
from google.oauth2 import service_account
from googleapiclient.discovery import build
from googleapiclient.http import MediaIoBaseDownload
from fin_data_utils import FinDataUtils


In [18]:
min_buffer_size = 10
max_buffer_size = 1000

my_queue = deque(maxlen=max_buffer_size)

In [None]:
class DeltaNeutralReader:
    def __init__(self, credentials_file: str, folder_id: str, definitions_file: str, cache_dir: str = 'cache'):
        #folder_id = "google_drive_folder_id"
        #credentials_file = "path_to_credentials_file.json"
        #definitions_file = "path_to_definitions_file.json"
        #cache_dir = "path_to_cache_directory"

        self.folder_id = folder_id
        self.ivy_db = FinDataUtils(definitions_file)
        
        self.cache_dir = Path(cache_dir)
        self.cache_dir.mkdir(parents=True, exist_ok=True)

        # Set up Google Drive service
        credentials = service_account.Credentials.from_service_account_file(
            credentials_file, scopes=['https://www.googleapis.com/auth/drive.readonly']
        )
        self.service = build('drive', 'v3', credentials=credentials)

    def search_file(self, file_name: str) -> str:
        query = f"name='{file_name}' and '{self.folder_id}' in parents"
        results = self.service.files().list(
            q=query, spaces='drive', fields='files(id, name, modifiedTime)'
        ).execute()
        items = results.get('files', [])
        
        if not items:
            raise FileNotFoundError(f"File {file_name} not found in the specified folder.")
        
        return items[0]

    def get_cache_path(self, file_name: str) -> Path:
        return self.cache_dir / file_name

    def download_and_cache(self, file_id: str, file_name: str) -> Path:
        request = self.service.files().get_media(fileId=file_id)
        file = io.BytesIO()
        downloader = MediaIoBaseDownload(file, request)
        done = False
        while done is False:
            status, done = downloader.next_chunk()
            print(f"Download {int(status.progress() * 100)}%.")
        
        cache_path = self.get_cache_path(file_name)
        with open(cache_path, 'wb') as f:
            f.write(file.getvalue())
        
        return cache_path
    

In [21]:
class cache():
    def __init__(self, cache_dir):
        self.cache_dir = cache_dir
        self.cache_dir.mkdir(parents=True, exist_ok=True)
    
    def get_cache_path(self, file_name: str) -> Path:
        return self.cache_dir / file_name
    
    
    def is_cached(self,fname):
        return (self.cache_dir / fname).exists()

    def __contains__(self, key):
        print(f'__contains__ checking for {(self.cache_dir / key)}')
        print(f'__contains__ checking for {(self.cache_dir / key).exists()}')
        return (self.cache_dir / key).exists()
    
    def is_cache_valid(self, file_name: str, modified_time: str) -> bool:
        cache_path = self.get_cache_path(file_name)
        if not cache_path.exists():
            return False
        
        cache_modified_time = cache_path.stat().st_mtime
        drive_modified_time = pd.to_datetime(modified_time).timestamp()
        
        return cache_modified_time >= drive_modified_time
    
    def extract_zip(self, zip_file: Path):
        with zipfile.ZipFile(zip_file) as zip_ref:
            zip_ref.extractall(self.cache_dir)
    
    def file_exists_in_zip(self,zip_path, filename):
        """
        Check if a file exists in a ZIP archive.

        :param zip_path: Path to the ZIP archive.
        :param filename: Name of the file to check for within the archive.
        :return: True if the file exists in the archive, False otherwise.
        """
        with zipfile.ZipFile(zip_path, 'r') as zip_ref:
            return filename in zip_ref.namelist()

class ds_info():
    def __init__(self, cache):
        self.cache = cache
    def get_archive_file_name(self,date)->str:
        fname=f'L3_{date.strftime("%Y")}_{date.strftime("%B")}.zip'
        return fname

    def get_archive_file_name_daily(self,date)->str:
        fname=f'L3_{date.strftime("%Y%m%d")}.zip'
        return fname

    def get_daily_options_file_name(self,date)->str:
        fname=f'L3_options_{date.strftime("%Y%m%d")}.csv'
        return fname



my_date=pd.to_datetime("2024-04-15")
my_cache = cache(Path("cache"))
dsource=ds_info(my_cache)
#get the filenames for daily and monthly files:
verbose=True

daily_fname=dsource.get_daily_options_file_name(my_date) 
monthly_fname=dsource.get_archive_file_name(my_date)
if verbose:
    print(f'daily fname {daily_fname}')
    print(f'monthly fname {monthly_fname}')

if not (daily_fname in my_cache):
    print(f'not cached')

    if monthly_fname in my_cache: 
        if my_cache.file_exists_in_zip(my_cache.get_cache_path(monthly_fname),daily_fname):
            my_cache.extract_zip(my_cache.get_cache_path(monthly_fname))
    else:
        print("monthly not cached") 
        #load from google drive

    if my_cache.is_cached(daily_fname):
        print("daily cached")
        with open(my_cache.get_cache_path(daily_fname)) as file:
            df=pd.read_csv(file,header=None)
            print(df.head())
        


daily fname L3_options_20240415.csv
monthly fname L3_2024_April.zip
__contains__ checking for cache/L3_options_20240415.csv
__contains__ checking for True


# quick detour can I do better:
# right now i have a cache class that returns the local name of the filename based on a date
1) my_cache= cache(cache_dir=dir)
2) if (daily_file(date) in cache)
3)      fpath=cache(daily_file(date))

# different way
# try
    try:
        ds=load_local_daily(my_date)
    catch:
        if (not_in_cache):
            try
                ds=load_from_monthly_archive(my_date)
            catch:
                if (not_in_cache):
                    ds=load_from_remote_drive(my_date)
    


In [10]:
def get_cache_path(self, file_name: str) -> Path:
    return self.cache_dir / file_name

def is_cache_valid(self, file_name: str, modified_time: str) -> bool:
    cache_path = self.get_cache_path(file_name)
    if not cache_path.exists():
        return False
    
    cache_modified_time = cache_path.stat().st_mtime
    drive_modified_time = pd.to_datetime(modified_time).timestamp()
    
    return cache_modified_time >= drive_modified_time


In [28]:
import fin_data_utils
from fin_data_utils import FinDataUtils
from google.oauth2 import service_account
from googleapiclient.discovery import build
from googleapiclient.http import MediaIoBaseDownload

class DeltaNeutralReader:
    def __init__(self, credentials_file: str, folder_id: str, definitions_file: str, cache_dir: str = 'cache'):
        #folder_id = "google_drive_folder_id"
        #credentials_file = "path_to_credentials_file.json"
        #definitions_file = "path_to_definitions_file.json"
        #cache_dir = "path_to_cache_directory"

        self.folder_id = folder_id
        self.ivy_db = FinDataUtils(definitions_file)
        
        self.cache_dir = Path(cache_dir)
        self.cache_dir.mkdir(parents=True, exist_ok=True)

        # Set up Google Drive service
        credentials = service_account.Credentials.from_service_account_file(
            credentials_file, scopes=['https://www.googleapis.com/auth/drive.readonly']
        )
        self.service = build('drive', 'v3', credentials=credentials)

    def search_file(self, file_name: str) -> str:
        query = f"name='{file_name}' and '{self.folder_id}' in parents"
        results = self.service.files().list(
            q=query, spaces='drive', fields='files(id, name, modifiedTime)'
        ).execute()
        items = results.get('files', [])
        
        if not items:
            raise FileNotFoundError(f"File {file_name} not found in the specified folder.")
        
        return items[0]

    def get_cache_path(self, file_name: str) -> Path:
        return self.cache_dir / file_name

    def download_and_cache(self, file_id: str, file_name: str) -> Path:
        request = self.service.files().get_media(fileId=file_id)
        file = io.BytesIO()
        downloader = MediaIoBaseDownload(file, request)
        done = False
        while done is False:
            status, done = downloader.next_chunk()
            print(f"Download {int(status.progress() * 100)}%.")
        
        cache_path = self.get_cache_path(file_name)
        with open(cache_path, 'wb') as f:
            f.write(file.getvalue())
        
        return cache_path
    

In [30]:
from pathlib import Path
import pandas as pd
import zipfile

class FileNotInArchiveError(Exception):
    """A custom exception for specific error cases."""
    pass

class ds_info():
    def __init__(self):
        pass

    @staticmethod
    def get_archive_file_name(date)->str:
        fname=f'L3_{date.strftime("%Y")}_{date.strftime("%B")}.zip'
        return fname

    @staticmethod
    def get_archive_file_name_daily(date)->str:
        fname=f'L3_{date.strftime("%Y%m%d")}.zip'
        return fname

    @staticmethod
    def get_daily_options_file_name(date)->str:
        fname=f'L3_options_{date.strftime("%Y%m%d")}.csv'
        return fname


def load_daily_file_from_cache(fdate, cache_dir="cache"):
    fname=ds_info.get_daily_options_file_name(fdate)
    cache_path = Path(cache_dir) / fname
    if not cache_path.exists():
        raise FileNotFoundError(f"File {fname} not found in the cache directory.")
    return fname

def load_daily_file_from_monthly_cache(fdate, cache_dir="cache"):
    fname=ds_info.get_archive_file_name(fdate)
    daily_fname=ds_info.get_daily_options_file_name(fdate)
    cache_path = Path(cache_dir) / fname
    if not cache_path.exists():
        raise FileNotFoundError(f"File {fname} not found in the cache directory.")

    with zipfile.ZipFile(cache_path) as zip_ref:
        if  (daily_fname in zip_ref.namelist()):
            zip_ref.extractall(cache_dir)
        else:
            raise FileNotInArchiveError(f"File {fname} not found in the cache directory.")
    return daily_fname
def load_daily_file_from_remote_monthly(self,fdate, cache_dir="cache"):
    fname=ds_info.get_archive_file_name(fdate)
    file_info=self.remote_reader.search_file(fname)
    raise FileNotFoundError(f"File {fdate} not found in the cache directory.")

def load_daily_file_from_remote_daily(fdate, cache_dir="cache"):
    pass

load_functions=[
    load_daily_file_from_cache,
    load_daily_file_from_monthly_cache
]

class data_file_cache():
    def __init__(self,remote_reader,cache_dir="cache"):
        if not isinstance(cache_dir, Path):
            cache_dir = Path(cache_dir)
        self.cache_dir=cache_dir
        self.cache_dir.mkdir(exist_ok=True)
        self.remote_reader=remote_reader
        self.load_functions=load_functions
        self.load_functions.append(self.load_daily_file_from_remote_monthly)

    def load_daily_file_from_remote_monthly(self,fdate, cache_dir="cache"):
        fname=ds_info.get_archive_file_name(fdate)
        file_info=self.remote_reader.search_file(fname)
        raise FileNotFoundError(f"File {fdate} not found in the cache directory.")
    
    def load_file(self,fdate):
        success=False
        fname=None
        success=False
        for load_func in load_functions:
            try:
                fname=load_func(fdate,self.cache_dir)
                success=True
                break
            except(FileNotFoundError):
                continue

        return fname



d_reader = DeltaNeutralReader(
    credentials_file='testproject1-419520-61c1efd44a96.json',
    folder_id='1jkJR0INuQLWSVMfaZjnMa2NkfJnt4w0q',
    definitions_file='deltaneutral_table_definitions.json'
)

my_cache=data_file_cache(remote_reader=d_reader)
my_cache.load_file(pd.to_datetime("2024-04-15"))

'L3_options_20240415.csv'

In [8]:
import numpy as np
from functools import reduce

a=np.array([1,2,3,4,5])
b=reduce(lambda x,y: x+y, a,0)
b

c=a.sum()
d=0
for i in a:
    d+=i
d

15