In [None]:
!pip install sovai[full]

In [4]:
import sovai as sov
import pandas as pd

sov.token_auth(token="visit https://sov.ai/profile for your token")

In [13]:
tickers_meta = pd.read_parquet("data/tickers.parq")

In [15]:
tickers_meta[tickers_meta["ticker"]=="WFM"]

Unnamed: 0,table,permaticker,ticker,name,exchange,isdelisted,category,cusips,siccode,sicsector,sicindustry,famasector,famaindustry,sector,industry,scalemarketcap,scalerevenue,relatedtickers,currency,location,lastupdated,firstadded,firstpricedate,lastpricedate,firstquarter,lastquarter,secfilings,companysite,active,foreign,class
16616,SF1,198012,WFM,WHOLE FOODS MARKET INC,NASDAQ,Y,Domestic Common Stock,966837106,5411.0,Retail Trade,Retail-Grocery Stores,,Retail,Consumer Defensive,Grocery Stores,large,large,WFMI,USD,Texas; U.S.A,2019-05-07,2015-02-19,1992-01-23,2017-08-28,1996-12-31,2017-06-30,https://www.sec.gov/cgi-bin/browse-edgar?actio...,,Active,Domestic,Common Stock


In [None]:
df_code = pd.read_parquet("gs://sovai-public/sovai-master/output/df_codes.parquet")

In [None]:
df_code.head()

In [5]:
df_institute = sov.data("institutional/trading", start_date="2004-04-30", tickers=["AMZN", "DDD"])

In [None]:
df_ratios = sov.data("ratios/normal", tickers=["WFM"]); df_ratios.tail()

In [6]:
import pyarrow.dataset as ds
from pyarrow.fs import S3FileSystem
import pyarrow as pa
import pandas as pd
from functools import lru_cache
from concurrent.futures import ThreadPoolExecutor, as_completed
from sovai.tools.authentication import authentication

# Try to import CustomDataFrame, use regular DataFrame if not available
try:
    from sovai.extensions.pandas_extensions import CustomDataFrame
    HAS_CUSTOM_DATAFRAME = True
except ImportError:
    HAS_CUSTOM_DATAFRAME = False
    CustomDataFrame = pd.DataFrame  # Fallback to regular DataFrame


@lru_cache(maxsize=2)
def get_cached_s3_filesystem(storage_provider):
    return authentication.get_s3_filesystem_pickle(storage_provider, verbose=True)

@lru_cache(maxsize=2)
def get_cached_s3fs_filesystem(storage_provider):
    return authentication.get_s3fs_filesystem_json(storage_provider, verbose=True)

In [7]:
import os
import pandas as pd
import pyarrow as pa
import pyarrow.dataset as ds
import s3fs
import logging
import datetime
from functools import lru_cache
from concurrent.futures import ThreadPoolExecutor, as_completed
from tqdm import tqdm
from pyarrow.fs import S3FileSystem

# -------------------------------
# Configuration and Credential Management
# -------------------------------

# -------------------------------
# Logging Configuration
# -------------------------------

# Configure logging to output to both console and a log file
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler("data_operations.log"),
        logging.StreamHandler()
    ]
)
logger = logging.getLogger(__name__)

# -------------------------------
# Helper Functions
# -------------------------------

def construct_s3_path(type_name, provider, partition_type, ticker=None, publish_date=None, year=None, all_dates_after=False):
    """
    Construct the S3 path based on the partitioning scheme.

    Parameters:
    - type_name (str): Type of data (e.g., 'applications')
    - provider (str): 'wasabi' or 'digitalocean'
    - partition_type (str): 'date' or 'ticker'
    - ticker (str, optional): Ticker symbol
    - publish_date (str, optional): Publish date in 'YYYY-MM-DD' format
    - year (int, optional): Year extracted from publish_date
    - all_dates_after (bool, optional): If True, scan all date partitions after a certain date

    Returns:
    - str: Constructed S3 path
    """
    if provider=="digitalocean":
        bucket ="sovai/sovai-patents-export"
    else:
        bucket = "sovai-patents-export"
        

    if partition_type == "date":
        if all_dates_after:
            # Return the parent directory to scan all date partitions
            path = f"{type_name}/date/"
        else:
            if not publish_date:
                raise ValueError("publish_date must be provided for date partitioning.")
            path = f"{type_name}/date/date_partitioned={publish_date}/"
    elif partition_type == "ticker":
        if not ticker or not year:
            raise ValueError("Both ticker and year must be provided for ticker partitioning.")
        path = f"{type_name}/ticker/ticker_partitioned={ticker}/year={year}/"
    else:
        raise ValueError("Invalid partition_type. Choose 'date' or 'ticker'.")

    return f"{bucket}/{path}"


