In [1]:
#| default_exp load

# Load

> Methods for loading data

In [2]:
#| export
import pandas as pd
import numpy as np
from fastcore.all import *
import wandb
from datetime import datetime, timedelta
from dvats.imports import *
from dvats.utils import *
import pickle
import pyarrow.feather as ft

In [3]:
#| hide
from tsai.imports import beep

In [4]:
base_path = Path.home()
base_path

Path('/home/macu')

## Time series artifacts (to be used with weights and biases)

This class is meant to extend `wandb.Artifact` for logging/using files with time series data.

In [5]:
#| export
class TSArtifact(wandb.Artifact):

    default_storage_path = Path(Path.home()/'data/wandb_artifacts/')
    date_format = '%Y-%m-%d %H:%M:%S' # TODO add milliseconds
    handle_missing_values_techniques = {
        'linear_interpolation': lambda df : df.interpolate(method='linear', limit_direction='both'),
        'overall_mean': lambda df : df.fillna(df.mean()),
        'overall_median': lambda df : df.fillna(df.median()),
        'backward_fill' : lambda df : df.fillna(method='bfill'),
        'forward_fill' : lambda df : df.fillna(method='ffill')
    }

    "Class that represents a wandb artifact containing time series data. sd stands for start_date \
    and ed for end_date. Both should be pd.Timestamps"

    @delegates(wandb.Artifact.__init__)
    def __init__(self, name, sd:pd.Timestamp, ed:pd.Timestamp, **kwargs):
        super().__init__(type='dataset', name=name, **kwargs)
        self.sd = sd
        self.ed = ed
        if self.metadata is None:
            self.metadata = dict()
        self.metadata['TS'] = dict(sd = self.sd.strftime(self.date_format),
                                   ed = self.ed.strftime(self.date_format))


    @classmethod
    def from_daily_csv_files(cls, root_path, fread=pd.read_csv, start_date=None, end_date=None, metadata=None, **kwargs):

        "Create a wandb artifact of type `dataset`, containing the CSV files from `start_date` \
        to `end_date`. Dates must be pased as `datetime.datetime` objects. If a `wandb_run` is \
        defined, the created artifact will be logged to that run, using the longwall name as \
        artifact name, and the date range as version."

        return None


    @classmethod
    @delegates(__init__)
    def from_df(cls, df:pd.DataFrame, name:str, path:str=None, sd:pd.Timestamp=None, ed:pd.Timestamp=None,
                normalize:bool=False, missing_values_technique:str=None, resampling_freq:str=None, **kwargs):

        """
        Create a TSArtifact of type `dataset`, using the DataFrame `df` samples from \
        `sd` (start date) to `ed` (end date). Dates must be passed as `datetime.datetime` \
        objects. The transformed DataFrame is stored as a pickle file in the path `path` \
        and its reference is added to the artifact entries. Additionally, the dataset can \
        be normalized (see `normalize` argument) or transformed using missing values \
        handling techniques (see `missing_values_technique` argument) or resampling (see \
        `resampling_freq` argument).

        Arguments:
            df: (DataFrame) The dataframe you want to convert into an artifact.
            name: (str) The artifact name.
            path: (str, optional) The path where the file, containing the new transformed \
                dataframe, is saved. Default None.
            sd: (sd, optional) Start date. By default, the first index of `df` is taken.
            ed: (ed, optional) End date. By default, the last index of `df` is taken.
            normalize: (bool, optional) If the dataset values should be normalized. Default\
                False.
            missing_values_technique: (str, optional) The technique used to handle missing \
                values. Options: "linear_iterpolation", "overall_mean", "overall_median" or \
                None. Default None.
            resampling_freq: (str, optional) The offset string or object representing \
                frequency conversion for time series resampling. Default None.

        Returns:
            TSArtifact object.
        """
        sd = df.index[0] if sd is None else sd
        ed = df.index[-1] if ed is None else ed
        obj = cls(name, sd=sd, ed=ed, **kwargs)
        df = df.query('@obj.sd <= index <= @obj.ed')
        obj.metadata['TS']['created'] = 'from-df'
        obj.metadata['TS']['n_vars'] = df.columns.__len__()

        # Handle Missing Values
        df = obj.handle_missing_values_techniques[missing_values_technique](df) if missing_values_technique is not None else df
        obj.metadata['TS']['handle_missing_values_technique'] = missing_values_technique.__str__()
        obj.metadata['TS']['has_missing_values'] = np.any(df.isna().values).__str__()

        # Indexing and Resampling
        if resampling_freq: df = df.resample(resampling_freq).mean()
        obj.metadata['TS']['n_samples'] = len(df)
        obj.metadata['TS']['freq'] = str(df.index.freq)

        # Time Series Variables
        obj.metadata['TS']['vars'] = list(df.columns)

        # Normalization - Save the previous means and stds
        if normalize:
            obj.metadata['TS']['normalization'] = dict(means = df.describe().loc['mean'].to_dict(),
                                                       stds = df.describe().loc['std'].to_dict())
            df = normalize_columns(df)

        # Hash and save
        hash_code = str(pd.util.hash_pandas_object(df).sum()) # str(hash(df.values.tobytes()))
        path = obj.default_storage_path/f'{hash_code}' if path is None else Path(path)/f'{hash_code}'
        print("About to write df to ", path)
        ft.write_feather(df, path, compression = 'lz4')
        #feather.write_dataframe
        obj.metadata['TS']['hash'] = hash_code
        obj.add_file(str(path))

        return obj