def list_date_partitions(provider, type_name):
    s3_path = construct_s3_path(type_name, provider, partition_type="date", all_dates_after=True)
    fs = get_cached_s3fs_filesystem(provider)
    try:
        partitions = fs.ls(s3_path)
        dates = []
        for partition in partitions:
            basename = os.path.basename(partition.rstrip('/'))
            if basename.startswith('date_partitioned='):
                date_str = basename.split('=')[1]
                try:
                    date_obj = datetime.datetime.strptime(date_str, '%Y-%m-%d').date()
                    dates.append((date_obj, partition))
                except ValueError:
                    logger.warning(f"Unable to parse date from partition {partition}")
        return dates
    except Exception as e:
        logger.error(f"Error listing date partitions in {s3_path} ({provider}): {e}")
        return []

def load_parquet_from_s3(s3_path, provider, columns=None, start_date=None):
    """
    Load a Parquet file from S3 into a pandas DataFrame with optional date filtering.

    Parameters:
    - s3_path (str): S3 path to the Parquet file or directory
    - provider (str): 'wasabi' or 'digitalocean'
    - columns (list, optional): List of columns to select
    - start_date (str, optional): Minimum date in 'YYYY-MM-DD' format to filter the data

    Returns:
    - pandas.DataFrame: Loaded DataFrame
    """
    
    fs = get_cached_s3_filesystem(provider)
    
    try:
        dataset = ds.dataset(s3_path, filesystem=fs, format='parquet')

        if start_date:
            # Convert start_date string to pyarrow date32 scalar
            min_date_pa = pa.scalar(pd.to_datetime(start_date).date(), type=pa.date32())
            # Use ds.field to construct the filter expression
            filter_expr = ds.field('date') >= min_date_pa
            dataset = dataset.filter(filter_expr)

        table = dataset.to_table(columns=columns)

        # Log the schema for debugging
        logger.info(f"Schema for {s3_path} ({provider}):")
        logger.info(table.schema)

        df = table.to_pandas()

        partition_cols = ['ticker', 'date']
        existing_cols = [col for col in partition_cols if col in df.columns]
        other_cols = [col for col in df.columns if col not in partition_cols]
        df = df[existing_cols + other_cols]

        return df
    except Exception as e:
        logger.error(f"Error loading data from {s3_path} ({provider}): {e}")
        return pd.DataFrame()

# -------------------------------
# Loading Functions
# -------------------------------

def load_data_by_ticker(
    type_name,
    providers=['wasabi', 'digitalocean'],
    tickers=None,
    start_date=None,
    columns=None,
    max_workers=4
):
    """
    Load data for specified tickers and/or dates from S3 storage providers.

    Parameters:
    - type_name (str): Type of data (e.g., 'applications', 'prediction_all', etc.)
    - providers (list, optional): List of storage providers to load from ('wasabi', 'digitalocean')
    - tickers (str or list, optional): Ticker symbol(s) to load data for
    - start_date (str, optional): Minimum publishDate to filter data by (format 'YYYY-MM-DD') (only for date partitioning)
    - columns (list, optional): List of columns to select
    - max_workers (int, optional): Number of parallel threads

    Returns:
    - pandas.DataFrame: Concatenated DataFrame containing the loaded data
    """
    date_tasks = []
    ticker_tasks = []
    results = []

    # Normalize input parameters
    if tickers and isinstance(tickers, str):
        tickers = [tickers]
    if not tickers:
        tickers = []
    
    # If both tickers and min_date are provided
    if tickers and start_date:
        try:
            min_date_obj = datetime.datetime.strptime(start_date, '%Y-%m-%d').date()
            min_year = min_date_obj.year
            logger.info(f"Minimum date provided: {min_date_obj} (Year: {min_year})")
        except ValueError:
            logger.error(f"Invalid min_date format: {start_date}. Expected 'YYYY-MM-DD'.")
            return pd.DataFrame()

        for provider in providers:
            for ticker in tickers:
                # Determine the range of years to load based on start_date
                # Assuming data starts from 2011
                start_year = min_year
                current_year = datetime.datetime.now().year
                for year in range(start_year, current_year + 1):
                    s3_path = construct_s3_path(
                        type_name, provider, partition_type="ticker",
                        ticker=ticker, year=year
                    )
                    ticker_tasks.append((s3_path, provider, columns, None))  # No additional filtering here

    # If only min_date is provided
    elif start_date and not tickers:
        try:
            min_date_obj = datetime.datetime.strptime(start_date, '%Y-%m-%d').date()
            logger.info(f"Minimum date provided: {min_date_obj}")
        except ValueError:
            logger.error(f"Invalid start_date format: {start_date}. Expected 'YYYY-MM-DD'.")
            return pd.DataFrame()

        for provider in providers:
            # List all date-based partitions
            date_partitions = list_date_partitions(provider, type_name)
            for date_obj, partition_path in date_partitions:
                if date_obj >= min_date_obj:
                    date_tasks.append((partition_path, provider, columns, None))

    # If only tickers are provided
    elif tickers and not start_date:
        for provider in providers:
            for ticker in tickers:
                # Assuming data starts from 2011
                start_year = 2006 ## Change this to 2008 eventually
                current_year = datetime.datetime.now().year
                for year in range(start_year, current_year + 1):
                    s3_path = construct_s3_path(
                        type_name, provider, partition_type="ticker",
                        ticker=ticker, year=year
                    )
                    ticker_tasks.append((s3_path, provider, columns, None))

    else:
        logger.warning("No tickers or start_date provided. Please provide at least one.")
        return pd.DataFrame()

    # Combine all tasks
    all_tasks = ticker_tasks + date_tasks

    if not all_tasks:
        logger.warning("No tasks to process. Exiting.")
        return pd.DataFrame()

    def load_task(task):
        s3_path, provider, columns, ticker_filter = task
        df = load_parquet_from_s3(s3_path, provider, columns, start_date=start_date if ticker_filter is None else None)
        if not df.empty and ticker_filter:
            if 'ticker' in df.columns:
                df = df[df['ticker'].isin(ticker_filter)]
            else:
                logger.warning(f"'ticker' column not found in data from {s3_path} ({provider}). Skipping ticker filter.")
        return df

    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        future_to_task = {executor.submit(load_task, task): task for task in all_tasks}
        for future in tqdm(as_completed(future_to_task), total=len(all_tasks), desc="Loading data"):
            df = future.result()
            if not df.empty:
                results.append(df)

    if results:
        final_df = pd.concat(results, ignore_index=True)
        logger.info(f"Successfully loaded data with {len(final_df)} records.")
    else:
        final_df = pd.DataFrame()
        logger.warning("No data found.")

    return final_df

# -------------------------------
# Example Usage
# -------------------------------



In [8]:
load_data_by_ticker(tickers=["GM"],type_name="applications")

Loading data: 100%|████████████████████████████████████████████████████████████████████████████| 40/40 [00:13<00:00,  2.90it/s]