In [6]:
# TSArtifact class TEST

# resampling frequency
resampling_freq = '5s'
# handle missing values technique
missing_values_technique='overall_median'

# testing dataframe
df_test = pd.util.testing.makeMissingDataframe()
df_test.index = pd.date_range(start='2021-01-01', periods=len(df_test), freq='s')

artifact = TSArtifact.from_df(df_test, 
                              name='JNK', 
                              missing_values_technique=missing_values_technique,
                              resampling_freq=resampling_freq, 
                              normalize=True)
artifact.metadata

About to write df to  /home/macu/data/wandb_artifacts/-1511496075876420927


  import pandas.util.testing


{'TS': {'sd': '2021-01-01 00:00:00',
  'ed': '2021-01-01 00:00:29',
  'created': 'from-df',
  'n_vars': 4,
  'handle_missing_values_technique': 'overall_median',
  'has_missing_values': 'False',
  'n_samples': 6,
  'freq': '<5 * Seconds>',
  'vars': ['A', 'B', 'C', 'D'],
  'normalization': {'means': {'A': -0.1399466609903674,
    'B': 0.07151543123042768,
    'C': -0.11465545551938301,
    'D': -0.07301630858076928},
   'stds': {'A': 0.2342238262385836,
    'B': 0.45031771410710775,
    'C': 0.396302602268012,
    'D': 0.37714560431048827}},
  'hash': '-1511496075876420927'}}

In [7]:
hash = artifact.metadata['TS']['hash']
path = "../../data/wandb_artifacts/"+hash
print(path)
f = ft.read_feather(path)
print(f)

../../data/wandb_artifacts/-1511496075876420927
                            A         B         C         D
2021-01-01 00:00:00 -0.816941  0.275302 -0.789387  1.041437
2021-01-01 00:00:05 -0.959533 -0.822516  0.286192 -0.576697
2021-01-01 00:00:10  0.908326  1.414363  1.488951 -1.190392
2021-01-01 00:00:15  0.227122  0.455470 -1.409933 -0.702359
2021-01-01 00:00:20  1.399819 -1.424572  0.219548  1.285354
2021-01-01 00:00:25 -0.758792  0.101953  0.204630  0.142657


In [8]:
df = ft.read_feather("/home/macu/data/wandb_artifacts/-2535364569820284064")

In [9]:
type(df)

pandas.core.frame.DataFrame

At the end, we are interested in working with time series as a dataframe. So we need a function to download the files contained in a `wandb.apis.public.Artifact` object and process them into a TS dataframe. The process of passing from files to dataframe must be different depending on what type of creation method was used to generate the original `TSArtifact`.

In [10]:
#| export
@patch
def to_df(self:wandb.apis.public.Artifact):
    "Download the files of a saved wandb artifact and process them as a single dataframe. The artifact must \
    come from a call to `run.use_artifact` with a proper wandb run."
    # The way we have to ensure that the argument comes from a TS arfitact is the metadata
    if self.metadata.get('TS') is None:
        print(f'ERROR:{self} does not come from a logged TSArtifact')
        return None
    dir = Path(self.download())
    if self.metadata['TS']['created'] == 'from-df':
        # Call read_pickle with the single file from dir
        #return pd.read_pickle(dir.ls()[0])
        return ft.read_feather(dir.ls()[0])
    else:
        print("ERROR: Only from_df method is allowed yet")

For convenience, we can write a method to cast a downloaded wandb artifact (instance from `wandb.apis.public,Artifact`) to a TSArtifact