Unnamed: 0,ticker,date,application_id,org_name,source,subsidiary,title,abstract,description,claims,country,location,ipc3,naics,app_type,kind,us_series_code,claims_num,drawings_num,publication_id,file_name,file_date,date_partitioned
0,GM,2010-01-07,11528566,SAAB AB,listed,Saab Automobile AB,LASER TARGET SEEKER DEVICE,A laser target seeker device arranged to recei...,CROSS-REFERENCE TO RELATED APPLICATIONS This ...,1 . A laser target seeker device arranged to r...,SE,Linkoping,F41,332994.000,utility,A1,11,26,9,20100001119,/tmp/2010_application_ipa100107/ipa100107.xml,2006-09-28,2010-01-07
1,GM,2010-01-07,12369820,"GENERAL MOTORS GLOBAL TECHNOLOGY OPERATIONS, INC.",listed,"General Motors Global Service Operations, Inc",APPARATUS AND METHOD FOR COOLING AN EXHAUST GAS,An exhaust gas cooling apparatus and method fo...,CROSS REFERENCE TO RELATED APPLICATION This a...,"1 . An exhaust gas cooling apparatus, comprisi...",US,Michigan,F01,333611.000,utility,A1,12,16,4,20100000205,/tmp/2010_application_ipa100107/ipa100107.xml,2009-02-12,2010-01-07
2,GM,2010-01-07,12165899,"GM GLOBAL TECHNOLOGY OPERATIONS, INC.",listed,"GM GLOBAL TECHNOLOGY OPERATIONS, INC.",PRECHARGING A HIGH-VOLTAGE BUS USING A VOLTAGE...,Systems and methods are provided for prechargi...,TECHNICAL FIELD Embodiments of the subject ma...,"1 . A precharge system for use in a vehicle, t...",US,Michigan,B60,336111.000,utility,A1,12,20,5,20100001582,/tmp/2010_application_ipa100107/ipa100107.xml,2008-07-01,2010-01-07
3,GM,2010-01-07,12437202,"GM GLOBAL TECHNOLOGY OPERATIONS, INC.",listed,"GM GLOBAL TECHNOLOGY OPERATIONS, INC.",MOTOR VEHICLE WITH A-PILLAR AND AIRBAG MOUNTED...,A motor vehicle is provided with a passenger c...,CROSS-REFERENCE TO RELATED APPLICATION This a...,"1 . A motor vehicle, comprising: a passenger c...",US,Michigan,B60,336111.000,utility,A1,12,6,4,20100001496,/tmp/2010_application_ipa100107/ipa100107.xml,2009-05-07,2010-01-07
4,GM,2010-01-07,12180777,"GM GLOBAL TECHNOLOGY OPERATIONS, INC.",listed,"GM GLOBAL TECHNOLOGY OPERATIONS, INC.",IGNITION COIL MODULE FUSE DIAGNOSTIC,A control system comprising an ignition fuse d...,CROSS-REFERENCE TO RELATED APPLICATIONS This ...,"1 . A control system, comprising: an ignition ...",US,Michigan,B60,336111.000,utility,A1,12,20,5,20100004846,/tmp/2010_application_ipa100107/ipa100107.xml,2008-07-28,2010-01-07
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
1632,GM,2010-12-30,12492540,"GM GLOBAL TECHNOLOGY OPERATIONS, INC.",listed,"GM GLOBAL TECHNOLOGY OPERATIONS, INC.",ROOF ASSEMBLY FOR A VEHICLE,A roof assembly for a vehicle that includes a ...,TECHNICAL FIELD The present invention general...,"1 . A roof assembly for a vehicle, comprising:...",US,Michigan,B60,336111.000,utility,A1,12,18,3,20100327620,/tmp/2010_application_ipa101230/ipa101230.xml,2009-06-26,2010-12-30
1633,GM,2010-12-30,12821028,"GM GLOBAL TECHNOLOGY OPERATIONS, INC.",listed,"GM GLOBAL TECHNOLOGY OPERATIONS, INC.",SIDE STRUCTURE OF A VEHICLE,A side structure of a vehicle is provided with...,CROSS-REFERENCE TO RELATED APPLICATION This a...,1 . A side structure of a vehicle with a side ...,US,Michigan,B60,336111.000,utility,A1,12,11,4,20100327630,/tmp/2010_application_ipa101230/ipa101230.xml,2010-06-22,2010-12-30
1634,GM,2010-12-30,12491291,"GM GLOBAL TECHNOLOGY OPERATIONS, INC.",listed,"GM GLOBAL TECHNOLOGY OPERATIONS, INC.",CONTROL SYSTEM FOR AN AUTOMATIC TRANSMISSION H...,A hydraulic control system for a automatic tra...,TECHNICAL FIELD The disclosure relates to a c...,1 . A hydraulic control system for actuating a...,US,Michigan,B60,336111.000,utility,A1,12,21,9,20100326542,/tmp/2010_application_ipa101230/ipa101230.xml,2009-06-25,2010-12-30
1635,GM,2010-12-30,12495232,"GM GLOBAL TECHNOLOGY OPERATIONS, INC.",listed,"GM GLOBAL TECHNOLOGY OPERATIONS, INC.",METHODS AND APPARATUS FOR INITIATING SERVICE S...,A method for initiating a service session with...,TECHNICAL FIELD The technical field generally...,1 . A method for initiating a service session ...,US,Michigan,B60,336111.000,utility,A1,12,20,4,20100332073,/tmp/2010_application_ipa101230/ipa101230.xml,2009-06-30,2010-12-30


In [1]:
import sovai as sov
import pandas as pd

sov.token_auth(token="visit https://sov.ai/profile for your token")

df = sov.data("patents/applications", tickers=["GM","MSFT"])

Downloading and saving new file.
Downloading and saving new file.