In [11]:
#| export
@patch
def to_tsartifact(self:wandb.apis.public.Artifact):
    "Cast an artifact as a TS artifact. The artifact must have been created from one of the \
    class creation methods of the class `TSArtifact`. This is useful to go back to a TSArtifact \
    after downloading an artifact through the wand API"
    return TSArtifact(name=self.digest, #TODO change this
                      sd=pd.to_datetime(self.metadata['TS']['sd'], format=TSArtifact.date_format),
                      ed=pd.to_datetime(self.metadata['TS']['sd'], format=TSArtifact.date_format),
                      description=self.description,
                      metadata=self.metadata)

## Inject or infer frequencies in a dataframe

In [12]:
#| export
@delegates(pd.to_datetime)
def infer_or_inject_freq(df, injected_freq='1s', start_date=None, **kwargs):
    """
        Infer index frequency. If there's not a proper time index, create fake timestamps,
        keeping the desired `injected_freq`. If that is None, set a default one of 1 second.
        start_date: the first date of the index (int or string).
    """
    inferred_freq = pd.infer_freq(df.index)
    if inferred_freq == 'N':
        timedelta = pd.to_timedelta(injected_freq)
        df.index = pd.to_datetime(ifnone(start_date, 0), **kwargs) + timedelta*df.index
        df.index.freq = pd.infer_freq(df.index)
    else:
        df.index.freq = inferred_freq
    return df

In [13]:
#| hide
foo = pd.DataFrame([1, 2, 3])
bar = pd.DataFrame([1, 2, 3])
foo = infer_or_inject_freq(foo)
bar = infer_or_inject_freq(bar, injected_freq='2s')
test_eq(foo.index.freq, '1s')
test_eq(bar.index.freq, '2s')
foo, bar

(                     0
 1970-01-01 00:00:00  1
 1970-01-01 00:00:01  2
 1970-01-01 00:00:02  3,
                      0
 1970-01-01 00:00:00  1
 1970-01-01 00:00:02  2
 1970-01-01 00:00:04  3)

In [14]:
#| hide 
foo = pd.DataFrame([1, 2, 3])
bar = infer_or_inject_freq(foo, injected_freq='1W', start_date='01/01/2020')
baz = infer_or_inject_freq(foo, injected_freq='1W', start_date='2020-01-01', format = '%Y-%m-%d')
test_eq(bar, baz)


In [15]:
#| export
import requests
from pathlib import Path

def download_file_from_google_drive(id, destination):
    URL = "https://docs.google.com/uc?export=download"

    session = requests.Session()
    response = session.get(URL, params={'id': id}, stream=True)
    token = get_confirm_token(response)

    if token:
        params = {'id': id, 'confirm': token}
        response = session.get(URL, params=params, stream=True)

    save_response_content(response, destination)    
    print(f"File downloaded as: {destination}")

def get_confirm_token(response):
    for key, value in response.cookies.items():
        if key.startswith('download_warning'):
            return value

    return None

def save_response_content(response, destination):
    CHUNK_SIZE = 32768
    print(destination)
    with open(destination, "wb") as f:
        for chunk in response.iter_content(CHUNK_SIZE):
            if chunk: # filter out keep-alive new chunks
                f.write(chunk)
import zipfile

def zip_contents(zip_path):
    with zipfile.ZipFile(zip_path, 'r') as zip_file:
        return zip_file.namelist()


def unzip_mat(all_one, zip_path, extract_path, case = '', print_flag = True):
    if print_flag: print("--> Unzip_mat", all_one, zip_path, extract_path, case, print_flag)
    with zipfile.ZipFile(zip_path, 'r') as zip_ref:
        mat_files = [file for file in zip_ref.namelist() if file.endswith('.mat') and not file.startswith('__MACOSX/')]
        if print_flag: print(mat_files)
        if all_one == "all":
            # Extract
            for file in mat_files:
                zip_ref.extract(file, extract_path)
            return f"{mat_files} extracted to {extract_path}"
        
        elif all_one == "one":
            if case == "":
                # Extract first .mat
                zip_ref.extract(mat_files[0], extract_path)
                return f"{mat_files[0]} extracted to  {extract_path}"
            else:
                # Extract <case>.mat
                mat_file = next((file for file in mat_files if case in file), None)
                if mat_file:
                    zip_ref.extract(mat_file, extract_path)
                    return f"{mat_file} extracted to {extract_path}"
                else:
                    return "None "+case+".mat found."
        else:
            return "First parameter must be 'all' or 'one'."
        if print_flag: print("unzip_path -->")


In [16]:
#| hide
# Downloading insects (by Eamonn Keogh)
file_id = '1qq1z2mVRd7PzDqX0TDAwY7BcWVjnXUfQ'
data_path = os.path.expanduser('~/data')
destination = os.path.join(data_path, 'InsectData-fig11.zip')

# Llamada a la función
download_file_from_google_drive(file_id, destination)
zip_contents(destination)
print(unzip_mat('all', destination,  data_path))
print(unzip_mat('one', destination,  data_path))
print(unzip_mat('one', destination,  data_path, 'Insect_one_million'))
print(unzip_mat('one', destination,  data_path, 'Insect_one_millione'))

/home/macu/data/InsectData-fig11.zip
File downloaded as: /home/macu/data/InsectData-fig11.zip
--> Unzip_mat all /home/macu/data/InsectData-fig11.zip /home/macu/data  True
['Insect_one_million.mat']
['Insect_one_million.mat'] extracted to /home/macu/data
--> Unzip_mat one /home/macu/data/InsectData-fig11.zip /home/macu/data  True
['Insect_one_million.mat']
Insect_one_million.mat extracted to  /home/macu/data
--> Unzip_mat one /home/macu/data/InsectData-fig11.zip /home/macu/data Insect_one_million True
['Insect_one_million.mat']
Insect_one_million.mat extracted to /home/macu/data
--> Unzip_mat one /home/macu/data/InsectData-fig11.zip /home/macu/data Insect_one_millione True
['Insect_one_million.mat']
None Insect_one_millione.mat found.


In [17]:
#| export
import scipy.io
import pandas as pd

def mat2csv(mat_file_path, csv_file_folder = '~/data/', print_flag=False):
    # Carga el archivo .mat, omitiendo las variables meta de MATLAB
    mat = scipy.io.loadmat(mat_file_path, squeeze_me=True, struct_as_record=False)
    
    # Itera sobre todas las variables encontradas en el archivo .mat
    for variable_name, data in mat.items():
        if variable_name.startswith('__') or isinstance(data, scipy.io.matlab.mio5_params.mat_struct):
            # Omite variables meta de MATLAB o estructuras (que requieren un manejo especial)
            continue
        
        # Convierte la data a un DataFrame de pandas, manejando diferentes tipos de datos
        if isinstance(data, np.ndarray):
            if data.dtype.names:  # Es un ndarray estructurado
                data_df = pd.DataFrame(data)
            else:  # Es un ndarray regular
                data_df = pd.DataFrame(data, columns=[variable_name])
        else:
            # Para otros tipos de datos, los convertimos en un DataFrame simple
            data_df = pd.DataFrame([data], columns=[variable_name])
        
        # Define la ruta del archivo .csv de salida
        csv_file_path = csv_file_folder+ variable_name + '.csv'
        
        # Guarda el DataFrame como un archivo .csv
        data_df.to_csv(csv_file_path, index=False)
        if print_flag:
            print(data_df.shape)
            display(data_df.head(5))
            print(f"Matlab matrix '{variable_name}' converted to CSV in: {csv_file_path}")
        return data_df



In [18]:
#| hide
path = '/home/macu/data/MP_first_test_penguin_sample.mat'
mat2csv(path, print_flag = True)

(109842, 1)


  if variable_name.startswith('__') or isinstance(data, scipy.io.matlab.mio5_params.mat_struct):


Unnamed: 0,penguin_sample
0,0.253906
1,0.259033
2,0.269287
3,0.27124
4,0.265137


Matlab matrix 'penguin_sample' converted to CSV in: ~/data/penguin_sample.csv


Unnamed: 0,penguin_sample
0,0.253906
1,0.259033
2,0.269287
3,0.271240
4,0.265137
...,...
109837,0.070312
109838,0.053955
109839,0.055908
109840,0.064209


In [19]:
#| hide
path = '/home/macu/data/Insect_one_million.mat'
mat2csv(path, print_flag = True)

  if variable_name.startswith('__') or isinstance(data, scipy.io.matlab.mio5_params.mat_struct):


(1000000, 1)


Unnamed: 0,Insect_one_million
0,0.23682
1,0.23682
2,0.23804
3,0.2063
4,0.026855


Matlab matrix 'Insect_one_million' converted to CSV in: ~/data/Insect_one_million.csv


Unnamed: 0,Insect_one_million
0,0.236820
1,0.236820
2,0.238040
3,0.206300
4,0.026855
...,...
999995,0.014648
999996,0.030518
999997,0.034180
999998,0.013428


## Export

In [20]:
#| hide
#from nbdev.export import *
#notebook2script()
beep(1)