Loading data: 100%|████████████████████████████████████████████████████████████████████████████| 32/32 [00:09<00:00,  3.29it/s]


In [2]:
df

In [5]:
df.shape

(3428, 21)

In [6]:
df.head()

Unnamed: 0,ticker,date,applicationid,publicationid,orgname,subsidiary_name,location,country,ipc3,naics,title,abstract,description,claims,claimsnum,drawingsnum,filedate,apptype,kind,usseriescode,filename
0,GM,2011-01-06,12496939,20110000195,"GM GLOBAL TECHNOLOGY OPERATIONS, INC.","GM Global Technology Operations, Inc",Michigan,US,F16,333612.0,REDUCED VOLUME ELECTRICALLY HEATED PARTICULATE...,A control system comprises an exhaust treatmen...,FIELD The present disclosure relates to engin...,1 . An exhaust treatment system comprising: a ...,20,7,2009-07-02,utility,A1,12,/tmp/2011_application_ipa110106/ipa110106.xml
1,GM,2011-01-06,12496773,20110000194,"GM GLOBAL TECHNOLOGY OPERATIONS, INC.","GM Global Technology Operations, Inc",Michigan,US,F16,333612.0,SELECTIVE CATALYTIC REDUCTION SYSTEM USING ELE...,An exhaust system includes N heating elements ...,FIELD The present disclosure relates to emiss...,1 . An exhaust system comprising: N heating el...,18,5,2009-07-02,utility,A1,12,/tmp/2011_application_ipa110106/ipa110106.xml
2,GM,2011-01-06,12497742,20110000421,"GM GLOBAL TECHNOLOGY OPERATIONS, INC.","GM Global Technology Operations, Inc",Michigan,US,F16,333612.0,METHOD AND APPARATUS TO ESTIMATE AUTOMOTIVE AL...,An embodiment contemplates a method for determ...,BACKGROUND OF INVENTION An embodiment relates...,1 . A method for determining belt slip in a ve...,20,5,2009-07-06,utility,A1,12,/tmp/2011_application_ipa110106/ipa110106.xml
3,GM,2011-01-06,12497000,20110000596,"GM GLOBAL TECHNOLOGY OPERATIONS, INC.","GM Global Technology Operations, Inc",Michigan,US,F16,333612.0,LOW NOISE RUN-FLAT TIRES,A tire includes a pair of sidewalls in spaced ...,BACKGROUND OF THE INVENTION The subject matte...,1 . A tire comprising: a pair of sidewalls in ...,25,6,2009-07-02,utility,A1,12,/tmp/2011_application_ipa110106/ipa110106.xml
4,GM,2011-01-06,12830187,20110000729,"GM GLOBAL TECHNOLOGY OPERATIONS, INC.","GM Global Technology Operations, Inc",Michigan,US,F16,333612.0,FLOOR STRUCTURE FOR A MOTOR VEHICLE,A floor structure of a motor vehicle is provid...,CROSS-REFERENCE TO RELATED APPLICATION This a...,"1 . A floor structure for a motor vehicle, com...",16,5,2010-07-02,utility,A1,12,/tmp/2011_application_ipa110106/ipa110106.xml


In [35]:
df.shape

(14631, 22)

In [20]:
tickers_to_load = ["AAPL", "MSFT"]
type_of_data = "applications"
selected_columns = [
    "applicationid", "ticker", "date"
]
start_date = "2010-10-24"

logger.info("Starting data load with both tickers and start_date.")
loaded_df_both = load_data_by_ticker(
    type_name=type_of_data,
    providers=['digitalocean'],
    tickers=tickers_to_load,
    start_date=start_date,
    columns=selected_columns,
    max_workers=8  # Adjust based on your system's capabilities
)

Loading data: 100%|████████████████████████████████████████████████████████████████████████| 30/30 [00:00<00:00, 49.00it/s]


In [21]:
loaded_df_both

Unnamed: 0,ticker,date,applicationid
0,AAPL,2011-01-20,12504392
1,AAPL,2011-01-27,12692433
2,AAPL,2011-01-27,12509413
3,AAPL,2011-02-03,12902094
4,AAPL,2011-02-10,12535974
...,...,...,...
463,MSFT,2011-11-03,13179098
464,MSFT,2011-11-10,13187206
465,MSFT,2011-11-10,13185000
466,MSFT,2011-11-17,13190538


In [13]:
del loaded_df_both